1 /// 2 module rdkafkad.handlers; 3 import rdkafkad; 4 import rdkafkad.iodriver; 5 6 import std.datetime: Clock, UTC; 7 8 /** 9 * Base handle, super class for specific clients. 10 */ 11 class Handle 12 { 13 14 /** 15 * Returns the client's broker-assigned group member id 16 * 17 * Note: This currently requires the high-level KafkaConsumer 18 * 19 * Last assigned member id, or empty string if not currently 20 * a group member. 21 */ 22 auto memberid() const 23 { 24 char* str; 25 mixin(IO!q{ 26 str = rd_kafka_memberid(rk_); 27 }); 28 string memberid = str.fromStringz.idup; 29 if (str) 30 rd_kafka_mem_free(cast(rd_kafka_s*) rk_, str); 31 return memberid; 32 } 33 34 /** 35 * Request Metadata from broker. 36 * 37 * Parameters: 38 * \p all_topics - if non-zero: request info about all topics in cluster, 39 * if zero: only request info about locally known topics. 40 * \p only_rkt - only request info about this topic 41 * \p metadatap - pointer to hold metadata result. 42 * The \p *metadatap pointer must be released with \c delete. 43 * \p timeout_ms - maximum response time before failing. 44 * 45 * ErrorCode.no_error on success (in which case \p *metadatap 46 * will be set), else timed_out on timeout or 47 * other error code on error. 48 */ 49 Metadata metadata(bool all_topics, const Topic only_rkt, int timeout_ms = 60_000) 50 { 51 const rd_kafka_metadata_t* cmetadatap = null; 52 53 rd_kafka_topic_t* topic = only_rkt ? cast(rd_kafka_topic_t*) only_rkt.rkt_ : null; 54 55 ErrorCode error; 56 mixin(IO!q{ 57 error = cast(ErrorCode)rd_kafka_metadata(rk_, all_topics, topic, &cmetadatap, timeout_ms); 58 }); 59 if (error) 60 { 61 throw new Exception(error.err2str); 62 } 63 return new Metadata(cmetadatap); 64 } 65 66 static if (!have_vibed) 67 mixin("@nogc nothrow:"); 68 69 package void setCommonConfig(GlobalConf conf) 70 { 71 rd_kafka_conf_set_opaque(conf.rk_conf_, cast(void*) this); 72 73 if (conf.event_cb_) 74 { 75 rd_kafka_conf_set_log_cb(conf.rk_conf_, &log_cb_trampoline); 76 rd_kafka_conf_set_error_cb(conf.rk_conf_, &error_cb_trampoline); 77 rd_kafka_conf_set_throttle_cb(conf.rk_conf_, &throttle_cb_trampoline); 78 rd_kafka_conf_set_stats_cb(conf.rk_conf_, &stats_cb_trampoline); 79 event_cb_ = conf.event_cb_; 80 } 81 82 if (conf.socket_cb_) 83 { 84 rd_kafka_conf_set_socket_cb(conf.rk_conf_, &socket_cb_trampoline); 85 socket_cb_ = conf.socket_cb_; 86 } 87 88 version (Windows) 89 { 90 } 91 else if (conf.open_cb_) 92 { 93 rd_kafka_conf_set_open_cb(conf.rk_conf_, &open_cb_trampoline); 94 open_cb_ = conf.open_cb_; 95 } 96 97 if (conf.rebalance_cb_) 98 { 99 rd_kafka_conf_set_rebalance_cb(conf.rk_conf_, &rebalance_cb_trampoline); 100 rebalance_cb_ = conf.rebalance_cb_; 101 } 102 103 if (conf.offset_commit_cb_) 104 { 105 rd_kafka_conf_set_offset_commit_cb(conf.rk_conf_, &offset_commit_cb_trampoline); 106 offset_commit_cb_ = conf.offset_commit_cb_; 107 } 108 } 109 110 /** 111 * Query broker for low (oldest/beginning) 112 * and high (newest/end) offsets for partition. 113 * 114 * Offsets are returned in \p *low and \p *high respectively. 115 * 116 * ErrorCode.no_error on success or an error code on failure. 117 */ 118 auto queryWatermarkOffsets(const(char)* topic, int partition, 119 ref long low, ref long high, int timeout_ms) 120 { 121 ErrorCode ret; 122 mixin(IO!q{ 123 ret = cast(ErrorCode) rd_kafka_query_watermark_offsets(rk_, topic, 124 partition, &low, &high, timeout_ms); 125 }); 126 return ret; 127 } 128 129 package rd_kafka_t* rk_; 130 /* All Producer and Consumer callbacks must reside in HandleImpl and 131 * the opaque provided to rdkafka must be a pointer to HandleImpl, since 132 * ProducerImpl and ConsumerImpl classes cannot be safely directly cast to 133 * HandleImpl due to the skewed diamond inheritance. */ 134 package EventCb event_cb_; 135 package SocketCb socket_cb_; 136 package OpenCb open_cb_; 137 package DeliveryReportCb dr_cb_; 138 package PartitionerCb partitioner_cb_; 139 package PartitionerKeyPointerCb partitioner_kp_cb_; 140 package RebalanceCb rebalance_cb_; 141 package OffsetCommitCb offset_commit_cb_; 142 143 /** the name of the handle */ 144 const(char)[] name() const 145 { 146 return rd_kafka_name(rk_).fromStringz; 147 } 148 149 /** 150 * Polls the provided kafka handle for events. 151 * 152 * Events will trigger application provided callbacks to be called. 153 * 154 * The \p timeout_ms argument specifies the maximum amount of time 155 * (in milliseconds) that the call will block waiting for events. 156 * For non-blocking calls, provide 0 as \p timeout_ms. 157 * To wait indefinately for events, provide -1. 158 * 159 * Events: 160 * - delivery report callbacks (if an DeliveryCb is configured) [producer] 161 * - event callbacks (if an EventCb is configured) [producer & consumer] 162 * 163 * Note: An application should make sure to call poll() at regular 164 * intervals to serve any queued callbacks waiting to be called. 165 * 166 * Warning: This method MUST NOT be used with the KafkaConsumer, 167 * use its consume() instead. 168 * 169 * the number of events served. 170 */ 171 int poll(int timeout_ms = 10) 172 { 173 typeof(return) ret; 174 static if (have_vibed == true) 175 { 176 import vibe.core.core: sleep; 177 import core.time: msecs; 178 ret = rd_kafka_poll(rk_, 0); 179 if (ret == 0 && timeout_ms) 180 sleep(timeout_ms.msecs); 181 } 182 else 183 { 184 ret = rd_kafka_poll(rk_, timeout_ms); 185 } 186 return ret; 187 } 188 189 /** 190 * Returns the current out queue length 191 * 192 * The out queue contains messages and requests waiting to be sent to, 193 * or acknowledged by, the broker. 194 */ 195 int outqLen() 196 { 197 typeof(return) ret; 198 mixin(IO!q{ 199 ret = rd_kafka_outq_len(rk_); 200 }); 201 return ret; 202 } 203 204 /** 205 * Convert a list of C partitions to C++ partitions 206 */ 207 nothrow @nogc private static TopicPartition[] c_parts_to_partitions( 208 const rd_kafka_topic_partition_list_t* c_parts) 209 { 210 auto partitions = (cast(TopicPartition*) malloc(c_parts.cnt * TopicPartition.sizeof))[0 211 .. c_parts.cnt]; 212 foreach (i, ref p; partitions) 213 partitions[i] = TopicPartition(&c_parts.elems[i]); 214 return partitions; 215 } 216 217 nothrow @nogc static void free_partition_vector(ref TopicPartition[] v) 218 { 219 foreach (ref p; v) 220 p.destroy; 221 free(v.ptr); 222 v = null; 223 } 224 225 nothrow @nogc private static rd_kafka_topic_partition_list_t* partitions_to_c_parts(const TopicPartition[] partitions) 226 { 227 rd_kafka_topic_partition_list_t* c_parts = rd_kafka_topic_partition_list_new(cast(int) partitions.length); 228 229 foreach (ref tpi; partitions) 230 { 231 rd_kafka_topic_partition_t* rktpar = rd_kafka_topic_partition_list_add(c_parts, tpi.topic_, tpi.partition_); 232 rktpar.offset = tpi.offset_; 233 } 234 235 return c_parts; 236 } 237 238 /** 239 * @brief Update the application provided 'partitions' with info from 'c_parts' 240 */ 241 nothrow @nogc private static void update_partitions_from_c_parts(TopicPartition[] partitions, 242 const rd_kafka_topic_partition_list_t* c_parts) 243 { 244 foreach (i; 0 .. c_parts.cnt) 245 { 246 const rd_kafka_topic_partition_t* p = &c_parts.elems[i]; 247 248 /* Find corresponding C++ entry */ 249 foreach (pp; partitions) 250 { 251 if (!strcmp(p.topic, pp.topic_) && p.partition == pp.partition_) 252 { 253 pp.offset_ = p.offset; 254 pp.err_ = cast(ErrorCode) p.err; 255 } 256 } 257 } 258 } 259 260 nothrow @nogc private static void log_cb_trampoline(const rd_kafka_t* rk, int level, 261 const(char)* fac, const(char)* buf) 262 { 263 if (!rk) 264 { 265 rd_kafka_log_print(rk, level, fac, buf); 266 return; 267 } 268 269 void* opaque = rd_kafka_opaque(rk); 270 Handle handle = cast(Handle)(opaque); 271 272 if (!handle.event_cb_) 273 { 274 rd_kafka_log_print(rk, level, fac, buf); 275 return; 276 } 277 278 auto event = Event(Event.Type.log, ErrorCode.no_error, 279 cast(Event.Severity)(level), fac, buf); 280 281 handle.event_cb_(event); 282 } 283 284 nothrow @nogc private static void error_cb_trampoline(rd_kafka_t* rk, int err, 285 const(char)* reason, void* opaque) 286 { 287 Handle handle = cast(Handle)(opaque); 288 289 auto event = Event(Event.Type.error, cast(ErrorCode) err, 290 Event.Severity.error, null, reason); 291 292 handle.event_cb_(event); 293 } 294 295 nothrow @nogc private static void throttle_cb_trampoline(rd_kafka_t* rk, 296 const(char)* broker_name, int broker_id, int throttle_time_ms, void* opaque) 297 { 298 Handle handle = cast(Handle)(opaque); 299 300 auto event = Event(Event.Type.throttle); 301 event.str_ = broker_name.fromStringz; 302 event.id_ = broker_id; 303 event.throttle_time_ = throttle_time_ms; 304 305 handle.event_cb_(event); 306 } 307 308 nothrow @nogc private static int stats_cb_trampoline(rd_kafka_t* rk, char* json, 309 size_t json_len, void* opaque) 310 { 311 Handle handle = cast(Handle)(opaque); 312 auto event = Event(Event.Type.stats, ErrorCode.no_error, Event.Severity.info, 313 null, json); 314 315 handle.event_cb_(event); 316 317 return 0; 318 } 319 320 nothrow @nogc private static int socket_cb_trampoline(int domain, int type, int protocol, void* opaque) 321 { 322 Handle handle = cast(Handle)(opaque); 323 324 return handle.socket_cb_(domain, type, protocol); 325 } 326 327 nothrow @nogc private static int open_cb_trampoline(const(char)* pathname, int flags, 328 mode_t mode, void* opaque) 329 { 330 Handle handle = cast(Handle)(opaque); 331 332 return handle.open_cb_(pathname.fromStringz, flags, cast(int)(mode)); 333 } 334 335 nothrow @nogc private static void rebalance_cb_trampoline(rd_kafka_t* rk, 336 rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t* c_partitions, void* opaque) 337 { 338 auto handle = cast(KafkaConsumer)(opaque); 339 TopicPartition[] partitions = c_parts_to_partitions(c_partitions); 340 341 handle.rebalance_cb_(handle, cast(ErrorCode) err, partitions); 342 343 free_partition_vector(partitions); 344 } 345 346 nothrow @nogc private static void offset_commit_cb_trampoline(rd_kafka_t* rk, 347 rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t* c_offsets, void* opaque) 348 { 349 Handle handle = cast(Handle)(opaque); 350 TopicPartition[] offsets; 351 352 if (c_offsets) 353 offsets = c_parts_to_partitions(c_offsets); 354 355 handle.offset_commit_cb_(cast(ErrorCode) err, offsets); 356 357 free_partition_vector(offsets); 358 } 359 360 /** 361 * Pause producing or consumption for the provided list of partitions. 362 * 363 * Success or error is returned per-partition in the \p partitions list. 364 * 365 * ErrorCode::no_error 366 * 367 * See_also: resume() 368 */ 369 ErrorCode pause(TopicPartition[] partitions) 370 { 371 rd_kafka_topic_partition_list_t* c_parts; 372 rd_kafka_resp_err_t err; 373 374 c_parts = partitions_to_c_parts(partitions); 375 376 err = rd_kafka_pause_partitions(rk_, c_parts); 377 378 if (!err) 379 update_partitions_from_c_parts(partitions, c_parts); 380 381 rd_kafka_topic_partition_list_destroy(c_parts); 382 383 return cast(ErrorCode) err; 384 385 } 386 387 /** 388 * Resume producing or consumption for the provided list of partitions. 389 * 390 * Success or error is returned per-partition in the \p partitions list. 391 * 392 * ErrorCode::no_error 393 * 394 * See_also: pause() 395 */ 396 ErrorCode resume(TopicPartition[] partitions) 397 { 398 rd_kafka_topic_partition_list_t* c_parts; 399 rd_kafka_resp_err_t err; 400 401 c_parts = partitions_to_c_parts(partitions); 402 403 err = rd_kafka_resume_partitions(rk_, c_parts); 404 405 if (!err) 406 update_partitions_from_c_parts(partitions, c_parts); 407 408 rd_kafka_topic_partition_list_destroy(c_parts); 409 410 return cast(ErrorCode) err; 411 } 412 413 /** 414 * Get last known low (oldest/beginning) 415 * and high (newest/end) offsets for partition. 416 * 417 * The low offset is updated periodically (if statistics.interval.ms is set) 418 * while the high offset is updated on each fetched message set from the 419 * broker. 420 * 421 * If there is no cached offset (either low or high, or both) then 422 * OFFSET_INVALID will be returned for the respective offset. 423 * 424 * Offsets are returned in \p *low and \p *high respectively. 425 * 426 * ErrorCode.no_error on success or an error code on failure. 427 * 428 * Note: Shall only be used with an active consumer instance. 429 */ 430 ErrorCode getWatermarkOffsets(const(char)* topic, int partition, ref long low, 431 ref long high) 432 { 433 return cast(ErrorCode) rd_kafka_get_watermark_offsets(rk_, topic, 434 partition, &low, &high); 435 } 436 } 437 438 /** 439 * Queue interface 440 * 441 * Create a new message queue. Message queues allows the application 442 * to re-route consumed messages from multiple topic+partitions into 443 * one single queue point. This queue point, containing messages from 444 * a number of topic+partitions, may then be served by a single 445 * consume() method, rather than one per topic+partition combination. 446 * 447 * See the Consumer::start(), Consumer::consume(), and 448 * Consumer::consumeCallback() methods that take a queue as the first 449 * parameter for more information. 450 */ 451 class Queue 452 { 453 nothrow @nogc: 454 /** 455 * Create Queue object 456 */ 457 this(Handle handle) 458 { 459 queue_ = rd_kafka_queue_new(handle.rk_); 460 } 461 462 ~this() 463 { 464 rd_kafka_queue_destroy(queue_); 465 } 466 467 private: 468 rd_kafka_queue_t* queue_; 469 } 470 471 /** 472 * High-level KafkaConsumer (for brokers 0.9 and later) 473 * 474 * Note: Requires Apache Kafka >= 0.9.0 brokers 475 * 476 * Currently supports the \c range and \c roundrobin partition assignment 477 * strategies (see \c partition.assignment.strategy) 478 */ 479 class KafkaConsumer : Handle 480 { 481 /** 482 * Creates a KafkaConsumer. 483 * 484 * The \p conf object must have \c group.id set to the consumer group to join. 485 * 486 * Use close() to shut down the consumer. 487 * 488 * See_also: RebalanceCb 489 * See_also: CONFIGURATION.md for \c group.id, \c session.timeout.ms, 490 * \c partition.assignment.strategy, etc. 491 */ 492 this(GlobalConf conf) 493 { 494 char[512] errbuf = void; 495 rd_kafka_conf_t* rk_conf = null; 496 size_t grlen; 497 498 if (!conf.rk_conf_) 499 { 500 throw new Exception("Requires Conf::CONF_GLOBAL object"); 501 } 502 503 if (rd_kafka_conf_get(conf.rk_conf_, "group.id", null, 504 &grlen) != rd_kafka_conf_res_t.RD_KAFKA_CONF_OK || grlen <= 1 /* terminating null only */ ) 505 { 506 throw new Exception("\"group.id\" must be configured"); 507 } 508 509 this.setCommonConfig(conf); 510 511 rk_conf = rd_kafka_conf_dup(conf.rk_conf_); 512 513 rd_kafka_t* rk; 514 if (null is(rk = rd_kafka_new(rd_kafka_type_t.RD_KAFKA_CONSUMER, rk_conf, 515 errbuf.ptr, errbuf.sizeof))) 516 { 517 throw new Exception(errbuf.ptr.fromStringz.idup); 518 } 519 520 this.rk_ = rk; 521 522 /* Redirect handle queue to cgrp's queue to provide a single queue point */ 523 rd_kafka_poll_set_consumer(rk); 524 } 525 526 static if (!have_vibed) 527 mixin("nothrow:"); 528 529 /** Returns the current partition assignment as set by 530 * assign() */ 531 ErrorCode assignment(ref TopicPartition[] partitions) 532 { 533 rd_kafka_topic_partition_list_t* c_parts; 534 rd_kafka_resp_err_t err; 535 536 mixin(IO!q{ 537 err = rd_kafka_assignment(rk_, &c_parts); 538 }); 539 if (err) 540 return cast(ErrorCode) err; 541 542 partitions.length = c_parts.cnt; 543 544 foreach (i, ref p; partitions) 545 p = TopicPartition(&c_parts.elems[i]); 546 547 rd_kafka_topic_partition_list_destroy(c_parts); 548 549 return ErrorCode.no_error; 550 551 } 552 553 static if (!have_vibed) 554 mixin("@nogc:"); 555 556 /** 557 * Update the subscription set to \p topics. 558 * 559 * Any previous subscription will be unassigned and unsubscribed first. 560 * 561 * The subscription set denotes the desired topics to consume and this 562 * set is provided to the partition assignor (one of the elected group 563 * members) for all clients which then uses the configured 564 * \c partition.assignment.strategy to assign the subscription sets's 565 * topics's partitions to the consumers, depending on their subscription. 566 * 567 * The result of such an assignment is a rebalancing which is either 568 * handled automatically in librdkafka or can be overriden by the application 569 * by providing a RebalanceCb. 570 * 571 * The rebalancing passes the assigned partition set to 572 * assign() to update what partitions are actually 573 * being fetched by the KafkaConsumer. 574 * 575 * Regex pattern matching automatically performed for topics prefixed 576 * with \c \"^\" (e.g. \c \"^myPfx[0-9]_.*\" 577 */ 578 ErrorCode subscribe(const(char)*[] topics...) 579 { 580 rd_kafka_topic_partition_list_t* c_topics; 581 rd_kafka_resp_err_t err; 582 583 c_topics = rd_kafka_topic_partition_list_new(cast(int) topics.length); 584 585 foreach (t; topics) 586 rd_kafka_topic_partition_list_add(c_topics, t, RD_KAFKA_PARTITION_UA); 587 588 mixin(IO!q{ 589 err = rd_kafka_subscribe(rk_, c_topics); 590 }); 591 592 rd_kafka_topic_partition_list_destroy(c_topics); 593 594 return cast(ErrorCode) err; 595 } 596 /** Unsubscribe from the current subscription set. */ 597 auto unsubscribe() 598 { 599 ErrorCode ret; 600 mixin(IO!q{ 601 ret = cast(ErrorCode)(rd_kafka_unsubscribe(this.rk_)); 602 }); 603 return ret; 604 } 605 606 /** 607 * Consume message or get error event, triggers callbacks. 608 * 609 * Will automatically call registered callbacks for any such queued events, 610 * including RebalanceCb, EventCb, OffsetCommitCb, 611 * etc. 612 * 613 * Note: An application should make sure to call consume() at regular 614 * intervals, even if no messages are expected, to serve any 615 * queued callbacks waiting to be called. This is especially 616 * important when a RebalanceCb has been registered as it needs 617 * to be called and handled properly to synchronize internal 618 * consumer state. 619 * 620 * Note: Application MUST NOT call \p poll() on KafkaConsumer objects. 621 * 622 * One of: 623 * - proper message (Message::err() is ErrorCode.no_error) 624 * - error event (Message::err() is != ErrorCode.no_error) 625 * - timeout due to no message or event in \p timeout_ms 626 * (Message::err() is timed_out) 627 */ 628 /++ 629 Params: 630 msg = message to fill. Use `msg.err` to check errors. 631 timeout_ms = time to to wait if no incomming msgs in queue. 632 +/ 633 auto consume(ref Message msg, int timeout_ms = 10) 634 { 635 rd_kafka_message_t* rkmessage; 636 637 mixin(IO!q{ 638 rkmessage = rd_kafka_consumer_poll(this.rk_, timeout_ms); 639 }); 640 641 if (!rkmessage) 642 { 643 msg = Message(null, ErrorCode.timed_out); 644 return; 645 } 646 647 msg = Message(rkmessage); 648 } 649 650 /** 651 * Update the assignment set to \p partitions. 652 * 653 * The assignment set is the set of partitions actually being consumed 654 * by the KafkaConsumer. 655 */ 656 ErrorCode assign(const TopicPartition[] partitions) 657 { 658 rd_kafka_topic_partition_list_t* c_parts; 659 rd_kafka_resp_err_t err; 660 661 c_parts = partitions_to_c_parts(partitions); 662 663 mixin(IO!q{ 664 err = rd_kafka_assign(rk_, c_parts); 665 }); 666 667 rd_kafka_topic_partition_list_destroy(c_parts); 668 return cast(ErrorCode) err; 669 } 670 671 /** 672 * Stop consumption and remove the current assignment. 673 */ 674 ErrorCode unassign() 675 { 676 typeof(return) ret; 677 mixin(IO!q{ 678 ret = cast(ErrorCode) rd_kafka_assign(rk_, null); 679 }); 680 return ret; 681 } 682 683 /** 684 * Commit offsets for the current assignment. 685 * 686 * Note: This is the synchronous variant that blocks until offsets 687 * are committed or the commit fails (see return value). 688 * 689 * Note: If a OffsetCommitCb callback is registered it will 690 * be called with commit details on a future call to 691 * consume() 692 * 693 * ErrorCode.no_error or error code. 694 */ 695 ErrorCode commitSync() 696 { 697 typeof(return) ret; 698 mixin(IO!q{ 699 ret = cast(ErrorCode) rd_kafka_commit(rk_, null, 0 /*sync*/ ); 700 }); 701 return ret; 702 } 703 704 /** 705 * Asynchronous version of CommitSync() 706 * 707 * See_also: KafkaConsummer::commitSync() 708 */ 709 ErrorCode commitAsync() 710 { 711 typeof(return) ret; 712 mixin(IO!q{ 713 ret = cast(ErrorCode) rd_kafka_commit(rk_, null, 1 /*async*/ ); 714 }); 715 return ret; 716 } 717 718 /** 719 * Commit offset for a single topic+partition based on \p message 720 * 721 * Note: This is the synchronous variant. 722 * 723 * See_also: KafkaConsummer::commitSync() 724 */ 725 ErrorCode commitSync(ref Message message) 726 { 727 typeof(return) ret; 728 mixin(IO!q{ 729 ret = cast(ErrorCode) rd_kafka_commit_message(rk_, message.rkmessage_, 0 /*sync*/ ); 730 }); 731 return ret; 732 } 733 734 /** 735 * Commit offset for a single topic+partition based on \p message 736 * 737 * Note: This is the asynchronous variant. 738 * 739 * See_also: KafkaConsummer::commitSync() 740 */ 741 ErrorCode commitAsync(ref Message message) 742 { 743 typeof(return) ret; 744 mixin(IO!q{ 745 ret = cast(ErrorCode) rd_kafka_commit_message(rk_, message.rkmessage_, 1 /*async*/ ); 746 }); 747 return ret; 748 } 749 750 /** 751 * Commit offsets for the provided list of partitions. 752 * 753 * Note: This is the synchronous variant. 754 */ 755 ErrorCode commitSync(TopicPartition[] offsets) 756 { 757 rd_kafka_topic_partition_list_t* c_parts = partitions_to_c_parts(offsets); 758 rd_kafka_resp_err_t err; 759 mixin(IO!q{ 760 err = rd_kafka_commit(rk_, c_parts, 0); 761 }); 762 if (!err) 763 update_partitions_from_c_parts(offsets, c_parts); 764 rd_kafka_topic_partition_list_destroy(c_parts); 765 return cast(ErrorCode) err; 766 } 767 768 /** 769 * Commit offset for the provided list of partitions. 770 * 771 * Note: This is the asynchronous variant. 772 */ 773 ErrorCode commitAsync(const TopicPartition[] offsets) 774 { 775 rd_kafka_topic_partition_list_t* c_parts = partitions_to_c_parts(offsets); 776 rd_kafka_resp_err_t err; 777 mixin(IO!q{ 778 err = rd_kafka_commit(rk_, c_parts, 1); 779 }); 780 rd_kafka_topic_partition_list_destroy(c_parts); 781 return cast(ErrorCode) err; 782 } 783 784 /** 785 * Retrieve committed offsets for topics+partitions. 786 * 787 * RD_KAFKA_RESP_ErrorCode.no_error on success in which case the 788 * \p offset or \p err field of each \p partitions' element is filled 789 * in with the stored offset, or a partition specific error. 790 * Else returns an error code. 791 */ 792 ErrorCode committed(TopicPartition[] partitions, int timeout_ms) 793 { 794 rd_kafka_topic_partition_list_t* c_parts; 795 rd_kafka_resp_err_t err; 796 797 c_parts = partitions_to_c_parts(partitions); 798 799 mixin(IO!q{ 800 err = rd_kafka_committed(rk_, c_parts, timeout_ms); 801 }); 802 803 if (!err) 804 { 805 update_partitions_from_c_parts(partitions, c_parts); 806 } 807 808 rd_kafka_topic_partition_list_destroy(c_parts); 809 810 return cast(ErrorCode) err; 811 } 812 813 /** 814 * Retrieve current positions (offsets) for topics+partitions. 815 * 816 * RD_KAFKA_RESP_ErrorCode.no_error on success in which case the 817 * \p offset or \p err field of each \p partitions' element is filled 818 * in with the stored offset, or a partition specific error. 819 * Else returns an error code. 820 */ 821 ErrorCode position(TopicPartition[] partitions) 822 { 823 rd_kafka_topic_partition_list_t* c_parts; 824 rd_kafka_resp_err_t err; 825 826 c_parts = partitions_to_c_parts(partitions); 827 828 mixin(IO!q{ 829 err = rd_kafka_position(rk_, c_parts); 830 }); 831 832 if (!err) 833 { 834 update_partitions_from_c_parts(partitions, c_parts); 835 } 836 837 rd_kafka_topic_partition_list_destroy(c_parts); 838 839 return cast(ErrorCode) err; 840 841 } 842 843 /** 844 * Close and shut down the proper. 845 * 846 * This call will block until the following operations are finished: 847 * - Trigger a local rebalance to void the current assignment 848 * - Stop consumption for current assignment 849 * - Commit offsets 850 * - Leave group 851 * 852 * The maximum blocking time is roughly limited to session.timeout.ms. 853 * 854 * Note: Callbacks, such as RebalanceCb and 855 * OffsetCommitCb, etc, may be called. 856 * 857 * Note: The consumer object must later be freed with \c delete 858 */ 859 ErrorCode close() 860 { 861 rd_kafka_resp_err_t err; 862 mixin(IO!q{ 863 err = rd_kafka_consumer_close(rk_); 864 }); 865 if (err) 866 return cast(ErrorCode) err; 867 868 mixin(IO!q{ 869 while (rd_kafka_outq_len(rk_) > 0) 870 rd_kafka_poll(rk_, 10); 871 rd_kafka_destroy(rk_); 872 }); 873 874 return cast(ErrorCode) err; 875 } 876 } 877 878 /** 879 * Simple Consumer (legacy) 880 * 881 * A simple non-balanced, non-group-aware, consumer. 882 */ 883 class Consumer : Handle 884 { 885 /** 886 * Creates a new Kafka consumer handle. 887 * 888 * \p conf is an optional object that will be used instead of the default 889 * configuration. 890 * The \p conf object is reusable after this call. 891 * 892 * the new handle on success or null on error in which case 893 * \p errstr is set to a human readable error message. 894 */ 895 this(GlobalConf conf) 896 { 897 char[512] errbuf = void; 898 rd_kafka_conf_t* rk_conf = null; 899 900 if (conf) 901 { 902 if (!conf.rk_conf_) 903 { 904 throw new Exception("Requires Conf::CONF_GLOBAL object"); 905 } 906 907 this.setCommonConfig(conf); 908 909 rk_conf = rd_kafka_conf_dup(conf.rk_conf_); 910 } 911 912 mixin(IO!q{ 913 rk_ = rd_kafka_new(rd_kafka_type_t.RD_KAFKA_CONSUMER, 914 rk_conf, errbuf.ptr, errbuf.sizeof); 915 }); 916 if (null is rk_) 917 { 918 throw new Exception(errbuf.ptr.fromStringz.idup); 919 } 920 } 921 922 static if (!have_vibed) 923 mixin("nothrow @nogc:"); 924 925 ~this() 926 { 927 mixin(IO!q{ 928 rd_kafka_destroy(rk_); 929 }); 930 } 931 932 static: 933 934 /** 935 * Start consuming messages for topic and \p partition 936 * at offset \p offset which may either be a proper offset (0..N) 937 * or one of the the special offsets: \p OFFSET_BEGINNING or \p OFFSET_END. 938 * 939 * rdkafka will attempt to keep \p queued.min.messages (config property) 940 * messages in the local queue by repeatedly fetching batches of messages 941 * from the broker until the threshold is reached. 942 * 943 * The application shall use one of the \p ...consume*() functions 944 * to consume messages from the local queue, each kafka message being 945 * represented as a `Message ` object. 946 * 947 * \p ...start() must not be called multiple times for the same 948 * topic and partition without stopping consumption first with 949 * \p ...stop(). 950 * 951 * an ErrorCode to indicate success or failure. 952 */ 953 ErrorCode start(Topic topic, int partition, long offset, Queue queue) 954 { 955 int err; 956 mixin(IO!q{ 957 err = rd_kafka_consume_start(topic.rkt_, partition, offset); 958 }); 959 if (err == -1) 960 return cast(ErrorCode)(rd_kafka_errno2err(errno)); 961 return ErrorCode.no_error; 962 } 963 964 /** 965 * Stop consuming messages for topic and \p partition, purging 966 * all messages currently in the local queue. 967 * 968 * The application needs to be stop all consumers before destroying 969 * the Consumer handle. 970 * 971 * an ErrorCode to indicate success or failure. 972 */ 973 ErrorCode stop(Topic topic, int partition) 974 { 975 int err; 976 mixin(IO!q{ 977 err = rd_kafka_consume_stop(topic.rkt_, partition); 978 }); 979 if (err == -1) 980 return cast(ErrorCode)(rd_kafka_errno2err(errno)); 981 return ErrorCode.no_error; 982 } 983 984 /** 985 * Seek consumer for topic+partition to \p offset which is either an 986 * absolute or logical offset. 987 * 988 * If \p timeout_ms is not 0 the call will wait this long for the 989 * seek to be performed. If the timeout is reached the internal state 990 * will be unknown and this function returns `timed_out`. 991 * If \p timeout_ms is 0 it will initiate the seek but return 992 * immediately without any error reporting (e.g., async). 993 * 994 * This call triggers a fetch queue barrier flush. 995 * 996 * an ErrorCode to indicate success or failure. 997 */ 998 ErrorCode seek(Topic topic, int partition, long offset, int timeout_ms) 999 { 1000 int err; 1001 mixin(IO!q{ 1002 err = rd_kafka_seek(topic.rkt_, partition, offset, timeout_ms); 1003 }); 1004 if (err == -1) 1005 return cast(ErrorCode)(rd_kafka_errno2err(errno)); 1006 return ErrorCode.no_error; 1007 } 1008 1009 /** 1010 * Consume a single message from \p topic and \p partition. 1011 * 1012 * \p timeout_ms is maximum amount of time to wait for a message to be 1013 * received. 1014 * Consumer must have been previously started with \p ...start(). 1015 * 1016 * a Message object, the application needs to check if message 1017 * is an error or a proper message Message::err() and checking for 1018 * \p ErrorCode.no_error. 1019 * 1020 * The message object must be destroyed when the application is done with it. 1021 * 1022 * Errors (in Message::err()): 1023 * - timed_out - \p timeout_ms was reached with no new messages fetched. 1024 * - PARTITION_EOF - End of partition reached, not an error. 1025 */ 1026 void consume(Topic topic, int partition, int timeout_ms, ref Message msg) 1027 { 1028 rd_kafka_message_t* rkmessage; 1029 1030 mixin(IO!q{ 1031 rkmessage = rd_kafka_consume(topic.rkt_, partition, timeout_ms); 1032 }); 1033 if (!rkmessage) 1034 msg = Message(topic, cast(ErrorCode) rd_kafka_errno2err(errno)); 1035 1036 msg = Message(topic, rkmessage); 1037 } 1038 1039 /** 1040 * Consume a single message from the specified queue. 1041 * 1042 * \p timeout_ms is maximum amount of time to wait for a message to be 1043 * received. 1044 * Consumer must have been previously started on the queue with 1045 * \p ...start(). 1046 * 1047 * a Message object, the application needs to check if message 1048 * is an error or a proper message \p Message.err() and checking for 1049 * \p ErrorCode.no_error. 1050 * 1051 * The message object must be destroyed when the application is done with it. 1052 * 1053 * Errors (in Message::err()): 1054 * - timed_out - \p timeout_ms was reached with no new messages fetched 1055 * 1056 * Note that Message.topic() may be nullptr after certain kinds of 1057 * errors, so applications should check that it isn't null before 1058 * dereferencing it. 1059 */ 1060 void consume(Queue queue, int timeout_ms, ref Message msg) 1061 { 1062 rd_kafka_message_t* rkmessage; 1063 mixin(IO!q{ 1064 rkmessage = rd_kafka_consume_queue(queue.queue_, timeout_ms); 1065 }); 1066 if (!rkmessage) 1067 msg = Message(null, cast(ErrorCode) rd_kafka_errno2err(errno)); 1068 /* 1069 * Recover our Topic from the topic conf's opaque field, which we 1070 * set in Topic::create() for just this kind of situation. 1071 */ 1072 void* opaque = rd_kafka_topic_opaque(rkmessage.rkt); 1073 Topic topic = cast(Topic)(opaque); 1074 1075 msg = Message(topic, rkmessage); 1076 } 1077 1078 /* Helper struct for `consume_callback'. 1079 * Encapsulates the values we need in order to call `rd_kafka_consume_callback' 1080 * and keep track of the C++ callback function and `opaque' value. 1081 */ 1082 private static struct ConsumerCallback 1083 { 1084 /* This function is the one we give to `rd_kafka_consume_callback', with 1085 * the `opaque' pointer pointing to an instance of this struct, in which 1086 * we can find the C++ callback and `cb_data'. 1087 */ 1088 static nothrow @nogc void consume_cb_trampoline(rd_kafka_message_t* msg, void* opaque) 1089 { 1090 ConsumerCallback* instance = cast(ConsumerCallback*) opaque; 1091 Message message = Message(instance.topic, msg, false /*don't free*/ ); 1092 instance.cb_cls(message); 1093 } 1094 1095 Topic topic; 1096 ConsumeCb cb_cls; 1097 } 1098 1099 /** 1100 * Consumes messages from \p topic and \p partition, calling 1101 * the provided callback for each consumed messsage. 1102 * 1103 * \p consumeCallback() provides higher throughput performance 1104 * than \p consume(). 1105 * 1106 * \p timeout_ms is the maximum amount of time to wait for one or 1107 * more messages to arrive. 1108 * 1109 * The provided \p consume_cb instance has its \p consume_cb function 1110 * called for every message received. 1111 * 1112 * The \p opaque argument is passed to the \p consume_cb as \p opaque. 1113 * 1114 * the number of messages processed or -1 on error. 1115 * 1116 * See_also: Consumer::consume() 1117 */ 1118 int consumeCallback(Topic topic, int partition, int timeout_ms, ConsumeCb consume_cb) 1119 { 1120 auto context = ConsumerCallback(topic, consume_cb); 1121 int ret; 1122 mixin(IO!q{ 1123 ret = rd_kafka_consume_callback(topic.rkt_, partition, timeout_ms, 1124 &ConsumerCallback.consume_cb_trampoline, &context); 1125 }); 1126 return ret; 1127 } 1128 1129 /* Helper struct for `consume_callback' with a Queue. 1130 * Encapsulates the values we need in order to call `rd_kafka_consume_callback' 1131 * and keep track of the C++ callback function and `opaque' value. 1132 */ 1133 private static struct ConsumerQueueCallback 1134 { 1135 /* This function is the one we give to `rd_kafka_consume_callback', with 1136 * the `opaque' pointer pointing to an instance of this struct, in which 1137 * we can find the C++ callback and `cb_data'. 1138 */ 1139 static nothrow @nogc void consume_cb_trampoline(rd_kafka_message_t* msg, void* opaque) 1140 { 1141 ConsumerQueueCallback* instance = cast(ConsumerQueueCallback*) opaque; 1142 /* 1143 * Recover our Topic from the topic conf's opaque field, which we 1144 * set in Topic::create() for just this kind of situation. 1145 */ 1146 void* topic_opaque = rd_kafka_topic_opaque(msg.rkt); 1147 Topic topic = cast(Topic) topic_opaque; 1148 Message message = Message(topic, msg, false /*don't free*/ ); 1149 instance.cb_cls(message); 1150 } 1151 1152 ConsumeCb cb_cls; 1153 } 1154 1155 /** 1156 * Consumes messages from \p queue, calling the provided callback for 1157 * each consumed messsage. 1158 * 1159 * See_also: Consumer::consumeCallback() 1160 */ 1161 1162 int consumeCallback(Queue queue, int timeout_ms, ConsumeCb consume_cb) 1163 { 1164 auto context = ConsumerQueueCallback(consume_cb); 1165 int ret; 1166 mixin(IO!q{ 1167 ret = rd_kafka_consume_callback_queue(queue.queue_, timeout_ms, 1168 &ConsumerQueueCallback.consume_cb_trampoline, &context); 1169 }); 1170 return ret; 1171 } 1172 1173 /** 1174 * Converts an offset into the logical offset from the tail of a topic. 1175 * 1176 * \p offset is the (positive) number of items from the end. 1177 * 1178 * the logical offset for message \p offset from the tail, this value 1179 * may be passed to Consumer::start, et.al. 1180 * Note: The returned logical offset is specific to librdkafka. 1181 */ 1182 long offsetTail(long offset) 1183 { 1184 return RD_KAFKA_OFFSET_TAIL(offset); 1185 } 1186 } 1187 1188 /** 1189 * Producer 1190 */ 1191 class Producer : Handle 1192 { 1193 package GlobalConf _conf; 1194 /** 1195 * Creates a new Kafka producer handle. 1196 * 1197 * \p conf is an optional object that will be used instead of the default 1198 * configuration. 1199 * The \p conf object is reusable after this call. 1200 */ 1201 this(GlobalConf conf) 1202 { 1203 _conf = conf; 1204 char[512] errbuf = void; 1205 rd_kafka_conf_t* rk_conf = null; 1206 1207 if (conf) 1208 { 1209 if (!conf.rk_conf_) 1210 { 1211 throw new Exception("Requires Conf::CONF_GLOBAL object"); 1212 } 1213 1214 this.setCommonConfig(conf); 1215 1216 rk_conf = rd_kafka_conf_dup(conf.rk_conf_); 1217 1218 if (conf.dr_cb_) 1219 { 1220 rd_kafka_conf_set_dr_msg_cb(rk_conf, &dr_msg_cb_trampoline); 1221 this.dr_cb_ = conf.dr_cb_; 1222 } 1223 } 1224 1225 if (null is(rk_ = rd_kafka_new(rd_kafka_type_t.RD_KAFKA_PRODUCER, 1226 rk_conf, errbuf.ptr, errbuf.sizeof))) 1227 { 1228 throw new Exception(errbuf.ptr.fromStringz.idup); 1229 } 1230 } 1231 1232 /++ 1233 Returns: new topic for this producer 1234 Params: 1235 topic = topic name 1236 topicConf = TopicConf, if null `defaultTopicConf` should be setted to the global configuration. 1237 +/ 1238 Topic newTopic(const(char)[] topic, TopicConf topicConf = null) 1239 { 1240 if(!topicConf) 1241 topicConf = _conf.defaultTopicConf; 1242 assert(topicConf); 1243 return new Topic(this, topic, topicConf); 1244 } 1245 1246 static if (!have_vibed) 1247 mixin("nothrow @nogc:"); 1248 1249 ~this() 1250 { 1251 if (rk_) 1252 { 1253 mixin(IO!q{ 1254 rd_kafka_destroy(rk_); 1255 }); 1256 } 1257 } 1258 1259 /** 1260 * Producer::produce() \p msgflags 1261 * 1262 * These flags are optional and mutually exclusive. 1263 */ 1264 enum MsgOpt 1265 { 1266 free = 0x1, /**< rdkafka will free(3) \p payload 1267 * when it is done with it. */ 1268 copy = 0x2, /**< the \p payload data will be copied 1269 * and the \p payload pointer will not 1270 * be used by rdkafka after the 1271 * call returns. */ 1272 block = 0x4, /**< Block produce*() on message queue 1273 * full. 1274 * WARNING: 1275 * If a delivery report callback 1276 * is used the application MUST 1277 * call rd_kafka_poll() (or equiv.) 1278 * to make sure delivered messages 1279 * are drained from the internal 1280 * delivery report queue. 1281 * Failure to do so will result 1282 * in indefinately blocking on 1283 * the produce() call when the 1284 * message queue is full. 1285 */ 1286 } 1287 1288 nothrow @nogc private static void dr_msg_cb_trampoline(rd_kafka_t* rk, 1289 const rd_kafka_message_t* rkmessage, void* opaque) 1290 { 1291 auto handle = cast(Handle) opaque; 1292 auto message = Message(null, rkmessage); 1293 handle.dr_cb_(message); 1294 message.destroy; 1295 } 1296 1297 /** 1298 * Produce and send a single message to broker. 1299 * 1300 * This is an asynch non-blocking API. 1301 * 1302 * \p partition is the target partition, either: 1303 * - Topic::PARTITION_UA (unassigned) for 1304 * automatic partitioning using the topic's partitioner function, or 1305 * - a fixed partition (0..N) 1306 * 1307 * \p msgflags is zero or more of the following flags OR:ed together: 1308 * block - block \p produce*() call if 1309 * \p queue.buffering.max.messages or 1310 * \p queue.buffering.max.kbytes are exceeded. 1311 * Messages are considered in-queue from the point they 1312 * are accepted by produce() until their corresponding 1313 * delivery report callback/event returns. 1314 * It is thus a requirement to call 1315 * poll() (or equiv.) from a separate 1316 * thread when block is used. 1317 * See WARNING on \c block above. 1318 * free - rdkafka will free(3) \p payload when it is done with it. 1319 * copy - the \p payload data will be copied and the \p payload 1320 * pointer will not be used by rdkafka after the 1321 * call returns. 1322 * 1323 * NOTE: free and copy are mutually exclusive. 1324 * 1325 * If the function returns -1 and free was specified, then 1326 * the memory associated with the payload is still the caller's 1327 * responsibility. 1328 * 1329 * \p payload is the message payload of size \p len bytes. 1330 * 1331 * \p key is an optional message key, if non-null it 1332 * will be passed to the topic partitioner as well as be sent with the 1333 * message to the broker and passed on to the consumer. 1334 * 1335 * \p msg_opaque is an optional application-provided per-message opaque 1336 * pointer that will provided in the delivery report callback (\p dr_cb) for 1337 * referencing this message. 1338 * 1339 * an ErrorCode to indicate success or failure: 1340 * - _QUEUE_FULL - maximum number of outstanding messages has been 1341 * reached: \c queue.buffering.max.message 1342 * 1343 * - MSG_SIZE_TOO_LARGE - message is larger than configured max size: 1344 * \c messages.max.bytes 1345 * 1346 * - _UNKNOWN_PARTITION - requested \p partition is unknown in the 1347 * Kafka cluster. 1348 * 1349 * - _UNKNOWN_TOPIC - topic is unknown in the Kafka cluster. 1350 */ 1351 ErrorCode produce(Topic topic, int partition, void[] payload, 1352 const(void)[] key = null, 1353 long timestamp = Clock.currTime(UTC()).toUnixTime!long, 1354 int msgflags = MsgOpt.copy, 1355 void* msg_opaque = null) 1356 { 1357 int err; 1358 mixin(IO!q{ 1359 //with(rd_kafka_vtype_t) 1360 //err = rd_kafka_producev( 1361 // rk_, 1362 // RD_KAFKA_VTYPE_RKT, 1363 // topic.rkt_, 1364 // RD_KAFKA_VTYPE_PARTITION, 1365 // partition, 1366 // RD_KAFKA_VTYPE_MSGFLAGS, 1367 // msgflags, 1368 // RD_KAFKA_VTYPE_VALUE, 1369 // payload.ptr, 1370 // payload.length, 1371 // RD_KAFKA_VTYPE_KEY, 1372 // key.ptr, 1373 // key.length, 1374 // //RD_KAFKA_VTYPE_OPAQUE, 1375 // //msg_opaque, 1376 // RD_KAFKA_VTYPE_END, 1377 // ); 1378 err = rd_kafka_produce(topic.rkt_, partition, msgflags, payload.ptr, 1379 payload.length, key.ptr, key.length, msg_opaque); 1380 }); 1381 if (err == -1) 1382 return cast(ErrorCode) rd_kafka_errno2err(errno); 1383 return ErrorCode.no_error; 1384 } 1385 1386 /** 1387 * Wait until all outstanding produce requests, et.al, are completed. 1388 * This should typically be done prior to destroying a producer instance 1389 * to make sure all queued and in-flight produce requests are completed 1390 * before terminating. 1391 * 1392 * Note: This function will call poll() and thus trigger callbacks. 1393 * 1394 * timed_out if \p timeout_ms was reached before all 1395 * outstanding requests were completed, else ErrorCode.no_error 1396 */ 1397 ErrorCode flush(int timeout_ms = 60_000) 1398 { 1399 typeof(return) ret; 1400 mixin(IO!q{ 1401 ret = cast(ErrorCode) rd_kafka_flush(rk_, timeout_ms); 1402 }); 1403 return ret; 1404 } 1405 }