1 /// 2 module rdkafkad.handler.simple_consumer; 3 import rdkafkad; 4 5 /** 6 * Simple Consumer (legacy) 7 * 8 * A simple non-balanced, non-group-aware, consumer. 9 */ 10 class Consumer : Handle 11 { 12 /** 13 * Creates a new Kafka consumer handle. 14 * 15 * \p conf is an optional object that will be used instead of the default 16 * configuration. 17 * The \p conf object is reusable after this call. 18 * 19 * the new handle on success or null on error in which case 20 * \p errstr is set to a human readable error message. 21 */ 22 this(GlobalConf conf) 23 { 24 char[512] errbuf = void; 25 rd_kafka_conf_t* rk_conf = null; 26 27 if (conf) 28 { 29 if (!conf.rk_conf_) 30 { 31 throw new Exception("Requires Conf::CONF_GLOBAL object"); 32 } 33 34 this.setCommonConfig(conf); 35 36 rk_conf = rd_kafka_conf_dup(conf.rk_conf_); 37 } 38 39 rk_ = rd_kafka_new(rd_kafka_type_t.RD_KAFKA_CONSUMER, 40 rk_conf, errbuf.ptr, errbuf.sizeof); 41 if (null is rk_) 42 { 43 throw new Exception(errbuf.ptr.fromStringz.idup); 44 } 45 } 46 47 nothrow @nogc: 48 49 ~this() 50 { 51 rd_kafka_destroy(rk_); 52 } 53 54 static: 55 56 /** 57 * Start consuming messages for topic and \p partition 58 * at offset \p offset which may either be a proper offset (0..N) 59 * or one of the the special offsets: \p OFFSET_BEGINNING or \p OFFSET_END. 60 * 61 * rdkafka will attempt to keep \p queued.min.messages (config property) 62 * messages in the local queue by repeatedly fetching batches of messages 63 * from the broker until the threshold is reached. 64 * 65 * The application shall use one of the \p ...consume*() functions 66 * to consume messages from the local queue, each kafka message being 67 * represented as a `Message ` object. 68 * 69 * \p ...start() must not be called multiple times for the same 70 * topic and partition without stopping consumption first with 71 * \p ...stop(). 72 * 73 * an ErrorCode to indicate success or failure. 74 */ 75 ErrorCode start(Topic topic, int partition, long offset, Queue queue) 76 { 77 int err; 78 err = rd_kafka_consume_start(topic.rkt_, partition, offset); 79 if (err == -1) 80 return cast(ErrorCode)(rd_kafka_errno2err(errno)); 81 return ErrorCode.no_error; 82 } 83 84 /** 85 * Stop consuming messages for topic and \p partition, purging 86 * all messages currently in the local queue. 87 * 88 * The application needs to be stop all consumers before destroying 89 * the Consumer handle. 90 * 91 * an ErrorCode to indicate success or failure. 92 */ 93 ErrorCode stop(Topic topic, int partition) 94 { 95 int err; 96 err = rd_kafka_consume_stop(topic.rkt_, partition); 97 if (err == -1) 98 return cast(ErrorCode)(rd_kafka_errno2err(errno)); 99 return ErrorCode.no_error; 100 } 101 102 /** 103 * Seek consumer for topic+partition to \p offset which is either an 104 * absolute or logical offset. 105 * 106 * If \p timeout_ms is not 0 the call will wait this long for the 107 * seek to be performed. If the timeout is reached the internal state 108 * will be unknown and this function returns `timed_out`. 109 * If \p timeout_ms is 0 it will initiate the seek but return 110 * immediately without any error reporting (e.g., async). 111 * 112 * This call triggers a fetch queue barrier flush. 113 * 114 * an ErrorCode to indicate success or failure. 115 */ 116 ErrorCode seek(Topic topic, int partition, long offset, int timeout_ms) 117 { 118 int err; 119 err = rd_kafka_seek(topic.rkt_, partition, offset, timeout_ms); 120 if (err == -1) 121 return cast(ErrorCode)(rd_kafka_errno2err(errno)); 122 return ErrorCode.no_error; 123 } 124 125 /** 126 * Consume a single message from \p topic and \p partition. 127 * 128 * \p timeout_ms is maximum amount of time to wait for a message to be 129 * received. 130 * Consumer must have been previously started with \p ...start(). 131 * 132 * a Message object, the application needs to check if message 133 * is an error or a proper message Message::err() and checking for 134 * \p ErrorCode.no_error. 135 * 136 * The message object must be destroyed when the application is done with it. 137 * 138 * Errors (in Message::err()): 139 * - timed_out - \p timeout_ms was reached with no new messages fetched. 140 * - PARTITION_EOF - End of partition reached, not an error. 141 */ 142 void consume(Topic topic, int partition, int timeout_ms, ref Message msg) 143 { 144 rd_kafka_message_t* rkmessage; 145 146 rkmessage = rd_kafka_consume(topic.rkt_, partition, timeout_ms); 147 if (!rkmessage) 148 msg = Message(topic, cast(ErrorCode) rd_kafka_errno2err(errno)); 149 150 msg = Message(topic, rkmessage); 151 } 152 153 /** 154 * Consume a single message from the specified queue. 155 * 156 * \p timeout_ms is maximum amount of time to wait for a message to be 157 * received. 158 * Consumer must have been previously started on the queue with 159 * \p ...start(). 160 * 161 * a Message object, the application needs to check if message 162 * is an error or a proper message \p Message.err() and checking for 163 * \p ErrorCode.no_error. 164 * 165 * The message object must be destroyed when the application is done with it. 166 * 167 * Errors (in Message::err()): 168 * - timed_out - \p timeout_ms was reached with no new messages fetched 169 * 170 * Note that Message.topic() may be nullptr after certain kinds of 171 * errors, so applications should check that it isn't null before 172 * dereferencing it. 173 */ 174 void consume(Queue queue, int timeout_ms, ref Message msg) 175 { 176 rd_kafka_message_t* rkmessage; 177 rkmessage = rd_kafka_consume_queue(queue.queue_, timeout_ms); 178 if (!rkmessage) 179 msg = Message(null, cast(ErrorCode) rd_kafka_errno2err(errno)); 180 /* 181 * Recover our Topic from the topic conf's opaque field, which we 182 * set in Topic::create() for just this kind of situation. 183 */ 184 void* opaque = rd_kafka_topic_opaque(rkmessage.rkt); 185 Topic topic = cast(Topic)(opaque); 186 187 msg = Message(topic, rkmessage); 188 } 189 190 /* Helper struct for `consume_callback'. 191 * Encapsulates the values we need in order to call `rd_kafka_consume_callback' 192 * and keep track of the C++ callback function and `opaque' value. 193 */ 194 private static struct ConsumerCallback 195 { 196 /* This function is the one we give to `rd_kafka_consume_callback', with 197 * the `opaque' pointer pointing to an instance of this struct, in which 198 * we can find the C++ callback and `cb_data'. 199 */ 200 static nothrow @nogc void consume_cb_trampoline(rd_kafka_message_t* msg, void* opaque) 201 { 202 ConsumerCallback* instance = cast(ConsumerCallback*) opaque; 203 Message message = Message(instance.topic, msg, false /*don't free*/ ); 204 instance.cb_cls(message); 205 } 206 207 Topic topic; 208 ConsumeCb cb_cls; 209 } 210 211 /** 212 * Consumes messages from \p topic and \p partition, calling 213 * the provided callback for each consumed messsage. 214 * 215 * \p consumeCallback() provides higher throughput performance 216 * than \p consume(). 217 * 218 * \p timeout_ms is the maximum amount of time to wait for one or 219 * more messages to arrive. 220 * 221 * The provided \p consume_cb instance has its \p consume_cb function 222 * called for every message received. 223 * 224 * The \p opaque argument is passed to the \p consume_cb as \p opaque. 225 * 226 * the number of messages processed or -1 on error. 227 * 228 * See_also: Consumer::consume() 229 */ 230 int consumeCallback(Topic topic, int partition, int timeout_ms, ConsumeCb consume_cb) 231 { 232 auto context = ConsumerCallback(topic, consume_cb); 233 int ret; 234 ret = rd_kafka_consume_callback(topic.rkt_, partition, timeout_ms, 235 &ConsumerCallback.consume_cb_trampoline, &context); 236 return ret; 237 } 238 239 /* Helper struct for `consume_callback' with a Queue. 240 * Encapsulates the values we need in order to call `rd_kafka_consume_callback' 241 * and keep track of the C++ callback function and `opaque' value. 242 */ 243 private static struct ConsumerQueueCallback 244 { 245 /* This function is the one we give to `rd_kafka_consume_callback', with 246 * the `opaque' pointer pointing to an instance of this struct, in which 247 * we can find the C++ callback and `cb_data'. 248 */ 249 static nothrow @nogc void consume_cb_trampoline(rd_kafka_message_t* msg, void* opaque) 250 { 251 ConsumerQueueCallback* instance = cast(ConsumerQueueCallback*) opaque; 252 /* 253 * Recover our Topic from the topic conf's opaque field, which we 254 * set in Topic::create() for just this kind of situation. 255 */ 256 void* topic_opaque = rd_kafka_topic_opaque(msg.rkt); 257 Topic topic = cast(Topic) topic_opaque; 258 Message message = Message(topic, msg, false /*don't free*/ ); 259 instance.cb_cls(message); 260 } 261 262 ConsumeCb cb_cls; 263 } 264 265 /** 266 * Consumes messages from \p queue, calling the provided callback for 267 * each consumed messsage. 268 * 269 * See_also: Consumer::consumeCallback() 270 */ 271 272 int consumeCallback(Queue queue, int timeout_ms, ConsumeCb consume_cb) 273 { 274 auto context = ConsumerQueueCallback(consume_cb); 275 int ret; 276 ret = rd_kafka_consume_callback_queue(queue.queue_, timeout_ms, 277 &ConsumerQueueCallback.consume_cb_trampoline, &context); 278 return ret; 279 } 280 281 /** 282 * Converts an offset into the logical offset from the tail of a topic. 283 * 284 * \p offset is the (positive) number of items from the end. 285 * 286 * the logical offset for message \p offset from the tail, this value 287 * may be passed to Consumer::start, et.al. 288 * Note: The returned logical offset is specific to librdkafka. 289 */ 290 long offsetTail(long offset) 291 { 292 return RD_KAFKA_OFFSET_TAIL(offset); 293 } 294 }