1 /* 2 * librdkafka - Apache Kafka C/C++ library 3 * 4 * Copyright (c) 2014 Magnus Edenhill 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions are met: 9 * 10 * 1. Redistributions of source code must retain the above copyright notice, 11 * this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright notice, 13 * this list of conditions and the following disclaimer in the documentation 14 * and/or other materials provided with the distribution. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 26 * POSSIBILITY OF SUCH DAMAGE. 27 */ 28 29 /** 30 * Apache Kafka C/C++ consumer and producer client library. 31 * 32 * rdkafkacpp.h contains the public C++ API for librdkafka. 33 * The API is documented in this file as comments prefixing the class, 34 * function, type, enum, define, etc. 35 * For more information, see the C interface in rdkafka.h and read the 36 * manual in INTRODUCTION.md. 37 * The C++ interface is STD C++ '03 compliant and adheres to the 38 * Google C++ Style Guide. 39 40 * See_also: For the C interface see rdkafka.h 41 * 42 */ 43 module rdkafkad; 44 45 package import std.string : fromStringz, toStringz; 46 package import core.stdc.errno; 47 package import core.stdc.string; 48 package import core.stdc.stdlib; 49 package import core.stdc.ctype; 50 package import core.sys.posix.sys.types; 51 package import deimos.rdkafka; 52 53 public import rdkafkad.config; 54 public import rdkafkad.handlers; 55 public import rdkafkad.metadata; 56 public import rdkafkad.topic; 57 import rdkafkad.iodriver; 58 59 /** 60 * librdkafka version 61 * 62 * Interpreted as hex \c MM.mm.rr.xx: 63 * - MM = Major 64 * - mm = minor 65 * - rr = revision 66 * - xx = pre-release id (0xff is the final release) 67 * 68 * E.g.: \c 0x000801ff.8.1 69 * 70 * Note: This value should only be used during compile time, 71 * for runtime checks of version use version() 72 */ 73 enum RD_KAFKA_VERSION = 0x00090200; 74 75 /** 76 * Returns the librdkafka version as integer. 77 * 78 * See_also: See RD_KAFKA_VERSION for how to parse the integer format. 79 */ 80 int version_() nothrow @nogc 81 { 82 return rd_kafka_version(); 83 } 84 85 /** 86 * Returns the librdkafka version as string. 87 */ 88 const(char)[] versionStr() nothrow @nogc 89 { 90 return fromStringz(rd_kafka_version_str); 91 } 92 93 /** 94 * Returns a CSV list of the supported debug contexts 95 * for use with Conf::Set("debug", ..). 96 */ 97 const(char)[] getDebugContexts() nothrow @nogc 98 { 99 return rd_kafka_get_debug_contexts().fromStringz; 100 } 101 102 /** 103 * Wait for all rd_kafka_t objects to be destroyed. 104 * 105 * 0 if all kafka objects are now destroyed, or -1 if the 106 * timeout was reached. 107 * Since RdKafka handle deletion is an asynch operation the 108 * \p wait_destroyed() function can be used for applications where 109 * a clean shutdown is required. 110 */ 111 auto waitDestroyed(int timeout_ms) 112 { 113 int ret; 114 mixin(IO!q{ 115 ret = rd_kafka_wait_destroyed(timeout_ms); 116 }); 117 return ret; 118 } 119 120 /** 121 * Error codes. 122 * 123 * The negative error codes delimited by two underscores 124 * (\c _..) denotes errors internal to librdkafka and are 125 * displayed as \c \"Local: \<error string..\>\", while the error codes 126 * delimited by a single underscore (\c ERR_..) denote broker 127 * errors and are displayed as \c \"Broker: \<error string..\>\". 128 * 129 * See_also: Use err2str() to translate an error code a human readable string 130 */ 131 enum ErrorCode 132 { 133 /* Internal errors to rdkafka: */ 134 /** Begin internal error codes */ 135 begin = -200, 136 /** Received message is incorrect */ 137 bad_msg = -199, 138 /** Bad/unknown compression */ 139 bad_compression = -198, 140 /** Broker is going away */ 141 destroy = -197, 142 /** Generic failure */ 143 fail = -196, 144 /** Broker transport failure */ 145 transport = -195, 146 /** Critical system resource */ 147 crit_sys_resource = -194, 148 /** Failed to resolve broker */ 149 resolve = -193, 150 /** Produced message timed out*/ 151 msg_timed_out = -192, 152 /** Reached the end of the topic+partition queue on 153 * the broker. Not really an error. */ 154 partition_eof = -191, 155 /** Permanent: Partition does not exist in cluster. */ 156 unknown_partition = -190, 157 /** File or filesystem error */ 158 fs = -189, 159 /** Permanent: Topic does not exist in cluster. */ 160 unknown_topic = -188, 161 /** All broker connections are down. */ 162 all_brokers_down = -187, 163 /** Invalid argument, or invalid configuration */ 164 invalid_arg = -186, 165 /** Operation timed out */ 166 timed_out = -185, 167 /** Queue is full */ 168 queue_full = -184, 169 /** ISR count < required.acks */ 170 isr_insuff = -183, 171 /** Broker node update */ 172 node_update = -182, 173 /** SSL error */ 174 ssl = -181, 175 /** Waiting for coordinator to become available. */ 176 wait_coord = -180, 177 /** Unknown client group */ 178 unknown_group = -179, 179 /** Operation in progress */ 180 in_progress = -178, 181 /** Previous operation in progress, wait for it to finish. */ 182 prev_in_progress = -177, 183 /** This operation would interfere with an existing subscription */ 184 existing_subscription = -176, 185 /** Assigned partitions (rebalance_cb) */ 186 assign_partitions = -175, 187 /** Revoked partitions (rebalance_cb) */ 188 revoke_partitions = -174, 189 /** Conflicting use */ 190 conflict = -173, 191 /** Wrong state */ 192 state = -172, 193 /** Unknown protocol */ 194 unknown_protocol = -171, 195 /** Not implemented */ 196 not_implemented = -170, 197 /** Authentication failure*/ 198 authentication = -169, 199 /** No stored offset */ 200 no_offset = -168, 201 /** Outdated */ 202 outdated = -167, 203 /** Timed out in queue */ 204 timed_out_queue = -166, 205 206 /** End internal error codes */ 207 end = -100, 208 209 /* Kafka broker errors: */ 210 /** Unknown broker error */ 211 unknown = -1, 212 /** Success */ 213 no_error, 214 /** Offset out of range */ 215 offset_out_of_range = 1, 216 /** Invalid message */ 217 invalid_msg = 2, 218 /** Unknown topic or partition */ 219 unknown_topic_or_part = 3, 220 /** Invalid message size */ 221 invalid_msg_size = 4, 222 /** Leader not available */ 223 leader_not_available = 5, 224 /** Not leader for partition */ 225 not_leader_for_partition = 6, 226 /** Request timed out */ 227 request_timed_out = 7, 228 /** Broker not available */ 229 broker_not_available = 8, 230 /** Replica not available */ 231 replica_not_available = 9, 232 /** Message size too large */ 233 msg_size_too_large = 10, 234 /** StaleControllerEpochCode */ 235 stale_ctrl_epoch = 11, 236 /** Offset metadata string too large */ 237 offset_metadata_too_large = 12, 238 /** Broker disconnected before response received */ 239 network_exception = 13, 240 /** Group coordinator load in progress */ 241 group_load_in_progress = 14, 242 /** Group coordinator not available */ 243 group_coordinator_not_available = 15, 244 /** Not coordinator for group */ 245 not_coordinator_for_group = 16, 246 /** Invalid topic */ 247 topic_exception = 17, 248 /** Message batch larger than configured server segment size */ 249 record_list_too_large = 18, 250 /** Not enough in-sync replicas */ 251 not_enough_replicas = 19, 252 /** Message(s) written to insufficient number of in-sync replicas */ 253 not_enough_replicas_after_append = 20, 254 /** Invalid required acks value */ 255 invalid_required_acks = 21, 256 /** Specified group generation id is not valid */ 257 illegal_generation = 22, 258 /** Inconsistent group protocol */ 259 inconsistent_group_protocol = 23, 260 /** Invalid group.id */ 261 invalid_group_id = 24, 262 /** Unknown member */ 263 unknown_member_id = 25, 264 /** Invalid session timeout */ 265 invalid_session_timeout = 26, 266 /** Group rebalance in progress */ 267 rebalance_in_progress = 27, 268 /** Commit offset data size is not valid */ 269 invalid_commit_offset_size = 28, 270 /** Topic authorization failed */ 271 topic_authorization_failed = 29, 272 /** Group authorization failed */ 273 group_authorization_failed = 30, 274 /** Cluster authorization failed */ 275 cluster_authorization_failed = 31 276 } 277 278 /** 279 * Returns a human readable representation of a kafka error. 280 */ 281 282 string err2str(ErrorCode err) nothrow @nogc 283 { 284 return cast(string)rd_kafka_err2str(cast(rd_kafka_resp_err_t) err).fromStringz; 285 } 286 287 /** 288 * Delivery Report callback class 289 * 290 * The delivery report callback will be called once for each message 291 * accepted by Producer::produce() (et.al) with 292 * Message::err() set to indicate the result of the produce request. 293 * 294 * The callback is called when a message is succesfully produced or 295 * if librdkafka encountered a permanent failure, or the retry counter for 296 * temporary errors has been exhausted. 297 * 298 * An application must call poll() at regular intervals to 299 * serve queued delivery report callbacks. 300 301 */ 302 alias DeliveryReportCb = void delegate(ref Message message) nothrow @nogc; 303 304 /** 305 * Partitioner callback class 306 * 307 * Generic partitioner callback class for implementing custom partitioners. 308 * 309 * See_also: Conf::set() \c "partitioner_cb" 310 */ 311 /** 312 * Partitioner callback 313 * 314 * Return the partition to use for \p key in \p topic. 315 * 316 * The \p msg_opaque is the same \p msg_opaque provided in the 317 * Producer::produce() call. 318 * 319 * Note: \p key may be null or the empty. 320 * 321 * Must return a value between 0 and \p partition_cnt (non-inclusive). 322 * May return RD_KAFKA_PARTITION_UA (-1) if partitioning failed. 323 * 324 * See_also: The callback may use Topic::partition_available() to check 325 * if a partition has an active leader broker. 326 */ 327 alias PartitionerCb = int delegate(const Topic topic, const(void)[] key, 328 int partition_cnt, void* msg_opaque) nothrow @nogc; 329 330 /** 331 * Variant partitioner with key pointer 332 * 333 */ 334 alias PartitionerKeyPointerCb = int delegate(const Topic topic, const(void)* key, 335 size_t key_len, int partition_cnt, void* msg_opaque) nothrow @nogc; 336 337 /** 338 * Event callback class 339 * 340 * Events are a generic interface for propagating errors, statistics, logs, etc 341 * from librdkafka to the application. 342 * 343 * See_also: Event 344 */ 345 alias EventCb = void delegate(ref Event event) nothrow @nogc; 346 347 /** 348 * Event object class as passed to the EventCb callback. 349 */ 350 struct Event 351 { 352 nothrow @nogc: 353 /** Event type */ 354 enum Type 355 { 356 error, /**< Event is an error condition */ 357 stats, /**< Event is a statistics JSON document */ 358 log, /**< Event is a log message */ 359 throttle /**< Event is a throttle level signaling from the broker */ 360 } 361 362 /** LOG severities (conforms to syslog(3) severities) */ 363 enum Severity 364 { 365 emerg, 366 alert = 1, 367 critical = 2, 368 error = 3, 369 warning = 4, 370 notice = 5, 371 info = 6, 372 debug_ = 7 373 } 374 375 package this(Type type, ErrorCode err, Severity severity, const char* fac, const char* str) 376 { 377 type_ = type; 378 err_ = err; 379 severity_ = severity; 380 fac_ = fac.fromStringz; 381 str_ = str.fromStringz; 382 } 383 384 package this(Type type) 385 { 386 type_ = type; 387 err_ = ErrorCode.no_error; 388 severity_ = Severity.emerg; 389 fac_ = ""; 390 str_ = ""; 391 } 392 393 package Type type_; 394 package ErrorCode err_; 395 package Severity severity_; 396 package const(char)[] fac_; 397 package const(char)[] str_; /* reused for throttle broker_name */ 398 package int id_; 399 package int throttle_time_; 400 401 @property: 402 403 /* 404 * Event Accessor methods 405 */ 406 407 /** 408 * The event type 409 * Note: Applies to all event types 410 */ 411 Type type() const 412 { 413 return type_; 414 } 415 416 /** 417 * Event error, if any. 418 * Note: Applies to all event types except throttle 419 */ 420 ErrorCode err() const 421 { 422 return err_; 423 } 424 425 /** 426 * Log severity level. 427 * Note: Applies to LOG event type. 428 */ 429 Severity severity() const 430 { 431 return severity_; 432 } 433 434 /** 435 * Log facility string. 436 * Note: Applies to LOG event type. 437 */ 438 const(char)[] fac() const 439 { 440 return fac_; 441 } 442 443 /** 444 * Log message string. 445 * 446 * \c LOG: Log message string. 447 * \c STATS: JSON object (as string). 448 * 449 * Note: Applies to LOG event type. 450 */ 451 const(char)[] str() const 452 { 453 return str_; 454 } 455 456 /** 457 * throttle time in milliseconds. 458 * Note: Applies to throttle event type. 459 */ 460 int throttleTime() const 461 { 462 return throttle_time_; 463 } 464 465 /** 466 * Throttling broker's name. 467 * Note: Applies to throttle event type. 468 */ 469 const(char)[] brokerName() const 470 { 471 if (type_ == Type.throttle) 472 return str_; 473 else 474 return ""; 475 } 476 477 /** 478 * Throttling broker's id. 479 * Note: Applies to throttle event type. 480 */ 481 int brokerId() const 482 { 483 return id_; 484 } 485 } 486 487 /** 488 * Consume callback class 489 */ 490 /** 491 * The consume callback is used with 492 * Consumer::consumeCallback() 493 * methods and will be called for each consumed \p message. 494 * 495 * The callback interface is optional but provides increased performance. 496 */ 497 alias ConsumeCb = void delegate(ref Message message) nothrow @nogc; 498 499 /** 500 * \b KafkaConsunmer: Rebalance callback class 501 */ 502 /** 503 * Group rebalance callback for use with KafkaConsunmer 504 * 505 * Registering a \p rebalance_cb turns off librdkafka's automatic 506 * partition assignment/revocation and instead delegates that responsibility 507 * to the application's \p rebalance_cb. 508 * 509 * The rebalance callback is responsible for updating librdkafka's 510 * assignment set based on the two events: ASSIGN_PARTITIONS 511 * and REVOKE_PARTITIONS but should also be able to handle 512 * arbitrary rebalancing failures where \p err is neither of those. 513 * Note: In this latter case (arbitrary error), the application must 514 * call unassign() to synchronize state. 515 * 516 * Without a rebalance callback this is done automatically by librdkafka 517 * but registering a rebalance callback gives the application flexibility 518 * in performing other operations along with the assinging/revocation, 519 * such as fetching offsets from an alternate location (on assign) 520 * or manually committing offsets (on revoke). 521 */ 522 alias RebalanceCb = void delegate(KafkaConsumer consumer, ErrorCode err, 523 ref TopicPartition[] partitions) nothrow @nogc; 524 525 /** 526 * Offset Commit callback class 527 */ 528 /** 529 * Set offset commit callback for use with consumer groups 530 * 531 * The results of automatic or manual offset commits will be scheduled 532 * for this callback and is served by consume() 533 * or commitSync() 534 * 535 * If no partitions had valid offsets to commit this callback will be called 536 * with \p err == NO_OFFSET which is not to be considered an error. 537 * 538 * The \p offsets list contains per-partition information: 539 * - \c topic The topic committed 540 * - \c partition The partition committed 541 * - \c offset: Committed offset (attempted) 542 * - \c err: Commit error 543 */ 544 545 alias OffsetCommitCb = void delegate(ErrorCode err, ref TopicPartition[] offsets) nothrow @nogc; 546 547 /** 548 * \b Portability: SocketCb callback class 549 * 550 */ 551 /** 552 * Socket callback 553 * 554 * The socket callback is responsible for opening a socket 555 * according to the supplied \p domain, \p type and \p protocol. 556 * The socket shall be created with \c CLOEXEC set in a racefree fashion, if 557 * possible. 558 * 559 * It is typically not required to register an alternative socket 560 * implementation 561 * 562 * The socket file descriptor or -1 on error (\c errno must be set) 563 */ 564 alias SocketCb = int function(int domain, int type, int protocol) nothrow @nogc; 565 566 567 /** 568 * Message object 569 * 570 * This object represents either a single consumed or produced message, 571 * or an event (\p err() is set). 572 * 573 * An application must check Message::err() to see if the 574 * object is a proper message (error is ErrorCode.no_error) or a 575 * an error event. 576 * 577 */ 578 struct Message 579 { 580 /** 581 * Message timestamp object 582 * 583 * Represents the number of milliseconds since the epoch (UTC). 584 * 585 * The Type dictates the timestamp type or origin. 586 * 587 * Note: Requires Apache Kafka broker version >= 0.10.0 588 * 589 */ 590 static struct Timestamp 591 { 592 enum Type 593 { 594 not_available, /**< Timestamp not available */ 595 create_time, /**< Message creation time (source) */ 596 log_append_time /**< Message log append time (broker) */ 597 } 598 599 long timestamp; /**< Milliseconds since epoch (UTC). */ 600 Type type; /**< Timestamp type */ 601 } 602 603 604 @disable this(this); 605 nothrow @nogc: 606 ~this() 607 { 608 if (free_rkmessage_ && rkmessage_) 609 rd_kafka_message_destroy(cast(rd_kafka_message_t*) rkmessage_); 610 } 611 612 this(Topic topic, rd_kafka_message_t* rkmessage, bool dofree = true) 613 { 614 topic_ = topic; 615 rkmessage_ = rkmessage; 616 free_rkmessage_ = dofree; 617 } 618 619 this(Topic topic, const rd_kafka_message_t* rkmessage) 620 { 621 topic_ = topic; 622 rkmessage_ = rkmessage; 623 } 624 625 this(rd_kafka_message_t* rkmessage) 626 { 627 628 rkmessage_ = rkmessage; 629 free_rkmessage_ = true; 630 631 if (rkmessage.rkt) 632 { 633 /* Possibly null */ 634 topic_ = cast(Topic) rd_kafka_topic_opaque(rkmessage.rkt); 635 } 636 } 637 638 /* Create errored message */ 639 this(Topic topic, ErrorCode err) 640 { 641 topic_ = topic; 642 rkmessage_ = &rkmessage_err_; 643 memset(&rkmessage_err_, 0, rkmessage_err_.sizeof); 644 rkmessage_err_.err = cast(rd_kafka_resp_err_t)(err); 645 } 646 647 /** The error string if object represent an error event, 648 * else an empty string. */ 649 string errstr() const 650 { 651 /* FIXME: If there is an error string in payload (for consume_cb) 652 * it wont be shown since 'payload' is reused for errstr 653 * and we cant distinguish between consumer and producer. 654 * For the producer case the payload needs to be the original 655 * payload pointer. */ 656 return err2str(cast(ErrorCode)rkmessage_.err); 657 } 658 659 /** The error code if object represents an error event, else 0. */ 660 ErrorCode err() const 661 { 662 return cast(ErrorCode) rkmessage_.err; 663 } 664 665 /** the Topic object for a message (if applicable), 666 * or null if a corresponding Topic object has not been 667 * explicitly created with Topic::create(). 668 * In this case use topic_name() instead. */ 669 const(Topic) topic() const 670 { 671 return topic_; 672 } 673 /** Topic name (if applicable, else empty string) */ 674 const(char)[] topicName() const 675 { 676 if (rkmessage_.rkt) 677 return rd_kafka_topic_name(rkmessage_.rkt).fromStringz; 678 else 679 return ""; 680 } 681 /** Partition (if applicable) */ 682 int partition() const 683 { 684 return rkmessage_.partition; 685 } 686 /** Message payload (if applicable) */ 687 const(void)[] payload() const 688 { 689 return rkmessage_.payload[0 .. rkmessage_.len]; 690 } 691 692 /** Message key as string (if applicable) */ 693 const(void)[] key() const 694 { 695 return (cast(const(char)*) rkmessage_.key)[0 .. rkmessage_.key_len]; 696 } 697 698 /** Message or error offset (if applicable) */ 699 long offset() const 700 { 701 return rkmessage_.offset; 702 } 703 704 /** Message timestamp (if applicable) */ 705 Timestamp timestamp() const 706 { 707 Timestamp ts; 708 rd_kafka_timestamp_type_t tstype; 709 ts.timestamp = rd_kafka_message_timestamp(rkmessage_, &tstype); 710 ts.type = cast(Timestamp.Type) tstype; 711 return ts; 712 } 713 714 /** The \p msg_opaque as provided to Producer::produce() */ 715 const(void)* msgOpaque() const 716 { 717 return rkmessage_._private; 718 } 719 720 package: 721 Topic topic_; 722 const (rd_kafka_message_t)* rkmessage_; 723 bool free_rkmessage_; 724 /* For error signalling by the C++ layer the .._err_ message is 725 * used as a place holder and rkmessage_ is set to point to it. */ 726 rd_kafka_message_t rkmessage_err_; 727 }