1 /*
2  * librdkafka - Apache Kafka C/C++ library
3  *
4  * Copyright (c) 2014 Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 /**
30  * Apache Kafka C/C++ consumer and producer client library.
31  *
32  * rdkafkacpp.h contains the public C++ API for librdkafka.
33  * The API is documented in this file as comments prefixing the class,
34  * function, type, enum, define, etc.
35  * For more information, see the C interface in rdkafka.h and read the
36  * manual in INTRODUCTION.md.
37  * The C++ interface is STD C++ '03 compliant and adheres to the
38  * Google C++ Style Guide.
39 
40  * See_also: For the C interface see rdkafka.h
41  *
42  */
43 module rdkafkad;
44 
45 package import std.string : fromStringz, toStringz;
46 package import core.stdc.errno;
47 package import core.stdc.string;
48 package import core.stdc.stdlib;
49 package import core.stdc.ctype;
50 package import core.sys.posix.sys.types;
51 package import deimos.rdkafka;
52 
53 public import rdkafkad.config;
54 public import rdkafkad.handlers;
55 public import rdkafkad.metadata;
56 public import rdkafkad.topic;
57 
58 /**
59  * librdkafka version
60  *
61  * Interpreted as hex \c MM.mm.rr.xx:
62  *  - MM = Major
63  *  - mm = minor
64  *  - rr = revision
65  *  - xx = pre-release id (0xff is the final release)
66  *
67  * E.g.: \c 0x000801ff.8.1
68  *
69  * Note: This value should only be used during compile time,
70  *         for runtime checks of version use version()
71  */
72 enum RD_KAFKA_VERSION = 0x00090200;
73 
74 /**
75  * Returns the librdkafka version as integer.
76  *
77  * See_also: See RD_KAFKA_VERSION for how to parse the integer format.
78  */
79 int version_() nothrow @nogc
80 {
81     return rd_kafka_version();
82 }
83 
84 /**
85  * Returns the librdkafka version as string.
86  */
87 const(char)[] versionStr() nothrow @nogc
88 {
89     return fromStringz(rd_kafka_version_str);
90 }
91 
92 /**
93  * Returns a CSV list of the supported debug contexts
94  *        for use with Conf::Set("debug", ..).
95  */
96 const(char)[] getDebugContexts() nothrow @nogc
97 {
98     return rd_kafka_get_debug_contexts().fromStringz;
99 }
100 
101 /**
102  * Wait for all rd_kafka_t objects to be destroyed.
103  *
104  * 0 if all kafka objects are now destroyed, or -1 if the
105  * timeout was reached.
106  * Since RdKafka handle deletion is an asynch operation the
107  * \p wait_destroyed() function can be used for applications where
108  * a clean shutdown is required.
109  */
110 int waitDestroyed(int timeout_ms) nothrow @nogc
111 {
112     return rd_kafka_wait_destroyed(timeout_ms);
113 }
114 
115 /**
116  * Error codes.
117  *
118  * The negative error codes delimited by two underscores
119  * (\c _..) denotes errors internal to librdkafka and are
120  * displayed as \c \"Local: \<error string..\>\", while the error codes
121  * delimited by a single underscore (\c ERR_..) denote broker
122  * errors and are displayed as \c \"Broker: \<error string..\>\".
123  *
124  * See_also: Use err2str() to translate an error code a human readable string
125  */
126 enum ErrorCode
127 {
128     /* Internal errors to rdkafka: */
129     /** Begin internal error codes */
130     begin = -200,
131     /** Received message is incorrect */
132     bad_msg = -199,
133     /** Bad/unknown compression */
134     bad_compression = -198,
135     /** Broker is going away */
136     destroy = -197,
137     /** Generic failure */
138     fail = -196,
139     /** Broker transport failure */
140     transport = -195,
141     /** Critical system resource */
142     crit_sys_resource = -194,
143     /** Failed to resolve broker */
144     resolve = -193,
145     /** Produced message timed out*/
146     msg_timed_out = -192,
147     /** Reached the end of the topic+partition queue on
148    * the broker. Not really an error. */
149     partition_eof = -191,
150     /** Permanent: Partition does not exist in cluster. */
151     unknown_partition = -190,
152     /** File or filesystem error */
153     fs = -189,
154     /** Permanent: Topic does not exist in cluster. */
155     unknown_topic = -188,
156     /** All broker connections are down. */
157     all_brokers_down = -187,
158     /** Invalid argument, or invalid configuration */
159     invalid_arg = -186,
160     /** Operation timed out */
161     timed_out = -185,
162     /** Queue is full */
163     queue_full = -184,
164     /** ISR count < required.acks */
165     isr_insuff = -183,
166     /** Broker node update */
167     node_update = -182,
168     /** SSL error */
169     ssl = -181,
170     /** Waiting for coordinator to become available. */
171     wait_coord = -180,
172     /** Unknown client group */
173     unknown_group = -179,
174     /** Operation in progress */
175     in_progress = -178,
176     /** Previous operation in progress, wait for it to finish. */
177     prev_in_progress = -177,
178     /** This operation would interfere with an existing subscription */
179     existing_subscription = -176,
180     /** Assigned partitions (rebalance_cb) */
181     assign_partitions = -175,
182     /** Revoked partitions (rebalance_cb) */
183     revoke_partitions = -174,
184     /** Conflicting use */
185     conflict = -173,
186     /** Wrong state */
187     state = -172,
188     /** Unknown protocol */
189     unknown_protocol = -171,
190     /** Not implemented */
191     not_implemented = -170,
192     /** Authentication failure*/
193     authentication = -169,
194     /** No stored offset */
195     no_offset = -168,
196     /** Outdated */
197     outdated = -167,
198     /** Timed out in queue */
199     timed_out_queue = -166,
200 
201     /** End internal error codes */
202     end = -100,
203 
204     /* Kafka broker errors: */
205     /** Unknown broker error */
206     unknown = -1,
207     /** Success */
208     no_error,
209     /** Offset out of range */
210     offset_out_of_range = 1,
211     /** Invalid message */
212     invalid_msg = 2,
213     /** Unknown topic or partition */
214     unknown_topic_or_part = 3,
215     /** Invalid message size */
216     invalid_msg_size = 4,
217     /** Leader not available */
218     leader_not_available = 5,
219     /** Not leader for partition */
220     not_leader_for_partition = 6,
221     /** Request timed out */
222     request_timed_out = 7,
223     /** Broker not available */
224     broker_not_available = 8,
225     /** Replica not available */
226     replica_not_available = 9,
227     /** Message size too large */
228     msg_size_too_large = 10,
229     /** StaleControllerEpochCode */
230     stale_ctrl_epoch = 11,
231     /** Offset metadata string too large */
232     offset_metadata_too_large = 12,
233     /** Broker disconnected before response received */
234     network_exception = 13,
235     /** Group coordinator load in progress */
236     group_load_in_progress = 14,
237     /** Group coordinator not available */
238     group_coordinator_not_available = 15,
239     /** Not coordinator for group */
240     not_coordinator_for_group = 16,
241     /** Invalid topic */
242     topic_exception = 17,
243     /** Message batch larger than configured server segment size */
244     record_list_too_large = 18,
245     /** Not enough in-sync replicas */
246     not_enough_replicas = 19,
247     /** Message(s) written to insufficient number of in-sync replicas */
248     not_enough_replicas_after_append = 20,
249     /** Invalid required acks value */
250     invalid_required_acks = 21,
251     /** Specified group generation id is not valid */
252     illegal_generation = 22,
253     /** Inconsistent group protocol */
254     inconsistent_group_protocol = 23,
255     /** Invalid group.id */
256     invalid_group_id = 24,
257     /** Unknown member */
258     unknown_member_id = 25,
259     /** Invalid session timeout */
260     invalid_session_timeout = 26,
261     /** Group rebalance in progress */
262     rebalance_in_progress = 27,
263     /** Commit offset data size is not valid */
264     invalid_commit_offset_size = 28,
265     /** Topic authorization failed */
266     topic_authorization_failed = 29,
267     /** Group authorization failed */
268     group_authorization_failed = 30,
269     /** Cluster authorization failed */
270     cluster_authorization_failed = 31
271 }
272 
273 /**
274  * Returns a human readable representation of a kafka error.
275  */
276 
277 string err2str(ErrorCode err) nothrow @nogc
278 {
279     return cast(string)rd_kafka_err2str(cast(rd_kafka_resp_err_t) err).fromStringz;
280 }
281 
282 /**
283  * Delivery Report callback class
284  *
285  * The delivery report callback will be called once for each message
286  * accepted by Producer::produce() (et.al) with
287  * Message::err() set to indicate the result of the produce request.
288  *
289  * The callback is called when a message is succesfully produced or
290  * if librdkafka encountered a permanent failure, or the retry counter for
291  * temporary errors has been exhausted.
292  *
293  * An application must call poll() at regular intervals to
294  * serve queued delivery report callbacks.
295 
296  */
297 alias DeliveryReportCb = void delegate(ref Message message) nothrow @nogc;
298 
299 /**
300  * Partitioner callback class
301  *
302  * Generic partitioner callback class for implementing custom partitioners.
303  *
304  * See_also: Conf::set() \c "partitioner_cb"
305  */
306 /**
307    * Partitioner callback
308    *
309    * Return the partition to use for \p key in \p topic.
310    *
311    * The \p msg_opaque is the same \p msg_opaque provided in the
312    * Producer::produce() call.
313    *
314    * Note: \p key may be null or the empty.
315    *
316    * Must return a value between 0 and \p partition_cnt (non-inclusive).
317    *          May return RD_KAFKA_PARTITION_UA (-1) if partitioning failed.
318    *
319    * See_also: The callback may use Topic::partition_available() to check
320    *     if a partition has an active leader broker.
321    */
322 alias PartitionerCb = int delegate(const Topic topic, const(void)[] key,
323     int partition_cnt, void* msg_opaque) nothrow @nogc;
324 
325 /**
326  *  Variant partitioner with key pointer
327  *
328  */
329 alias PartitionerKeyPointerCb = int delegate(const Topic topic, const(void)* key,
330     size_t key_len, int partition_cnt, void* msg_opaque) nothrow @nogc;
331 
332 /**
333  * Event callback class
334  *
335  * Events are a generic interface for propagating errors, statistics, logs, etc
336  * from librdkafka to the application.
337  *
338  * See_also: Event
339  */
340 alias EventCb = void delegate(ref Event event) nothrow @nogc;
341 
342 /**
343  * Event object class as passed to the EventCb callback.
344  */
345 struct Event
346 {
347 nothrow @nogc:
348     /** Event type */
349     enum Type
350     {
351         error, /**< Event is an error condition */
352         stats, /**< Event is a statistics JSON document */
353         log, /**< Event is a log message */
354         throttle /**< Event is a throttle level signaling from the broker */
355     }
356 
357     /** LOG severities (conforms to syslog(3) severities) */
358     enum Severity
359     {
360         emerg,
361         alert = 1,
362         critical = 2,
363         error = 3,
364         warning = 4,
365         notice = 5,
366         info = 6,
367         debug_ = 7
368     }
369 
370     package this(Type type, ErrorCode err, Severity severity, const char* fac, const char* str)
371     {
372         type_ = type;
373         err_ = err;
374         severity_ = severity;
375         fac_ = fac.fromStringz;
376         str_ = str.fromStringz;
377     }
378 
379     package this(Type type)
380     {
381         type_ = type;
382         err_ = ErrorCode.no_error;
383         severity_ = Severity.emerg;
384         fac_ = "";
385         str_ = "";
386     }
387 
388     package Type type_;
389     package ErrorCode err_;
390     package Severity severity_;
391     package const(char)[] fac_;
392     package const(char)[] str_; /* reused for throttle broker_name */
393     package int id_;
394     package int throttle_time_;
395 
396 @property:
397 
398     /*
399    * Event Accessor methods
400    */
401 
402     /**
403    * The event type
404    * Note: Applies to all event types
405    */
406     Type type() const
407     {
408         return type_;
409     }
410 
411     /**
412    * Event error, if any.
413    * Note: Applies to all event types except throttle
414    */
415     ErrorCode err() const
416     {
417         return err_;
418     }
419 
420     /**
421    * Log severity level.
422    * Note: Applies to LOG event type.
423    */
424     Severity severity() const
425     {
426         return severity_;
427     }
428 
429     /**
430    * Log facility string.
431    * Note: Applies to LOG event type.
432    */
433     const(char)[] fac() const
434     {
435         return fac_;
436     }
437 
438     /**
439    * Log message string.
440    *
441    * \c LOG: Log message string.
442    * \c STATS: JSON object (as string).
443    *
444    * Note: Applies to LOG event type.
445    */
446     const(char)[] str() const
447     {
448         return str_;
449     }
450 
451     /**
452    * throttle time in milliseconds.
453    * Note: Applies to throttle event type.
454    */
455     int throttleTime() const
456     {
457         return throttle_time_;
458     }
459 
460     /**
461    * Throttling broker's name.
462    * Note: Applies to throttle event type.
463    */
464     const(char)[] brokerName() const
465     {
466         if (type_ == Type.throttle)
467             return str_;
468         else
469             return "";
470     }
471 
472     /**
473    * Throttling broker's id.
474    * Note: Applies to throttle event type.
475    */
476     int brokerId() const
477     {
478         return id_;
479     }
480 }
481 
482 /**
483  * Consume callback class
484  */
485 /**
486 * The consume callback is used with
487 *        Consumer::consumeCallback()
488 *        methods and will be called for each consumed \p message.
489 *
490 * The callback interface is optional but provides increased performance.
491 */
492 alias ConsumeCb = void delegate(ref Message message) nothrow @nogc;
493 
494 /**
495  * \b KafkaConsunmer: Rebalance callback class
496  */
497 /**
498 * Group rebalance callback for use with KafkaConsunmer
499 *
500 * Registering a \p rebalance_cb turns off librdkafka's automatic
501 * partition assignment/revocation and instead delegates that responsibility
502 * to the application's \p rebalance_cb.
503 *
504 * The rebalance callback is responsible for updating librdkafka's
505 * assignment set based on the two events: ASSIGN_PARTITIONS
506 * and REVOKE_PARTITIONS but should also be able to handle
507 * arbitrary rebalancing failures where \p err is neither of those.
508 * Note: In this latter case (arbitrary error), the application must
509 *         call unassign() to synchronize state.
510 *
511 * Without a rebalance callback this is done automatically by librdkafka
512 * but registering a rebalance callback gives the application flexibility
513 * in performing other operations along with the assinging/revocation,
514 * such as fetching offsets from an alternate location (on assign)
515 * or manually committing offsets (on revoke).
516 */
517 alias RebalanceCb = void delegate(KafkaConsumer consumer, ErrorCode err,
518     ref TopicPartition[] partitions) nothrow @nogc;
519 
520 /**
521  * Offset Commit callback class
522  */
523 /**
524 * Set offset commit callback for use with consumer groups
525 *
526 * The results of automatic or manual offset commits will be scheduled
527 * for this callback and is served by consume()
528 * or commitSync()
529 *
530 * If no partitions had valid offsets to commit this callback will be called
531 * with \p err == NO_OFFSET which is not to be considered an error.
532 *
533 * The \p offsets list contains per-partition information:
534 *   - \c topic      The topic committed
535 *   - \c partition  The partition committed
536 *   - \c offset:    Committed offset (attempted)
537 *   - \c err:       Commit error
538 */
539 
540 alias OffsetCommitCb = void delegate(ErrorCode err, ref TopicPartition[] offsets) nothrow @nogc;
541 
542 /**
543  * \b Portability: SocketCb callback class
544  *
545  */
546 /**
547 * Socket callback
548 *
549 * The socket callback is responsible for opening a socket
550 * according to the supplied \p domain, \p type and \p protocol.
551 * The socket shall be created with \c CLOEXEC set in a racefree fashion, if
552 * possible.
553 *
554 * It is typically not required to register an alternative socket
555 * implementation
556 *
557 * The socket file descriptor or -1 on error (\c errno must be set)
558 */
559 alias SocketCb = int function(int domain, int type, int protocol) nothrow @nogc;
560 
561 
562 /**
563  * Message object
564  *
565  * This object represents either a single consumed or produced message,
566  * or an event (\p err() is set).
567  *
568  * An application must check Message::err() to see if the
569  * object is a proper message (error is ErrorCode.no_error) or a
570  * an error event.
571  *
572  */
573 struct Message
574 {
575     /**
576      * Message timestamp object
577      *
578      * Represents the number of milliseconds since the epoch (UTC).
579      *
580      * The Type dictates the timestamp type or origin.
581      *
582      * Note: Requires Apache Kafka broker version >= 0.10.0
583      *
584      */
585     struct Timestamp
586     {
587         enum Type
588         {
589             not_available, /**< Timestamp not available */
590             create_time, /**< Message creation time (source) */
591             log_append_time /**< Message log append time (broker) */
592         }
593 
594         long timestamp; /**< Milliseconds since epoch (UTC). */
595         Type type; /**< Timestamp type */
596     }
597 
598 
599     @disable this(this);
600 nothrow @nogc:
601      ~this()
602     {
603         if (free_rkmessage_ && rkmessage_)
604             rd_kafka_message_destroy(cast(rd_kafka_message_t*) rkmessage_);
605     }
606 
607     this(Topic topic, rd_kafka_message_t* rkmessage, bool dofree = true)
608     {
609         topic_ = topic;
610         rkmessage_ = rkmessage;
611         free_rkmessage_ = dofree;
612     }
613 
614     this(Topic topic, const rd_kafka_message_t* rkmessage)
615     {
616         topic_ = topic;
617         rkmessage_ = rkmessage;
618     }
619 
620     this(rd_kafka_message_t* rkmessage)
621     {
622 
623         rkmessage_ = rkmessage;
624         free_rkmessage_ = true;
625 
626         if (rkmessage.rkt)
627         {
628             /* Possibly null */
629             topic_ = cast(Topic) rd_kafka_topic_opaque(rkmessage.rkt);
630         }
631     }
632 
633     /* Create errored message */
634     this(Topic topic, ErrorCode err)
635     {
636         topic_ = topic;
637         rkmessage_ = &rkmessage_err_;
638         memset(&rkmessage_err_, 0, rkmessage_err_.sizeof);
639         rkmessage_err_.err = cast(rd_kafka_resp_err_t)(err);
640     }
641 
642     /** The error string if object represent an error event,
643    *           else an empty string. */
644     string errstr() const
645     {
646         /* FIXME: If there is an error string in payload (for consume_cb)
647      *        it wont be shown since 'payload' is reused for errstr
648      *        and we cant distinguish between consumer and producer.
649      *        For the producer case the payload needs to be the original
650      *        payload pointer. */
651         return err2str(cast(ErrorCode)rkmessage_.err);
652     }
653 
654     /** The error code if object represents an error event, else 0. */
655     ErrorCode err() const
656     {
657         return cast(ErrorCode) rkmessage_.err;
658     }
659 
660     /** the Topic object for a message (if applicable),
661    *            or null if a corresponding Topic object has not been
662    *            explicitly created with Topic::create().
663    *            In this case use topic_name() instead. */
664     const(Topic) topic() const
665     {
666         return topic_;
667     }
668     /** Topic name (if applicable, else empty string) */
669     const(char)[] topicName() const
670     {
671         if (rkmessage_.rkt)
672             return rd_kafka_topic_name(rkmessage_.rkt).fromStringz;
673         else
674             return "";
675     }
676     /** Partition (if applicable) */
677     int partition() const
678     {
679         return rkmessage_.partition;
680     }
681     /** Message payload (if applicable) */
682     const(void)[] payload() const
683     {
684         return rkmessage_.payload[0 .. rkmessage_.len];
685     }
686 
687     /** Message key as string (if applicable) */
688     const(void)[] key() const
689     {
690         return (cast(const(char)*) rkmessage_.key)[0 .. rkmessage_.key_len];
691     }
692 
693     /** Message or error offset (if applicable) */
694     long offset() const
695     {
696         return rkmessage_.offset;
697     }
698 
699     /** Message timestamp (if applicable) */
700     Timestamp timestamp() const
701     {
702         Timestamp ts;
703         rd_kafka_timestamp_type_t tstype;
704         ts.timestamp = rd_kafka_message_timestamp(rkmessage_, &tstype);
705         ts.type = cast(Timestamp.Type) tstype;
706         return ts;
707     }
708 
709     /** The \p msg_opaque as provided to Producer::produce() */
710     const(void)* msgOpaque() const
711     {
712         return rkmessage_._private;
713     }
714 
715 package:
716     Topic topic_;
717     const (rd_kafka_message_t)* rkmessage_;
718     bool free_rkmessage_;
719     /* For error signalling by the C++ layer the .._err_ message is
720    * used as a place holder and rkmessage_ is set to point to it. */
721     rd_kafka_message_t rkmessage_err_;
722 }