1 ///
2 module rdkafkad.handlers;
3 import rdkafkad;
4 import rdkafkad.iodriver;
5 
6 import std.datetime: Clock, UTC;
7 
8 /**
9  * Base handle, super class for specific clients.
10  */
11 class Handle
12 {
13 
14     /**
15     * Returns the client's broker-assigned group member id
16     *
17     * Note: This currently requires the high-level KafkaConsumer
18     *
19     * Last assigned member id, or empty string if not currently
20     *          a group member.
21     */
22     auto memberid() const
23     {
24         char* str;
25         mixin(IO!q{
26         str = rd_kafka_memberid(rk_);
27         });
28         string memberid = str.fromStringz.idup;
29         if (str)
30             rd_kafka_mem_free(cast(rd_kafka_s*) rk_, str);
31         return memberid;
32     }
33 
34     /**
35     * Request Metadata from broker.
36     *
37     * Parameters:
38     *  \p all_topics  - if non-zero: request info about all topics in cluster,
39     *                   if zero: only request info about locally known topics.
40     *  \p only_rkt    - only request info about this topic
41     *  \p metadatap   - pointer to hold metadata result.
42     *                   The \p *metadatap pointer must be released with \c delete.
43     *  \p timeout_ms  - maximum response time before failing.
44     *
45     * ErrorCode.no_error on success (in which case \p *metadatap
46     * will be set), else timed_out on timeout or
47     * other error code on error.
48     */
49     Metadata metadata(bool all_topics, const Topic only_rkt, int timeout_ms = 60_000)
50     {
51         const rd_kafka_metadata_t* cmetadatap = null;
52 
53         rd_kafka_topic_t* topic = only_rkt ? cast(rd_kafka_topic_t*) only_rkt.rkt_ : null;
54 
55         ErrorCode error;
56         mixin(IO!q{
57         error = cast(ErrorCode)rd_kafka_metadata(rk_, all_topics, topic, &cmetadatap, timeout_ms); 
58         });
59         if (error)
60         {
61             throw new Exception(error.err2str);
62         }
63         return new Metadata(cmetadatap);
64     }
65 
66 static if (!have_vibed)
67 mixin("@nogc nothrow:");
68 
69     package void setCommonConfig(GlobalConf conf)
70     {
71         rd_kafka_conf_set_opaque(conf.rk_conf_, cast(void*) this);
72 
73         if (conf.event_cb_)
74         {
75             rd_kafka_conf_set_log_cb(conf.rk_conf_, &log_cb_trampoline);
76             rd_kafka_conf_set_error_cb(conf.rk_conf_, &error_cb_trampoline);
77             rd_kafka_conf_set_throttle_cb(conf.rk_conf_, &throttle_cb_trampoline);
78             rd_kafka_conf_set_stats_cb(conf.rk_conf_, &stats_cb_trampoline);
79             event_cb_ = conf.event_cb_;
80         }
81 
82         if (conf.socket_cb_)
83         {
84             rd_kafka_conf_set_socket_cb(conf.rk_conf_, &socket_cb_trampoline);
85             socket_cb_ = conf.socket_cb_;
86         }
87 
88         version (Windows)
89         {
90         }
91         else if (conf.open_cb_)
92         {
93             rd_kafka_conf_set_open_cb(conf.rk_conf_, &open_cb_trampoline);
94             open_cb_ = conf.open_cb_;
95         }
96 
97         if (conf.rebalance_cb_)
98         {
99             rd_kafka_conf_set_rebalance_cb(conf.rk_conf_, &rebalance_cb_trampoline);
100             rebalance_cb_ = conf.rebalance_cb_;
101         }
102 
103         if (conf.offset_commit_cb_)
104         {
105             rd_kafka_conf_set_offset_commit_cb(conf.rk_conf_, &offset_commit_cb_trampoline);
106             offset_commit_cb_ = conf.offset_commit_cb_;
107         }
108     }
109 
110     /**
111     * Query broker for low (oldest/beginning)
112     *        and high (newest/end) offsets for partition.
113     *
114     * Offsets are returned in \p *low and \p *high respectively.
115     *
116     * ErrorCode.no_error on success or an error code on failure.
117     */
118     auto queryWatermarkOffsets(const(char)* topic, int partition,
119         ref long low, ref long high, int timeout_ms)
120     {
121         ErrorCode ret;
122         mixin(IO!q{
123         ret = cast(ErrorCode) rd_kafka_query_watermark_offsets(rk_, topic,
124             partition, &low, &high, timeout_ms);
125         });
126         return ret;
127     }
128 
129     package rd_kafka_t* rk_;
130     /* All Producer and Consumer callbacks must reside in HandleImpl and
131      * the opaque provided to rdkafka must be a pointer to HandleImpl, since
132      * ProducerImpl and ConsumerImpl classes cannot be safely directly cast to
133      * HandleImpl due to the skewed diamond inheritance. */
134     package EventCb event_cb_;
135     package SocketCb socket_cb_;
136     package OpenCb open_cb_;
137     package DeliveryReportCb dr_cb_;
138     package PartitionerCb partitioner_cb_;
139     package PartitionerKeyPointerCb partitioner_kp_cb_;
140     package RebalanceCb rebalance_cb_;
141     package OffsetCommitCb offset_commit_cb_;
142 
143     /** the name of the handle */
144     const(char)[] name() const
145     {
146         return rd_kafka_name(rk_).fromStringz;
147     }
148 
149     /**
150     * Polls the provided kafka handle for events.
151     *
152     * Events will trigger application provided callbacks to be called.
153     *
154     * The \p timeout_ms argument specifies the maximum amount of time
155     * (in milliseconds) that the call will block waiting for events.
156     * For non-blocking calls, provide 0 as \p timeout_ms.
157     * To wait indefinately for events, provide -1.
158     *
159     * Events:
160     *   - delivery report callbacks (if an DeliveryCb is configured) [producer]
161     *   - event callbacks (if an EventCb is configured) [producer & consumer]
162     *
163     * Note:  An application should make sure to call poll() at regular
164     *          intervals to serve any queued callbacks waiting to be called.
165     *
166     * Warning: This method MUST NOT be used with the KafkaConsumer,
167     *          use its consume() instead.
168     *
169     * the number of events served.
170     */
171     int poll(int timeout_ms = 10)
172     {
173         typeof(return) ret;
174         static if (have_vibed == true)
175         {
176             import vibe.core.core: sleep;
177             import core.time: msecs;
178             ret = rd_kafka_poll(rk_, 0);
179             if (ret == 0 && timeout_ms)
180                 sleep(timeout_ms.msecs);
181         }
182         else
183         {
184             ret = rd_kafka_poll(rk_, timeout_ms);
185         }
186         return ret;
187     }
188 
189     /**
190     *  Returns the current out queue length
191     *
192     * The out queue contains messages and requests waiting to be sent to,
193     * or acknowledged by, the broker.
194     */
195     int outqLen()
196     {
197         typeof(return) ret;
198         mixin(IO!q{
199         ret = rd_kafka_outq_len(rk_);
200         });
201         return ret;
202     }
203 
204     /**
205      * Convert a list of C partitions to C++ partitions
206      */
207     nothrow @nogc private static TopicPartition[] c_parts_to_partitions(
208         const rd_kafka_topic_partition_list_t* c_parts)
209     {
210         auto partitions = (cast(TopicPartition*) malloc(c_parts.cnt * TopicPartition.sizeof))[0
211             .. c_parts.cnt];
212         foreach (i, ref p; partitions)
213             partitions[i] = TopicPartition(&c_parts.elems[i]);
214         return partitions;
215     }
216 
217     nothrow @nogc static void free_partition_vector(ref TopicPartition[] v)
218     {
219         foreach (ref p; v)
220             p.destroy;
221         free(v.ptr);
222         v = null;
223     }
224 
225     nothrow @nogc private static rd_kafka_topic_partition_list_t* partitions_to_c_parts(const TopicPartition[] partitions)
226     {
227         rd_kafka_topic_partition_list_t* c_parts = rd_kafka_topic_partition_list_new(cast(int) partitions.length);
228 
229         foreach (ref tpi; partitions)
230         {
231             rd_kafka_topic_partition_t* rktpar = rd_kafka_topic_partition_list_add(c_parts, tpi.topic_, tpi.partition_);
232             rktpar.offset = tpi.offset_;
233         }
234 
235         return c_parts;
236     }
237 
238     /**
239      * @brief Update the application provided 'partitions' with info from 'c_parts'
240      */
241     nothrow @nogc private static void update_partitions_from_c_parts(TopicPartition[] partitions,
242         const rd_kafka_topic_partition_list_t* c_parts)
243     {
244         foreach (i; 0 .. c_parts.cnt)
245         {
246             const rd_kafka_topic_partition_t* p = &c_parts.elems[i];
247 
248             /* Find corresponding C++ entry */
249             foreach (pp; partitions)
250             {
251                 if (!strcmp(p.topic, pp.topic_) && p.partition == pp.partition_)
252                 {
253                     pp.offset_ = p.offset;
254                     pp.err_ = cast(ErrorCode) p.err;
255                 }
256             }
257         }
258     }
259 
260     nothrow @nogc private static void log_cb_trampoline(const rd_kafka_t* rk, int level,
261         const(char)* fac, const(char)* buf)
262     {
263         if (!rk)
264         {
265             rd_kafka_log_print(rk, level, fac, buf);
266             return;
267         }
268 
269         void* opaque = rd_kafka_opaque(rk);
270         Handle handle = cast(Handle)(opaque);
271 
272         if (!handle.event_cb_)
273         {
274             rd_kafka_log_print(rk, level, fac, buf);
275             return;
276         }
277 
278         auto event = Event(Event.Type.log, ErrorCode.no_error,
279             cast(Event.Severity)(level), fac, buf);
280 
281         handle.event_cb_(event);
282     }
283 
284     nothrow @nogc private static void error_cb_trampoline(rd_kafka_t* rk, int err,
285         const(char)* reason, void* opaque)
286     {
287         Handle handle = cast(Handle)(opaque);
288 
289         auto event = Event(Event.Type.error, cast(ErrorCode) err,
290             Event.Severity.error, null, reason);
291 
292         handle.event_cb_(event);
293     }
294 
295     nothrow @nogc private static void throttle_cb_trampoline(rd_kafka_t* rk,
296         const(char)* broker_name, int broker_id, int throttle_time_ms, void* opaque)
297     {
298         Handle handle = cast(Handle)(opaque);
299 
300         auto event = Event(Event.Type.throttle);
301         event.str_ = broker_name.fromStringz;
302         event.id_ = broker_id;
303         event.throttle_time_ = throttle_time_ms;
304 
305         handle.event_cb_(event);
306     }
307 
308     nothrow @nogc private static int stats_cb_trampoline(rd_kafka_t* rk, char* json,
309         size_t json_len, void* opaque)
310     {
311         Handle handle = cast(Handle)(opaque);
312         auto event = Event(Event.Type.stats, ErrorCode.no_error, Event.Severity.info,
313             null, json);
314 
315         handle.event_cb_(event);
316 
317         return 0;
318     }
319 
320     nothrow @nogc private static int socket_cb_trampoline(int domain, int type, int protocol, void* opaque)
321     {
322         Handle handle = cast(Handle)(opaque);
323 
324         return handle.socket_cb_(domain, type, protocol);
325     }
326 
327     nothrow @nogc private static int open_cb_trampoline(const(char)* pathname, int flags,
328         mode_t mode, void* opaque)
329     {
330         Handle handle = cast(Handle)(opaque);
331 
332         return handle.open_cb_(pathname.fromStringz, flags, cast(int)(mode));
333     }
334 
335     nothrow @nogc private static void rebalance_cb_trampoline(rd_kafka_t* rk,
336         rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t* c_partitions, void* opaque)
337     {
338         auto handle = cast(KafkaConsumer)(opaque);
339         TopicPartition[] partitions = c_parts_to_partitions(c_partitions);
340 
341         handle.rebalance_cb_(handle, cast(ErrorCode) err, partitions);
342 
343         free_partition_vector(partitions);
344     }
345 
346     nothrow @nogc private static void offset_commit_cb_trampoline(rd_kafka_t* rk,
347         rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t* c_offsets, void* opaque)
348     {
349         Handle handle = cast(Handle)(opaque);
350         TopicPartition[] offsets;
351 
352         if (c_offsets)
353             offsets = c_parts_to_partitions(c_offsets);
354 
355         handle.offset_commit_cb_(cast(ErrorCode) err, offsets);
356 
357         free_partition_vector(offsets);
358     }
359 
360     /**
361     * Pause producing or consumption for the provided list of partitions.
362     *
363     * Success or error is returned per-partition in the \p partitions list.
364     *
365     * ErrorCode::no_error
366     *
367     * See_also: resume()
368     */
369     ErrorCode pause(TopicPartition[] partitions)
370     {
371         rd_kafka_topic_partition_list_t* c_parts;
372         rd_kafka_resp_err_t err;
373 
374         c_parts = partitions_to_c_parts(partitions);
375 
376         err = rd_kafka_pause_partitions(rk_, c_parts);
377 
378         if (!err)
379             update_partitions_from_c_parts(partitions, c_parts);
380 
381         rd_kafka_topic_partition_list_destroy(c_parts);
382 
383         return cast(ErrorCode) err;
384 
385     }
386 
387     /**
388     * Resume producing or consumption for the provided list of partitions.
389     *
390     * Success or error is returned per-partition in the \p partitions list.
391     *
392     * ErrorCode::no_error
393     *
394     * See_also: pause()
395     */
396     ErrorCode resume(TopicPartition[] partitions)
397     {
398         rd_kafka_topic_partition_list_t* c_parts;
399         rd_kafka_resp_err_t err;
400 
401         c_parts = partitions_to_c_parts(partitions);
402 
403         err = rd_kafka_resume_partitions(rk_, c_parts);
404 
405         if (!err)
406             update_partitions_from_c_parts(partitions, c_parts);
407 
408         rd_kafka_topic_partition_list_destroy(c_parts);
409 
410         return cast(ErrorCode) err;
411     }
412 
413     /**
414    * Get last known low (oldest/beginning)
415    *        and high (newest/end) offsets for partition.
416    *
417    * The low offset is updated periodically (if statistics.interval.ms is set)
418    * while the high offset is updated on each fetched message set from the
419    * broker.
420    *
421    * If there is no cached offset (either low or high, or both) then
422    * OFFSET_INVALID will be returned for the respective offset.
423    *
424    * Offsets are returned in \p *low and \p *high respectively.
425    *
426    * ErrorCode.no_error on success or an error code on failure.
427    *
428    * Note: Shall only be used with an active consumer instance.
429    */
430     ErrorCode getWatermarkOffsets(const(char)* topic, int partition, ref long low,
431         ref long high)
432     {
433         return cast(ErrorCode) rd_kafka_get_watermark_offsets(rk_, topic,
434             partition, &low, &high);
435     }
436 }
437 
438 /**
439  * Queue interface
440  *
441  * Create a new message queue.  Message queues allows the application
442  * to re-route consumed messages from multiple topic+partitions into
443  * one single queue point.  This queue point, containing messages from
444  * a number of topic+partitions, may then be served by a single
445  * consume() method, rather than one per topic+partition combination.
446  *
447  * See the Consumer::start(), Consumer::consume(), and
448  * Consumer::consumeCallback() methods that take a queue as the first
449  * parameter for more information.
450  */
451 class Queue
452 {
453 nothrow @nogc:
454     /**
455    * Create Queue object
456    */
457     this(Handle handle)
458     {
459         queue_ = rd_kafka_queue_new(handle.rk_);
460     }
461 
462     ~this()
463     {
464         rd_kafka_queue_destroy(queue_);
465     }
466 
467 private:
468     rd_kafka_queue_t* queue_;
469 }
470 
471 /**
472  * High-level KafkaConsumer (for brokers 0.9 and later)
473  *
474  * Note: Requires Apache Kafka >= 0.9.0 brokers
475  *
476  * Currently supports the \c range and \c roundrobin partition assignment
477  * strategies (see \c partition.assignment.strategy)
478  */
479 class KafkaConsumer : Handle
480 {
481     /**
482    * Creates a KafkaConsumer.
483    *
484    * The \p conf object must have \c group.id set to the consumer group to join.
485    *
486    * Use close() to shut down the consumer.
487    *
488    * See_also: RebalanceCb
489    * See_also: CONFIGURATION.md for \c group.id, \c session.timeout.ms,
490    *     \c partition.assignment.strategy, etc.
491    */
492     this(GlobalConf conf)
493     {
494         char[512] errbuf = void;
495         rd_kafka_conf_t* rk_conf = null;
496         size_t grlen;
497 
498         if (!conf.rk_conf_)
499         {
500             throw new Exception("Requires Conf::CONF_GLOBAL object");
501         }
502 
503         if (rd_kafka_conf_get(conf.rk_conf_, "group.id", null,
504                 &grlen) != rd_kafka_conf_res_t.RD_KAFKA_CONF_OK || grlen <= 1 /* terminating null only */ )
505         {
506             throw new Exception("\"group.id\" must be configured");
507         }
508 
509         this.setCommonConfig(conf);
510 
511         rk_conf = rd_kafka_conf_dup(conf.rk_conf_);
512 
513         rd_kafka_t* rk;
514         if (null is(rk = rd_kafka_new(rd_kafka_type_t.RD_KAFKA_CONSUMER, rk_conf,
515                 errbuf.ptr, errbuf.sizeof)))
516         {
517             throw new Exception(errbuf.ptr.fromStringz.idup);
518         }
519 
520         this.rk_ = rk;
521 
522         /* Redirect handle queue to cgrp's queue to provide a single queue point */
523         rd_kafka_poll_set_consumer(rk);
524     }
525 
526 static if (!have_vibed)
527 mixin("nothrow:");
528 
529     /** Returns the current partition assignment as set by
530      *  assign() */
531     ErrorCode assignment(ref TopicPartition[] partitions) 
532     {
533         rd_kafka_topic_partition_list_t* c_parts;
534         rd_kafka_resp_err_t err;
535 
536         mixin(IO!q{
537         err = rd_kafka_assignment(rk_, &c_parts);
538         });
539         if (err)
540             return cast(ErrorCode) err;
541 
542         partitions.length = c_parts.cnt;
543 
544         foreach (i, ref p; partitions)
545             p = TopicPartition(&c_parts.elems[i]);
546 
547         rd_kafka_topic_partition_list_destroy(c_parts);
548 
549         return ErrorCode.no_error;
550 
551     }
552 
553 static if (!have_vibed)
554 mixin("@nogc:");
555 
556   /**
557    * Update the subscription set to \p topics.
558    *
559    * Any previous subscription will be unassigned and  unsubscribed first.
560    *
561    * The subscription set denotes the desired topics to consume and this
562    * set is provided to the partition assignor (one of the elected group
563    * members) for all clients which then uses the configured
564    * \c partition.assignment.strategy to assign the subscription sets's
565    * topics's partitions to the consumers, depending on their subscription.
566    *
567    * The result of such an assignment is a rebalancing which is either
568    * handled automatically in librdkafka or can be overriden by the application
569    * by providing a RebalanceCb.
570    *
571    * The rebalancing passes the assigned partition set to
572    * assign() to update what partitions are actually
573    * being fetched by the KafkaConsumer.
574    *
575    * Regex pattern matching automatically performed for topics prefixed
576    * with \c \"^\" (e.g. \c \"^myPfx[0-9]_.*\"
577    */
578     ErrorCode subscribe(const(char)*[] topics...)
579     {
580         rd_kafka_topic_partition_list_t* c_topics;
581         rd_kafka_resp_err_t err;
582 
583         c_topics = rd_kafka_topic_partition_list_new(cast(int) topics.length);
584 
585         foreach (t; topics)
586             rd_kafka_topic_partition_list_add(c_topics, t, RD_KAFKA_PARTITION_UA);
587 
588         mixin(IO!q{
589         err = rd_kafka_subscribe(rk_, c_topics);
590         });
591 
592         rd_kafka_topic_partition_list_destroy(c_topics);
593 
594         return cast(ErrorCode) err;
595     }
596     /** Unsubscribe from the current subscription set. */
597     auto unsubscribe()
598     {
599         ErrorCode ret;
600         mixin(IO!q{
601         ret = cast(ErrorCode)(rd_kafka_unsubscribe(this.rk_));
602         });
603         return ret;
604     }
605 
606     /**
607    * Consume message or get error event, triggers callbacks.
608    *
609    * Will automatically call registered callbacks for any such queued events,
610    * including RebalanceCb, EventCb, OffsetCommitCb,
611    * etc.
612    *
613    * Note:  An application should make sure to call consume() at regular
614    *          intervals, even if no messages are expected, to serve any
615    *          queued callbacks waiting to be called. This is especially
616    *          important when a RebalanceCb has been registered as it needs
617    *          to be called and handled properly to synchronize internal
618    *          consumer state.
619    *
620    * Note: Application MUST NOT call \p poll() on KafkaConsumer objects.
621    *
622    * One of:
623    *  - proper message (Message::err() is ErrorCode.no_error)
624    *  - error event (Message::err() is != ErrorCode.no_error)
625    *  - timeout due to no message or event in \p timeout_ms
626    *    (Message::err() is timed_out)
627    */
628    /++
629    Params:
630         msg = message to fill. Use `msg.err` to check errors.
631         timeout_ms = time to to wait if no incomming msgs in queue.
632    +/
633     auto consume(ref Message msg, int timeout_ms = 10)
634     {
635         rd_kafka_message_t* rkmessage;
636 
637         mixin(IO!q{
638         rkmessage = rd_kafka_consumer_poll(this.rk_, timeout_ms);
639         });
640 
641         if (!rkmessage)
642         {
643             msg = Message(null, ErrorCode.timed_out);
644             return;
645         }
646 
647         msg = Message(rkmessage);
648     }
649 
650     /**
651    *  Update the assignment set to \p partitions.
652    *
653    * The assignment set is the set of partitions actually being consumed
654    * by the KafkaConsumer.
655    */
656     ErrorCode assign(const TopicPartition[] partitions)
657     {
658         rd_kafka_topic_partition_list_t* c_parts;
659         rd_kafka_resp_err_t err;
660 
661         c_parts = partitions_to_c_parts(partitions);
662 
663         mixin(IO!q{
664         err = rd_kafka_assign(rk_, c_parts);
665         });
666 
667         rd_kafka_topic_partition_list_destroy(c_parts);
668         return cast(ErrorCode) err;
669     }
670 
671     /**
672    * Stop consumption and remove the current assignment.
673    */
674     ErrorCode unassign()
675     {
676         typeof(return) ret;
677         mixin(IO!q{
678         ret = cast(ErrorCode) rd_kafka_assign(rk_, null);
679         });
680         return ret;
681     }
682 
683     /**
684    * Commit offsets for the current assignment.
685    *
686    * Note: This is the synchronous variant that blocks until offsets
687    *         are committed or the commit fails (see return value).
688    *
689    * Note: If a OffsetCommitCb callback is registered it will
690    *         be called with commit details on a future call to
691    *         consume()
692    *
693    * ErrorCode.no_error or error code.
694    */
695     ErrorCode commitSync()
696     {
697         typeof(return) ret;
698         mixin(IO!q{
699         ret = cast(ErrorCode) rd_kafka_commit(rk_, null, 0 /*sync*/ );
700         });
701         return ret;
702     }
703 
704     /**
705    * Asynchronous version of CommitSync()
706    *
707    * See_also: KafkaConsummer::commitSync()
708    */
709     ErrorCode commitAsync()
710     {
711         typeof(return) ret;
712         mixin(IO!q{
713         ret = cast(ErrorCode) rd_kafka_commit(rk_, null, 1 /*async*/ );
714         });
715         return ret;
716     }
717 
718     /**
719    * Commit offset for a single topic+partition based on \p message
720    *
721    * Note: This is the synchronous variant.
722    *
723    * See_also: KafkaConsummer::commitSync()
724    */
725     ErrorCode commitSync(ref Message message)
726     {
727         typeof(return) ret;
728         mixin(IO!q{
729         ret = cast(ErrorCode) rd_kafka_commit_message(rk_, message.rkmessage_, 0 /*sync*/ );
730         });
731         return ret;
732     }
733 
734     /**
735    * Commit offset for a single topic+partition based on \p message
736    *
737    * Note: This is the asynchronous variant.
738    *
739    * See_also: KafkaConsummer::commitSync()
740    */
741     ErrorCode commitAsync(ref Message message)
742     {
743         typeof(return) ret;
744         mixin(IO!q{
745         ret = cast(ErrorCode) rd_kafka_commit_message(rk_, message.rkmessage_, 1 /*async*/ );
746         });
747         return ret;
748     }
749 
750     /**
751    * Commit offsets for the provided list of partitions.
752    *
753    * Note: This is the synchronous variant.
754    */
755     ErrorCode commitSync(TopicPartition[] offsets)
756     {
757         rd_kafka_topic_partition_list_t* c_parts = partitions_to_c_parts(offsets);
758         rd_kafka_resp_err_t err;
759         mixin(IO!q{
760         err = rd_kafka_commit(rk_, c_parts, 0);
761         });
762         if (!err)
763             update_partitions_from_c_parts(offsets, c_parts);
764         rd_kafka_topic_partition_list_destroy(c_parts);
765         return cast(ErrorCode) err;
766     }
767 
768     /**
769    * Commit offset for the provided list of partitions.
770    *
771    * Note: This is the asynchronous variant.
772    */
773     ErrorCode commitAsync(const TopicPartition[] offsets)
774     {
775         rd_kafka_topic_partition_list_t* c_parts = partitions_to_c_parts(offsets);
776         rd_kafka_resp_err_t err;
777         mixin(IO!q{
778         err = rd_kafka_commit(rk_, c_parts, 1);
779         });
780         rd_kafka_topic_partition_list_destroy(c_parts);
781         return cast(ErrorCode) err;
782     }
783 
784     /**
785    * Retrieve committed offsets for topics+partitions.
786    *
787    * RD_KAFKA_RESP_ErrorCode.no_error on success in which case the
788    *          \p offset or \p err field of each \p partitions' element is filled
789    *          in with the stored offset, or a partition specific error.
790    *          Else returns an error code.
791    */
792     ErrorCode committed(TopicPartition[] partitions, int timeout_ms)
793     {
794         rd_kafka_topic_partition_list_t* c_parts;
795         rd_kafka_resp_err_t err;
796 
797         c_parts = partitions_to_c_parts(partitions);
798 
799         mixin(IO!q{
800         err = rd_kafka_committed(rk_, c_parts, timeout_ms);
801         });
802 
803         if (!err)
804         {
805             update_partitions_from_c_parts(partitions, c_parts);
806         }
807 
808         rd_kafka_topic_partition_list_destroy(c_parts);
809 
810         return cast(ErrorCode) err;
811     }
812 
813     /**
814    * Retrieve current positions (offsets) for topics+partitions.
815    *
816    * RD_KAFKA_RESP_ErrorCode.no_error on success in which case the
817    *          \p offset or \p err field of each \p partitions' element is filled
818    *          in with the stored offset, or a partition specific error.
819    *          Else returns an error code.
820    */
821     ErrorCode position(TopicPartition[] partitions)
822     {
823         rd_kafka_topic_partition_list_t* c_parts;
824         rd_kafka_resp_err_t err;
825 
826         c_parts = partitions_to_c_parts(partitions);
827 
828         mixin(IO!q{
829         err = rd_kafka_position(rk_, c_parts);
830         });
831 
832         if (!err)
833         {
834             update_partitions_from_c_parts(partitions, c_parts);
835         }
836 
837         rd_kafka_topic_partition_list_destroy(c_parts);
838 
839         return cast(ErrorCode) err;
840 
841     }
842 
843     /**
844    * Close and shut down the proper.
845    *
846    * This call will block until the following operations are finished:
847    *  - Trigger a local rebalance to void the current assignment
848    *  - Stop consumption for current assignment
849    *  - Commit offsets
850    *  - Leave group
851    *
852    * The maximum blocking time is roughly limited to session.timeout.ms.
853    *
854    * Note: Callbacks, such as RebalanceCb and
855    *         OffsetCommitCb, etc, may be called.
856    *
857    * Note: The consumer object must later be freed with \c delete
858    */
859     ErrorCode close()
860     {
861         rd_kafka_resp_err_t err;
862         mixin(IO!q{
863         err = rd_kafka_consumer_close(rk_);
864         });
865         if (err)
866             return cast(ErrorCode) err;
867 
868         mixin(IO!q{
869         while (rd_kafka_outq_len(rk_) > 0)
870             rd_kafka_poll(rk_, 10);
871         rd_kafka_destroy(rk_);
872         });
873 
874         return cast(ErrorCode) err;
875     }
876 }
877 
878 /**
879  * Simple Consumer (legacy)
880  *
881  * A simple non-balanced, non-group-aware, consumer.
882  */
883 class Consumer : Handle
884 {
885     /**
886    * Creates a new Kafka consumer handle.
887    *
888    * \p conf is an optional object that will be used instead of the default
889    * configuration.
890    * The \p conf object is reusable after this call.
891    *
892    * the new handle on success or null on error in which case
893    * \p errstr is set to a human readable error message.
894    */
895     this(GlobalConf conf)
896     {
897         char[512] errbuf = void;
898         rd_kafka_conf_t* rk_conf = null;
899 
900         if (conf)
901         {
902             if (!conf.rk_conf_)
903             {
904                 throw new Exception("Requires Conf::CONF_GLOBAL object");
905             }
906 
907             this.setCommonConfig(conf);
908 
909             rk_conf = rd_kafka_conf_dup(conf.rk_conf_);
910         }
911 
912         mixin(IO!q{
913         rk_ = rd_kafka_new(rd_kafka_type_t.RD_KAFKA_CONSUMER,
914                 rk_conf, errbuf.ptr, errbuf.sizeof);
915         });
916         if (null is rk_)
917         {
918             throw new Exception(errbuf.ptr.fromStringz.idup);
919         }
920     }
921 
922 static if (!have_vibed)
923 mixin("nothrow @nogc:");
924 
925      ~this()
926     {
927         mixin(IO!q{
928         rd_kafka_destroy(rk_);
929         });
930     }
931 
932 static:
933 
934     /**
935    * Start consuming messages for topic and \p partition
936    * at offset \p offset which may either be a proper offset (0..N)
937    * or one of the the special offsets: \p OFFSET_BEGINNING or \p OFFSET_END.
938    *
939    * rdkafka will attempt to keep \p queued.min.messages (config property)
940    * messages in the local queue by repeatedly fetching batches of messages
941    * from the broker until the threshold is reached.
942    *
943    * The application shall use one of the \p ...consume*() functions
944    * to consume messages from the local queue, each kafka message being
945    * represented as a `Message ` object.
946    *
947    * \p ...start() must not be called multiple times for the same
948    * topic and partition without stopping consumption first with
949    * \p ...stop().
950    *
951    * an ErrorCode to indicate success or failure.
952    */
953     ErrorCode start(Topic topic, int partition, long offset, Queue queue)
954     {
955         int err;
956         mixin(IO!q{
957         err = rd_kafka_consume_start(topic.rkt_, partition, offset);
958         });
959         if (err == -1)
960             return cast(ErrorCode)(rd_kafka_errno2err(errno));
961         return ErrorCode.no_error;
962     }
963 
964     /**
965    * Stop consuming messages for topic and \p partition, purging
966    *        all messages currently in the local queue.
967    *
968    * The application needs to be stop all consumers before destroying
969    * the Consumer handle.
970    *
971    * an ErrorCode to indicate success or failure.
972    */
973     ErrorCode stop(Topic topic, int partition)
974     {
975         int err;
976         mixin(IO!q{
977         err = rd_kafka_consume_stop(topic.rkt_, partition);
978         });
979         if (err == -1)
980             return cast(ErrorCode)(rd_kafka_errno2err(errno));
981         return ErrorCode.no_error;
982     }
983 
984     /**
985    * Seek consumer for topic+partition to \p offset which is either an
986    *        absolute or logical offset.
987    *
988    * If \p timeout_ms is not 0 the call will wait this long for the
989    * seek to be performed. If the timeout is reached the internal state
990    * will be unknown and this function returns `timed_out`.
991    * If \p timeout_ms is 0 it will initiate the seek but return
992    * immediately without any error reporting (e.g., async).
993    *
994    * This call triggers a fetch queue barrier flush.
995    *
996    * an ErrorCode to indicate success or failure.
997    */
998     ErrorCode seek(Topic topic, int partition, long offset, int timeout_ms)
999     {
1000         int err;
1001         mixin(IO!q{
1002         err = rd_kafka_seek(topic.rkt_, partition, offset, timeout_ms);
1003         });
1004         if (err == -1)
1005             return cast(ErrorCode)(rd_kafka_errno2err(errno));
1006         return ErrorCode.no_error;
1007     }
1008 
1009     /**
1010    * Consume a single message from \p topic and \p partition.
1011    *
1012    * \p timeout_ms is maximum amount of time to wait for a message to be
1013    * received.
1014    * Consumer must have been previously started with \p ...start().
1015    *
1016    * a Message object, the application needs to check if message
1017    * is an error or a proper message Message::err() and checking for
1018    * \p ErrorCode.no_error.
1019    *
1020    * The message object must be destroyed when the application is done with it.
1021    *
1022    * Errors (in Message::err()):
1023    *  - timed_out - \p timeout_ms was reached with no new messages fetched.
1024    *  - PARTITION_EOF - End of partition reached, not an error.
1025    */
1026     void consume(Topic topic, int partition, int timeout_ms, ref Message msg)
1027     {
1028         rd_kafka_message_t* rkmessage;
1029 
1030         mixin(IO!q{
1031         rkmessage = rd_kafka_consume(topic.rkt_, partition, timeout_ms);
1032         });
1033         if (!rkmessage)
1034             msg = Message(topic, cast(ErrorCode) rd_kafka_errno2err(errno));
1035 
1036         msg = Message(topic, rkmessage);
1037     }
1038 
1039     /**
1040    * Consume a single message from the specified queue.
1041    *
1042    * \p timeout_ms is maximum amount of time to wait for a message to be
1043    * received.
1044    * Consumer must have been previously started on the queue with
1045    * \p ...start().
1046    *
1047    * a Message object, the application needs to check if message
1048    * is an error or a proper message \p Message.err() and checking for
1049    * \p ErrorCode.no_error.
1050    *
1051    * The message object must be destroyed when the application is done with it.
1052    *
1053    * Errors (in Message::err()):
1054    *   - timed_out - \p timeout_ms was reached with no new messages fetched
1055    *
1056    * Note that Message.topic() may be nullptr after certain kinds of
1057    * errors, so applications should check that it isn't null before
1058    * dereferencing it.
1059    */
1060     void consume(Queue queue, int timeout_ms, ref Message msg)
1061     {
1062         rd_kafka_message_t* rkmessage;
1063         mixin(IO!q{
1064         rkmessage = rd_kafka_consume_queue(queue.queue_, timeout_ms);
1065         });
1066         if (!rkmessage)
1067             msg = Message(null, cast(ErrorCode) rd_kafka_errno2err(errno));
1068         /*
1069        * Recover our Topic from the topic conf's opaque field, which we
1070        * set in Topic::create() for just this kind of situation.
1071        */
1072         void* opaque = rd_kafka_topic_opaque(rkmessage.rkt);
1073         Topic topic = cast(Topic)(opaque);
1074 
1075         msg = Message(topic, rkmessage);
1076     }
1077 
1078     /* Helper struct for `consume_callback'.
1079    * Encapsulates the values we need in order to call `rd_kafka_consume_callback'
1080    * and keep track of the C++ callback function and `opaque' value.
1081    */
1082     private static struct ConsumerCallback
1083     {
1084         /* This function is the one we give to `rd_kafka_consume_callback', with
1085      * the `opaque' pointer pointing to an instance of this struct, in which
1086      * we can find the C++ callback and `cb_data'.
1087      */
1088         static nothrow @nogc void consume_cb_trampoline(rd_kafka_message_t* msg, void* opaque)
1089         {
1090             ConsumerCallback* instance = cast(ConsumerCallback*) opaque;
1091             Message message = Message(instance.topic, msg, false  /*don't free*/ );
1092             instance.cb_cls(message);
1093         }
1094 
1095         Topic topic;
1096         ConsumeCb cb_cls;
1097     }
1098 
1099     /**
1100    * Consumes messages from \p topic and \p partition, calling
1101    *        the provided callback for each consumed messsage.
1102    *
1103    * \p consumeCallback() provides higher throughput performance
1104    * than \p consume().
1105    *
1106    * \p timeout_ms is the maximum amount of time to wait for one or
1107    * more messages to arrive.
1108    *
1109    * The provided \p consume_cb instance has its \p consume_cb function
1110    * called for every message received.
1111    *
1112    * The \p opaque argument is passed to the \p consume_cb as \p opaque.
1113    *
1114    * the number of messages processed or -1 on error.
1115    *
1116    * See_also: Consumer::consume()
1117    */
1118     int consumeCallback(Topic topic, int partition, int timeout_ms, ConsumeCb consume_cb)
1119     {
1120         auto context = ConsumerCallback(topic, consume_cb);
1121         int ret;
1122         mixin(IO!q{
1123         ret = rd_kafka_consume_callback(topic.rkt_, partition, timeout_ms,
1124             &ConsumerCallback.consume_cb_trampoline, &context);
1125         });
1126         return ret;
1127     }
1128 
1129     /* Helper struct for `consume_callback' with a Queue.
1130    * Encapsulates the values we need in order to call `rd_kafka_consume_callback'
1131    * and keep track of the C++ callback function and `opaque' value.
1132    */
1133     private static struct ConsumerQueueCallback
1134     {
1135         /* This function is the one we give to `rd_kafka_consume_callback', with
1136      * the `opaque' pointer pointing to an instance of this struct, in which
1137      * we can find the C++ callback and `cb_data'.
1138      */
1139         static nothrow @nogc void consume_cb_trampoline(rd_kafka_message_t* msg, void* opaque)
1140         {
1141             ConsumerQueueCallback* instance = cast(ConsumerQueueCallback*) opaque;
1142             /*
1143        * Recover our Topic from the topic conf's opaque field, which we
1144        * set in Topic::create() for just this kind of situation.
1145        */
1146             void* topic_opaque = rd_kafka_topic_opaque(msg.rkt);
1147             Topic topic = cast(Topic) topic_opaque;
1148             Message message = Message(topic, msg, false  /*don't free*/ );
1149             instance.cb_cls(message);
1150         }
1151 
1152         ConsumeCb cb_cls;
1153     }
1154 
1155     /**
1156    * Consumes messages from \p queue, calling the provided callback for
1157    *        each consumed messsage.
1158    *
1159    * See_also: Consumer::consumeCallback()
1160    */
1161 
1162     int consumeCallback(Queue queue, int timeout_ms, ConsumeCb consume_cb)
1163     {
1164         auto context = ConsumerQueueCallback(consume_cb);
1165         int ret;
1166         mixin(IO!q{
1167         ret = rd_kafka_consume_callback_queue(queue.queue_, timeout_ms,
1168             &ConsumerQueueCallback.consume_cb_trampoline, &context);
1169         });
1170         return ret;
1171     }
1172 
1173     /**
1174    * Converts an offset into the logical offset from the tail of a topic.
1175    *
1176    * \p offset is the (positive) number of items from the end.
1177    *
1178    * the logical offset for message \p offset from the tail, this value
1179    *          may be passed to Consumer::start, et.al.
1180    * Note: The returned logical offset is specific to librdkafka.
1181    */
1182     long offsetTail(long offset)
1183     {
1184         return RD_KAFKA_OFFSET_TAIL(offset);
1185     }
1186 }
1187 
1188 /**
1189  * Producer
1190  */
1191 class Producer : Handle
1192 {
1193     package GlobalConf _conf;
1194     /**
1195    * Creates a new Kafka producer handle.
1196    *
1197    * \p conf is an optional object that will be used instead of the default
1198    * configuration.
1199    * The \p conf object is reusable after this call.
1200    */
1201     this(GlobalConf conf)
1202     {
1203         _conf = conf;
1204         char[512] errbuf = void;
1205         rd_kafka_conf_t* rk_conf = null;
1206 
1207         if (conf)
1208         {
1209             if (!conf.rk_conf_)
1210             {
1211                 throw new Exception("Requires Conf::CONF_GLOBAL object");
1212             }
1213 
1214             this.setCommonConfig(conf);
1215 
1216             rk_conf = rd_kafka_conf_dup(conf.rk_conf_);
1217 
1218             if (conf.dr_cb_)
1219             {
1220                 rd_kafka_conf_set_dr_msg_cb(rk_conf, &dr_msg_cb_trampoline);
1221                 this.dr_cb_ = conf.dr_cb_;
1222             }
1223         }
1224 
1225         if (null is(rk_ = rd_kafka_new(rd_kafka_type_t.RD_KAFKA_PRODUCER,
1226                 rk_conf, errbuf.ptr, errbuf.sizeof)))
1227         {
1228             throw new Exception(errbuf.ptr.fromStringz.idup);
1229         }
1230     }
1231 
1232     /++
1233     Returns: new topic for this producer
1234     Params:
1235         topic = topic name
1236         topicConf = TopicConf, if null `defaultTopicConf` should be setted to the global configuration.
1237     +/
1238     Topic newTopic(const(char)[] topic, TopicConf topicConf = null)
1239     {
1240         if(!topicConf)
1241             topicConf = _conf.defaultTopicConf;
1242         assert(topicConf);
1243         return new Topic(this, topic, topicConf);
1244     }
1245 
1246 static if (!have_vibed)
1247 mixin("nothrow @nogc:");
1248 
1249      ~this()
1250     {
1251         if (rk_)
1252         {
1253             mixin(IO!q{
1254             rd_kafka_destroy(rk_);
1255             });
1256         }
1257     }
1258 
1259     /**
1260    * Producer::produce() \p msgflags
1261    *
1262    * These flags are optional and mutually exclusive.
1263    */
1264    enum MsgOpt
1265    {
1266         free = 0x1, /**< rdkafka will free(3) \p payload
1267                                             * when it is done with it. */
1268         copy = 0x2, /**< the \p payload data will be copied
1269                                            * and the \p payload pointer will not
1270                                            * be used by rdkafka after the
1271                                            * call returns. */
1272         block = 0x4, /**< Block produce*() on message queue
1273               *   full.
1274               *   WARNING:
1275               *   If a delivery report callback
1276               *   is used the application MUST
1277               *   call rd_kafka_poll() (or equiv.)
1278               *   to make sure delivered messages
1279               *   are drained from the internal
1280               *   delivery report queue.
1281               *   Failure to do so will result
1282               *   in indefinately blocking on
1283               *   the produce() call when the
1284               *   message queue is full.
1285               */
1286    }
1287 
1288     nothrow @nogc private static void dr_msg_cb_trampoline(rd_kafka_t* rk,
1289         const rd_kafka_message_t* rkmessage, void* opaque)
1290     {
1291         auto handle = cast(Handle) opaque;
1292         auto message = Message(null, rkmessage);
1293         handle.dr_cb_(message);
1294         message.destroy;
1295     }
1296 
1297     /**
1298    * Produce and send a single message to broker.
1299    *
1300    * This is an asynch non-blocking API.
1301    *
1302    * \p partition is the target partition, either:
1303    *   - Topic::PARTITION_UA (unassigned) for
1304    *     automatic partitioning using the topic's partitioner function, or
1305    *   - a fixed partition (0..N)
1306    *
1307    * \p msgflags is zero or more of the following flags OR:ed together:
1308    *    block - block \p produce*() call if
1309    *                   \p queue.buffering.max.messages or
1310    *                   \p queue.buffering.max.kbytes are exceeded.
1311    *                   Messages are considered in-queue from the point they
1312    *                   are accepted by produce() until their corresponding
1313    *                   delivery report callback/event returns.
1314    *                   It is thus a requirement to call 
1315    *                   poll() (or equiv.) from a separate
1316    *                   thread when block is used.
1317    *                   See WARNING on \c block above.
1318    *    free - rdkafka will free(3) \p payload when it is done with it.
1319    *    copy - the \p payload data will be copied and the \p payload
1320    *               pointer will not be used by rdkafka after the
1321    *               call returns.
1322    *
1323    *  NOTE: free and copy are mutually exclusive.
1324    *
1325    *  If the function returns -1 and free was specified, then
1326    *  the memory associated with the payload is still the caller's
1327    *  responsibility.
1328    *
1329    * \p payload is the message payload of size \p len bytes.
1330    *
1331    * \p key is an optional message key, if non-null it
1332    * will be passed to the topic partitioner as well as be sent with the
1333    * message to the broker and passed on to the consumer.
1334    *
1335    * \p msg_opaque is an optional application-provided per-message opaque
1336    * pointer that will provided in the delivery report callback (\p dr_cb) for
1337    * referencing this message.
1338    *
1339    * an ErrorCode to indicate success or failure:
1340    *  - _QUEUE_FULL - maximum number of outstanding messages has been
1341    *                      reached: \c queue.buffering.max.message
1342    *
1343    *  - MSG_SIZE_TOO_LARGE - message is larger than configured max size:
1344    *                            \c messages.max.bytes
1345    *
1346    *  - _UNKNOWN_PARTITION - requested \p partition is unknown in the
1347    *                           Kafka cluster.
1348    *
1349    *  - _UNKNOWN_TOPIC     - topic is unknown in the Kafka cluster.
1350    */
1351     ErrorCode produce(Topic topic, int partition, void[] payload,
1352         const(void)[] key = null,
1353         long timestamp = Clock.currTime(UTC()).toUnixTime!long,
1354         int msgflags = MsgOpt.copy,
1355         void* msg_opaque = null)
1356     {
1357         int err;
1358         mixin(IO!q{
1359         //with(rd_kafka_vtype_t)
1360         //err = rd_kafka_producev(
1361         //        rk_,
1362         //        RD_KAFKA_VTYPE_RKT,
1363         //        topic.rkt_,
1364         //        RD_KAFKA_VTYPE_PARTITION,
1365         //        partition,
1366         //        RD_KAFKA_VTYPE_MSGFLAGS,
1367         //        msgflags,
1368         //        RD_KAFKA_VTYPE_VALUE,
1369         //        payload.ptr,
1370         //        payload.length,
1371         //        RD_KAFKA_VTYPE_KEY,
1372         //        key.ptr,
1373         //        key.length,
1374         //        //RD_KAFKA_VTYPE_OPAQUE,
1375         //        //msg_opaque,
1376         //        RD_KAFKA_VTYPE_END,
1377         //        );
1378         err = rd_kafka_produce(topic.rkt_, partition, msgflags, payload.ptr,
1379                 payload.length, key.ptr, key.length, msg_opaque);
1380         });
1381         if (err == -1)
1382             return cast(ErrorCode) rd_kafka_errno2err(errno);
1383         return ErrorCode.no_error;
1384     }
1385 
1386     /**
1387    * Wait until all outstanding produce requests, et.al, are completed.
1388    *        This should typically be done prior to destroying a producer instance
1389    *        to make sure all queued and in-flight produce requests are completed
1390    *        before terminating.
1391    *
1392    * Note: This function will call poll() and thus trigger callbacks.
1393    *
1394    * timed_out if \p timeout_ms was reached before all
1395    *          outstanding requests were completed, else ErrorCode.no_error
1396    */
1397     ErrorCode flush(int timeout_ms = 60_000)
1398     {
1399         typeof(return) ret;
1400         mixin(IO!q{
1401         ret = cast(ErrorCode) rd_kafka_flush(rk_, timeout_ms);
1402         });
1403         return ret;
1404     }
1405 }