1 /* 2 * librdkafka - Apache Kafka C/C++ library 3 * 4 * Copyright (c) 2014 Magnus Edenhill 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions are met: 9 * 10 * 1. Redistributions of source code must retain the above copyright notice, 11 * this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright notice, 13 * this list of conditions and the following disclaimer in the documentation 14 * and/or other materials provided with the distribution. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 26 * POSSIBILITY OF SUCH DAMAGE. 27 */ 28 29 /** 30 * Apache Kafka C/C++ consumer and producer client library. 31 * 32 * rdkafkacpp.h contains the public C++ API for librdkafka. 33 * The API is documented in this file as comments prefixing the class, 34 * function, type, enum, define, etc. 35 * For more information, see the C interface in rdkafka.h and read the 36 * manual in INTRODUCTION.md. 37 * The C++ interface is STD C++ '03 compliant and adheres to the 38 * Google C++ Style Guide. 39 40 * See_also: For the C interface see rdkafka.h 41 * 42 */ 43 module rdkafkad; 44 45 package import std.string : fromStringz, toStringz; 46 package import core.stdc.errno; 47 package import core.stdc.string; 48 package import core.stdc.stdlib; 49 package import core.stdc.ctype; 50 package import core.sys.posix.sys.types; 51 package import deimos.rdkafka; 52 53 public import rdkafkad.config; 54 public import rdkafkad.handlers; 55 public import rdkafkad.metadata; 56 public import rdkafkad.topic; 57 58 /** 59 * librdkafka version 60 * 61 * Interpreted as hex \c MM.mm.rr.xx: 62 * - MM = Major 63 * - mm = minor 64 * - rr = revision 65 * - xx = pre-release id (0xff is the final release) 66 * 67 * E.g.: \c 0x000801ff.8.1 68 * 69 * Note: This value should only be used during compile time, 70 * for runtime checks of version use version() 71 */ 72 enum RD_KAFKA_VERSION = 0x00090200; 73 74 /** 75 * Returns the librdkafka version as integer. 76 * 77 * See_also: See RD_KAFKA_VERSION for how to parse the integer format. 78 */ 79 int version_() nothrow @nogc 80 { 81 return rd_kafka_version(); 82 } 83 84 /** 85 * Returns the librdkafka version as string. 86 */ 87 const(char)[] versionStr() nothrow @nogc 88 { 89 return fromStringz(rd_kafka_version_str); 90 } 91 92 /** 93 * Returns a CSV list of the supported debug contexts 94 * for use with Conf::Set("debug", ..). 95 */ 96 const(char)[] getDebugContexts() nothrow @nogc 97 { 98 return rd_kafka_get_debug_contexts().fromStringz; 99 } 100 101 /** 102 * Wait for all rd_kafka_t objects to be destroyed. 103 * 104 * 0 if all kafka objects are now destroyed, or -1 if the 105 * timeout was reached. 106 * Since RdKafka handle deletion is an asynch operation the 107 * \p wait_destroyed() function can be used for applications where 108 * a clean shutdown is required. 109 */ 110 int waitDestroyed(int timeout_ms) nothrow @nogc 111 { 112 return rd_kafka_wait_destroyed(timeout_ms); 113 } 114 115 /** 116 * Error codes. 117 * 118 * The negative error codes delimited by two underscores 119 * (\c _..) denotes errors internal to librdkafka and are 120 * displayed as \c \"Local: \<error string..\>\", while the error codes 121 * delimited by a single underscore (\c ERR_..) denote broker 122 * errors and are displayed as \c \"Broker: \<error string..\>\". 123 * 124 * See_also: Use err2str() to translate an error code a human readable string 125 */ 126 enum ErrorCode 127 { 128 /* Internal errors to rdkafka: */ 129 /** Begin internal error codes */ 130 begin = -200, 131 /** Received message is incorrect */ 132 bad_msg = -199, 133 /** Bad/unknown compression */ 134 bad_compression = -198, 135 /** Broker is going away */ 136 destroy = -197, 137 /** Generic failure */ 138 fail = -196, 139 /** Broker transport failure */ 140 transport = -195, 141 /** Critical system resource */ 142 crit_sys_resource = -194, 143 /** Failed to resolve broker */ 144 resolve = -193, 145 /** Produced message timed out*/ 146 msg_timed_out = -192, 147 /** Reached the end of the topic+partition queue on 148 * the broker. Not really an error. */ 149 partition_eof = -191, 150 /** Permanent: Partition does not exist in cluster. */ 151 unknown_partition = -190, 152 /** File or filesystem error */ 153 fs = -189, 154 /** Permanent: Topic does not exist in cluster. */ 155 unknown_topic = -188, 156 /** All broker connections are down. */ 157 all_brokers_down = -187, 158 /** Invalid argument, or invalid configuration */ 159 invalid_arg = -186, 160 /** Operation timed out */ 161 timed_out = -185, 162 /** Queue is full */ 163 queue_full = -184, 164 /** ISR count < required.acks */ 165 isr_insuff = -183, 166 /** Broker node update */ 167 node_update = -182, 168 /** SSL error */ 169 ssl = -181, 170 /** Waiting for coordinator to become available. */ 171 wait_coord = -180, 172 /** Unknown client group */ 173 unknown_group = -179, 174 /** Operation in progress */ 175 in_progress = -178, 176 /** Previous operation in progress, wait for it to finish. */ 177 prev_in_progress = -177, 178 /** This operation would interfere with an existing subscription */ 179 existing_subscription = -176, 180 /** Assigned partitions (rebalance_cb) */ 181 assign_partitions = -175, 182 /** Revoked partitions (rebalance_cb) */ 183 revoke_partitions = -174, 184 /** Conflicting use */ 185 conflict = -173, 186 /** Wrong state */ 187 state = -172, 188 /** Unknown protocol */ 189 unknown_protocol = -171, 190 /** Not implemented */ 191 not_implemented = -170, 192 /** Authentication failure*/ 193 authentication = -169, 194 /** No stored offset */ 195 no_offset = -168, 196 /** Outdated */ 197 outdated = -167, 198 /** Timed out in queue */ 199 timed_out_queue = -166, 200 201 /** End internal error codes */ 202 end = -100, 203 204 /* Kafka broker errors: */ 205 /** Unknown broker error */ 206 unknown = -1, 207 /** Success */ 208 no_error, 209 /** Offset out of range */ 210 offset_out_of_range = 1, 211 /** Invalid message */ 212 invalid_msg = 2, 213 /** Unknown topic or partition */ 214 unknown_topic_or_part = 3, 215 /** Invalid message size */ 216 invalid_msg_size = 4, 217 /** Leader not available */ 218 leader_not_available = 5, 219 /** Not leader for partition */ 220 not_leader_for_partition = 6, 221 /** Request timed out */ 222 request_timed_out = 7, 223 /** Broker not available */ 224 broker_not_available = 8, 225 /** Replica not available */ 226 replica_not_available = 9, 227 /** Message size too large */ 228 msg_size_too_large = 10, 229 /** StaleControllerEpochCode */ 230 stale_ctrl_epoch = 11, 231 /** Offset metadata string too large */ 232 offset_metadata_too_large = 12, 233 /** Broker disconnected before response received */ 234 network_exception = 13, 235 /** Group coordinator load in progress */ 236 group_load_in_progress = 14, 237 /** Group coordinator not available */ 238 group_coordinator_not_available = 15, 239 /** Not coordinator for group */ 240 not_coordinator_for_group = 16, 241 /** Invalid topic */ 242 topic_exception = 17, 243 /** Message batch larger than configured server segment size */ 244 record_list_too_large = 18, 245 /** Not enough in-sync replicas */ 246 not_enough_replicas = 19, 247 /** Message(s) written to insufficient number of in-sync replicas */ 248 not_enough_replicas_after_append = 20, 249 /** Invalid required acks value */ 250 invalid_required_acks = 21, 251 /** Specified group generation id is not valid */ 252 illegal_generation = 22, 253 /** Inconsistent group protocol */ 254 inconsistent_group_protocol = 23, 255 /** Invalid group.id */ 256 invalid_group_id = 24, 257 /** Unknown member */ 258 unknown_member_id = 25, 259 /** Invalid session timeout */ 260 invalid_session_timeout = 26, 261 /** Group rebalance in progress */ 262 rebalance_in_progress = 27, 263 /** Commit offset data size is not valid */ 264 invalid_commit_offset_size = 28, 265 /** Topic authorization failed */ 266 topic_authorization_failed = 29, 267 /** Group authorization failed */ 268 group_authorization_failed = 30, 269 /** Cluster authorization failed */ 270 cluster_authorization_failed = 31 271 } 272 273 /** 274 * Returns a human readable representation of a kafka error. 275 */ 276 277 string err2str(ErrorCode err) nothrow @nogc 278 { 279 return cast(string)rd_kafka_err2str(cast(rd_kafka_resp_err_t) err).fromStringz; 280 } 281 282 /** 283 * Delivery Report callback class 284 * 285 * The delivery report callback will be called once for each message 286 * accepted by Producer::produce() (et.al) with 287 * Message::err() set to indicate the result of the produce request. 288 * 289 * The callback is called when a message is succesfully produced or 290 * if librdkafka encountered a permanent failure, or the retry counter for 291 * temporary errors has been exhausted. 292 * 293 * An application must call poll() at regular intervals to 294 * serve queued delivery report callbacks. 295 296 */ 297 alias DeliveryReportCb = void delegate(ref Message message) nothrow @nogc; 298 299 /** 300 * Partitioner callback class 301 * 302 * Generic partitioner callback class for implementing custom partitioners. 303 * 304 * See_also: Conf::set() \c "partitioner_cb" 305 */ 306 /** 307 * Partitioner callback 308 * 309 * Return the partition to use for \p key in \p topic. 310 * 311 * The \p msg_opaque is the same \p msg_opaque provided in the 312 * Producer::produce() call. 313 * 314 * Note: \p key may be null or the empty. 315 * 316 * Must return a value between 0 and \p partition_cnt (non-inclusive). 317 * May return RD_KAFKA_PARTITION_UA (-1) if partitioning failed. 318 * 319 * See_also: The callback may use Topic::partition_available() to check 320 * if a partition has an active leader broker. 321 */ 322 alias PartitionerCb = int delegate(const Topic topic, const(void)[] key, 323 int partition_cnt, void* msg_opaque) nothrow @nogc; 324 325 /** 326 * Variant partitioner with key pointer 327 * 328 */ 329 alias PartitionerKeyPointerCb = int delegate(const Topic topic, const(void)* key, 330 size_t key_len, int partition_cnt, void* msg_opaque) nothrow @nogc; 331 332 /** 333 * Event callback class 334 * 335 * Events are a generic interface for propagating errors, statistics, logs, etc 336 * from librdkafka to the application. 337 * 338 * See_also: Event 339 */ 340 alias EventCb = void delegate(ref Event event) nothrow @nogc; 341 342 /** 343 * Event object class as passed to the EventCb callback. 344 */ 345 struct Event 346 { 347 nothrow @nogc: 348 /** Event type */ 349 enum Type 350 { 351 error, /**< Event is an error condition */ 352 stats, /**< Event is a statistics JSON document */ 353 log, /**< Event is a log message */ 354 throttle /**< Event is a throttle level signaling from the broker */ 355 } 356 357 /** LOG severities (conforms to syslog(3) severities) */ 358 enum Severity 359 { 360 emerg, 361 alert = 1, 362 critical = 2, 363 error = 3, 364 warning = 4, 365 notice = 5, 366 info = 6, 367 debug_ = 7 368 } 369 370 package this(Type type, ErrorCode err, Severity severity, const char* fac, const char* str) 371 { 372 type_ = type; 373 err_ = err; 374 severity_ = severity; 375 fac_ = fac.fromStringz; 376 str_ = str.fromStringz; 377 } 378 379 package this(Type type) 380 { 381 type_ = type; 382 err_ = ErrorCode.no_error; 383 severity_ = Severity.emerg; 384 fac_ = ""; 385 str_ = ""; 386 } 387 388 package Type type_; 389 package ErrorCode err_; 390 package Severity severity_; 391 package const(char)[] fac_; 392 package const(char)[] str_; /* reused for throttle broker_name */ 393 package int id_; 394 package int throttle_time_; 395 396 @property: 397 398 /* 399 * Event Accessor methods 400 */ 401 402 /** 403 * The event type 404 * Note: Applies to all event types 405 */ 406 Type type() const 407 { 408 return type_; 409 } 410 411 /** 412 * Event error, if any. 413 * Note: Applies to all event types except throttle 414 */ 415 ErrorCode err() const 416 { 417 return err_; 418 } 419 420 /** 421 * Log severity level. 422 * Note: Applies to LOG event type. 423 */ 424 Severity severity() const 425 { 426 return severity_; 427 } 428 429 /** 430 * Log facility string. 431 * Note: Applies to LOG event type. 432 */ 433 const(char)[] fac() const 434 { 435 return fac_; 436 } 437 438 /** 439 * Log message string. 440 * 441 * \c LOG: Log message string. 442 * \c STATS: JSON object (as string). 443 * 444 * Note: Applies to LOG event type. 445 */ 446 const(char)[] str() const 447 { 448 return str_; 449 } 450 451 /** 452 * throttle time in milliseconds. 453 * Note: Applies to throttle event type. 454 */ 455 int throttleTime() const 456 { 457 return throttle_time_; 458 } 459 460 /** 461 * Throttling broker's name. 462 * Note: Applies to throttle event type. 463 */ 464 const(char)[] brokerName() const 465 { 466 if (type_ == Type.throttle) 467 return str_; 468 else 469 return ""; 470 } 471 472 /** 473 * Throttling broker's id. 474 * Note: Applies to throttle event type. 475 */ 476 int brokerId() const 477 { 478 return id_; 479 } 480 } 481 482 /** 483 * Consume callback class 484 */ 485 /** 486 * The consume callback is used with 487 * Consumer::consumeCallback() 488 * methods and will be called for each consumed \p message. 489 * 490 * The callback interface is optional but provides increased performance. 491 */ 492 alias ConsumeCb = void delegate(ref Message message) nothrow @nogc; 493 494 /** 495 * \b KafkaConsunmer: Rebalance callback class 496 */ 497 /** 498 * Group rebalance callback for use with KafkaConsunmer 499 * 500 * Registering a \p rebalance_cb turns off librdkafka's automatic 501 * partition assignment/revocation and instead delegates that responsibility 502 * to the application's \p rebalance_cb. 503 * 504 * The rebalance callback is responsible for updating librdkafka's 505 * assignment set based on the two events: ASSIGN_PARTITIONS 506 * and REVOKE_PARTITIONS but should also be able to handle 507 * arbitrary rebalancing failures where \p err is neither of those. 508 * Note: In this latter case (arbitrary error), the application must 509 * call unassign() to synchronize state. 510 * 511 * Without a rebalance callback this is done automatically by librdkafka 512 * but registering a rebalance callback gives the application flexibility 513 * in performing other operations along with the assinging/revocation, 514 * such as fetching offsets from an alternate location (on assign) 515 * or manually committing offsets (on revoke). 516 */ 517 alias RebalanceCb = void delegate(KafkaConsumer consumer, ErrorCode err, 518 ref TopicPartition[] partitions) nothrow @nogc; 519 520 /** 521 * Offset Commit callback class 522 */ 523 /** 524 * Set offset commit callback for use with consumer groups 525 * 526 * The results of automatic or manual offset commits will be scheduled 527 * for this callback and is served by consume() 528 * or commitSync() 529 * 530 * If no partitions had valid offsets to commit this callback will be called 531 * with \p err == NO_OFFSET which is not to be considered an error. 532 * 533 * The \p offsets list contains per-partition information: 534 * - \c topic The topic committed 535 * - \c partition The partition committed 536 * - \c offset: Committed offset (attempted) 537 * - \c err: Commit error 538 */ 539 540 alias OffsetCommitCb = void delegate(ErrorCode err, ref TopicPartition[] offsets) nothrow @nogc; 541 542 /** 543 * \b Portability: SocketCb callback class 544 * 545 */ 546 /** 547 * Socket callback 548 * 549 * The socket callback is responsible for opening a socket 550 * according to the supplied \p domain, \p type and \p protocol. 551 * The socket shall be created with \c CLOEXEC set in a racefree fashion, if 552 * possible. 553 * 554 * It is typically not required to register an alternative socket 555 * implementation 556 * 557 * The socket file descriptor or -1 on error (\c errno must be set) 558 */ 559 alias SocketCb = int function(int domain, int type, int protocol) nothrow @nogc; 560 561 562 /** 563 * Message object 564 * 565 * This object represents either a single consumed or produced message, 566 * or an event (\p err() is set). 567 * 568 * An application must check Message::err() to see if the 569 * object is a proper message (error is ErrorCode.no_error) or a 570 * an error event. 571 * 572 */ 573 struct Message 574 { 575 /** 576 * Message timestamp object 577 * 578 * Represents the number of milliseconds since the epoch (UTC). 579 * 580 * The Type dictates the timestamp type or origin. 581 * 582 * Note: Requires Apache Kafka broker version >= 0.10.0 583 * 584 */ 585 struct Timestamp 586 { 587 enum Type 588 { 589 not_available, /**< Timestamp not available */ 590 create_time, /**< Message creation time (source) */ 591 log_append_time /**< Message log append time (broker) */ 592 } 593 594 long timestamp; /**< Milliseconds since epoch (UTC). */ 595 Type type; /**< Timestamp type */ 596 } 597 598 599 @disable this(this); 600 nothrow @nogc: 601 ~this() 602 { 603 if (free_rkmessage_ && rkmessage_) 604 rd_kafka_message_destroy(cast(rd_kafka_message_t*) rkmessage_); 605 } 606 607 this(Topic topic, rd_kafka_message_t* rkmessage, bool dofree = true) 608 { 609 topic_ = topic; 610 rkmessage_ = rkmessage; 611 free_rkmessage_ = dofree; 612 } 613 614 this(Topic topic, const rd_kafka_message_t* rkmessage) 615 { 616 topic_ = topic; 617 rkmessage_ = rkmessage; 618 } 619 620 this(rd_kafka_message_t* rkmessage) 621 { 622 623 rkmessage_ = rkmessage; 624 free_rkmessage_ = true; 625 626 if (rkmessage.rkt) 627 { 628 /* Possibly null */ 629 topic_ = cast(Topic) rd_kafka_topic_opaque(rkmessage.rkt); 630 } 631 } 632 633 /* Create errored message */ 634 this(Topic topic, ErrorCode err) 635 { 636 topic_ = topic; 637 rkmessage_ = &rkmessage_err_; 638 memset(&rkmessage_err_, 0, rkmessage_err_.sizeof); 639 rkmessage_err_.err = cast(rd_kafka_resp_err_t)(err); 640 } 641 642 /** The error string if object represent an error event, 643 * else an empty string. */ 644 string errstr() const 645 { 646 /* FIXME: If there is an error string in payload (for consume_cb) 647 * it wont be shown since 'payload' is reused for errstr 648 * and we cant distinguish between consumer and producer. 649 * For the producer case the payload needs to be the original 650 * payload pointer. */ 651 return err2str(cast(ErrorCode)rkmessage_.err); 652 } 653 654 /** The error code if object represents an error event, else 0. */ 655 ErrorCode err() const 656 { 657 return cast(ErrorCode) rkmessage_.err; 658 } 659 660 /** the Topic object for a message (if applicable), 661 * or null if a corresponding Topic object has not been 662 * explicitly created with Topic::create(). 663 * In this case use topic_name() instead. */ 664 const(Topic) topic() const 665 { 666 return topic_; 667 } 668 /** Topic name (if applicable, else empty string) */ 669 const(char)[] topicName() const 670 { 671 if (rkmessage_.rkt) 672 return rd_kafka_topic_name(rkmessage_.rkt).fromStringz; 673 else 674 return ""; 675 } 676 /** Partition (if applicable) */ 677 int partition() const 678 { 679 return rkmessage_.partition; 680 } 681 /** Message payload (if applicable) */ 682 const(void)[] payload() const 683 { 684 return rkmessage_.payload[0 .. rkmessage_.len]; 685 } 686 687 /** Message key as string (if applicable) */ 688 const(void)[] key() const 689 { 690 return (cast(const(char)*) rkmessage_.key)[0 .. rkmessage_.key_len]; 691 } 692 693 /** Message or error offset (if applicable) */ 694 long offset() const 695 { 696 return rkmessage_.offset; 697 } 698 699 /** Message timestamp (if applicable) */ 700 Timestamp timestamp() const 701 { 702 Timestamp ts; 703 rd_kafka_timestamp_type_t tstype; 704 ts.timestamp = rd_kafka_message_timestamp(rkmessage_, &tstype); 705 ts.type = cast(Timestamp.Type) tstype; 706 return ts; 707 } 708 709 /** The \p msg_opaque as provided to Producer::produce() */ 710 const(void)* msgOpaque() const 711 { 712 return rkmessage_._private; 713 } 714 715 package: 716 Topic topic_; 717 const (rd_kafka_message_t)* rkmessage_; 718 bool free_rkmessage_; 719 /* For error signalling by the C++ layer the .._err_ message is 720 * used as a place holder and rkmessage_ is set to point to it. */ 721 rd_kafka_message_t rkmessage_err_; 722 }