1 ///
2 module rdkafkad.handler.simple_consumer;
3 import rdkafkad;
4 
5 /**
6  * Simple Consumer (legacy)
7  *
8  * A simple non-balanced, non-group-aware, consumer.
9  */
10 class Consumer : Handle
11 {
12     /**
13    * Creates a new Kafka consumer handle.
14    *
15    * \p conf is an optional object that will be used instead of the default
16    * configuration.
17    * The \p conf object is reusable after this call.
18    *
19    * the new handle on success or null on error in which case
20    * \p errstr is set to a human readable error message.
21    */
22     this(GlobalConf conf)
23     {
24         char[512] errbuf = void;
25         rd_kafka_conf_t* rk_conf = null;
26 
27         if (conf)
28         {
29             if (!conf.rk_conf_)
30             {
31                 throw new Exception("Requires Conf::CONF_GLOBAL object");
32             }
33 
34             this.setCommonConfig(conf);
35 
36             rk_conf = rd_kafka_conf_dup(conf.rk_conf_);
37         }
38 
39         rk_ = rd_kafka_new(rd_kafka_type_t.RD_KAFKA_CONSUMER,
40                 rk_conf, errbuf.ptr, errbuf.sizeof);
41         if (null is rk_)
42         {
43             throw new Exception(errbuf.ptr.fromStringz.idup);
44         }
45     }
46 
47 nothrow @nogc:
48 
49      ~this()
50     {
51         rd_kafka_destroy(rk_);
52     }
53 
54 static:
55 
56     /**
57    * Start consuming messages for topic and \p partition
58    * at offset \p offset which may either be a proper offset (0..N)
59    * or one of the the special offsets: \p OFFSET_BEGINNING or \p OFFSET_END.
60    *
61    * rdkafka will attempt to keep \p queued.min.messages (config property)
62    * messages in the local queue by repeatedly fetching batches of messages
63    * from the broker until the threshold is reached.
64    *
65    * The application shall use one of the \p ...consume*() functions
66    * to consume messages from the local queue, each kafka message being
67    * represented as a `Message ` object.
68    *
69    * \p ...start() must not be called multiple times for the same
70    * topic and partition without stopping consumption first with
71    * \p ...stop().
72    *
73    * an ErrorCode to indicate success or failure.
74    */
75     ErrorCode start(Topic topic, int partition, long offset, Queue queue)
76     {
77         int err;
78         err = rd_kafka_consume_start(topic.rkt_, partition, offset);
79         if (err == -1)
80             return cast(ErrorCode)(rd_kafka_errno2err(errno));
81         return ErrorCode.no_error;
82     }
83 
84     /**
85    * Stop consuming messages for topic and \p partition, purging
86    *        all messages currently in the local queue.
87    *
88    * The application needs to be stop all consumers before destroying
89    * the Consumer handle.
90    *
91    * an ErrorCode to indicate success or failure.
92    */
93     ErrorCode stop(Topic topic, int partition)
94     {
95         int err;
96         err = rd_kafka_consume_stop(topic.rkt_, partition);
97         if (err == -1)
98             return cast(ErrorCode)(rd_kafka_errno2err(errno));
99         return ErrorCode.no_error;
100     }
101 
102     /**
103    * Seek consumer for topic+partition to \p offset which is either an
104    *        absolute or logical offset.
105    *
106    * If \p timeout_ms is not 0 the call will wait this long for the
107    * seek to be performed. If the timeout is reached the internal state
108    * will be unknown and this function returns `timed_out`.
109    * If \p timeout_ms is 0 it will initiate the seek but return
110    * immediately without any error reporting (e.g., async).
111    *
112    * This call triggers a fetch queue barrier flush.
113    *
114    * an ErrorCode to indicate success or failure.
115    */
116     ErrorCode seek(Topic topic, int partition, long offset, int timeout_ms)
117     {
118         int err;
119         err = rd_kafka_seek(topic.rkt_, partition, offset, timeout_ms);
120         if (err == -1)
121             return cast(ErrorCode)(rd_kafka_errno2err(errno));
122         return ErrorCode.no_error;
123     }
124 
125     /**
126    * Consume a single message from \p topic and \p partition.
127    *
128    * \p timeout_ms is maximum amount of time to wait for a message to be
129    * received.
130    * Consumer must have been previously started with \p ...start().
131    *
132    * a Message object, the application needs to check if message
133    * is an error or a proper message Message::err() and checking for
134    * \p ErrorCode.no_error.
135    *
136    * The message object must be destroyed when the application is done with it.
137    *
138    * Errors (in Message::err()):
139    *  - timed_out - \p timeout_ms was reached with no new messages fetched.
140    *  - PARTITION_EOF - End of partition reached, not an error.
141    */
142     void consume(Topic topic, int partition, int timeout_ms, ref Message msg)
143     {
144         rd_kafka_message_t* rkmessage;
145 
146         rkmessage = rd_kafka_consume(topic.rkt_, partition, timeout_ms);
147         if (!rkmessage)
148             msg = Message(topic, cast(ErrorCode) rd_kafka_errno2err(errno));
149 
150         msg = Message(topic, rkmessage);
151     }
152 
153     /**
154    * Consume a single message from the specified queue.
155    *
156    * \p timeout_ms is maximum amount of time to wait for a message to be
157    * received.
158    * Consumer must have been previously started on the queue with
159    * \p ...start().
160    *
161    * a Message object, the application needs to check if message
162    * is an error or a proper message \p Message.err() and checking for
163    * \p ErrorCode.no_error.
164    *
165    * The message object must be destroyed when the application is done with it.
166    *
167    * Errors (in Message::err()):
168    *   - timed_out - \p timeout_ms was reached with no new messages fetched
169    *
170    * Note that Message.topic() may be nullptr after certain kinds of
171    * errors, so applications should check that it isn't null before
172    * dereferencing it.
173    */
174     void consume(Queue queue, int timeout_ms, ref Message msg)
175     {
176         rd_kafka_message_t* rkmessage;
177         rkmessage = rd_kafka_consume_queue(queue.queue_, timeout_ms);
178         if (!rkmessage)
179             msg = Message(null, cast(ErrorCode) rd_kafka_errno2err(errno));
180         /*
181        * Recover our Topic from the topic conf's opaque field, which we
182        * set in Topic::create() for just this kind of situation.
183        */
184         void* opaque = rd_kafka_topic_opaque(rkmessage.rkt);
185         Topic topic = cast(Topic)(opaque);
186 
187         msg = Message(topic, rkmessage);
188     }
189 
190     /* Helper struct for `consume_callback'.
191    * Encapsulates the values we need in order to call `rd_kafka_consume_callback'
192    * and keep track of the C++ callback function and `opaque' value.
193    */
194     private static struct ConsumerCallback
195     {
196         /* This function is the one we give to `rd_kafka_consume_callback', with
197      * the `opaque' pointer pointing to an instance of this struct, in which
198      * we can find the C++ callback and `cb_data'.
199      */
200         static nothrow @nogc void consume_cb_trampoline(rd_kafka_message_t* msg, void* opaque)
201         {
202             ConsumerCallback* instance = cast(ConsumerCallback*) opaque;
203             Message message = Message(instance.topic, msg, false  /*don't free*/ );
204             instance.cb_cls(message);
205         }
206 
207         Topic topic;
208         ConsumeCb cb_cls;
209     }
210 
211     /**
212    * Consumes messages from \p topic and \p partition, calling
213    *        the provided callback for each consumed messsage.
214    *
215    * \p consumeCallback() provides higher throughput performance
216    * than \p consume().
217    *
218    * \p timeout_ms is the maximum amount of time to wait for one or
219    * more messages to arrive.
220    *
221    * The provided \p consume_cb instance has its \p consume_cb function
222    * called for every message received.
223    *
224    * The \p opaque argument is passed to the \p consume_cb as \p opaque.
225    *
226    * the number of messages processed or -1 on error.
227    *
228    * See_also: Consumer::consume()
229    */
230     int consumeCallback(Topic topic, int partition, int timeout_ms, ConsumeCb consume_cb)
231     {
232         auto context = ConsumerCallback(topic, consume_cb);
233         int ret;
234         ret = rd_kafka_consume_callback(topic.rkt_, partition, timeout_ms,
235             &ConsumerCallback.consume_cb_trampoline, &context);
236         return ret;
237     }
238 
239     /* Helper struct for `consume_callback' with a Queue.
240    * Encapsulates the values we need in order to call `rd_kafka_consume_callback'
241    * and keep track of the C++ callback function and `opaque' value.
242    */
243     private static struct ConsumerQueueCallback
244     {
245         /* This function is the one we give to `rd_kafka_consume_callback', with
246      * the `opaque' pointer pointing to an instance of this struct, in which
247      * we can find the C++ callback and `cb_data'.
248      */
249         static nothrow @nogc void consume_cb_trampoline(rd_kafka_message_t* msg, void* opaque)
250         {
251             ConsumerQueueCallback* instance = cast(ConsumerQueueCallback*) opaque;
252             /*
253        * Recover our Topic from the topic conf's opaque field, which we
254        * set in Topic::create() for just this kind of situation.
255        */
256             void* topic_opaque = rd_kafka_topic_opaque(msg.rkt);
257             Topic topic = cast(Topic) topic_opaque;
258             Message message = Message(topic, msg, false  /*don't free*/ );
259             instance.cb_cls(message);
260         }
261 
262         ConsumeCb cb_cls;
263     }
264 
265     /**
266    * Consumes messages from \p queue, calling the provided callback for
267    *        each consumed messsage.
268    *
269    * See_also: Consumer::consumeCallback()
270    */
271 
272     int consumeCallback(Queue queue, int timeout_ms, ConsumeCb consume_cb)
273     {
274         auto context = ConsumerQueueCallback(consume_cb);
275         int ret;
276         ret = rd_kafka_consume_callback_queue(queue.queue_, timeout_ms,
277             &ConsumerQueueCallback.consume_cb_trampoline, &context);
278         return ret;
279     }
280 
281     /**
282    * Converts an offset into the logical offset from the tail of a topic.
283    *
284    * \p offset is the (positive) number of items from the end.
285    *
286    * the logical offset for message \p offset from the tail, this value
287    *          may be passed to Consumer::start, et.al.
288    * Note: The returned logical offset is specific to librdkafka.
289    */
290     long offsetTail(long offset)
291     {
292         return RD_KAFKA_OFFSET_TAIL(offset);
293     }
294 }