1 ///
2 module rdkafkad.handlers;
3 import rdkafkad;
4 
5 /**
6  * Base handle, super class for specific clients.
7  */
8 class Handle
9 {
10 
11     /**
12     * Returns the client's broker-assigned group member id
13     *
14     * Note: This currently requires the high-level KafkaConsumer
15     *
16     * Last assigned member id, or empty string if not currently
17     *          a group member.
18     */
19     string memberid() const nothrow
20     {
21         char* str = rd_kafka_memberid(rk_);
22         string memberid = str.fromStringz.idup;
23         if (str)
24             rd_kafka_mem_free(cast(rd_kafka_s*) rk_, str);
25         return memberid;
26     }
27 
28     /**
29     * Request Metadata from broker.
30     *
31     * Parameters:
32     *  \p all_topics  - if non-zero: request info about all topics in cluster,
33     *                   if zero: only request info about locally known topics.
34     *  \p only_rkt    - only request info about this topic
35     *  \p metadatap   - pointer to hold metadata result.
36     *                   The \p *metadatap pointer must be released with \c delete.
37     *  \p timeout_ms  - maximum response time before failing.
38     *
39     * ErrorCode.no_error on success (in which case \p *metadatap
40     * will be set), else timed_out on timeout or
41     * other error code on error.
42     */
43     Metadata metadata(bool all_topics, const Topic only_rkt, int timeout_ms = 60_000)
44     {
45         const rd_kafka_metadata_t* cmetadatap = null;
46 
47         rd_kafka_topic_t* topic = only_rkt ? cast(rd_kafka_topic_t*) only_rkt.rkt_ : null;
48 
49         if (auto error = cast(ErrorCode)rd_kafka_metadata(rk_, all_topics, topic, &cmetadatap, timeout_ms))
50         {
51             throw new Exception(error.err2str);
52         }
53         return new Metadata(cmetadatap);
54     }
55 
56 nothrow @nogc:
57 
58     package void setCommonConfig(GlobalConf conf)
59     {
60         rd_kafka_conf_set_opaque(conf.rk_conf_, cast(void*) this);
61 
62         if (conf.event_cb_)
63         {
64             rd_kafka_conf_set_log_cb(conf.rk_conf_, &log_cb_trampoline);
65             rd_kafka_conf_set_error_cb(conf.rk_conf_, &error_cb_trampoline);
66             rd_kafka_conf_set_throttle_cb(conf.rk_conf_, &throttle_cb_trampoline);
67             rd_kafka_conf_set_stats_cb(conf.rk_conf_, &stats_cb_trampoline);
68             event_cb_ = conf.event_cb_;
69         }
70 
71         if (conf.socket_cb_)
72         {
73             rd_kafka_conf_set_socket_cb(conf.rk_conf_, &socket_cb_trampoline);
74             socket_cb_ = conf.socket_cb_;
75         }
76 
77         version (Windows)
78         {
79         }
80         else if (conf.open_cb_)
81         {
82             rd_kafka_conf_set_open_cb(conf.rk_conf_, &open_cb_trampoline);
83             open_cb_ = conf.open_cb_;
84         }
85 
86         if (conf.rebalance_cb_)
87         {
88             rd_kafka_conf_set_rebalance_cb(conf.rk_conf_, &rebalance_cb_trampoline);
89             rebalance_cb_ = conf.rebalance_cb_;
90         }
91 
92         if (conf.offset_commit_cb_)
93         {
94             rd_kafka_conf_set_offset_commit_cb(conf.rk_conf_, &offset_commit_cb_trampoline);
95             offset_commit_cb_ = conf.offset_commit_cb_;
96         }
97     }
98 
99     /**
100     * Query broker for low (oldest/beginning)
101     *        and high (newest/end) offsets for partition.
102     *
103     * Offsets are returned in \p *low and \p *high respectively.
104     *
105     * ErrorCode.no_error on success or an error code on failure.
106     */
107     ErrorCode queryWatermarkOffsets(const(char)* topic, int partition,
108         ref long low, ref long high, int timeout_ms)
109     {
110         return cast(ErrorCode) rd_kafka_query_watermark_offsets(rk_, topic,
111             partition, &low, &high, timeout_ms);
112     }
113 
114     package rd_kafka_t* rk_;
115     /* All Producer and Consumer callbacks must reside in HandleImpl and
116      * the opaque provided to rdkafka must be a pointer to HandleImpl, since
117      * ProducerImpl and ConsumerImpl classes cannot be safely directly cast to
118      * HandleImpl due to the skewed diamond inheritance. */
119     package EventCb event_cb_;
120     package SocketCb socket_cb_;
121     package OpenCb open_cb_;
122     package DeliveryReportCb dr_cb_;
123     package PartitionerCb partitioner_cb_;
124     package PartitionerKeyPointerCb partitioner_kp_cb_;
125     package RebalanceCb rebalance_cb_;
126     package OffsetCommitCb offset_commit_cb_;
127 
128     /** the name of the handle */
129     const(char)[] name() const
130     {
131         return rd_kafka_name(rk_).fromStringz;
132     }
133 
134     /**
135     * Polls the provided kafka handle for events.
136     *
137     * Events will trigger application provided callbacks to be called.
138     *
139     * The \p timeout_ms argument specifies the maximum amount of time
140     * (in milliseconds) that the call will block waiting for events.
141     * For non-blocking calls, provide 0 as \p timeout_ms.
142     * To wait indefinately for events, provide -1.
143     *
144     * Events:
145     *   - delivery report callbacks (if an DeliveryCb is configured) [producer]
146     *   - event callbacks (if an EventCb is configured) [producer & consumer]
147     *
148     * Note:  An application should make sure to call poll() at regular
149     *          intervals to serve any queued callbacks waiting to be called.
150     *
151     * Warning: This method MUST NOT be used with the KafkaConsumer,
152     *          use its consume() instead.
153     *
154     * the number of events served.
155     */
156     int poll(int timeout_ms = 10)
157     {
158         return rd_kafka_poll(rk_, timeout_ms);
159     }
160 
161     /**
162     *  Returns the current out queue length
163     *
164     * The out queue contains messages and requests waiting to be sent to,
165     * or acknowledged by, the broker.
166     */
167     int outqLen()
168     {
169         return rd_kafka_outq_len(rk_);
170     }
171 
172     /**
173      * Convert a list of C partitions to C++ partitions
174      */
175     private static TopicPartition[] c_parts_to_partitions(
176         const rd_kafka_topic_partition_list_t* c_parts)
177     {
178         auto partitions = (cast(TopicPartition*) malloc(c_parts.cnt * TopicPartition.sizeof))[0
179             .. c_parts.cnt];
180         foreach (i, ref p; partitions)
181             partitions[i] = TopicPartition(&c_parts.elems[i]);
182         return partitions;
183     }
184 
185     static void free_partition_vector(ref TopicPartition[] v)
186     {
187         foreach (ref p; v)
188             p.destroy;
189         free(v.ptr);
190         v = null;
191     }
192 
193     private static rd_kafka_topic_partition_list_t* partitions_to_c_parts(const TopicPartition[] partitions)
194     {
195         rd_kafka_topic_partition_list_t* c_parts = rd_kafka_topic_partition_list_new(cast(int) partitions.length);
196 
197         foreach (ref tpi; partitions)
198         {
199             rd_kafka_topic_partition_t* rktpar = rd_kafka_topic_partition_list_add(c_parts, tpi.topic_, tpi.partition_);
200             rktpar.offset = tpi.offset_;
201         }
202 
203         return c_parts;
204     }
205 
206     /**
207      * @brief Update the application provided 'partitions' with info from 'c_parts'
208      */
209     private static void update_partitions_from_c_parts(TopicPartition[] partitions,
210         const rd_kafka_topic_partition_list_t* c_parts)
211     {
212         foreach (i; 0 .. c_parts.cnt)
213         {
214             const rd_kafka_topic_partition_t* p = &c_parts.elems[i];
215 
216             /* Find corresponding C++ entry */
217             foreach (pp; partitions)
218             {
219                 if (!strcmp(p.topic, pp.topic_) && p.partition == pp.partition_)
220                 {
221                     pp.offset_ = p.offset;
222                     pp.err_ = cast(ErrorCode) p.err;
223                 }
224             }
225         }
226     }
227 
228     private static void log_cb_trampoline(const rd_kafka_t* rk, int level,
229         const(char)* fac, const(char)* buf)
230     {
231         if (!rk)
232         {
233             rd_kafka_log_print(rk, level, fac, buf);
234             return;
235         }
236 
237         void* opaque = rd_kafka_opaque(rk);
238         Handle handle = cast(Handle)(opaque);
239 
240         if (!handle.event_cb_)
241         {
242             rd_kafka_log_print(rk, level, fac, buf);
243             return;
244         }
245 
246         auto event = Event(Event.Type.log, ErrorCode.no_error,
247             cast(Event.Severity)(level), fac, buf);
248 
249         handle.event_cb_(event);
250     }
251 
252     private static void error_cb_trampoline(rd_kafka_t* rk, int err,
253         const(char)* reason, void* opaque)
254     {
255         Handle handle = cast(Handle)(opaque);
256 
257         auto event = Event(Event.Type.error, cast(ErrorCode) err,
258             Event.Severity.error, null, reason);
259 
260         handle.event_cb_(event);
261     }
262 
263     private static void throttle_cb_trampoline(rd_kafka_t* rk,
264         const(char)* broker_name, int broker_id, int throttle_time_ms, void* opaque)
265     {
266         Handle handle = cast(Handle)(opaque);
267 
268         auto event = Event(Event.Type.throttle);
269         event.str_ = broker_name.fromStringz;
270         event.id_ = broker_id;
271         event.throttle_time_ = throttle_time_ms;
272 
273         handle.event_cb_(event);
274     }
275 
276     private static int stats_cb_trampoline(rd_kafka_t* rk, char* json,
277         size_t json_len, void* opaque)
278     {
279         Handle handle = cast(Handle)(opaque);
280         auto event = Event(Event.Type.stats, ErrorCode.no_error, Event.Severity.info,
281             null, json);
282 
283         handle.event_cb_(event);
284 
285         return 0;
286     }
287 
288     private static int socket_cb_trampoline(int domain, int type, int protocol, void* opaque)
289     {
290         Handle handle = cast(Handle)(opaque);
291 
292         return handle.socket_cb_(domain, type, protocol);
293     }
294 
295     private static int open_cb_trampoline(const(char)* pathname, int flags,
296         mode_t mode, void* opaque)
297     {
298         Handle handle = cast(Handle)(opaque);
299 
300         return handle.open_cb_(pathname.fromStringz, flags, cast(int)(mode));
301     }
302 
303     private static void rebalance_cb_trampoline(rd_kafka_t* rk,
304         rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t* c_partitions, void* opaque)
305     {
306         auto handle = cast(KafkaConsumer)(opaque);
307         TopicPartition[] partitions = c_parts_to_partitions(c_partitions);
308 
309         handle.rebalance_cb_(handle, cast(ErrorCode) err, partitions);
310 
311         free_partition_vector(partitions);
312     }
313 
314     private static void offset_commit_cb_trampoline(rd_kafka_t* rk,
315         rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t* c_offsets, void* opaque)
316     {
317         Handle handle = cast(Handle)(opaque);
318         TopicPartition[] offsets;
319 
320         if (c_offsets)
321             offsets = c_parts_to_partitions(c_offsets);
322 
323         handle.offset_commit_cb_(cast(ErrorCode) err, offsets);
324 
325         free_partition_vector(offsets);
326     }
327 
328     /**
329     * Pause producing or consumption for the provided list of partitions.
330     *
331     * Success or error is returned per-partition in the \p partitions list.
332     *
333     * ErrorCode::no_error
334     *
335     * See_also: resume()
336     */
337     ErrorCode pause(TopicPartition[] partitions)
338     {
339         rd_kafka_topic_partition_list_t* c_parts;
340         rd_kafka_resp_err_t err;
341 
342         c_parts = partitions_to_c_parts(partitions);
343 
344         err = rd_kafka_pause_partitions(rk_, c_parts);
345 
346         if (!err)
347             update_partitions_from_c_parts(partitions, c_parts);
348 
349         rd_kafka_topic_partition_list_destroy(c_parts);
350 
351         return cast(ErrorCode) err;
352 
353     }
354 
355     /**
356     * Resume producing or consumption for the provided list of partitions.
357     *
358     * Success or error is returned per-partition in the \p partitions list.
359     *
360     * ErrorCode::no_error
361     *
362     * See_also: pause()
363     */
364     ErrorCode resume(TopicPartition[] partitions)
365     {
366         rd_kafka_topic_partition_list_t* c_parts;
367         rd_kafka_resp_err_t err;
368 
369         c_parts = partitions_to_c_parts(partitions);
370 
371         err = rd_kafka_resume_partitions(rk_, c_parts);
372 
373         if (!err)
374             update_partitions_from_c_parts(partitions, c_parts);
375 
376         rd_kafka_topic_partition_list_destroy(c_parts);
377 
378         return cast(ErrorCode) err;
379     }
380 
381     /**
382    * Get last known low (oldest/beginning)
383    *        and high (newest/end) offsets for partition.
384    *
385    * The low offset is updated periodically (if statistics.interval.ms is set)
386    * while the high offset is updated on each fetched message set from the
387    * broker.
388    *
389    * If there is no cached offset (either low or high, or both) then
390    * OFFSET_INVALID will be returned for the respective offset.
391    *
392    * Offsets are returned in \p *low and \p *high respectively.
393    *
394    * ErrorCode.no_error on success or an error code on failure.
395    *
396    * Note: Shall only be used with an active consumer instance.
397    */
398     ErrorCode getWatermarkOffsets(const(char)* topic, int partition, ref long low,
399         ref long high)
400     {
401         return cast(ErrorCode) rd_kafka_get_watermark_offsets(rk_, topic,
402             partition, &low, &high);
403     }
404 }
405 
406 /**
407  * Queue interface
408  *
409  * Create a new message queue.  Message queues allows the application
410  * to re-route consumed messages from multiple topic+partitions into
411  * one single queue point.  This queue point, containing messages from
412  * a number of topic+partitions, may then be served by a single
413  * consume() method, rather than one per topic+partition combination.
414  *
415  * See the Consumer::start(), Consumer::consume(), and
416  * Consumer::consumeCallback() methods that take a queue as the first
417  * parameter for more information.
418  */
419 class Queue
420 {
421 nothrow @nogc:
422     /**
423    * Create Queue object
424    */
425     this(Handle handle)
426     {
427         queue_ = rd_kafka_queue_new(handle.rk_);
428     }
429 
430     ~this()
431     {
432         rd_kafka_queue_destroy(queue_);
433     }
434 
435 private:
436     rd_kafka_queue_t* queue_;
437 }
438 
439 /**
440  * High-level KafkaConsumer (for brokers 0.9 and later)
441  *
442  * Note: Requires Apache Kafka >= 0.9.0 brokers
443  *
444  * Currently supports the \c range and \c roundrobin partition assignment
445  * strategies (see \c partition.assignment.strategy)
446  */
447 class KafkaConsumer : Handle
448 {
449     /**
450    * Creates a KafkaConsumer.
451    *
452    * The \p conf object must have \c group.id set to the consumer group to join.
453    *
454    * Use close() to shut down the consumer.
455    *
456    * See_also: RebalanceCb
457    * See_also: CONFIGURATION.md for \c group.id, \c session.timeout.ms,
458    *     \c partition.assignment.strategy, etc.
459    */
460     this(GlobalConf conf)
461     {
462         char[512] errbuf = void;
463         rd_kafka_conf_t* rk_conf = null;
464         size_t grlen;
465 
466         if (!conf.rk_conf_)
467         {
468             throw new Exception("Requires Conf::CONF_GLOBAL object");
469         }
470 
471         if (rd_kafka_conf_get(conf.rk_conf_, "group.id", null,
472                 &grlen) != rd_kafka_conf_res_t.RD_KAFKA_CONF_OK || grlen <= 1 /* terminating null only */ )
473         {
474             throw new Exception("\"group.id\" must be configured");
475         }
476 
477         this.setCommonConfig(conf);
478 
479         rk_conf = rd_kafka_conf_dup(conf.rk_conf_);
480 
481         rd_kafka_t* rk;
482         if (null is(rk = rd_kafka_new(rd_kafka_type_t.RD_KAFKA_CONSUMER, rk_conf,
483                 errbuf.ptr, errbuf.sizeof)))
484         {
485             throw new Exception(errbuf.ptr.fromStringz.idup);
486         }
487 
488         this.rk_ = rk;
489 
490         /* Redirect handle queue to cgrp's queue to provide a single queue point */
491         rd_kafka_poll_set_consumer(rk);
492     }
493 
494     /** Returns the current partition assignment as set by
495      *  assign() */
496     ErrorCode assignment(ref TopicPartition[] partitions) nothrow
497     {
498         rd_kafka_topic_partition_list_t* c_parts;
499         rd_kafka_resp_err_t err;
500 
501         if (0 != (err = rd_kafka_assignment(rk_, &c_parts)))
502             return cast(ErrorCode) err;
503 
504         partitions.length = c_parts.cnt;
505 
506         foreach (i, ref p; partitions)
507             p = TopicPartition(&c_parts.elems[i]);
508 
509         rd_kafka_topic_partition_list_destroy(c_parts);
510 
511         return ErrorCode.no_error;
512 
513     }
514 
515 nothrow @nogc:
516 
517   /**
518    * Update the subscription set to \p topics.
519    *
520    * Any previous subscription will be unassigned and  unsubscribed first.
521    *
522    * The subscription set denotes the desired topics to consume and this
523    * set is provided to the partition assignor (one of the elected group
524    * members) for all clients which then uses the configured
525    * \c partition.assignment.strategy to assign the subscription sets's
526    * topics's partitions to the consumers, depending on their subscription.
527    *
528    * The result of such an assignment is a rebalancing which is either
529    * handled automatically in librdkafka or can be overriden by the application
530    * by providing a RebalanceCb.
531    *
532    * The rebalancing passes the assigned partition set to
533    * assign() to update what partitions are actually
534    * being fetched by the KafkaConsumer.
535    *
536    * Regex pattern matching automatically performed for topics prefixed
537    * with \c \"^\" (e.g. \c \"^myPfx[0-9]_.*\"
538    */
539     ErrorCode subscribe(const(char)*[] topics...)
540     {
541         rd_kafka_topic_partition_list_t* c_topics;
542         rd_kafka_resp_err_t err;
543 
544         c_topics = rd_kafka_topic_partition_list_new(cast(int) topics.length);
545 
546         foreach (t; topics)
547             rd_kafka_topic_partition_list_add(c_topics, t, RD_KAFKA_PARTITION_UA);
548 
549         err = rd_kafka_subscribe(rk_, c_topics);
550 
551         rd_kafka_topic_partition_list_destroy(c_topics);
552 
553         return cast(ErrorCode) err;
554     }
555     /** Unsubscribe from the current subscription set. */
556     nothrow @nogc ErrorCode unsubscribe()
557     {
558         return cast(ErrorCode)(rd_kafka_unsubscribe(this.rk_));
559     }
560 
561     /**
562    * Consume message or get error event, triggers callbacks.
563    *
564    * Will automatically call registered callbacks for any such queued events,
565    * including RebalanceCb, EventCb, OffsetCommitCb,
566    * etc.
567    *
568    * Note:  An application should make sure to call consume() at regular
569    *          intervals, even if no messages are expected, to serve any
570    *          queued callbacks waiting to be called. This is especially
571    *          important when a RebalanceCb has been registered as it needs
572    *          to be called and handled properly to synchronize internal
573    *          consumer state.
574    *
575    * Note: Application MUST NOT call \p poll() on KafkaConsumer objects.
576    *
577    * One of:
578    *  - proper message (Message::err() is ErrorCode.no_error)
579    *  - error event (Message::err() is != ErrorCode.no_error)
580    *  - timeout due to no message or event in \p timeout_ms
581    *    (Message::err() is timed_out)
582    */
583    /++
584    Params:
585         msg = message to fill. Use `msg.err` to check errors.
586         timeout_ms = time to to wait if no incomming msgs in queue.
587    +/
588     nothrow @nogc void consume(ref Message msg, int timeout_ms = 10)
589     {
590         rd_kafka_message_t* rkmessage;
591 
592         rkmessage = rd_kafka_consumer_poll(this.rk_, timeout_ms);
593 
594         if (!rkmessage)
595         {
596             msg = Message(null, ErrorCode.timed_out);
597             return;
598         }
599 
600         msg = Message(rkmessage);
601     }
602 
603     /**
604    *  Update the assignment set to \p partitions.
605    *
606    * The assignment set is the set of partitions actually being consumed
607    * by the KafkaConsumer.
608    */
609     ErrorCode assign(const TopicPartition[] partitions)
610     {
611         rd_kafka_topic_partition_list_t* c_parts;
612         rd_kafka_resp_err_t err;
613 
614         c_parts = partitions_to_c_parts(partitions);
615 
616         err = rd_kafka_assign(rk_, c_parts);
617 
618         rd_kafka_topic_partition_list_destroy(c_parts);
619         return cast(ErrorCode) err;
620     }
621 
622     /**
623    * Stop consumption and remove the current assignment.
624    */
625     ErrorCode unassign()
626     {
627         return cast(ErrorCode) rd_kafka_assign(rk_, null);
628     }
629 
630     /**
631    * Commit offsets for the current assignment.
632    *
633    * Note: This is the synchronous variant that blocks until offsets
634    *         are committed or the commit fails (see return value).
635    *
636    * Note: If a OffsetCommitCb callback is registered it will
637    *         be called with commit details on a future call to
638    *         consume()
639    *
640    * ErrorCode.no_error or error code.
641    */
642     ErrorCode commitSync()
643     {
644         return cast(ErrorCode) rd_kafka_commit(rk_, null, 0 /*sync*/ );
645 
646     }
647 
648     /**
649    * Asynchronous version of CommitSync()
650    *
651    * See_also: KafkaConsummer::commitSync()
652    */
653     ErrorCode commitAsync()
654     {
655         return cast(ErrorCode) rd_kafka_commit(rk_, null, 1 /*async*/ );
656     }
657 
658     /**
659    * Commit offset for a single topic+partition based on \p message
660    *
661    * Note: This is the synchronous variant.
662    *
663    * See_also: KafkaConsummer::commitSync()
664    */
665     ErrorCode commitSync(ref Message message)
666     {
667         return cast(ErrorCode) rd_kafka_commit_message(rk_, message.rkmessage_, 0 /*sync*/ );
668     }
669 
670     /**
671    * Commit offset for a single topic+partition based on \p message
672    *
673    * Note: This is the asynchronous variant.
674    *
675    * See_also: KafkaConsummer::commitSync()
676    */
677     ErrorCode commitAsync(ref Message message)
678     {
679         return cast(ErrorCode) rd_kafka_commit_message(rk_, message.rkmessage_, 1 /*async*/ );
680     }
681 
682     /**
683    * Commit offsets for the provided list of partitions.
684    *
685    * Note: This is the synchronous variant.
686    */
687     ErrorCode commitSync(TopicPartition[] offsets)
688     {
689         rd_kafka_topic_partition_list_t* c_parts = partitions_to_c_parts(offsets);
690         rd_kafka_resp_err_t err = rd_kafka_commit(rk_, c_parts, 0);
691         if (!err)
692             update_partitions_from_c_parts(offsets, c_parts);
693         rd_kafka_topic_partition_list_destroy(c_parts);
694         return cast(ErrorCode) err;
695     }
696 
697     /**
698    * Commit offset for the provided list of partitions.
699    *
700    * Note: This is the asynchronous variant.
701    */
702     ErrorCode commitAsync(const TopicPartition[] offsets)
703     {
704         rd_kafka_topic_partition_list_t* c_parts = partitions_to_c_parts(offsets);
705         rd_kafka_resp_err_t err = rd_kafka_commit(rk_, c_parts, 1);
706         rd_kafka_topic_partition_list_destroy(c_parts);
707         return cast(ErrorCode) err;
708     }
709 
710     /**
711    * Retrieve committed offsets for topics+partitions.
712    *
713    * RD_KAFKA_RESP_ErrorCode.no_error on success in which case the
714    *          \p offset or \p err field of each \p partitions' element is filled
715    *          in with the stored offset, or a partition specific error.
716    *          Else returns an error code.
717    */
718     ErrorCode committed(TopicPartition[] partitions, int timeout_ms)
719     {
720         rd_kafka_topic_partition_list_t* c_parts;
721         rd_kafka_resp_err_t err;
722 
723         c_parts = partitions_to_c_parts(partitions);
724 
725         err = rd_kafka_committed(rk_, c_parts, timeout_ms);
726 
727         if (!err)
728         {
729             update_partitions_from_c_parts(partitions, c_parts);
730         }
731 
732         rd_kafka_topic_partition_list_destroy(c_parts);
733 
734         return cast(ErrorCode) err;
735     }
736 
737     /**
738    * Retrieve current positions (offsets) for topics+partitions.
739    *
740    * RD_KAFKA_RESP_ErrorCode.no_error on success in which case the
741    *          \p offset or \p err field of each \p partitions' element is filled
742    *          in with the stored offset, or a partition specific error.
743    *          Else returns an error code.
744    */
745     ErrorCode position(TopicPartition[] partitions)
746     {
747         rd_kafka_topic_partition_list_t* c_parts;
748         rd_kafka_resp_err_t err;
749 
750         c_parts = partitions_to_c_parts(partitions);
751 
752         err = rd_kafka_position(rk_, c_parts);
753 
754         if (!err)
755         {
756             update_partitions_from_c_parts(partitions, c_parts);
757         }
758 
759         rd_kafka_topic_partition_list_destroy(c_parts);
760 
761         return cast(ErrorCode) err;
762 
763     }
764 
765     /**
766    * Close and shut down the proper.
767    *
768    * This call will block until the following operations are finished:
769    *  - Trigger a local rebalance to void the current assignment
770    *  - Stop consumption for current assignment
771    *  - Commit offsets
772    *  - Leave group
773    *
774    * The maximum blocking time is roughly limited to session.timeout.ms.
775    *
776    * Note: Callbacks, such as RebalanceCb and
777    *         OffsetCommitCb, etc, may be called.
778    *
779    * Note: The consumer object must later be freed with \c delete
780    */
781     ErrorCode close()
782     {
783         rd_kafka_resp_err_t err;
784         err = rd_kafka_consumer_close(rk_);
785         if (err)
786             return cast(ErrorCode) err;
787 
788         while (rd_kafka_outq_len(rk_) > 0)
789             rd_kafka_poll(rk_, 10);
790         rd_kafka_destroy(rk_);
791 
792         return cast(ErrorCode) err;
793     }
794 }
795 
796 /**
797  * Simple Consumer (legacy)
798  *
799  * A simple non-balanced, non-group-aware, consumer.
800  */
801 class Consumer : Handle
802 {
803     /**
804    * Creates a new Kafka consumer handle.
805    *
806    * \p conf is an optional object that will be used instead of the default
807    * configuration.
808    * The \p conf object is reusable after this call.
809    *
810    * the new handle on success or null on error in which case
811    * \p errstr is set to a human readable error message.
812    */
813     this(GlobalConf conf)
814     {
815         char[512] errbuf = void;
816         rd_kafka_conf_t* rk_conf = null;
817 
818         if (conf)
819         {
820             if (!conf.rk_conf_)
821             {
822                 throw new Exception("Requires Conf::CONF_GLOBAL object");
823             }
824 
825             this.setCommonConfig(conf);
826 
827             rk_conf = rd_kafka_conf_dup(conf.rk_conf_);
828         }
829 
830         if (null is(rk_ = rd_kafka_new(rd_kafka_type_t.RD_KAFKA_CONSUMER,
831                 rk_conf, errbuf.ptr, errbuf.sizeof)))
832         {
833             throw new Exception(errbuf.ptr.fromStringz.idup);
834         }
835     }
836 
837 nothrow @nogc:
838 
839      ~this()
840     {
841         rd_kafka_destroy(rk_);
842     }
843 
844 static:
845 
846     /**
847    * Start consuming messages for topic and \p partition
848    * at offset \p offset which may either be a proper offset (0..N)
849    * or one of the the special offsets: \p OFFSET_BEGINNING or \p OFFSET_END.
850    *
851    * rdkafka will attempt to keep \p queued.min.messages (config property)
852    * messages in the local queue by repeatedly fetching batches of messages
853    * from the broker until the threshold is reached.
854    *
855    * The application shall use one of the \p ...consume*() functions
856    * to consume messages from the local queue, each kafka message being
857    * represented as a `Message ` object.
858    *
859    * \p ...start() must not be called multiple times for the same
860    * topic and partition without stopping consumption first with
861    * \p ...stop().
862    *
863    * an ErrorCode to indicate success or failure.
864    */
865     ErrorCode start(Topic topic, int partition, long offset, Queue queue)
866     {
867         if (rd_kafka_consume_start(topic.rkt_, partition, offset) == -1)
868             return cast(ErrorCode)(rd_kafka_errno2err(errno));
869         return ErrorCode.no_error;
870     }
871 
872     /**
873    * Stop consuming messages for topic and \p partition, purging
874    *        all messages currently in the local queue.
875    *
876    * The application needs to be stop all consumers before destroying
877    * the Consumer handle.
878    *
879    * an ErrorCode to indicate success or failure.
880    */
881     ErrorCode stop(Topic topic, int partition)
882     {
883         if (rd_kafka_consume_stop(topic.rkt_, partition) == -1)
884             return cast(ErrorCode)(rd_kafka_errno2err(errno));
885         return ErrorCode.no_error;
886     }
887 
888     /**
889    * Seek consumer for topic+partition to \p offset which is either an
890    *        absolute or logical offset.
891    *
892    * If \p timeout_ms is not 0 the call will wait this long for the
893    * seek to be performed. If the timeout is reached the internal state
894    * will be unknown and this function returns `timed_out`.
895    * If \p timeout_ms is 0 it will initiate the seek but return
896    * immediately without any error reporting (e.g., async).
897    *
898    * This call triggers a fetch queue barrier flush.
899    *
900    * an ErrorCode to indicate success or failure.
901    */
902     ErrorCode seek(Topic topic, int partition, long offset, int timeout_ms)
903     {
904         if (rd_kafka_seek(topic.rkt_, partition, offset, timeout_ms) == -1)
905             return cast(ErrorCode)(rd_kafka_errno2err(errno));
906         return ErrorCode.no_error;
907     }
908 
909     /**
910    * Consume a single message from \p topic and \p partition.
911    *
912    * \p timeout_ms is maximum amount of time to wait for a message to be
913    * received.
914    * Consumer must have been previously started with \p ...start().
915    *
916    * a Message object, the application needs to check if message
917    * is an error or a proper message Message::err() and checking for
918    * \p ErrorCode.no_error.
919    *
920    * The message object must be destroyed when the application is done with it.
921    *
922    * Errors (in Message::err()):
923    *  - timed_out - \p timeout_ms was reached with no new messages fetched.
924    *  - PARTITION_EOF - End of partition reached, not an error.
925    */
926     void consume(Topic topic, int partition, int timeout_ms, ref Message msg)
927     {
928         rd_kafka_message_t* rkmessage;
929 
930         rkmessage = rd_kafka_consume(topic.rkt_, partition, timeout_ms);
931         if (!rkmessage)
932             msg = Message(topic, cast(ErrorCode) rd_kafka_errno2err(errno));
933 
934         msg = Message(topic, rkmessage);
935     }
936 
937     /**
938    * Consume a single message from the specified queue.
939    *
940    * \p timeout_ms is maximum amount of time to wait for a message to be
941    * received.
942    * Consumer must have been previously started on the queue with
943    * \p ...start().
944    *
945    * a Message object, the application needs to check if message
946    * is an error or a proper message \p Message.err() and checking for
947    * \p ErrorCode.no_error.
948    *
949    * The message object must be destroyed when the application is done with it.
950    *
951    * Errors (in Message::err()):
952    *   - timed_out - \p timeout_ms was reached with no new messages fetched
953    *
954    * Note that Message.topic() may be nullptr after certain kinds of
955    * errors, so applications should check that it isn't null before
956    * dereferencing it.
957    */
958     void consume(Queue queue, int timeout_ms, ref Message msg)
959     {
960         rd_kafka_message_t* rkmessage;
961         rkmessage = rd_kafka_consume_queue(queue.queue_, timeout_ms);
962         if (!rkmessage)
963             msg = Message(null, cast(ErrorCode) rd_kafka_errno2err(errno));
964         /*
965        * Recover our Topic from the topic conf's opaque field, which we
966        * set in Topic::create() for just this kind of situation.
967        */
968         void* opaque = rd_kafka_topic_opaque(rkmessage.rkt);
969         Topic topic = cast(Topic)(opaque);
970 
971         msg = Message(topic, rkmessage);
972     }
973 
974     /* Helper struct for `consume_callback'.
975    * Encapsulates the values we need in order to call `rd_kafka_consume_callback'
976    * and keep track of the C++ callback function and `opaque' value.
977    */
978     private static struct ConsumerCallback
979     {
980         /* This function is the one we give to `rd_kafka_consume_callback', with
981      * the `opaque' pointer pointing to an instance of this struct, in which
982      * we can find the C++ callback and `cb_data'.
983      */
984         static nothrow @nogc void consume_cb_trampoline(rd_kafka_message_t* msg, void* opaque)
985         {
986             ConsumerCallback* instance = cast(ConsumerCallback*) opaque;
987             Message message = Message(instance.topic, msg, false  /*don't free*/ );
988             instance.cb_cls(message);
989         }
990 
991         Topic topic;
992         ConsumeCb cb_cls;
993     }
994 
995     /**
996    * Consumes messages from \p topic and \p partition, calling
997    *        the provided callback for each consumed messsage.
998    *
999    * \p consumeCallback() provides higher throughput performance
1000    * than \p consume().
1001    *
1002    * \p timeout_ms is the maximum amount of time to wait for one or
1003    * more messages to arrive.
1004    *
1005    * The provided \p consume_cb instance has its \p consume_cb function
1006    * called for every message received.
1007    *
1008    * The \p opaque argument is passed to the \p consume_cb as \p opaque.
1009    *
1010    * the number of messages processed or -1 on error.
1011    *
1012    * See_also: Consumer::consume()
1013    */
1014     int consumeCallback(Topic topic, int partition, int timeout_ms, ConsumeCb consume_cb)
1015     {
1016         auto context = ConsumerCallback(topic, consume_cb);
1017         return rd_kafka_consume_callback(topic.rkt_, partition, timeout_ms,
1018             &ConsumerCallback.consume_cb_trampoline, &context);
1019     }
1020 
1021     /* Helper struct for `consume_callback' with a Queue.
1022    * Encapsulates the values we need in order to call `rd_kafka_consume_callback'
1023    * and keep track of the C++ callback function and `opaque' value.
1024    */
1025     private static struct ConsumerQueueCallback
1026     {
1027         /* This function is the one we give to `rd_kafka_consume_callback', with
1028      * the `opaque' pointer pointing to an instance of this struct, in which
1029      * we can find the C++ callback and `cb_data'.
1030      */
1031         static nothrow @nogc void consume_cb_trampoline(rd_kafka_message_t* msg, void* opaque)
1032         {
1033             ConsumerQueueCallback* instance = cast(ConsumerQueueCallback*) opaque;
1034             /*
1035        * Recover our Topic from the topic conf's opaque field, which we
1036        * set in Topic::create() for just this kind of situation.
1037        */
1038             void* topic_opaque = rd_kafka_topic_opaque(msg.rkt);
1039             Topic topic = cast(Topic) topic_opaque;
1040             Message message = Message(topic, msg, false  /*don't free*/ );
1041             instance.cb_cls(message);
1042         }
1043 
1044         ConsumeCb cb_cls;
1045     }
1046 
1047     /**
1048    * Consumes messages from \p queue, calling the provided callback for
1049    *        each consumed messsage.
1050    *
1051    * See_also: Consumer::consumeCallback()
1052    */
1053 
1054     int consumeCallback(Queue queue, int timeout_ms, ConsumeCb consume_cb)
1055     {
1056         auto context = ConsumerQueueCallback(consume_cb);
1057         return rd_kafka_consume_callback_queue(queue.queue_, timeout_ms,
1058             &ConsumerQueueCallback.consume_cb_trampoline, &context);
1059     }
1060 
1061     /**
1062    * Converts an offset into the logical offset from the tail of a topic.
1063    *
1064    * \p offset is the (positive) number of items from the end.
1065    *
1066    * the logical offset for message \p offset from the tail, this value
1067    *          may be passed to Consumer::start, et.al.
1068    * Note: The returned logical offset is specific to librdkafka.
1069    */
1070     long offsetTail(long offset)
1071     {
1072         return RD_KAFKA_OFFSET_TAIL(offset);
1073     }
1074 }
1075 
1076 /**
1077  * Producer
1078  */
1079 class Producer : Handle
1080 {
1081     package GlobalConf _conf;
1082     /**
1083    * Creates a new Kafka producer handle.
1084    *
1085    * \p conf is an optional object that will be used instead of the default
1086    * configuration.
1087    * The \p conf object is reusable after this call.
1088    */
1089     this(GlobalConf conf)
1090     {
1091         _conf = conf;
1092         char[512] errbuf = void;
1093         rd_kafka_conf_t* rk_conf = null;
1094 
1095         if (conf)
1096         {
1097             if (!conf.rk_conf_)
1098             {
1099                 throw new Exception("Requires Conf::CONF_GLOBAL object");
1100             }
1101 
1102             this.setCommonConfig(conf);
1103 
1104             rk_conf = rd_kafka_conf_dup(conf.rk_conf_);
1105 
1106             if (conf.dr_cb_)
1107             {
1108                 rd_kafka_conf_set_dr_msg_cb(rk_conf, &dr_msg_cb_trampoline);
1109                 this.dr_cb_ = conf.dr_cb_;
1110             }
1111         }
1112 
1113         if (null is(rk_ = rd_kafka_new(rd_kafka_type_t.RD_KAFKA_PRODUCER,
1114                 rk_conf, errbuf.ptr, errbuf.sizeof)))
1115         {
1116             throw new Exception(errbuf.ptr.fromStringz.idup);
1117         }
1118     }
1119 
1120     /++
1121     Returns: new topic for this producer
1122     Params:
1123         topic = topic name
1124         topicConf = TopicConf, if null `defaultTopicConf` should be setted to the global configuration.
1125     +/
1126     Topic newTopic(const(char)[] topic, TopicConf topicConf = null)
1127     {
1128         if(!topicConf)
1129             topicConf = _conf.defaultTopicConf;
1130         assert(topicConf);
1131         return new Topic(this, topic, topicConf);
1132     }
1133 
1134 nothrow @nogc:
1135 
1136      ~this()
1137     {
1138         if (rk_)
1139         {
1140             rd_kafka_destroy(rk_);
1141         }
1142     }
1143 
1144     /**
1145    * Producer::produce() \p msgflags
1146    *
1147    * These flags are optional and mutually exclusive.
1148    */
1149    enum MsgOpt
1150    {
1151         free = 0x1, /**< rdkafka will free(3) \p payload
1152                                             * when it is done with it. */
1153         copy = 0x2, /**< the \p payload data will be copied
1154                                            * and the \p payload pointer will not
1155                                            * be used by rdkafka after the
1156                                            * call returns. */
1157         block = 0x4, /**< Block produce*() on message queue
1158               *   full.
1159               *   WARNING:
1160               *   If a delivery report callback
1161               *   is used the application MUST
1162               *   call rd_kafka_poll() (or equiv.)
1163               *   to make sure delivered messages
1164               *   are drained from the internal
1165               *   delivery report queue.
1166               *   Failure to do so will result
1167               *   in indefinately blocking on
1168               *   the produce() call when the
1169               *   message queue is full.
1170               */
1171    }
1172 
1173     private static void dr_msg_cb_trampoline(rd_kafka_t* rk,
1174         const rd_kafka_message_t* rkmessage, void* opaque)
1175     {
1176         auto handle = cast(Handle) opaque;
1177         auto message = Message(null, rkmessage);
1178         handle.dr_cb_(message);
1179         message.destroy;
1180     }
1181 
1182     /**
1183    * Produce and send a single message to broker.
1184    *
1185    * This is an asynch non-blocking API.
1186    *
1187    * \p partition is the target partition, either:
1188    *   - Topic::PARTITION_UA (unassigned) for
1189    *     automatic partitioning using the topic's partitioner function, or
1190    *   - a fixed partition (0..N)
1191    *
1192    * \p msgflags is zero or more of the following flags OR:ed together:
1193    *    block - block \p produce*() call if
1194    *                   \p queue.buffering.max.messages or
1195    *                   \p queue.buffering.max.kbytes are exceeded.
1196    *                   Messages are considered in-queue from the point they
1197    *                   are accepted by produce() until their corresponding
1198    *                   delivery report callback/event returns.
1199    *                   It is thus a requirement to call 
1200    *                   poll() (or equiv.) from a separate
1201    *                   thread when block is used.
1202    *                   See WARNING on \c block above.
1203    *    free - rdkafka will free(3) \p payload when it is done with it.
1204    *    copy - the \p payload data will be copied and the \p payload
1205    *               pointer will not be used by rdkafka after the
1206    *               call returns.
1207    *
1208    *  NOTE: free and copy are mutually exclusive.
1209    *
1210    *  If the function returns -1 and free was specified, then
1211    *  the memory associated with the payload is still the caller's
1212    *  responsibility.
1213    *
1214    * \p payload is the message payload of size \p len bytes.
1215    *
1216    * \p key is an optional message key, if non-null it
1217    * will be passed to the topic partitioner as well as be sent with the
1218    * message to the broker and passed on to the consumer.
1219    *
1220    * \p msg_opaque is an optional application-provided per-message opaque
1221    * pointer that will provided in the delivery report callback (\p dr_cb) for
1222    * referencing this message.
1223    *
1224    * an ErrorCode to indicate success or failure:
1225    *  - _QUEUE_FULL - maximum number of outstanding messages has been
1226    *                      reached: \c queue.buffering.max.message
1227    *
1228    *  - MSG_SIZE_TOO_LARGE - message is larger than configured max size:
1229    *                            \c messages.max.bytes
1230    *
1231    *  - _UNKNOWN_PARTITION - requested \p partition is unknown in the
1232    *                           Kafka cluster.
1233    *
1234    *  - _UNKNOWN_TOPIC     - topic is unknown in the Kafka cluster.
1235    */
1236     ErrorCode produce(Topic topic, int partition, void[] payload,
1237         const(void)[] key = null, int msgflags = MsgOpt.copy, void* msg_opaque = null)
1238     {
1239         if (rd_kafka_produce(topic.rkt_, partition, msgflags, payload.ptr,
1240                 payload.length, key.ptr, key.length, msg_opaque) == -1)
1241             return cast(ErrorCode) rd_kafka_errno2err(errno);
1242         return ErrorCode.no_error;
1243     }
1244 
1245     /**
1246    * Wait until all outstanding produce requests, et.al, are completed.
1247    *        This should typically be done prior to destroying a producer instance
1248    *        to make sure all queued and in-flight produce requests are completed
1249    *        before terminating.
1250    *
1251    * Note: This function will call poll() and thus trigger callbacks.
1252    *
1253    * timed_out if \p timeout_ms was reached before all
1254    *          outstanding requests were completed, else ErrorCode.no_error
1255    */
1256     ErrorCode flush(int timeout_ms = 60_000)
1257     {
1258         return cast(ErrorCode) rd_kafka_flush(rk_, timeout_ms);
1259     }
1260 }