1 /*
2  * librdkafka - Apache Kafka C/C++ library
3  *
4  * Copyright (c) 2014 Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 /**
30  * Apache Kafka C/C++ consumer and producer client library.
31  *
32  * rdkafkacpp.h contains the public C++ API for librdkafka.
33  * The API is documented in this file as comments prefixing the class,
34  * function, type, enum, define, etc.
35  * For more information, see the C interface in rdkafka.h and read the
36  * manual in INTRODUCTION.md.
37  * The C++ interface is STD C++ '03 compliant and adheres to the
38  * Google C++ Style Guide.
39 
40  * See_also: For the C interface see rdkafka.h
41  *
42  */
43 module rdkafkad;
44 
45 package import std.string : fromStringz, toStringz;
46 package import core.stdc.errno;
47 package import core.stdc.string;
48 package import core.stdc.stdlib;
49 package import core.stdc.ctype;
50 package import core.sys.posix.sys.types;
51 package import deimos.rdkafka;
52 
53 public import rdkafkad.config;
54 public import rdkafkad.handlers;
55 public import rdkafkad.metadata;
56 public import rdkafkad.topic;
57 import rdkafkad.iodriver;
58 
59 /**
60  * librdkafka version
61  *
62  * Interpreted as hex \c MM.mm.rr.xx:
63  *  - MM = Major
64  *  - mm = minor
65  *  - rr = revision
66  *  - xx = pre-release id (0xff is the final release)
67  *
68  * E.g.: \c 0x000801ff.8.1
69  *
70  * Note: This value should only be used during compile time,
71  *         for runtime checks of version use version()
72  */
73 enum RD_KAFKA_VERSION = 0x00090200;
74 
75 /**
76  * Returns the librdkafka version as integer.
77  *
78  * See_also: See RD_KAFKA_VERSION for how to parse the integer format.
79  */
80 int version_() nothrow @nogc
81 {
82     return rd_kafka_version();
83 }
84 
85 /**
86  * Returns the librdkafka version as string.
87  */
88 const(char)[] versionStr() nothrow @nogc
89 {
90     return fromStringz(rd_kafka_version_str);
91 }
92 
93 /**
94  * Returns a CSV list of the supported debug contexts
95  *        for use with Conf::Set("debug", ..).
96  */
97 const(char)[] getDebugContexts() nothrow @nogc
98 {
99     return rd_kafka_get_debug_contexts().fromStringz;
100 }
101 
102 /**
103  * Wait for all rd_kafka_t objects to be destroyed.
104  *
105  * 0 if all kafka objects are now destroyed, or -1 if the
106  * timeout was reached.
107  * Since RdKafka handle deletion is an asynch operation the
108  * \p wait_destroyed() function can be used for applications where
109  * a clean shutdown is required.
110  */
111 auto waitDestroyed(int timeout_ms)
112 {
113     int ret;
114     mixin(IO!q{
115     ret = rd_kafka_wait_destroyed(timeout_ms);
116     });
117     return ret;
118 }
119 
120 /**
121  * Error codes.
122  *
123  * The negative error codes delimited by two underscores
124  * (\c _..) denotes errors internal to librdkafka and are
125  * displayed as \c \"Local: \<error string..\>\", while the error codes
126  * delimited by a single underscore (\c ERR_..) denote broker
127  * errors and are displayed as \c \"Broker: \<error string..\>\".
128  *
129  * See_also: Use err2str() to translate an error code a human readable string
130  */
131 enum ErrorCode
132 {
133     /* Internal errors to rdkafka: */
134     /** Begin internal error codes */
135     begin = -200,
136     /** Received message is incorrect */
137     bad_msg = -199,
138     /** Bad/unknown compression */
139     bad_compression = -198,
140     /** Broker is going away */
141     destroy = -197,
142     /** Generic failure */
143     fail = -196,
144     /** Broker transport failure */
145     transport = -195,
146     /** Critical system resource */
147     crit_sys_resource = -194,
148     /** Failed to resolve broker */
149     resolve = -193,
150     /** Produced message timed out*/
151     msg_timed_out = -192,
152     /** Reached the end of the topic+partition queue on
153    * the broker. Not really an error. */
154     partition_eof = -191,
155     /** Permanent: Partition does not exist in cluster. */
156     unknown_partition = -190,
157     /** File or filesystem error */
158     fs = -189,
159     /** Permanent: Topic does not exist in cluster. */
160     unknown_topic = -188,
161     /** All broker connections are down. */
162     all_brokers_down = -187,
163     /** Invalid argument, or invalid configuration */
164     invalid_arg = -186,
165     /** Operation timed out */
166     timed_out = -185,
167     /** Queue is full */
168     queue_full = -184,
169     /** ISR count < required.acks */
170     isr_insuff = -183,
171     /** Broker node update */
172     node_update = -182,
173     /** SSL error */
174     ssl = -181,
175     /** Waiting for coordinator to become available. */
176     wait_coord = -180,
177     /** Unknown client group */
178     unknown_group = -179,
179     /** Operation in progress */
180     in_progress = -178,
181     /** Previous operation in progress, wait for it to finish. */
182     prev_in_progress = -177,
183     /** This operation would interfere with an existing subscription */
184     existing_subscription = -176,
185     /** Assigned partitions (rebalance_cb) */
186     assign_partitions = -175,
187     /** Revoked partitions (rebalance_cb) */
188     revoke_partitions = -174,
189     /** Conflicting use */
190     conflict = -173,
191     /** Wrong state */
192     state = -172,
193     /** Unknown protocol */
194     unknown_protocol = -171,
195     /** Not implemented */
196     not_implemented = -170,
197     /** Authentication failure*/
198     authentication = -169,
199     /** No stored offset */
200     no_offset = -168,
201     /** Outdated */
202     outdated = -167,
203     /** Timed out in queue */
204     timed_out_queue = -166,
205 
206     /** End internal error codes */
207     end = -100,
208 
209     /* Kafka broker errors: */
210     /** Unknown broker error */
211     unknown = -1,
212     /** Success */
213     no_error,
214     /** Offset out of range */
215     offset_out_of_range = 1,
216     /** Invalid message */
217     invalid_msg = 2,
218     /** Unknown topic or partition */
219     unknown_topic_or_part = 3,
220     /** Invalid message size */
221     invalid_msg_size = 4,
222     /** Leader not available */
223     leader_not_available = 5,
224     /** Not leader for partition */
225     not_leader_for_partition = 6,
226     /** Request timed out */
227     request_timed_out = 7,
228     /** Broker not available */
229     broker_not_available = 8,
230     /** Replica not available */
231     replica_not_available = 9,
232     /** Message size too large */
233     msg_size_too_large = 10,
234     /** StaleControllerEpochCode */
235     stale_ctrl_epoch = 11,
236     /** Offset metadata string too large */
237     offset_metadata_too_large = 12,
238     /** Broker disconnected before response received */
239     network_exception = 13,
240     /** Group coordinator load in progress */
241     group_load_in_progress = 14,
242     /** Group coordinator not available */
243     group_coordinator_not_available = 15,
244     /** Not coordinator for group */
245     not_coordinator_for_group = 16,
246     /** Invalid topic */
247     topic_exception = 17,
248     /** Message batch larger than configured server segment size */
249     record_list_too_large = 18,
250     /** Not enough in-sync replicas */
251     not_enough_replicas = 19,
252     /** Message(s) written to insufficient number of in-sync replicas */
253     not_enough_replicas_after_append = 20,
254     /** Invalid required acks value */
255     invalid_required_acks = 21,
256     /** Specified group generation id is not valid */
257     illegal_generation = 22,
258     /** Inconsistent group protocol */
259     inconsistent_group_protocol = 23,
260     /** Invalid group.id */
261     invalid_group_id = 24,
262     /** Unknown member */
263     unknown_member_id = 25,
264     /** Invalid session timeout */
265     invalid_session_timeout = 26,
266     /** Group rebalance in progress */
267     rebalance_in_progress = 27,
268     /** Commit offset data size is not valid */
269     invalid_commit_offset_size = 28,
270     /** Topic authorization failed */
271     topic_authorization_failed = 29,
272     /** Group authorization failed */
273     group_authorization_failed = 30,
274     /** Cluster authorization failed */
275     cluster_authorization_failed = 31
276 }
277 
278 /**
279  * Returns a human readable representation of a kafka error.
280  */
281 
282 string err2str(ErrorCode err) nothrow @nogc
283 {
284     return cast(string)rd_kafka_err2str(cast(rd_kafka_resp_err_t) err).fromStringz;
285 }
286 
287 /**
288  * Delivery Report callback class
289  *
290  * The delivery report callback will be called once for each message
291  * accepted by Producer::produce() (et.al) with
292  * Message::err() set to indicate the result of the produce request.
293  *
294  * The callback is called when a message is succesfully produced or
295  * if librdkafka encountered a permanent failure, or the retry counter for
296  * temporary errors has been exhausted.
297  *
298  * An application must call poll() at regular intervals to
299  * serve queued delivery report callbacks.
300 
301  */
302 alias DeliveryReportCb = void delegate(ref Message message) nothrow @nogc;
303 
304 /**
305  * Partitioner callback class
306  *
307  * Generic partitioner callback class for implementing custom partitioners.
308  *
309  * See_also: Conf::set() \c "partitioner_cb"
310  */
311 /**
312    * Partitioner callback
313    *
314    * Return the partition to use for \p key in \p topic.
315    *
316    * The \p msg_opaque is the same \p msg_opaque provided in the
317    * Producer::produce() call.
318    *
319    * Note: \p key may be null or the empty.
320    *
321    * Must return a value between 0 and \p partition_cnt (non-inclusive).
322    *          May return RD_KAFKA_PARTITION_UA (-1) if partitioning failed.
323    *
324    * See_also: The callback may use Topic::partition_available() to check
325    *     if a partition has an active leader broker.
326    */
327 alias PartitionerCb = int delegate(const Topic topic, const(void)[] key,
328     int partition_cnt, void* msg_opaque) nothrow @nogc;
329 
330 /**
331  *  Variant partitioner with key pointer
332  *
333  */
334 alias PartitionerKeyPointerCb = int delegate(const Topic topic, const(void)* key,
335     size_t key_len, int partition_cnt, void* msg_opaque) nothrow @nogc;
336 
337 /**
338  * Event callback class
339  *
340  * Events are a generic interface for propagating errors, statistics, logs, etc
341  * from librdkafka to the application.
342  *
343  * See_also: Event
344  */
345 alias EventCb = void delegate(ref Event event) nothrow @nogc;
346 
347 /**
348  * Event object class as passed to the EventCb callback.
349  */
350 struct Event
351 {
352 nothrow @nogc:
353     /** Event type */
354     enum Type
355     {
356         error, /**< Event is an error condition */
357         stats, /**< Event is a statistics JSON document */
358         log, /**< Event is a log message */
359         throttle /**< Event is a throttle level signaling from the broker */
360     }
361 
362     /** LOG severities (conforms to syslog(3) severities) */
363     enum Severity
364     {
365         emerg,
366         alert = 1,
367         critical = 2,
368         error = 3,
369         warning = 4,
370         notice = 5,
371         info = 6,
372         debug_ = 7
373     }
374 
375     package this(Type type, ErrorCode err, Severity severity, const char* fac, const char* str)
376     {
377         type_ = type;
378         err_ = err;
379         severity_ = severity;
380         fac_ = fac.fromStringz;
381         str_ = str.fromStringz;
382     }
383 
384     package this(Type type)
385     {
386         type_ = type;
387         err_ = ErrorCode.no_error;
388         severity_ = Severity.emerg;
389         fac_ = "";
390         str_ = "";
391     }
392 
393     package Type type_;
394     package ErrorCode err_;
395     package Severity severity_;
396     package const(char)[] fac_;
397     package const(char)[] str_; /* reused for throttle broker_name */
398     package int id_;
399     package int throttle_time_;
400 
401 @property:
402 
403     /*
404    * Event Accessor methods
405    */
406 
407     /**
408    * The event type
409    * Note: Applies to all event types
410    */
411     Type type() const
412     {
413         return type_;
414     }
415 
416     /**
417    * Event error, if any.
418    * Note: Applies to all event types except throttle
419    */
420     ErrorCode err() const
421     {
422         return err_;
423     }
424 
425     /**
426    * Log severity level.
427    * Note: Applies to LOG event type.
428    */
429     Severity severity() const
430     {
431         return severity_;
432     }
433 
434     /**
435    * Log facility string.
436    * Note: Applies to LOG event type.
437    */
438     const(char)[] fac() const
439     {
440         return fac_;
441     }
442 
443     /**
444    * Log message string.
445    *
446    * \c LOG: Log message string.
447    * \c STATS: JSON object (as string).
448    *
449    * Note: Applies to LOG event type.
450    */
451     const(char)[] str() const
452     {
453         return str_;
454     }
455 
456     /**
457    * throttle time in milliseconds.
458    * Note: Applies to throttle event type.
459    */
460     int throttleTime() const
461     {
462         return throttle_time_;
463     }
464 
465     /**
466    * Throttling broker's name.
467    * Note: Applies to throttle event type.
468    */
469     const(char)[] brokerName() const
470     {
471         if (type_ == Type.throttle)
472             return str_;
473         else
474             return "";
475     }
476 
477     /**
478    * Throttling broker's id.
479    * Note: Applies to throttle event type.
480    */
481     int brokerId() const
482     {
483         return id_;
484     }
485 }
486 
487 /**
488  * Consume callback class
489  */
490 /**
491 * The consume callback is used with
492 *        Consumer::consumeCallback()
493 *        methods and will be called for each consumed \p message.
494 *
495 * The callback interface is optional but provides increased performance.
496 */
497 alias ConsumeCb = void delegate(ref Message message) nothrow @nogc;
498 
499 /**
500  * \b KafkaConsunmer: Rebalance callback class
501  */
502 /**
503 * Group rebalance callback for use with KafkaConsunmer
504 *
505 * Registering a \p rebalance_cb turns off librdkafka's automatic
506 * partition assignment/revocation and instead delegates that responsibility
507 * to the application's \p rebalance_cb.
508 *
509 * The rebalance callback is responsible for updating librdkafka's
510 * assignment set based on the two events: ASSIGN_PARTITIONS
511 * and REVOKE_PARTITIONS but should also be able to handle
512 * arbitrary rebalancing failures where \p err is neither of those.
513 * Note: In this latter case (arbitrary error), the application must
514 *         call unassign() to synchronize state.
515 *
516 * Without a rebalance callback this is done automatically by librdkafka
517 * but registering a rebalance callback gives the application flexibility
518 * in performing other operations along with the assinging/revocation,
519 * such as fetching offsets from an alternate location (on assign)
520 * or manually committing offsets (on revoke).
521 */
522 alias RebalanceCb = void delegate(KafkaConsumer consumer, ErrorCode err,
523     ref TopicPartition[] partitions) nothrow @nogc;
524 
525 /**
526  * Offset Commit callback class
527  */
528 /**
529 * Set offset commit callback for use with consumer groups
530 *
531 * The results of automatic or manual offset commits will be scheduled
532 * for this callback and is served by consume()
533 * or commitSync()
534 *
535 * If no partitions had valid offsets to commit this callback will be called
536 * with \p err == NO_OFFSET which is not to be considered an error.
537 *
538 * The \p offsets list contains per-partition information:
539 *   - \c topic      The topic committed
540 *   - \c partition  The partition committed
541 *   - \c offset:    Committed offset (attempted)
542 *   - \c err:       Commit error
543 */
544 
545 alias OffsetCommitCb = void delegate(ErrorCode err, ref TopicPartition[] offsets) nothrow @nogc;
546 
547 /**
548  * \b Portability: SocketCb callback class
549  *
550  */
551 /**
552 * Socket callback
553 *
554 * The socket callback is responsible for opening a socket
555 * according to the supplied \p domain, \p type and \p protocol.
556 * The socket shall be created with \c CLOEXEC set in a racefree fashion, if
557 * possible.
558 *
559 * It is typically not required to register an alternative socket
560 * implementation
561 *
562 * The socket file descriptor or -1 on error (\c errno must be set)
563 */
564 alias SocketCb = int function(int domain, int type, int protocol) nothrow @nogc;
565 
566 
567 /**
568  * Message object
569  *
570  * This object represents either a single consumed or produced message,
571  * or an event (\p err() is set).
572  *
573  * An application must check Message::err() to see if the
574  * object is a proper message (error is ErrorCode.no_error) or a
575  * an error event.
576  *
577  */
578 struct Message
579 {
580     /**
581      * Message timestamp object
582      *
583      * Represents the number of milliseconds since the epoch (UTC).
584      *
585      * The Type dictates the timestamp type or origin.
586      *
587      * Note: Requires Apache Kafka broker version >= 0.10.0
588      *
589      */
590     static struct Timestamp
591     {
592         enum Type
593         {
594             not_available, /**< Timestamp not available */
595             create_time, /**< Message creation time (source) */
596             log_append_time /**< Message log append time (broker) */
597         }
598 
599         long timestamp; /**< Milliseconds since epoch (UTC). */
600         Type type; /**< Timestamp type */
601     }
602 
603 
604     @disable this(this);
605 nothrow @nogc:
606      ~this()
607     {
608         if (free_rkmessage_ && rkmessage_)
609             rd_kafka_message_destroy(cast(rd_kafka_message_t*) rkmessage_);
610     }
611 
612     this(Topic topic, rd_kafka_message_t* rkmessage, bool dofree = true)
613     {
614         topic_ = topic;
615         rkmessage_ = rkmessage;
616         free_rkmessage_ = dofree;
617     }
618 
619     this(Topic topic, const rd_kafka_message_t* rkmessage)
620     {
621         topic_ = topic;
622         rkmessage_ = rkmessage;
623     }
624 
625     this(rd_kafka_message_t* rkmessage)
626     {
627 
628         rkmessage_ = rkmessage;
629         free_rkmessage_ = true;
630 
631         if (rkmessage.rkt)
632         {
633             /* Possibly null */
634             topic_ = cast(Topic) rd_kafka_topic_opaque(rkmessage.rkt);
635         }
636     }
637 
638     /* Create errored message */
639     this(Topic topic, ErrorCode err)
640     {
641         topic_ = topic;
642         rkmessage_ = &rkmessage_err_;
643         memset(&rkmessage_err_, 0, rkmessage_err_.sizeof);
644         rkmessage_err_.err = cast(rd_kafka_resp_err_t)(err);
645     }
646 
647     /** The error string if object represent an error event,
648    *           else an empty string. */
649     string errstr() const
650     {
651         /* FIXME: If there is an error string in payload (for consume_cb)
652      *        it wont be shown since 'payload' is reused for errstr
653      *        and we cant distinguish between consumer and producer.
654      *        For the producer case the payload needs to be the original
655      *        payload pointer. */
656         return err2str(cast(ErrorCode)rkmessage_.err);
657     }
658 
659     /** The error code if object represents an error event, else 0. */
660     ErrorCode err() const
661     {
662         return cast(ErrorCode) rkmessage_.err;
663     }
664 
665     /** the Topic object for a message (if applicable),
666    *            or null if a corresponding Topic object has not been
667    *            explicitly created with Topic::create().
668    *            In this case use topic_name() instead. */
669     const(Topic) topic() const
670     {
671         return topic_;
672     }
673     /** Topic name (if applicable, else empty string) */
674     const(char)[] topicName() const
675     {
676         if (rkmessage_.rkt)
677             return rd_kafka_topic_name(rkmessage_.rkt).fromStringz;
678         else
679             return "";
680     }
681     /** Partition (if applicable) */
682     int partition() const
683     {
684         return rkmessage_.partition;
685     }
686     /** Message payload (if applicable) */
687     const(void)[] payload() const
688     {
689         return rkmessage_.payload[0 .. rkmessage_.len];
690     }
691 
692     /** Message key as string (if applicable) */
693     const(void)[] key() const
694     {
695         return (cast(const(char)*) rkmessage_.key)[0 .. rkmessage_.key_len];
696     }
697 
698     /** Message or error offset (if applicable) */
699     long offset() const
700     {
701         return rkmessage_.offset;
702     }
703 
704     /** Message timestamp (if applicable) */
705     Timestamp timestamp() const
706     {
707         Timestamp ts;
708         rd_kafka_timestamp_type_t tstype;
709         ts.timestamp = rd_kafka_message_timestamp(rkmessage_, &tstype);
710         ts.type = cast(Timestamp.Type) tstype;
711         return ts;
712     }
713 
714     /** The \p msg_opaque as provided to Producer::produce() */
715     const(void)* msgOpaque() const
716     {
717         return rkmessage_._private;
718     }
719 
720 package:
721     Topic topic_;
722     const (rd_kafka_message_t)* rkmessage_;
723     bool free_rkmessage_;
724     /* For error signalling by the C++ layer the .._err_ message is
725    * used as a place holder and rkmessage_ is set to point to it. */
726     rd_kafka_message_t rkmessage_err_;
727 }