1 /// 2 module rdkafkad.handler.kafka_consumer; 3 import rdkafkad; 4 5 /** 6 * High-level KafkaConsumer (for brokers 0.9 and later) 7 * 8 * Note: Requires Apache Kafka >= 0.9.0 brokers 9 * 10 * Currently supports the \c range and \c roundrobin partition assignment 11 * strategies (see \c partition.assignment.strategy) 12 */ 13 class KafkaConsumer : Handle 14 { 15 /** 16 * Creates a KafkaConsumer. 17 * 18 * The \p conf object must have \c group.id set to the consumer group to join. 19 * 20 * Use close() to shut down the consumer. 21 * 22 * See_also: RebalanceCb 23 * See_also: CONFIGURATION.md for \c group.id, \c session.timeout.ms, 24 * \c partition.assignment.strategy, etc. 25 */ 26 this(GlobalConf conf) 27 { 28 char[512] errbuf = void; 29 rd_kafka_conf_t* rk_conf = null; 30 size_t grlen; 31 32 if (!conf.rk_conf_) 33 { 34 throw new Exception("Requires Conf::CONF_GLOBAL object"); 35 } 36 37 if (rd_kafka_conf_get(conf.rk_conf_, "group.id", null, 38 &grlen) != rd_kafka_conf_res_t.RD_KAFKA_CONF_OK || grlen <= 1 /* terminating null only */ ) 39 { 40 throw new Exception("\"group.id\" must be configured"); 41 } 42 43 this.setCommonConfig(conf); 44 45 rk_conf = rd_kafka_conf_dup(conf.rk_conf_); 46 47 rd_kafka_t* rk; 48 if (null is(rk = rd_kafka_new(rd_kafka_type_t.RD_KAFKA_CONSUMER, rk_conf, 49 errbuf.ptr, errbuf.sizeof))) 50 { 51 throw new Exception(errbuf.ptr.fromStringz.idup); 52 } 53 54 this.rk_ = rk; 55 56 /* Redirect handle queue to cgrp's queue to provide a single queue point */ 57 rd_kafka_poll_set_consumer(rk); 58 } 59 60 nothrow: 61 62 /** Returns the current partition assignment as set by 63 * assign() */ 64 ErrorCode assignment(ref TopicPartition[] partitions) 65 { 66 rd_kafka_topic_partition_list_t* c_parts; 67 rd_kafka_resp_err_t err; 68 69 err = rd_kafka_assignment(rk_, &c_parts); 70 if (err) 71 return cast(ErrorCode) err; 72 73 partitions.length = c_parts.cnt; 74 75 foreach (i, ref p; partitions) 76 p = TopicPartition(&c_parts.elems[i]); 77 78 rd_kafka_topic_partition_list_destroy(c_parts); 79 80 return ErrorCode.no_error; 81 82 } 83 84 @nogc: 85 86 /** 87 * Update the subscription set to \p topics. 88 * 89 * Any previous subscription will be unassigned and unsubscribed first. 90 * 91 * The subscription set denotes the desired topics to consume and this 92 * set is provided to the partition assignor (one of the elected group 93 * members) for all clients which then uses the configured 94 * \c partition.assignment.strategy to assign the subscription sets's 95 * topics's partitions to the consumers, depending on their subscription. 96 * 97 * The result of such an assignment is a rebalancing which is either 98 * handled automatically in librdkafka or can be overriden by the application 99 * by providing a RebalanceCb. 100 * 101 * The rebalancing passes the assigned partition set to 102 * assign() to update what partitions are actually 103 * being fetched by the KafkaConsumer. 104 * 105 * Regex pattern matching automatically performed for topics prefixed 106 * with \c \"^\" (e.g. \c \"^myPfx[0-9]_.*\" 107 */ 108 ErrorCode subscribe(const(char)*[] topics...) 109 { 110 rd_kafka_topic_partition_list_t* c_topics; 111 rd_kafka_resp_err_t err; 112 113 c_topics = rd_kafka_topic_partition_list_new(cast(int) topics.length); 114 115 foreach (t; topics) 116 rd_kafka_topic_partition_list_add(c_topics, t, RD_KAFKA_PARTITION_UA); 117 118 err = rd_kafka_subscribe(rk_, c_topics); 119 120 rd_kafka_topic_partition_list_destroy(c_topics); 121 122 return cast(ErrorCode) err; 123 } 124 /** Unsubscribe from the current subscription set. */ 125 auto unsubscribe() 126 { 127 ErrorCode ret; 128 ret = cast(ErrorCode)(rd_kafka_unsubscribe(this.rk_)); 129 return ret; 130 } 131 132 /** 133 * Consume message or get error event, triggers callbacks. 134 * 135 * Will automatically call registered callbacks for any such queued events, 136 * including RebalanceCb, EventCb, OffsetCommitCb, 137 * etc. 138 * 139 * Note: An application should make sure to call consume() at regular 140 * intervals, even if no messages are expected, to serve any 141 * queued callbacks waiting to be called. This is especially 142 * important when a RebalanceCb has been registered as it needs 143 * to be called and handled properly to synchronize internal 144 * consumer state. 145 * 146 * Note: Application MUST NOT call \p poll() on KafkaConsumer objects. 147 * 148 * One of: 149 * - proper message (Message::err() is ErrorCode.no_error) 150 * - error event (Message::err() is != ErrorCode.no_error) 151 * - timeout due to no message or event in \p timeout_ms 152 * (Message::err() is timed_out) 153 */ 154 /++ 155 Params: 156 msg = message to fill. Use `msg.err` to check errors. 157 timeout_ms = time to to wait if no incomming msgs in queue. 158 +/ 159 auto consume(ref Message msg, int timeout_ms = 10) 160 { 161 rd_kafka_message_t* rkmessage; 162 163 rkmessage = rd_kafka_consumer_poll(this.rk_, timeout_ms); 164 165 if (!rkmessage) 166 { 167 msg = Message(null, ErrorCode.timed_out); 168 return; 169 } 170 171 msg = Message(rkmessage); 172 } 173 174 /** 175 * Update the assignment set to \p partitions. 176 * 177 * The assignment set is the set of partitions actually being consumed 178 * by the KafkaConsumer. 179 */ 180 ErrorCode assign(const TopicPartition[] partitions) 181 { 182 rd_kafka_topic_partition_list_t* c_parts; 183 rd_kafka_resp_err_t err; 184 185 c_parts = partitions_to_c_parts(partitions); 186 187 err = rd_kafka_assign(rk_, c_parts); 188 189 rd_kafka_topic_partition_list_destroy(c_parts); 190 return cast(ErrorCode) err; 191 } 192 193 /** 194 * Stop consumption and remove the current assignment. 195 */ 196 ErrorCode unassign() 197 { 198 typeof(return) ret; 199 ret = cast(ErrorCode) rd_kafka_assign(rk_, null); 200 return ret; 201 } 202 203 /** 204 * Commit offsets for the current assignment. 205 * 206 * Note: This is the synchronous variant that blocks until offsets 207 * are committed or the commit fails (see return value). 208 * 209 * Note: If a OffsetCommitCb callback is registered it will 210 * be called with commit details on a future call to 211 * consume() 212 * 213 * ErrorCode.no_error or error code. 214 */ 215 ErrorCode commitSync() 216 { 217 typeof(return) ret; 218 ret = cast(ErrorCode) rd_kafka_commit(rk_, null, 0 /*sync*/ ); 219 return ret; 220 } 221 222 /** 223 * Asynchronous version of CommitSync() 224 * 225 * See_also: KafkaConsummer::commitSync() 226 */ 227 ErrorCode commitAsync() 228 { 229 typeof(return) ret; 230 ret = cast(ErrorCode) rd_kafka_commit(rk_, null, 1 /*async*/ ); 231 return ret; 232 } 233 234 /** 235 * Commit offset for a single topic+partition based on \p message 236 * 237 * Note: This is the synchronous variant. 238 * 239 * See_also: KafkaConsummer::commitSync() 240 */ 241 ErrorCode commitSync(ref Message message) 242 { 243 typeof(return) ret; 244 ret = cast(ErrorCode) rd_kafka_commit_message(rk_, message.rkmessage_, 0 /*sync*/ ); 245 return ret; 246 } 247 248 /** 249 * Commit offset for a single topic+partition based on \p message 250 * 251 * Note: This is the asynchronous variant. 252 * 253 * See_also: KafkaConsummer::commitSync() 254 */ 255 ErrorCode commitAsync(ref Message message) 256 { 257 typeof(return) ret; 258 ret = cast(ErrorCode) rd_kafka_commit_message(rk_, message.rkmessage_, 1 /*async*/ ); 259 return ret; 260 } 261 262 /** 263 * Commit offsets for the provided list of partitions. 264 * 265 * Note: This is the synchronous variant. 266 */ 267 ErrorCode commitSync(TopicPartition[] offsets) 268 { 269 rd_kafka_topic_partition_list_t* c_parts = partitions_to_c_parts(offsets); 270 rd_kafka_resp_err_t err; 271 err = rd_kafka_commit(rk_, c_parts, 0); 272 if (!err) 273 update_partitions_from_c_parts(offsets, c_parts); 274 rd_kafka_topic_partition_list_destroy(c_parts); 275 return cast(ErrorCode) err; 276 } 277 278 /** 279 * Commit offset for the provided list of partitions. 280 * 281 * Note: This is the asynchronous variant. 282 */ 283 ErrorCode commitAsync(const TopicPartition[] offsets) 284 { 285 rd_kafka_topic_partition_list_t* c_parts = partitions_to_c_parts(offsets); 286 rd_kafka_resp_err_t err; 287 err = rd_kafka_commit(rk_, c_parts, 1); 288 rd_kafka_topic_partition_list_destroy(c_parts); 289 return cast(ErrorCode) err; 290 } 291 292 /** 293 * Retrieve committed offsets for topics+partitions. 294 * 295 * RD_KAFKA_RESP_ErrorCode.no_error on success in which case the 296 * \p offset or \p err field of each \p partitions' element is filled 297 * in with the stored offset, or a partition specific error. 298 * Else returns an error code. 299 */ 300 ErrorCode committed(TopicPartition[] partitions, int timeout_ms) 301 { 302 rd_kafka_topic_partition_list_t* c_parts; 303 rd_kafka_resp_err_t err; 304 305 c_parts = partitions_to_c_parts(partitions); 306 307 err = rd_kafka_committed(rk_, c_parts, timeout_ms); 308 309 if (!err) 310 { 311 update_partitions_from_c_parts(partitions, c_parts); 312 } 313 314 rd_kafka_topic_partition_list_destroy(c_parts); 315 316 return cast(ErrorCode) err; 317 } 318 319 /** 320 * Retrieve current positions (offsets) for topics+partitions. 321 * 322 * RD_KAFKA_RESP_ErrorCode.no_error on success in which case the 323 * \p offset or \p err field of each \p partitions' element is filled 324 * in with the stored offset, or a partition specific error. 325 * Else returns an error code. 326 */ 327 ErrorCode position(TopicPartition[] partitions) 328 { 329 rd_kafka_topic_partition_list_t* c_parts; 330 rd_kafka_resp_err_t err; 331 332 c_parts = partitions_to_c_parts(partitions); 333 334 err = rd_kafka_position(rk_, c_parts); 335 336 if (!err) 337 { 338 update_partitions_from_c_parts(partitions, c_parts); 339 } 340 341 rd_kafka_topic_partition_list_destroy(c_parts); 342 343 return cast(ErrorCode) err; 344 345 } 346 347 /** 348 * Close and shut down the proper. 349 * 350 * This call will block until the following operations are finished: 351 * - Trigger a local rebalance to void the current assignment 352 * - Stop consumption for current assignment 353 * - Commit offsets 354 * - Leave group 355 * 356 * The maximum blocking time is roughly limited to session.timeout.ms. 357 * 358 * Note: Callbacks, such as RebalanceCb and 359 * OffsetCommitCb, etc, may be called. 360 * 361 * Note: The consumer object must later be freed with \c delete 362 */ 363 ErrorCode close() 364 { 365 rd_kafka_resp_err_t err; 366 err = rd_kafka_consumer_close(rk_); 367 if (err) 368 return cast(ErrorCode) err; 369 370 while (rd_kafka_outq_len(rk_) > 0) 371 rd_kafka_poll(rk_, 10); 372 rd_kafka_destroy(rk_); 373 374 return cast(ErrorCode) err; 375 } 376 }