1 /// 2 module rdkafkad.handlers; 3 import rdkafkad; 4 5 /** 6 * Base handle, super class for specific clients. 7 */ 8 class Handle 9 { 10 11 /** 12 * Returns the client's broker-assigned group member id 13 * 14 * Note: This currently requires the high-level KafkaConsumer 15 * 16 * Last assigned member id, or empty string if not currently 17 * a group member. 18 */ 19 string memberid() const nothrow 20 { 21 char* str = rd_kafka_memberid(rk_); 22 string memberid = str.fromStringz.idup; 23 if (str) 24 rd_kafka_mem_free(cast(rd_kafka_s*) rk_, str); 25 return memberid; 26 } 27 28 /** 29 * Request Metadata from broker. 30 * 31 * Parameters: 32 * \p all_topics - if non-zero: request info about all topics in cluster, 33 * if zero: only request info about locally known topics. 34 * \p only_rkt - only request info about this topic 35 * \p metadatap - pointer to hold metadata result. 36 * The \p *metadatap pointer must be released with \c delete. 37 * \p timeout_ms - maximum response time before failing. 38 * 39 * ErrorCode.no_error on success (in which case \p *metadatap 40 * will be set), else timed_out on timeout or 41 * other error code on error. 42 */ 43 Metadata metadata(bool all_topics, const Topic only_rkt, int timeout_ms = 60_000) 44 { 45 const rd_kafka_metadata_t* cmetadatap = null; 46 47 rd_kafka_topic_t* topic = only_rkt ? cast(rd_kafka_topic_t*) only_rkt.rkt_ : null; 48 49 if (auto error = cast(ErrorCode)rd_kafka_metadata(rk_, all_topics, topic, &cmetadatap, timeout_ms)) 50 { 51 throw new Exception(error.err2str); 52 } 53 return new Metadata(cmetadatap); 54 } 55 56 nothrow @nogc: 57 58 package void setCommonConfig(GlobalConf conf) 59 { 60 rd_kafka_conf_set_opaque(conf.rk_conf_, cast(void*) this); 61 62 if (conf.event_cb_) 63 { 64 rd_kafka_conf_set_log_cb(conf.rk_conf_, &log_cb_trampoline); 65 rd_kafka_conf_set_error_cb(conf.rk_conf_, &error_cb_trampoline); 66 rd_kafka_conf_set_throttle_cb(conf.rk_conf_, &throttle_cb_trampoline); 67 rd_kafka_conf_set_stats_cb(conf.rk_conf_, &stats_cb_trampoline); 68 event_cb_ = conf.event_cb_; 69 } 70 71 if (conf.socket_cb_) 72 { 73 rd_kafka_conf_set_socket_cb(conf.rk_conf_, &socket_cb_trampoline); 74 socket_cb_ = conf.socket_cb_; 75 } 76 77 version (Windows) 78 { 79 } 80 else if (conf.open_cb_) 81 { 82 rd_kafka_conf_set_open_cb(conf.rk_conf_, &open_cb_trampoline); 83 open_cb_ = conf.open_cb_; 84 } 85 86 if (conf.rebalance_cb_) 87 { 88 rd_kafka_conf_set_rebalance_cb(conf.rk_conf_, &rebalance_cb_trampoline); 89 rebalance_cb_ = conf.rebalance_cb_; 90 } 91 92 if (conf.offset_commit_cb_) 93 { 94 rd_kafka_conf_set_offset_commit_cb(conf.rk_conf_, &offset_commit_cb_trampoline); 95 offset_commit_cb_ = conf.offset_commit_cb_; 96 } 97 } 98 99 /** 100 * Query broker for low (oldest/beginning) 101 * and high (newest/end) offsets for partition. 102 * 103 * Offsets are returned in \p *low and \p *high respectively. 104 * 105 * ErrorCode.no_error on success or an error code on failure. 106 */ 107 ErrorCode queryWatermarkOffsets(const(char)* topic, int partition, 108 ref long low, ref long high, int timeout_ms) 109 { 110 return cast(ErrorCode) rd_kafka_query_watermark_offsets(rk_, topic, 111 partition, &low, &high, timeout_ms); 112 } 113 114 package rd_kafka_t* rk_; 115 /* All Producer and Consumer callbacks must reside in HandleImpl and 116 * the opaque provided to rdkafka must be a pointer to HandleImpl, since 117 * ProducerImpl and ConsumerImpl classes cannot be safely directly cast to 118 * HandleImpl due to the skewed diamond inheritance. */ 119 package EventCb event_cb_; 120 package SocketCb socket_cb_; 121 package OpenCb open_cb_; 122 package DeliveryReportCb dr_cb_; 123 package PartitionerCb partitioner_cb_; 124 package PartitionerKeyPointerCb partitioner_kp_cb_; 125 package RebalanceCb rebalance_cb_; 126 package OffsetCommitCb offset_commit_cb_; 127 128 /** the name of the handle */ 129 const(char)[] name() const 130 { 131 return rd_kafka_name(rk_).fromStringz; 132 } 133 134 /** 135 * Polls the provided kafka handle for events. 136 * 137 * Events will trigger application provided callbacks to be called. 138 * 139 * The \p timeout_ms argument specifies the maximum amount of time 140 * (in milliseconds) that the call will block waiting for events. 141 * For non-blocking calls, provide 0 as \p timeout_ms. 142 * To wait indefinately for events, provide -1. 143 * 144 * Events: 145 * - delivery report callbacks (if an DeliveryCb is configured) [producer] 146 * - event callbacks (if an EventCb is configured) [producer & consumer] 147 * 148 * Note: An application should make sure to call poll() at regular 149 * intervals to serve any queued callbacks waiting to be called. 150 * 151 * Warning: This method MUST NOT be used with the KafkaConsumer, 152 * use its consume() instead. 153 * 154 * the number of events served. 155 */ 156 int poll(int timeout_ms = 10) 157 { 158 return rd_kafka_poll(rk_, timeout_ms); 159 } 160 161 /** 162 * Returns the current out queue length 163 * 164 * The out queue contains messages and requests waiting to be sent to, 165 * or acknowledged by, the broker. 166 */ 167 int outqLen() 168 { 169 return rd_kafka_outq_len(rk_); 170 } 171 172 /** 173 * Convert a list of C partitions to C++ partitions 174 */ 175 private static TopicPartition[] c_parts_to_partitions( 176 const rd_kafka_topic_partition_list_t* c_parts) 177 { 178 auto partitions = (cast(TopicPartition*) malloc(c_parts.cnt * TopicPartition.sizeof))[0 179 .. c_parts.cnt]; 180 foreach (i, ref p; partitions) 181 partitions[i] = TopicPartition(&c_parts.elems[i]); 182 return partitions; 183 } 184 185 static void free_partition_vector(ref TopicPartition[] v) 186 { 187 foreach (ref p; v) 188 p.destroy; 189 free(v.ptr); 190 v = null; 191 } 192 193 private static rd_kafka_topic_partition_list_t* partitions_to_c_parts(const TopicPartition[] partitions) 194 { 195 rd_kafka_topic_partition_list_t* c_parts = rd_kafka_topic_partition_list_new(cast(int) partitions.length); 196 197 foreach (ref tpi; partitions) 198 { 199 rd_kafka_topic_partition_t* rktpar = rd_kafka_topic_partition_list_add(c_parts, tpi.topic_, tpi.partition_); 200 rktpar.offset = tpi.offset_; 201 } 202 203 return c_parts; 204 } 205 206 /** 207 * @brief Update the application provided 'partitions' with info from 'c_parts' 208 */ 209 private static void update_partitions_from_c_parts(TopicPartition[] partitions, 210 const rd_kafka_topic_partition_list_t* c_parts) 211 { 212 foreach (i; 0 .. c_parts.cnt) 213 { 214 const rd_kafka_topic_partition_t* p = &c_parts.elems[i]; 215 216 /* Find corresponding C++ entry */ 217 foreach (pp; partitions) 218 { 219 if (!strcmp(p.topic, pp.topic_) && p.partition == pp.partition_) 220 { 221 pp.offset_ = p.offset; 222 pp.err_ = cast(ErrorCode) p.err; 223 } 224 } 225 } 226 } 227 228 private static void log_cb_trampoline(const rd_kafka_t* rk, int level, 229 const(char)* fac, const(char)* buf) 230 { 231 if (!rk) 232 { 233 rd_kafka_log_print(rk, level, fac, buf); 234 return; 235 } 236 237 void* opaque = rd_kafka_opaque(rk); 238 Handle handle = cast(Handle)(opaque); 239 240 if (!handle.event_cb_) 241 { 242 rd_kafka_log_print(rk, level, fac, buf); 243 return; 244 } 245 246 auto event = Event(Event.Type.log, ErrorCode.no_error, 247 cast(Event.Severity)(level), fac, buf); 248 249 handle.event_cb_(event); 250 } 251 252 private static void error_cb_trampoline(rd_kafka_t* rk, int err, 253 const(char)* reason, void* opaque) 254 { 255 Handle handle = cast(Handle)(opaque); 256 257 auto event = Event(Event.Type.error, cast(ErrorCode) err, 258 Event.Severity.error, null, reason); 259 260 handle.event_cb_(event); 261 } 262 263 private static void throttle_cb_trampoline(rd_kafka_t* rk, 264 const(char)* broker_name, int broker_id, int throttle_time_ms, void* opaque) 265 { 266 Handle handle = cast(Handle)(opaque); 267 268 auto event = Event(Event.Type.throttle); 269 event.str_ = broker_name.fromStringz; 270 event.id_ = broker_id; 271 event.throttle_time_ = throttle_time_ms; 272 273 handle.event_cb_(event); 274 } 275 276 private static int stats_cb_trampoline(rd_kafka_t* rk, char* json, 277 size_t json_len, void* opaque) 278 { 279 Handle handle = cast(Handle)(opaque); 280 auto event = Event(Event.Type.stats, ErrorCode.no_error, Event.Severity.info, 281 null, json); 282 283 handle.event_cb_(event); 284 285 return 0; 286 } 287 288 private static int socket_cb_trampoline(int domain, int type, int protocol, void* opaque) 289 { 290 Handle handle = cast(Handle)(opaque); 291 292 return handle.socket_cb_(domain, type, protocol); 293 } 294 295 private static int open_cb_trampoline(const(char)* pathname, int flags, 296 mode_t mode, void* opaque) 297 { 298 Handle handle = cast(Handle)(opaque); 299 300 return handle.open_cb_(pathname.fromStringz, flags, cast(int)(mode)); 301 } 302 303 private static void rebalance_cb_trampoline(rd_kafka_t* rk, 304 rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t* c_partitions, void* opaque) 305 { 306 auto handle = cast(KafkaConsumer)(opaque); 307 TopicPartition[] partitions = c_parts_to_partitions(c_partitions); 308 309 handle.rebalance_cb_(handle, cast(ErrorCode) err, partitions); 310 311 free_partition_vector(partitions); 312 } 313 314 private static void offset_commit_cb_trampoline(rd_kafka_t* rk, 315 rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t* c_offsets, void* opaque) 316 { 317 Handle handle = cast(Handle)(opaque); 318 TopicPartition[] offsets; 319 320 if (c_offsets) 321 offsets = c_parts_to_partitions(c_offsets); 322 323 handle.offset_commit_cb_(cast(ErrorCode) err, offsets); 324 325 free_partition_vector(offsets); 326 } 327 328 /** 329 * Pause producing or consumption for the provided list of partitions. 330 * 331 * Success or error is returned per-partition in the \p partitions list. 332 * 333 * ErrorCode::no_error 334 * 335 * See_also: resume() 336 */ 337 ErrorCode pause(TopicPartition[] partitions) 338 { 339 rd_kafka_topic_partition_list_t* c_parts; 340 rd_kafka_resp_err_t err; 341 342 c_parts = partitions_to_c_parts(partitions); 343 344 err = rd_kafka_pause_partitions(rk_, c_parts); 345 346 if (!err) 347 update_partitions_from_c_parts(partitions, c_parts); 348 349 rd_kafka_topic_partition_list_destroy(c_parts); 350 351 return cast(ErrorCode) err; 352 353 } 354 355 /** 356 * Resume producing or consumption for the provided list of partitions. 357 * 358 * Success or error is returned per-partition in the \p partitions list. 359 * 360 * ErrorCode::no_error 361 * 362 * See_also: pause() 363 */ 364 ErrorCode resume(TopicPartition[] partitions) 365 { 366 rd_kafka_topic_partition_list_t* c_parts; 367 rd_kafka_resp_err_t err; 368 369 c_parts = partitions_to_c_parts(partitions); 370 371 err = rd_kafka_resume_partitions(rk_, c_parts); 372 373 if (!err) 374 update_partitions_from_c_parts(partitions, c_parts); 375 376 rd_kafka_topic_partition_list_destroy(c_parts); 377 378 return cast(ErrorCode) err; 379 } 380 381 /** 382 * Get last known low (oldest/beginning) 383 * and high (newest/end) offsets for partition. 384 * 385 * The low offset is updated periodically (if statistics.interval.ms is set) 386 * while the high offset is updated on each fetched message set from the 387 * broker. 388 * 389 * If there is no cached offset (either low or high, or both) then 390 * OFFSET_INVALID will be returned for the respective offset. 391 * 392 * Offsets are returned in \p *low and \p *high respectively. 393 * 394 * ErrorCode.no_error on success or an error code on failure. 395 * 396 * Note: Shall only be used with an active consumer instance. 397 */ 398 ErrorCode getWatermarkOffsets(const(char)* topic, int partition, ref long low, 399 ref long high) 400 { 401 return cast(ErrorCode) rd_kafka_get_watermark_offsets(rk_, topic, 402 partition, &low, &high); 403 } 404 } 405 406 /** 407 * Queue interface 408 * 409 * Create a new message queue. Message queues allows the application 410 * to re-route consumed messages from multiple topic+partitions into 411 * one single queue point. This queue point, containing messages from 412 * a number of topic+partitions, may then be served by a single 413 * consume() method, rather than one per topic+partition combination. 414 * 415 * See the Consumer::start(), Consumer::consume(), and 416 * Consumer::consumeCallback() methods that take a queue as the first 417 * parameter for more information. 418 */ 419 class Queue 420 { 421 nothrow @nogc: 422 /** 423 * Create Queue object 424 */ 425 this(Handle handle) 426 { 427 queue_ = rd_kafka_queue_new(handle.rk_); 428 } 429 430 ~this() 431 { 432 rd_kafka_queue_destroy(queue_); 433 } 434 435 private: 436 rd_kafka_queue_t* queue_; 437 } 438 439 /** 440 * High-level KafkaConsumer (for brokers 0.9 and later) 441 * 442 * Note: Requires Apache Kafka >= 0.9.0 brokers 443 * 444 * Currently supports the \c range and \c roundrobin partition assignment 445 * strategies (see \c partition.assignment.strategy) 446 */ 447 class KafkaConsumer : Handle 448 { 449 /** 450 * Creates a KafkaConsumer. 451 * 452 * The \p conf object must have \c group.id set to the consumer group to join. 453 * 454 * Use close() to shut down the consumer. 455 * 456 * See_also: RebalanceCb 457 * See_also: CONFIGURATION.md for \c group.id, \c session.timeout.ms, 458 * \c partition.assignment.strategy, etc. 459 */ 460 this(GlobalConf conf) 461 { 462 char[512] errbuf = void; 463 rd_kafka_conf_t* rk_conf = null; 464 size_t grlen; 465 466 if (!conf.rk_conf_) 467 { 468 throw new Exception("Requires Conf::CONF_GLOBAL object"); 469 } 470 471 if (rd_kafka_conf_get(conf.rk_conf_, "group.id", null, 472 &grlen) != rd_kafka_conf_res_t.RD_KAFKA_CONF_OK || grlen <= 1 /* terminating null only */ ) 473 { 474 throw new Exception("\"group.id\" must be configured"); 475 } 476 477 this.setCommonConfig(conf); 478 479 rk_conf = rd_kafka_conf_dup(conf.rk_conf_); 480 481 rd_kafka_t* rk; 482 if (null is(rk = rd_kafka_new(rd_kafka_type_t.RD_KAFKA_CONSUMER, rk_conf, 483 errbuf.ptr, errbuf.sizeof))) 484 { 485 throw new Exception(errbuf.ptr.fromStringz.idup); 486 } 487 488 this.rk_ = rk; 489 490 /* Redirect handle queue to cgrp's queue to provide a single queue point */ 491 rd_kafka_poll_set_consumer(rk); 492 } 493 494 /** Returns the current partition assignment as set by 495 * assign() */ 496 ErrorCode assignment(ref TopicPartition[] partitions) nothrow 497 { 498 rd_kafka_topic_partition_list_t* c_parts; 499 rd_kafka_resp_err_t err; 500 501 if (0 != (err = rd_kafka_assignment(rk_, &c_parts))) 502 return cast(ErrorCode) err; 503 504 partitions.length = c_parts.cnt; 505 506 foreach (i, ref p; partitions) 507 p = TopicPartition(&c_parts.elems[i]); 508 509 rd_kafka_topic_partition_list_destroy(c_parts); 510 511 return ErrorCode.no_error; 512 513 } 514 515 nothrow @nogc: 516 517 /** 518 * Update the subscription set to \p topics. 519 * 520 * Any previous subscription will be unassigned and unsubscribed first. 521 * 522 * The subscription set denotes the desired topics to consume and this 523 * set is provided to the partition assignor (one of the elected group 524 * members) for all clients which then uses the configured 525 * \c partition.assignment.strategy to assign the subscription sets's 526 * topics's partitions to the consumers, depending on their subscription. 527 * 528 * The result of such an assignment is a rebalancing which is either 529 * handled automatically in librdkafka or can be overriden by the application 530 * by providing a RebalanceCb. 531 * 532 * The rebalancing passes the assigned partition set to 533 * assign() to update what partitions are actually 534 * being fetched by the KafkaConsumer. 535 * 536 * Regex pattern matching automatically performed for topics prefixed 537 * with \c \"^\" (e.g. \c \"^myPfx[0-9]_.*\" 538 */ 539 ErrorCode subscribe(const(char)*[] topics...) 540 { 541 rd_kafka_topic_partition_list_t* c_topics; 542 rd_kafka_resp_err_t err; 543 544 c_topics = rd_kafka_topic_partition_list_new(cast(int) topics.length); 545 546 foreach (t; topics) 547 rd_kafka_topic_partition_list_add(c_topics, t, RD_KAFKA_PARTITION_UA); 548 549 err = rd_kafka_subscribe(rk_, c_topics); 550 551 rd_kafka_topic_partition_list_destroy(c_topics); 552 553 return cast(ErrorCode) err; 554 } 555 /** Unsubscribe from the current subscription set. */ 556 nothrow @nogc ErrorCode unsubscribe() 557 { 558 return cast(ErrorCode)(rd_kafka_unsubscribe(this.rk_)); 559 } 560 561 /** 562 * Consume message or get error event, triggers callbacks. 563 * 564 * Will automatically call registered callbacks for any such queued events, 565 * including RebalanceCb, EventCb, OffsetCommitCb, 566 * etc. 567 * 568 * Note: An application should make sure to call consume() at regular 569 * intervals, even if no messages are expected, to serve any 570 * queued callbacks waiting to be called. This is especially 571 * important when a RebalanceCb has been registered as it needs 572 * to be called and handled properly to synchronize internal 573 * consumer state. 574 * 575 * Note: Application MUST NOT call \p poll() on KafkaConsumer objects. 576 * 577 * One of: 578 * - proper message (Message::err() is ErrorCode.no_error) 579 * - error event (Message::err() is != ErrorCode.no_error) 580 * - timeout due to no message or event in \p timeout_ms 581 * (Message::err() is timed_out) 582 */ 583 /++ 584 Params: 585 msg = message to fill. Use `msg.err` to check errors. 586 timeout_ms = time to to wait if no incomming msgs in queue. 587 +/ 588 nothrow @nogc void consume(ref Message msg, int timeout_ms = 10) 589 { 590 rd_kafka_message_t* rkmessage; 591 592 rkmessage = rd_kafka_consumer_poll(this.rk_, timeout_ms); 593 594 if (!rkmessage) 595 { 596 msg = Message(null, ErrorCode.timed_out); 597 return; 598 } 599 600 msg = Message(rkmessage); 601 } 602 603 /** 604 * Update the assignment set to \p partitions. 605 * 606 * The assignment set is the set of partitions actually being consumed 607 * by the KafkaConsumer. 608 */ 609 ErrorCode assign(const TopicPartition[] partitions) 610 { 611 rd_kafka_topic_partition_list_t* c_parts; 612 rd_kafka_resp_err_t err; 613 614 c_parts = partitions_to_c_parts(partitions); 615 616 err = rd_kafka_assign(rk_, c_parts); 617 618 rd_kafka_topic_partition_list_destroy(c_parts); 619 return cast(ErrorCode) err; 620 } 621 622 /** 623 * Stop consumption and remove the current assignment. 624 */ 625 ErrorCode unassign() 626 { 627 return cast(ErrorCode) rd_kafka_assign(rk_, null); 628 } 629 630 /** 631 * Commit offsets for the current assignment. 632 * 633 * Note: This is the synchronous variant that blocks until offsets 634 * are committed or the commit fails (see return value). 635 * 636 * Note: If a OffsetCommitCb callback is registered it will 637 * be called with commit details on a future call to 638 * consume() 639 * 640 * ErrorCode.no_error or error code. 641 */ 642 ErrorCode commitSync() 643 { 644 return cast(ErrorCode) rd_kafka_commit(rk_, null, 0 /*sync*/ ); 645 646 } 647 648 /** 649 * Asynchronous version of CommitSync() 650 * 651 * See_also: KafkaConsummer::commitSync() 652 */ 653 ErrorCode commitAsync() 654 { 655 return cast(ErrorCode) rd_kafka_commit(rk_, null, 1 /*async*/ ); 656 } 657 658 /** 659 * Commit offset for a single topic+partition based on \p message 660 * 661 * Note: This is the synchronous variant. 662 * 663 * See_also: KafkaConsummer::commitSync() 664 */ 665 ErrorCode commitSync(ref Message message) 666 { 667 return cast(ErrorCode) rd_kafka_commit_message(rk_, message.rkmessage_, 0 /*sync*/ ); 668 } 669 670 /** 671 * Commit offset for a single topic+partition based on \p message 672 * 673 * Note: This is the asynchronous variant. 674 * 675 * See_also: KafkaConsummer::commitSync() 676 */ 677 ErrorCode commitAsync(ref Message message) 678 { 679 return cast(ErrorCode) rd_kafka_commit_message(rk_, message.rkmessage_, 1 /*async*/ ); 680 } 681 682 /** 683 * Commit offsets for the provided list of partitions. 684 * 685 * Note: This is the synchronous variant. 686 */ 687 ErrorCode commitSync(TopicPartition[] offsets) 688 { 689 rd_kafka_topic_partition_list_t* c_parts = partitions_to_c_parts(offsets); 690 rd_kafka_resp_err_t err = rd_kafka_commit(rk_, c_parts, 0); 691 if (!err) 692 update_partitions_from_c_parts(offsets, c_parts); 693 rd_kafka_topic_partition_list_destroy(c_parts); 694 return cast(ErrorCode) err; 695 } 696 697 /** 698 * Commit offset for the provided list of partitions. 699 * 700 * Note: This is the asynchronous variant. 701 */ 702 ErrorCode commitAsync(const TopicPartition[] offsets) 703 { 704 rd_kafka_topic_partition_list_t* c_parts = partitions_to_c_parts(offsets); 705 rd_kafka_resp_err_t err = rd_kafka_commit(rk_, c_parts, 1); 706 rd_kafka_topic_partition_list_destroy(c_parts); 707 return cast(ErrorCode) err; 708 } 709 710 /** 711 * Retrieve committed offsets for topics+partitions. 712 * 713 * RD_KAFKA_RESP_ErrorCode.no_error on success in which case the 714 * \p offset or \p err field of each \p partitions' element is filled 715 * in with the stored offset, or a partition specific error. 716 * Else returns an error code. 717 */ 718 ErrorCode committed(TopicPartition[] partitions, int timeout_ms) 719 { 720 rd_kafka_topic_partition_list_t* c_parts; 721 rd_kafka_resp_err_t err; 722 723 c_parts = partitions_to_c_parts(partitions); 724 725 err = rd_kafka_committed(rk_, c_parts, timeout_ms); 726 727 if (!err) 728 { 729 update_partitions_from_c_parts(partitions, c_parts); 730 } 731 732 rd_kafka_topic_partition_list_destroy(c_parts); 733 734 return cast(ErrorCode) err; 735 } 736 737 /** 738 * Retrieve current positions (offsets) for topics+partitions. 739 * 740 * RD_KAFKA_RESP_ErrorCode.no_error on success in which case the 741 * \p offset or \p err field of each \p partitions' element is filled 742 * in with the stored offset, or a partition specific error. 743 * Else returns an error code. 744 */ 745 ErrorCode position(TopicPartition[] partitions) 746 { 747 rd_kafka_topic_partition_list_t* c_parts; 748 rd_kafka_resp_err_t err; 749 750 c_parts = partitions_to_c_parts(partitions); 751 752 err = rd_kafka_position(rk_, c_parts); 753 754 if (!err) 755 { 756 update_partitions_from_c_parts(partitions, c_parts); 757 } 758 759 rd_kafka_topic_partition_list_destroy(c_parts); 760 761 return cast(ErrorCode) err; 762 763 } 764 765 /** 766 * Close and shut down the proper. 767 * 768 * This call will block until the following operations are finished: 769 * - Trigger a local rebalance to void the current assignment 770 * - Stop consumption for current assignment 771 * - Commit offsets 772 * - Leave group 773 * 774 * The maximum blocking time is roughly limited to session.timeout.ms. 775 * 776 * Note: Callbacks, such as RebalanceCb and 777 * OffsetCommitCb, etc, may be called. 778 * 779 * Note: The consumer object must later be freed with \c delete 780 */ 781 ErrorCode close() 782 { 783 rd_kafka_resp_err_t err; 784 err = rd_kafka_consumer_close(rk_); 785 if (err) 786 return cast(ErrorCode) err; 787 788 while (rd_kafka_outq_len(rk_) > 0) 789 rd_kafka_poll(rk_, 10); 790 rd_kafka_destroy(rk_); 791 792 return cast(ErrorCode) err; 793 } 794 } 795 796 /** 797 * Simple Consumer (legacy) 798 * 799 * A simple non-balanced, non-group-aware, consumer. 800 */ 801 class Consumer : Handle 802 { 803 /** 804 * Creates a new Kafka consumer handle. 805 * 806 * \p conf is an optional object that will be used instead of the default 807 * configuration. 808 * The \p conf object is reusable after this call. 809 * 810 * the new handle on success or null on error in which case 811 * \p errstr is set to a human readable error message. 812 */ 813 this(GlobalConf conf) 814 { 815 char[512] errbuf = void; 816 rd_kafka_conf_t* rk_conf = null; 817 818 if (conf) 819 { 820 if (!conf.rk_conf_) 821 { 822 throw new Exception("Requires Conf::CONF_GLOBAL object"); 823 } 824 825 this.setCommonConfig(conf); 826 827 rk_conf = rd_kafka_conf_dup(conf.rk_conf_); 828 } 829 830 if (null is(rk_ = rd_kafka_new(rd_kafka_type_t.RD_KAFKA_CONSUMER, 831 rk_conf, errbuf.ptr, errbuf.sizeof))) 832 { 833 throw new Exception(errbuf.ptr.fromStringz.idup); 834 } 835 } 836 837 nothrow @nogc: 838 839 ~this() 840 { 841 rd_kafka_destroy(rk_); 842 } 843 844 static: 845 846 /** 847 * Start consuming messages for topic and \p partition 848 * at offset \p offset which may either be a proper offset (0..N) 849 * or one of the the special offsets: \p OFFSET_BEGINNING or \p OFFSET_END. 850 * 851 * rdkafka will attempt to keep \p queued.min.messages (config property) 852 * messages in the local queue by repeatedly fetching batches of messages 853 * from the broker until the threshold is reached. 854 * 855 * The application shall use one of the \p ...consume*() functions 856 * to consume messages from the local queue, each kafka message being 857 * represented as a `Message ` object. 858 * 859 * \p ...start() must not be called multiple times for the same 860 * topic and partition without stopping consumption first with 861 * \p ...stop(). 862 * 863 * an ErrorCode to indicate success or failure. 864 */ 865 ErrorCode start(Topic topic, int partition, long offset, Queue queue) 866 { 867 if (rd_kafka_consume_start(topic.rkt_, partition, offset) == -1) 868 return cast(ErrorCode)(rd_kafka_errno2err(errno)); 869 return ErrorCode.no_error; 870 } 871 872 /** 873 * Stop consuming messages for topic and \p partition, purging 874 * all messages currently in the local queue. 875 * 876 * The application needs to be stop all consumers before destroying 877 * the Consumer handle. 878 * 879 * an ErrorCode to indicate success or failure. 880 */ 881 ErrorCode stop(Topic topic, int partition) 882 { 883 if (rd_kafka_consume_stop(topic.rkt_, partition) == -1) 884 return cast(ErrorCode)(rd_kafka_errno2err(errno)); 885 return ErrorCode.no_error; 886 } 887 888 /** 889 * Seek consumer for topic+partition to \p offset which is either an 890 * absolute or logical offset. 891 * 892 * If \p timeout_ms is not 0 the call will wait this long for the 893 * seek to be performed. If the timeout is reached the internal state 894 * will be unknown and this function returns `timed_out`. 895 * If \p timeout_ms is 0 it will initiate the seek but return 896 * immediately without any error reporting (e.g., async). 897 * 898 * This call triggers a fetch queue barrier flush. 899 * 900 * an ErrorCode to indicate success or failure. 901 */ 902 ErrorCode seek(Topic topic, int partition, long offset, int timeout_ms) 903 { 904 if (rd_kafka_seek(topic.rkt_, partition, offset, timeout_ms) == -1) 905 return cast(ErrorCode)(rd_kafka_errno2err(errno)); 906 return ErrorCode.no_error; 907 } 908 909 /** 910 * Consume a single message from \p topic and \p partition. 911 * 912 * \p timeout_ms is maximum amount of time to wait for a message to be 913 * received. 914 * Consumer must have been previously started with \p ...start(). 915 * 916 * a Message object, the application needs to check if message 917 * is an error or a proper message Message::err() and checking for 918 * \p ErrorCode.no_error. 919 * 920 * The message object must be destroyed when the application is done with it. 921 * 922 * Errors (in Message::err()): 923 * - timed_out - \p timeout_ms was reached with no new messages fetched. 924 * - PARTITION_EOF - End of partition reached, not an error. 925 */ 926 void consume(Topic topic, int partition, int timeout_ms, ref Message msg) 927 { 928 rd_kafka_message_t* rkmessage; 929 930 rkmessage = rd_kafka_consume(topic.rkt_, partition, timeout_ms); 931 if (!rkmessage) 932 msg = Message(topic, cast(ErrorCode) rd_kafka_errno2err(errno)); 933 934 msg = Message(topic, rkmessage); 935 } 936 937 /** 938 * Consume a single message from the specified queue. 939 * 940 * \p timeout_ms is maximum amount of time to wait for a message to be 941 * received. 942 * Consumer must have been previously started on the queue with 943 * \p ...start(). 944 * 945 * a Message object, the application needs to check if message 946 * is an error or a proper message \p Message.err() and checking for 947 * \p ErrorCode.no_error. 948 * 949 * The message object must be destroyed when the application is done with it. 950 * 951 * Errors (in Message::err()): 952 * - timed_out - \p timeout_ms was reached with no new messages fetched 953 * 954 * Note that Message.topic() may be nullptr after certain kinds of 955 * errors, so applications should check that it isn't null before 956 * dereferencing it. 957 */ 958 void consume(Queue queue, int timeout_ms, ref Message msg) 959 { 960 rd_kafka_message_t* rkmessage; 961 rkmessage = rd_kafka_consume_queue(queue.queue_, timeout_ms); 962 if (!rkmessage) 963 msg = Message(null, cast(ErrorCode) rd_kafka_errno2err(errno)); 964 /* 965 * Recover our Topic from the topic conf's opaque field, which we 966 * set in Topic::create() for just this kind of situation. 967 */ 968 void* opaque = rd_kafka_topic_opaque(rkmessage.rkt); 969 Topic topic = cast(Topic)(opaque); 970 971 msg = Message(topic, rkmessage); 972 } 973 974 /* Helper struct for `consume_callback'. 975 * Encapsulates the values we need in order to call `rd_kafka_consume_callback' 976 * and keep track of the C++ callback function and `opaque' value. 977 */ 978 private static struct ConsumerCallback 979 { 980 /* This function is the one we give to `rd_kafka_consume_callback', with 981 * the `opaque' pointer pointing to an instance of this struct, in which 982 * we can find the C++ callback and `cb_data'. 983 */ 984 static nothrow @nogc void consume_cb_trampoline(rd_kafka_message_t* msg, void* opaque) 985 { 986 ConsumerCallback* instance = cast(ConsumerCallback*) opaque; 987 Message message = Message(instance.topic, msg, false /*don't free*/ ); 988 instance.cb_cls(message); 989 } 990 991 Topic topic; 992 ConsumeCb cb_cls; 993 } 994 995 /** 996 * Consumes messages from \p topic and \p partition, calling 997 * the provided callback for each consumed messsage. 998 * 999 * \p consumeCallback() provides higher throughput performance 1000 * than \p consume(). 1001 * 1002 * \p timeout_ms is the maximum amount of time to wait for one or 1003 * more messages to arrive. 1004 * 1005 * The provided \p consume_cb instance has its \p consume_cb function 1006 * called for every message received. 1007 * 1008 * The \p opaque argument is passed to the \p consume_cb as \p opaque. 1009 * 1010 * the number of messages processed or -1 on error. 1011 * 1012 * See_also: Consumer::consume() 1013 */ 1014 int consumeCallback(Topic topic, int partition, int timeout_ms, ConsumeCb consume_cb) 1015 { 1016 auto context = ConsumerCallback(topic, consume_cb); 1017 return rd_kafka_consume_callback(topic.rkt_, partition, timeout_ms, 1018 &ConsumerCallback.consume_cb_trampoline, &context); 1019 } 1020 1021 /* Helper struct for `consume_callback' with a Queue. 1022 * Encapsulates the values we need in order to call `rd_kafka_consume_callback' 1023 * and keep track of the C++ callback function and `opaque' value. 1024 */ 1025 private static struct ConsumerQueueCallback 1026 { 1027 /* This function is the one we give to `rd_kafka_consume_callback', with 1028 * the `opaque' pointer pointing to an instance of this struct, in which 1029 * we can find the C++ callback and `cb_data'. 1030 */ 1031 static nothrow @nogc void consume_cb_trampoline(rd_kafka_message_t* msg, void* opaque) 1032 { 1033 ConsumerQueueCallback* instance = cast(ConsumerQueueCallback*) opaque; 1034 /* 1035 * Recover our Topic from the topic conf's opaque field, which we 1036 * set in Topic::create() for just this kind of situation. 1037 */ 1038 void* topic_opaque = rd_kafka_topic_opaque(msg.rkt); 1039 Topic topic = cast(Topic) topic_opaque; 1040 Message message = Message(topic, msg, false /*don't free*/ ); 1041 instance.cb_cls(message); 1042 } 1043 1044 ConsumeCb cb_cls; 1045 } 1046 1047 /** 1048 * Consumes messages from \p queue, calling the provided callback for 1049 * each consumed messsage. 1050 * 1051 * See_also: Consumer::consumeCallback() 1052 */ 1053 1054 int consumeCallback(Queue queue, int timeout_ms, ConsumeCb consume_cb) 1055 { 1056 auto context = ConsumerQueueCallback(consume_cb); 1057 return rd_kafka_consume_callback_queue(queue.queue_, timeout_ms, 1058 &ConsumerQueueCallback.consume_cb_trampoline, &context); 1059 } 1060 1061 /** 1062 * Converts an offset into the logical offset from the tail of a topic. 1063 * 1064 * \p offset is the (positive) number of items from the end. 1065 * 1066 * the logical offset for message \p offset from the tail, this value 1067 * may be passed to Consumer::start, et.al. 1068 * Note: The returned logical offset is specific to librdkafka. 1069 */ 1070 long offsetTail(long offset) 1071 { 1072 return RD_KAFKA_OFFSET_TAIL(offset); 1073 } 1074 } 1075 1076 /** 1077 * Producer 1078 */ 1079 class Producer : Handle 1080 { 1081 package GlobalConf _conf; 1082 /** 1083 * Creates a new Kafka producer handle. 1084 * 1085 * \p conf is an optional object that will be used instead of the default 1086 * configuration. 1087 * The \p conf object is reusable after this call. 1088 */ 1089 this(GlobalConf conf) 1090 { 1091 _conf = conf; 1092 char[512] errbuf = void; 1093 rd_kafka_conf_t* rk_conf = null; 1094 1095 if (conf) 1096 { 1097 if (!conf.rk_conf_) 1098 { 1099 throw new Exception("Requires Conf::CONF_GLOBAL object"); 1100 } 1101 1102 this.setCommonConfig(conf); 1103 1104 rk_conf = rd_kafka_conf_dup(conf.rk_conf_); 1105 1106 if (conf.dr_cb_) 1107 { 1108 rd_kafka_conf_set_dr_msg_cb(rk_conf, &dr_msg_cb_trampoline); 1109 this.dr_cb_ = conf.dr_cb_; 1110 } 1111 } 1112 1113 if (null is(rk_ = rd_kafka_new(rd_kafka_type_t.RD_KAFKA_PRODUCER, 1114 rk_conf, errbuf.ptr, errbuf.sizeof))) 1115 { 1116 throw new Exception(errbuf.ptr.fromStringz.idup); 1117 } 1118 } 1119 1120 /++ 1121 Returns: new topic for this producer 1122 Params: 1123 topic = topic name 1124 topicConf = TopicConf, if null `defaultTopicConf` should be setted to the global configuration. 1125 +/ 1126 Topic newTopic(const(char)[] topic, TopicConf topicConf = null) 1127 { 1128 if(!topicConf) 1129 topicConf = _conf.defaultTopicConf; 1130 assert(topicConf); 1131 return new Topic(this, topic, topicConf); 1132 } 1133 1134 nothrow @nogc: 1135 1136 ~this() 1137 { 1138 if (rk_) 1139 { 1140 rd_kafka_destroy(rk_); 1141 } 1142 } 1143 1144 /** 1145 * Producer::produce() \p msgflags 1146 * 1147 * These flags are optional and mutually exclusive. 1148 */ 1149 enum MsgOpt 1150 { 1151 free = 0x1, /**< rdkafka will free(3) \p payload 1152 * when it is done with it. */ 1153 copy = 0x2, /**< the \p payload data will be copied 1154 * and the \p payload pointer will not 1155 * be used by rdkafka after the 1156 * call returns. */ 1157 block = 0x4, /**< Block produce*() on message queue 1158 * full. 1159 * WARNING: 1160 * If a delivery report callback 1161 * is used the application MUST 1162 * call rd_kafka_poll() (or equiv.) 1163 * to make sure delivered messages 1164 * are drained from the internal 1165 * delivery report queue. 1166 * Failure to do so will result 1167 * in indefinately blocking on 1168 * the produce() call when the 1169 * message queue is full. 1170 */ 1171 } 1172 1173 private static void dr_msg_cb_trampoline(rd_kafka_t* rk, 1174 const rd_kafka_message_t* rkmessage, void* opaque) 1175 { 1176 auto handle = cast(Handle) opaque; 1177 auto message = Message(null, rkmessage); 1178 handle.dr_cb_(message); 1179 message.destroy; 1180 } 1181 1182 /** 1183 * Produce and send a single message to broker. 1184 * 1185 * This is an asynch non-blocking API. 1186 * 1187 * \p partition is the target partition, either: 1188 * - Topic::PARTITION_UA (unassigned) for 1189 * automatic partitioning using the topic's partitioner function, or 1190 * - a fixed partition (0..N) 1191 * 1192 * \p msgflags is zero or more of the following flags OR:ed together: 1193 * block - block \p produce*() call if 1194 * \p queue.buffering.max.messages or 1195 * \p queue.buffering.max.kbytes are exceeded. 1196 * Messages are considered in-queue from the point they 1197 * are accepted by produce() until their corresponding 1198 * delivery report callback/event returns. 1199 * It is thus a requirement to call 1200 * poll() (or equiv.) from a separate 1201 * thread when block is used. 1202 * See WARNING on \c block above. 1203 * free - rdkafka will free(3) \p payload when it is done with it. 1204 * copy - the \p payload data will be copied and the \p payload 1205 * pointer will not be used by rdkafka after the 1206 * call returns. 1207 * 1208 * NOTE: free and copy are mutually exclusive. 1209 * 1210 * If the function returns -1 and free was specified, then 1211 * the memory associated with the payload is still the caller's 1212 * responsibility. 1213 * 1214 * \p payload is the message payload of size \p len bytes. 1215 * 1216 * \p key is an optional message key, if non-null it 1217 * will be passed to the topic partitioner as well as be sent with the 1218 * message to the broker and passed on to the consumer. 1219 * 1220 * \p msg_opaque is an optional application-provided per-message opaque 1221 * pointer that will provided in the delivery report callback (\p dr_cb) for 1222 * referencing this message. 1223 * 1224 * an ErrorCode to indicate success or failure: 1225 * - _QUEUE_FULL - maximum number of outstanding messages has been 1226 * reached: \c queue.buffering.max.message 1227 * 1228 * - MSG_SIZE_TOO_LARGE - message is larger than configured max size: 1229 * \c messages.max.bytes 1230 * 1231 * - _UNKNOWN_PARTITION - requested \p partition is unknown in the 1232 * Kafka cluster. 1233 * 1234 * - _UNKNOWN_TOPIC - topic is unknown in the Kafka cluster. 1235 */ 1236 ErrorCode produce(Topic topic, int partition, void[] payload, 1237 const(void)[] key = null, int msgflags = MsgOpt.copy, void* msg_opaque = null) 1238 { 1239 if (rd_kafka_produce(topic.rkt_, partition, msgflags, payload.ptr, 1240 payload.length, key.ptr, key.length, msg_opaque) == -1) 1241 return cast(ErrorCode) rd_kafka_errno2err(errno); 1242 return ErrorCode.no_error; 1243 } 1244 1245 /** 1246 * Wait until all outstanding produce requests, et.al, are completed. 1247 * This should typically be done prior to destroying a producer instance 1248 * to make sure all queued and in-flight produce requests are completed 1249 * before terminating. 1250 * 1251 * Note: This function will call poll() and thus trigger callbacks. 1252 * 1253 * timed_out if \p timeout_ms was reached before all 1254 * outstanding requests were completed, else ErrorCode.no_error 1255 */ 1256 ErrorCode flush(int timeout_ms = 60_000) 1257 { 1258 return cast(ErrorCode) rd_kafka_flush(rk_, timeout_ms); 1259 } 1260 }