1 ///
2 module rdkafkad.handler;
3 import rdkafkad;
4 
5 /**
6  * Base handle, super class for specific clients.
7  */
8 class Handle
9 {
10 
11     /**
12     * Returns the client's broker-assigned group member id
13     *
14     * Note: This currently requires the high-level KafkaConsumer
15     *
16     * Last assigned member id, or empty string if not currently
17     *          a group member.
18     */
19     auto memberid() const
20     {
21         char* str;
22         str = rd_kafka_memberid(rk_);
23         string memberid = str.fromStringz.idup;
24         if (str)
25             rd_kafka_mem_free(cast(rd_kafka_s*) rk_, str);
26         return memberid;
27     }
28 
29     /**
30     * Request Metadata from broker.
31     *
32     * Parameters:
33     *  \p all_topics  - if non-zero: request info about all topics in cluster,
34     *                   if zero: only request info about locally known topics.
35     *  \p only_rkt    - only request info about this topic
36     *  \p metadatap   - pointer to hold metadata result.
37     *                   The \p *metadatap pointer must be released with \c delete.
38     *  \p timeout_ms  - maximum response time before failing.
39     *
40     * ErrorCode.no_error on success (in which case \p *metadatap
41     * will be set), else timed_out on timeout or
42     * other error code on error.
43     */
44     Metadata metadata(bool all_topics, const Topic only_rkt, int timeout_ms = 60_000)
45     {
46         const rd_kafka_metadata_t* cmetadatap = null;
47 
48         rd_kafka_topic_t* topic = only_rkt ? cast(rd_kafka_topic_t*) only_rkt.rkt_ : null;
49 
50         ErrorCode error;
51         error = cast(ErrorCode)rd_kafka_metadata(rk_, all_topics, topic, &cmetadatap, timeout_ms); 
52         if (error)
53         {
54             throw new Exception(error.err2str);
55         }
56         return new Metadata(cmetadatap);
57     }
58 
59 @nogc nothrow:
60 
61     package void setCommonConfig(GlobalConf conf)
62     {
63         rd_kafka_conf_set_opaque(conf.rk_conf_, cast(void*) this);
64 
65         if (conf.event_cb_)
66         {
67             rd_kafka_conf_set_log_cb(conf.rk_conf_, &log_cb_trampoline);
68             rd_kafka_conf_set_error_cb(conf.rk_conf_, &error_cb_trampoline);
69             rd_kafka_conf_set_throttle_cb(conf.rk_conf_, &throttle_cb_trampoline);
70             rd_kafka_conf_set_stats_cb(conf.rk_conf_, &stats_cb_trampoline);
71             event_cb_ = conf.event_cb_;
72         }
73 
74         if (conf.socket_cb_)
75         {
76             rd_kafka_conf_set_socket_cb(conf.rk_conf_, &socket_cb_trampoline);
77             socket_cb_ = conf.socket_cb_;
78         }
79 
80         version (Windows)
81         {
82         }
83         else if (conf.open_cb_)
84         {
85             rd_kafka_conf_set_open_cb(conf.rk_conf_, &open_cb_trampoline);
86             open_cb_ = conf.open_cb_;
87         }
88 
89         if (conf.rebalance_cb_)
90         {
91             rd_kafka_conf_set_rebalance_cb(conf.rk_conf_, &rebalance_cb_trampoline);
92             rebalance_cb_ = conf.rebalance_cb_;
93         }
94 
95         if (conf.offset_commit_cb_)
96         {
97             rd_kafka_conf_set_offset_commit_cb(conf.rk_conf_, &offset_commit_cb_trampoline);
98             offset_commit_cb_ = conf.offset_commit_cb_;
99         }
100     }
101 
102     /**
103     * Query broker for low (oldest/beginning)
104     *        and high (newest/end) offsets for partition.
105     *
106     * Offsets are returned in \p *low and \p *high respectively.
107     *
108     * ErrorCode.no_error on success or an error code on failure.
109     */
110     auto queryWatermarkOffsets(const(char)* topic, int partition,
111         ref long low, ref long high, int timeout_ms)
112     {
113         ErrorCode ret;
114         ret = cast(ErrorCode) rd_kafka_query_watermark_offsets(rk_, topic,
115             partition, &low, &high, timeout_ms);
116         return ret;
117     }
118 
119     package(rdkafkad) rd_kafka_t* rk_;
120     /* All Producer and Consumer callbacks must reside in HandleImpl and
121      * the opaque provided to rdkafka must be a pointer to HandleImpl, since
122      * ProducerImpl and ConsumerImpl classes cannot be safely directly cast to
123      * HandleImpl due to the skewed diamond inheritance. */
124     package(rdkafkad) EventCb event_cb_;
125     package(rdkafkad) SocketCb socket_cb_;
126     package(rdkafkad) OpenCb open_cb_;
127     package(rdkafkad) DeliveryReportCb dr_cb_;
128     package(rdkafkad) PartitionerCb partitioner_cb_;
129     package(rdkafkad) PartitionerKeyPointerCb partitioner_kp_cb_;
130     package(rdkafkad) RebalanceCb rebalance_cb_;
131     package(rdkafkad) OffsetCommitCb offset_commit_cb_;
132 
133     /** the name of the handle */
134     const(char)[] name() const
135     {
136         return rd_kafka_name(rk_).fromStringz;
137     }
138 
139     /**
140     * Polls the provided kafka handle for events.
141     *
142     * Events will trigger application provided callbacks to be called.
143     *
144     * The \p timeout_ms argument specifies the maximum amount of time
145     * (in milliseconds) that the call will block waiting for events.
146     * For non-blocking calls, provide 0 as \p timeout_ms.
147     * To wait indefinately for events, provide -1.
148     *
149     * Events:
150     *   - delivery report callbacks (if an DeliveryCb is configured) [producer]
151     *   - event callbacks (if an EventCb is configured) [producer & consumer]
152     *
153     * Note:  An application should make sure to call poll() at regular
154     *          intervals to serve any queued callbacks waiting to be called.
155     *
156     * Warning: This method MUST NOT be used with the KafkaConsumer,
157     *          use its consume() instead.
158     *
159     * the number of events served.
160     */
161     int poll(int timeout_ms = 10)
162     {
163         typeof(return) ret;
164         ret = rd_kafka_poll(rk_, timeout_ms);
165         return ret;
166     }
167 
168     /**
169     *  Returns the current out queue length
170     *
171     * The out queue contains messages and requests waiting to be sent to,
172     * or acknowledged by, the broker.
173     */
174     int outqLen()
175     {
176         typeof(return) ret;
177         ret = rd_kafka_outq_len(rk_);
178         return ret;
179     }
180 
181     /**
182      * Convert a list of C partitions to C++ partitions
183      */
184     nothrow @nogc package static TopicPartition[] c_parts_to_partitions(
185         const rd_kafka_topic_partition_list_t* c_parts)
186     {
187         auto partitions = (cast(TopicPartition*) malloc(c_parts.cnt * TopicPartition.sizeof))[0
188             .. c_parts.cnt];
189         foreach (i, ref p; partitions)
190             partitions[i] = TopicPartition(&c_parts.elems[i]);
191         return partitions;
192     }
193 
194     nothrow @nogc static void free_partition_vector(ref TopicPartition[] v)
195     {
196         foreach (ref p; v)
197             p.destroy;
198         free(v.ptr);
199         v = null;
200     }
201 
202     nothrow @nogc package static rd_kafka_topic_partition_list_t* partitions_to_c_parts(const TopicPartition[] partitions)
203     {
204         rd_kafka_topic_partition_list_t* c_parts = rd_kafka_topic_partition_list_new(cast(int) partitions.length);
205 
206         foreach (ref tpi; partitions)
207         {
208             rd_kafka_topic_partition_t* rktpar = rd_kafka_topic_partition_list_add(c_parts, tpi.topic_, tpi.partition_);
209             rktpar.offset = tpi.offset_;
210         }
211 
212         return c_parts;
213     }
214 
215     /**
216      * @brief Update the application provided 'partitions' with info from 'c_parts'
217      */
218     nothrow @nogc package static void update_partitions_from_c_parts(TopicPartition[] partitions,
219         const rd_kafka_topic_partition_list_t* c_parts)
220     {
221         foreach (i; 0 .. c_parts.cnt)
222         {
223             const rd_kafka_topic_partition_t* p = &c_parts.elems[i];
224 
225             /* Find corresponding C++ entry */
226             foreach (pp; partitions)
227             {
228                 if (!strcmp(p.topic, pp.topic_) && p.partition == pp.partition_)
229                 {
230                     pp.offset_ = p.offset;
231                     pp.err_ = cast(ErrorCode) p.err;
232                 }
233             }
234         }
235     }
236 
237     nothrow @nogc package static void log_cb_trampoline(const rd_kafka_t* rk, int level,
238         const(char)* fac, const(char)* buf)
239     {
240         if (!rk)
241         {
242             rd_kafka_log_print(rk, level, fac, buf);
243             return;
244         }
245 
246         void* opaque = rd_kafka_opaque(rk);
247         Handle handle = cast(Handle)(opaque);
248 
249         if (!handle.event_cb_)
250         {
251             rd_kafka_log_print(rk, level, fac, buf);
252             return;
253         }
254 
255         auto event = Event(Event.Type.log, ErrorCode.no_error,
256             cast(Event.Severity)(level), fac, buf);
257 
258         handle.event_cb_(event);
259     }
260 
261     nothrow @nogc package static void error_cb_trampoline(rd_kafka_t* rk, int err,
262         const(char)* reason, void* opaque)
263     {
264         Handle handle = cast(Handle)(opaque);
265 
266         auto event = Event(Event.Type.error, cast(ErrorCode) err,
267             Event.Severity.error, null, reason);
268 
269         handle.event_cb_(event);
270     }
271 
272     nothrow @nogc package static void throttle_cb_trampoline(rd_kafka_t* rk,
273         const(char)* broker_name, int broker_id, int throttle_time_ms, void* opaque)
274     {
275         Handle handle = cast(Handle)(opaque);
276 
277         auto event = Event(Event.Type.throttle);
278         event.str_ = broker_name.fromStringz;
279         event.id_ = broker_id;
280         event.throttle_time_ = throttle_time_ms;
281 
282         handle.event_cb_(event);
283     }
284 
285     nothrow @nogc package static int stats_cb_trampoline(rd_kafka_t* rk, char* json,
286         size_t json_len, void* opaque)
287     {
288         Handle handle = cast(Handle)(opaque);
289         auto event = Event(Event.Type.stats, ErrorCode.no_error, Event.Severity.info,
290             null, json);
291 
292         handle.event_cb_(event);
293 
294         return 0;
295     }
296 
297     nothrow @nogc package static int socket_cb_trampoline(int domain, int type, int protocol, void* opaque)
298     {
299         Handle handle = cast(Handle)(opaque);
300 
301         return handle.socket_cb_(domain, type, protocol);
302     }
303 
304     nothrow @nogc package static int open_cb_trampoline(const(char)* pathname, int flags,
305         mode_t mode, void* opaque)
306     {
307         Handle handle = cast(Handle)(opaque);
308 
309         return handle.open_cb_(pathname.fromStringz, flags, cast(int)(mode));
310     }
311 
312     nothrow @nogc package static void rebalance_cb_trampoline(rd_kafka_t* rk,
313         rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t* c_partitions, void* opaque)
314     {
315         auto handle = cast(KafkaConsumer)(opaque);
316         TopicPartition[] partitions = c_parts_to_partitions(c_partitions);
317 
318         handle.rebalance_cb_(handle, cast(ErrorCode) err, partitions);
319 
320         free_partition_vector(partitions);
321     }
322 
323     nothrow @nogc package static void offset_commit_cb_trampoline(rd_kafka_t* rk,
324         rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t* c_offsets, void* opaque)
325     {
326         Handle handle = cast(Handle)(opaque);
327         TopicPartition[] offsets;
328 
329         if (c_offsets)
330             offsets = c_parts_to_partitions(c_offsets);
331 
332         handle.offset_commit_cb_(cast(ErrorCode) err, offsets);
333 
334         free_partition_vector(offsets);
335     }
336 
337     /**
338     * Pause producing or consumption for the provided list of partitions.
339     *
340     * Success or error is returned per-partition in the \p partitions list.
341     *
342     * ErrorCode::no_error
343     *
344     * See_also: resume()
345     */
346     ErrorCode pause(TopicPartition[] partitions)
347     {
348         rd_kafka_topic_partition_list_t* c_parts;
349         rd_kafka_resp_err_t err;
350 
351         c_parts = partitions_to_c_parts(partitions);
352 
353         err = rd_kafka_pause_partitions(rk_, c_parts);
354 
355         if (!err)
356             update_partitions_from_c_parts(partitions, c_parts);
357 
358         rd_kafka_topic_partition_list_destroy(c_parts);
359 
360         return cast(ErrorCode) err;
361 
362     }
363 
364     /**
365     * Resume producing or consumption for the provided list of partitions.
366     *
367     * Success or error is returned per-partition in the \p partitions list.
368     *
369     * ErrorCode::no_error
370     *
371     * See_also: pause()
372     */
373     ErrorCode resume(TopicPartition[] partitions)
374     {
375         rd_kafka_topic_partition_list_t* c_parts;
376         rd_kafka_resp_err_t err;
377 
378         c_parts = partitions_to_c_parts(partitions);
379 
380         err = rd_kafka_resume_partitions(rk_, c_parts);
381 
382         if (!err)
383             update_partitions_from_c_parts(partitions, c_parts);
384 
385         rd_kafka_topic_partition_list_destroy(c_parts);
386 
387         return cast(ErrorCode) err;
388     }
389 
390     /**
391    * Get last known low (oldest/beginning)
392    *        and high (newest/end) offsets for partition.
393    *
394    * The low offset is updated periodically (if statistics.interval.ms is set)
395    * while the high offset is updated on each fetched message set from the
396    * broker.
397    *
398    * If there is no cached offset (either low or high, or both) then
399    * OFFSET_INVALID will be returned for the respective offset.
400    *
401    * Offsets are returned in \p *low and \p *high respectively.
402    *
403    * ErrorCode.no_error on success or an error code on failure.
404    *
405    * Note: Shall only be used with an active consumer instance.
406    */
407     ErrorCode getWatermarkOffsets(const(char)* topic, int partition, ref long low,
408         ref long high)
409     {
410         return cast(ErrorCode) rd_kafka_get_watermark_offsets(rk_, topic,
411             partition, &low, &high);
412     }
413 }
414 
415 /**
416  * Queue interface
417  *
418  * Create a new message queue.  Message queues allows the application
419  * to re-route consumed messages from multiple topic+partitions into
420  * one single queue point.  This queue point, containing messages from
421  * a number of topic+partitions, may then be served by a single
422  * consume() method, rather than one per topic+partition combination.
423  *
424  * See the Consumer::start(), Consumer::consume(), and
425  * Consumer::consumeCallback() methods that take a queue as the first
426  * parameter for more information.
427  */
428 class Queue
429 {
430 nothrow @nogc:
431     /**
432    * Create Queue object
433    */
434     this(Handle handle)
435     {
436         queue_ = rd_kafka_queue_new(handle.rk_);
437     }
438 
439     ~this()
440     {
441         rd_kafka_queue_destroy(queue_);
442     }
443 
444 package:
445     rd_kafka_queue_t* queue_;
446 }