1 ///
2 module rdkafkad.handler.kafka_consumer;
3 import rdkafkad;
4 
5 /**
6  * High-level KafkaConsumer (for brokers 0.9 and later)
7  *
8  * Note: Requires Apache Kafka >= 0.9.0 brokers
9  *
10  * Currently supports the \c range and \c roundrobin partition assignment
11  * strategies (see \c partition.assignment.strategy)
12  */
13 class KafkaConsumer : Handle
14 {
15     /**
16    * Creates a KafkaConsumer.
17    *
18    * The \p conf object must have \c group.id set to the consumer group to join.
19    *
20    * Use close() to shut down the consumer.
21    *
22    * See_also: RebalanceCb
23    * See_also: CONFIGURATION.md for \c group.id, \c session.timeout.ms,
24    *     \c partition.assignment.strategy, etc.
25    */
26     this(GlobalConf conf)
27     {
28         char[512] errbuf = void;
29         rd_kafka_conf_t* rk_conf = null;
30         size_t grlen;
31 
32         if (!conf.rk_conf_)
33         {
34             throw new Exception("Requires Conf::CONF_GLOBAL object");
35         }
36 
37         if (rd_kafka_conf_get(conf.rk_conf_, "group.id", null,
38                 &grlen) != rd_kafka_conf_res_t.RD_KAFKA_CONF_OK || grlen <= 1 /* terminating null only */ )
39         {
40             throw new Exception("\"group.id\" must be configured");
41         }
42 
43         this.setCommonConfig(conf);
44 
45         rk_conf = rd_kafka_conf_dup(conf.rk_conf_);
46 
47         rd_kafka_t* rk;
48         if (null is(rk = rd_kafka_new(rd_kafka_type_t.RD_KAFKA_CONSUMER, rk_conf,
49                 errbuf.ptr, errbuf.sizeof)))
50         {
51             throw new Exception(errbuf.ptr.fromStringz.idup);
52         }
53 
54         this.rk_ = rk;
55 
56         /* Redirect handle queue to cgrp's queue to provide a single queue point */
57         rd_kafka_poll_set_consumer(rk);
58     }
59 
60 nothrow:
61 
62     /** Returns the current partition assignment as set by
63      *  assign() */
64     ErrorCode assignment(ref TopicPartition[] partitions) 
65     {
66         rd_kafka_topic_partition_list_t* c_parts;
67         rd_kafka_resp_err_t err;
68 
69         err = rd_kafka_assignment(rk_, &c_parts);
70         if (err)
71             return cast(ErrorCode) err;
72 
73         partitions.length = c_parts.cnt;
74 
75         foreach (i, ref p; partitions)
76             p = TopicPartition(&c_parts.elems[i]);
77 
78         rd_kafka_topic_partition_list_destroy(c_parts);
79 
80         return ErrorCode.no_error;
81 
82     }
83 
84 @nogc:
85 
86   /**
87    * Update the subscription set to \p topics.
88    *
89    * Any previous subscription will be unassigned and  unsubscribed first.
90    *
91    * The subscription set denotes the desired topics to consume and this
92    * set is provided to the partition assignor (one of the elected group
93    * members) for all clients which then uses the configured
94    * \c partition.assignment.strategy to assign the subscription sets's
95    * topics's partitions to the consumers, depending on their subscription.
96    *
97    * The result of such an assignment is a rebalancing which is either
98    * handled automatically in librdkafka or can be overriden by the application
99    * by providing a RebalanceCb.
100    *
101    * The rebalancing passes the assigned partition set to
102    * assign() to update what partitions are actually
103    * being fetched by the KafkaConsumer.
104    *
105    * Regex pattern matching automatically performed for topics prefixed
106    * with \c \"^\" (e.g. \c \"^myPfx[0-9]_.*\"
107    */
108     ErrorCode subscribe(const(char)*[] topics...)
109     {
110         rd_kafka_topic_partition_list_t* c_topics;
111         rd_kafka_resp_err_t err;
112 
113         c_topics = rd_kafka_topic_partition_list_new(cast(int) topics.length);
114 
115         foreach (t; topics)
116             rd_kafka_topic_partition_list_add(c_topics, t, RD_KAFKA_PARTITION_UA);
117 
118         err = rd_kafka_subscribe(rk_, c_topics);
119 
120         rd_kafka_topic_partition_list_destroy(c_topics);
121 
122         return cast(ErrorCode) err;
123     }
124     /** Unsubscribe from the current subscription set. */
125     auto unsubscribe()
126     {
127         ErrorCode ret;
128         ret = cast(ErrorCode)(rd_kafka_unsubscribe(this.rk_));
129         return ret;
130     }
131 
132     /**
133    * Consume message or get error event, triggers callbacks.
134    *
135    * Will automatically call registered callbacks for any such queued events,
136    * including RebalanceCb, EventCb, OffsetCommitCb,
137    * etc.
138    *
139    * Note:  An application should make sure to call consume() at regular
140    *          intervals, even if no messages are expected, to serve any
141    *          queued callbacks waiting to be called. This is especially
142    *          important when a RebalanceCb has been registered as it needs
143    *          to be called and handled properly to synchronize internal
144    *          consumer state.
145    *
146    * Note: Application MUST NOT call \p poll() on KafkaConsumer objects.
147    *
148    * One of:
149    *  - proper message (Message::err() is ErrorCode.no_error)
150    *  - error event (Message::err() is != ErrorCode.no_error)
151    *  - timeout due to no message or event in \p timeout_ms
152    *    (Message::err() is timed_out)
153    */
154    /++
155    Params:
156         msg = message to fill. Use `msg.err` to check errors.
157         timeout_ms = time to to wait if no incomming msgs in queue.
158    +/
159     auto consume(ref Message msg, int timeout_ms = 10)
160     {
161         rd_kafka_message_t* rkmessage;
162 
163         rkmessage = rd_kafka_consumer_poll(this.rk_, timeout_ms);
164 
165         if (!rkmessage)
166         {
167             msg = Message(null, ErrorCode.timed_out);
168             return;
169         }
170 
171         msg = Message(rkmessage);
172     }
173 
174     /**
175    *  Update the assignment set to \p partitions.
176    *
177    * The assignment set is the set of partitions actually being consumed
178    * by the KafkaConsumer.
179    */
180     ErrorCode assign(const TopicPartition[] partitions)
181     {
182         rd_kafka_topic_partition_list_t* c_parts;
183         rd_kafka_resp_err_t err;
184 
185         c_parts = partitions_to_c_parts(partitions);
186 
187         err = rd_kafka_assign(rk_, c_parts);
188 
189         rd_kafka_topic_partition_list_destroy(c_parts);
190         return cast(ErrorCode) err;
191     }
192 
193     /**
194    * Stop consumption and remove the current assignment.
195    */
196     ErrorCode unassign()
197     {
198         typeof(return) ret;
199         ret = cast(ErrorCode) rd_kafka_assign(rk_, null);
200         return ret;
201     }
202 
203     /**
204    * Commit offsets for the current assignment.
205    *
206    * Note: This is the synchronous variant that blocks until offsets
207    *         are committed or the commit fails (see return value).
208    *
209    * Note: If a OffsetCommitCb callback is registered it will
210    *         be called with commit details on a future call to
211    *         consume()
212    *
213    * ErrorCode.no_error or error code.
214    */
215     ErrorCode commitSync()
216     {
217         typeof(return) ret;
218         ret = cast(ErrorCode) rd_kafka_commit(rk_, null, 0 /*sync*/ );
219         return ret;
220     }
221 
222     /**
223    * Asynchronous version of CommitSync()
224    *
225    * See_also: KafkaConsummer::commitSync()
226    */
227     ErrorCode commitAsync()
228     {
229         typeof(return) ret;
230         ret = cast(ErrorCode) rd_kafka_commit(rk_, null, 1 /*async*/ );
231         return ret;
232     }
233 
234     /**
235    * Commit offset for a single topic+partition based on \p message
236    *
237    * Note: This is the synchronous variant.
238    *
239    * See_also: KafkaConsummer::commitSync()
240    */
241     ErrorCode commitSync(ref Message message)
242     {
243         typeof(return) ret;
244         ret = cast(ErrorCode) rd_kafka_commit_message(rk_, message.rkmessage_, 0 /*sync*/ );
245         return ret;
246     }
247 
248     /**
249    * Commit offset for a single topic+partition based on \p message
250    *
251    * Note: This is the asynchronous variant.
252    *
253    * See_also: KafkaConsummer::commitSync()
254    */
255     ErrorCode commitAsync(ref Message message)
256     {
257         typeof(return) ret;
258         ret = cast(ErrorCode) rd_kafka_commit_message(rk_, message.rkmessage_, 1 /*async*/ );
259         return ret;
260     }
261 
262     /**
263    * Commit offsets for the provided list of partitions.
264    *
265    * Note: This is the synchronous variant.
266    */
267     ErrorCode commitSync(TopicPartition[] offsets)
268     {
269         rd_kafka_topic_partition_list_t* c_parts = partitions_to_c_parts(offsets);
270         rd_kafka_resp_err_t err;
271         err = rd_kafka_commit(rk_, c_parts, 0);
272         if (!err)
273             update_partitions_from_c_parts(offsets, c_parts);
274         rd_kafka_topic_partition_list_destroy(c_parts);
275         return cast(ErrorCode) err;
276     }
277 
278     /**
279    * Commit offset for the provided list of partitions.
280    *
281    * Note: This is the asynchronous variant.
282    */
283     ErrorCode commitAsync(const TopicPartition[] offsets)
284     {
285         rd_kafka_topic_partition_list_t* c_parts = partitions_to_c_parts(offsets);
286         rd_kafka_resp_err_t err;
287         err = rd_kafka_commit(rk_, c_parts, 1);
288         rd_kafka_topic_partition_list_destroy(c_parts);
289         return cast(ErrorCode) err;
290     }
291 
292     /**
293    * Retrieve committed offsets for topics+partitions.
294    *
295    * RD_KAFKA_RESP_ErrorCode.no_error on success in which case the
296    *          \p offset or \p err field of each \p partitions' element is filled
297    *          in with the stored offset, or a partition specific error.
298    *          Else returns an error code.
299    */
300     ErrorCode committed(TopicPartition[] partitions, int timeout_ms)
301     {
302         rd_kafka_topic_partition_list_t* c_parts;
303         rd_kafka_resp_err_t err;
304 
305         c_parts = partitions_to_c_parts(partitions);
306 
307         err = rd_kafka_committed(rk_, c_parts, timeout_ms);
308 
309         if (!err)
310         {
311             update_partitions_from_c_parts(partitions, c_parts);
312         }
313 
314         rd_kafka_topic_partition_list_destroy(c_parts);
315 
316         return cast(ErrorCode) err;
317     }
318 
319     /**
320    * Retrieve current positions (offsets) for topics+partitions.
321    *
322    * RD_KAFKA_RESP_ErrorCode.no_error on success in which case the
323    *          \p offset or \p err field of each \p partitions' element is filled
324    *          in with the stored offset, or a partition specific error.
325    *          Else returns an error code.
326    */
327     ErrorCode position(TopicPartition[] partitions)
328     {
329         rd_kafka_topic_partition_list_t* c_parts;
330         rd_kafka_resp_err_t err;
331 
332         c_parts = partitions_to_c_parts(partitions);
333 
334         err = rd_kafka_position(rk_, c_parts);
335 
336         if (!err)
337         {
338             update_partitions_from_c_parts(partitions, c_parts);
339         }
340 
341         rd_kafka_topic_partition_list_destroy(c_parts);
342 
343         return cast(ErrorCode) err;
344 
345     }
346 
347     /**
348    * Close and shut down the proper.
349    *
350    * This call will block until the following operations are finished:
351    *  - Trigger a local rebalance to void the current assignment
352    *  - Stop consumption for current assignment
353    *  - Commit offsets
354    *  - Leave group
355    *
356    * The maximum blocking time is roughly limited to session.timeout.ms.
357    *
358    * Note: Callbacks, such as RebalanceCb and
359    *         OffsetCommitCb, etc, may be called.
360    *
361    * Note: The consumer object must later be freed with \c delete
362    */
363     ErrorCode close()
364     {
365         rd_kafka_resp_err_t err;
366         err = rd_kafka_consumer_close(rk_);
367         if (err)
368             return cast(ErrorCode) err;
369 
370         while (rd_kafka_outq_len(rk_) > 0)
371             rd_kafka_poll(rk_, 10);
372         rd_kafka_destroy(rk_);
373 
374         return cast(ErrorCode) err;
375     }
376 }