1 /// 2 module rdkafkad.handler; 3 import rdkafkad; 4 5 /** 6 * Base handle, super class for specific clients. 7 */ 8 class Handle 9 { 10 11 /** 12 * Returns the client's broker-assigned group member id 13 * 14 * Note: This currently requires the high-level KafkaConsumer 15 * 16 * Last assigned member id, or empty string if not currently 17 * a group member. 18 */ 19 auto memberid() const 20 { 21 char* str; 22 str = rd_kafka_memberid(rk_); 23 string memberid = str.fromStringz.idup; 24 if (str) 25 rd_kafka_mem_free(cast(rd_kafka_s*) rk_, str); 26 return memberid; 27 } 28 29 /** 30 * Request Metadata from broker. 31 * 32 * Parameters: 33 * \p all_topics - if non-zero: request info about all topics in cluster, 34 * if zero: only request info about locally known topics. 35 * \p only_rkt - only request info about this topic 36 * \p metadatap - pointer to hold metadata result. 37 * The \p *metadatap pointer must be released with \c delete. 38 * \p timeout_ms - maximum response time before failing. 39 * 40 * ErrorCode.no_error on success (in which case \p *metadatap 41 * will be set), else timed_out on timeout or 42 * other error code on error. 43 */ 44 Metadata metadata(bool all_topics, const Topic only_rkt, int timeout_ms = 60_000) 45 { 46 const rd_kafka_metadata_t* cmetadatap = null; 47 48 rd_kafka_topic_t* topic = only_rkt ? cast(rd_kafka_topic_t*) only_rkt.rkt_ : null; 49 50 ErrorCode error; 51 error = cast(ErrorCode)rd_kafka_metadata(rk_, all_topics, topic, &cmetadatap, timeout_ms); 52 if (error) 53 { 54 throw new Exception(error.err2str); 55 } 56 return new Metadata(cmetadatap); 57 } 58 59 @nogc nothrow: 60 61 package void setCommonConfig(GlobalConf conf) 62 { 63 rd_kafka_conf_set_opaque(conf.rk_conf_, cast(void*) this); 64 65 if (conf.event_cb_) 66 { 67 rd_kafka_conf_set_log_cb(conf.rk_conf_, &log_cb_trampoline); 68 rd_kafka_conf_set_error_cb(conf.rk_conf_, &error_cb_trampoline); 69 rd_kafka_conf_set_throttle_cb(conf.rk_conf_, &throttle_cb_trampoline); 70 rd_kafka_conf_set_stats_cb(conf.rk_conf_, &stats_cb_trampoline); 71 event_cb_ = conf.event_cb_; 72 } 73 74 if (conf.socket_cb_) 75 { 76 rd_kafka_conf_set_socket_cb(conf.rk_conf_, &socket_cb_trampoline); 77 socket_cb_ = conf.socket_cb_; 78 } 79 80 version (Windows) 81 { 82 } 83 else if (conf.open_cb_) 84 { 85 rd_kafka_conf_set_open_cb(conf.rk_conf_, &open_cb_trampoline); 86 open_cb_ = conf.open_cb_; 87 } 88 89 if (conf.rebalance_cb_) 90 { 91 rd_kafka_conf_set_rebalance_cb(conf.rk_conf_, &rebalance_cb_trampoline); 92 rebalance_cb_ = conf.rebalance_cb_; 93 } 94 95 if (conf.offset_commit_cb_) 96 { 97 rd_kafka_conf_set_offset_commit_cb(conf.rk_conf_, &offset_commit_cb_trampoline); 98 offset_commit_cb_ = conf.offset_commit_cb_; 99 } 100 } 101 102 /** 103 * Query broker for low (oldest/beginning) 104 * and high (newest/end) offsets for partition. 105 * 106 * Offsets are returned in \p *low and \p *high respectively. 107 * 108 * ErrorCode.no_error on success or an error code on failure. 109 */ 110 auto queryWatermarkOffsets(const(char)* topic, int partition, 111 ref long low, ref long high, int timeout_ms) 112 { 113 ErrorCode ret; 114 ret = cast(ErrorCode) rd_kafka_query_watermark_offsets(rk_, topic, 115 partition, &low, &high, timeout_ms); 116 return ret; 117 } 118 119 package(rdkafkad) rd_kafka_t* rk_; 120 /* All Producer and Consumer callbacks must reside in HandleImpl and 121 * the opaque provided to rdkafka must be a pointer to HandleImpl, since 122 * ProducerImpl and ConsumerImpl classes cannot be safely directly cast to 123 * HandleImpl due to the skewed diamond inheritance. */ 124 package(rdkafkad) EventCb event_cb_; 125 package(rdkafkad) SocketCb socket_cb_; 126 package(rdkafkad) OpenCb open_cb_; 127 package(rdkafkad) DeliveryReportCb dr_cb_; 128 package(rdkafkad) PartitionerCb partitioner_cb_; 129 package(rdkafkad) PartitionerKeyPointerCb partitioner_kp_cb_; 130 package(rdkafkad) RebalanceCb rebalance_cb_; 131 package(rdkafkad) OffsetCommitCb offset_commit_cb_; 132 133 /** the name of the handle */ 134 const(char)[] name() const 135 { 136 return rd_kafka_name(rk_).fromStringz; 137 } 138 139 /** 140 * Polls the provided kafka handle for events. 141 * 142 * Events will trigger application provided callbacks to be called. 143 * 144 * The \p timeout_ms argument specifies the maximum amount of time 145 * (in milliseconds) that the call will block waiting for events. 146 * For non-blocking calls, provide 0 as \p timeout_ms. 147 * To wait indefinately for events, provide -1. 148 * 149 * Events: 150 * - delivery report callbacks (if an DeliveryCb is configured) [producer] 151 * - event callbacks (if an EventCb is configured) [producer & consumer] 152 * 153 * Note: An application should make sure to call poll() at regular 154 * intervals to serve any queued callbacks waiting to be called. 155 * 156 * Warning: This method MUST NOT be used with the KafkaConsumer, 157 * use its consume() instead. 158 * 159 * the number of events served. 160 */ 161 int poll(int timeout_ms = 10) 162 { 163 typeof(return) ret; 164 ret = rd_kafka_poll(rk_, timeout_ms); 165 return ret; 166 } 167 168 /** 169 * Returns the current out queue length 170 * 171 * The out queue contains messages and requests waiting to be sent to, 172 * or acknowledged by, the broker. 173 */ 174 int outqLen() 175 { 176 typeof(return) ret; 177 ret = rd_kafka_outq_len(rk_); 178 return ret; 179 } 180 181 /** 182 * Convert a list of C partitions to C++ partitions 183 */ 184 nothrow @nogc package static TopicPartition[] c_parts_to_partitions( 185 const rd_kafka_topic_partition_list_t* c_parts) 186 { 187 auto partitions = (cast(TopicPartition*) malloc(c_parts.cnt * TopicPartition.sizeof))[0 188 .. c_parts.cnt]; 189 foreach (i, ref p; partitions) 190 partitions[i] = TopicPartition(&c_parts.elems[i]); 191 return partitions; 192 } 193 194 nothrow @nogc static void free_partition_vector(ref TopicPartition[] v) 195 { 196 foreach (ref p; v) 197 p.destroy; 198 free(v.ptr); 199 v = null; 200 } 201 202 nothrow @nogc package static rd_kafka_topic_partition_list_t* partitions_to_c_parts(const TopicPartition[] partitions) 203 { 204 rd_kafka_topic_partition_list_t* c_parts = rd_kafka_topic_partition_list_new(cast(int) partitions.length); 205 206 foreach (ref tpi; partitions) 207 { 208 rd_kafka_topic_partition_t* rktpar = rd_kafka_topic_partition_list_add(c_parts, tpi.topic_, tpi.partition_); 209 rktpar.offset = tpi.offset_; 210 } 211 212 return c_parts; 213 } 214 215 /** 216 * @brief Update the application provided 'partitions' with info from 'c_parts' 217 */ 218 nothrow @nogc package static void update_partitions_from_c_parts(TopicPartition[] partitions, 219 const rd_kafka_topic_partition_list_t* c_parts) 220 { 221 foreach (i; 0 .. c_parts.cnt) 222 { 223 const rd_kafka_topic_partition_t* p = &c_parts.elems[i]; 224 225 /* Find corresponding C++ entry */ 226 foreach (pp; partitions) 227 { 228 if (!strcmp(p.topic, pp.topic_) && p.partition == pp.partition_) 229 { 230 pp.offset_ = p.offset; 231 pp.err_ = cast(ErrorCode) p.err; 232 } 233 } 234 } 235 } 236 237 nothrow @nogc package static void log_cb_trampoline(const rd_kafka_t* rk, int level, 238 const(char)* fac, const(char)* buf) 239 { 240 if (!rk) 241 { 242 rd_kafka_log_print(rk, level, fac, buf); 243 return; 244 } 245 246 void* opaque = rd_kafka_opaque(rk); 247 Handle handle = cast(Handle)(opaque); 248 249 if (!handle.event_cb_) 250 { 251 rd_kafka_log_print(rk, level, fac, buf); 252 return; 253 } 254 255 auto event = Event(Event.Type.log, ErrorCode.no_error, 256 cast(Event.Severity)(level), fac, buf); 257 258 handle.event_cb_(event); 259 } 260 261 nothrow @nogc package static void error_cb_trampoline(rd_kafka_t* rk, int err, 262 const(char)* reason, void* opaque) 263 { 264 Handle handle = cast(Handle)(opaque); 265 266 auto event = Event(Event.Type.error, cast(ErrorCode) err, 267 Event.Severity.error, null, reason); 268 269 handle.event_cb_(event); 270 } 271 272 nothrow @nogc package static void throttle_cb_trampoline(rd_kafka_t* rk, 273 const(char)* broker_name, int broker_id, int throttle_time_ms, void* opaque) 274 { 275 Handle handle = cast(Handle)(opaque); 276 277 auto event = Event(Event.Type.throttle); 278 event.str_ = broker_name.fromStringz; 279 event.id_ = broker_id; 280 event.throttle_time_ = throttle_time_ms; 281 282 handle.event_cb_(event); 283 } 284 285 nothrow @nogc package static int stats_cb_trampoline(rd_kafka_t* rk, char* json, 286 size_t json_len, void* opaque) 287 { 288 Handle handle = cast(Handle)(opaque); 289 auto event = Event(Event.Type.stats, ErrorCode.no_error, Event.Severity.info, 290 null, json); 291 292 handle.event_cb_(event); 293 294 return 0; 295 } 296 297 nothrow @nogc package static int socket_cb_trampoline(int domain, int type, int protocol, void* opaque) 298 { 299 Handle handle = cast(Handle)(opaque); 300 301 return handle.socket_cb_(domain, type, protocol); 302 } 303 304 nothrow @nogc package static int open_cb_trampoline(const(char)* pathname, int flags, 305 mode_t mode, void* opaque) 306 { 307 Handle handle = cast(Handle)(opaque); 308 309 return handle.open_cb_(pathname.fromStringz, flags, cast(int)(mode)); 310 } 311 312 nothrow @nogc package static void rebalance_cb_trampoline(rd_kafka_t* rk, 313 rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t* c_partitions, void* opaque) 314 { 315 auto handle = cast(KafkaConsumer)(opaque); 316 TopicPartition[] partitions = c_parts_to_partitions(c_partitions); 317 318 handle.rebalance_cb_(handle, cast(ErrorCode) err, partitions); 319 320 free_partition_vector(partitions); 321 } 322 323 nothrow @nogc package static void offset_commit_cb_trampoline(rd_kafka_t* rk, 324 rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t* c_offsets, void* opaque) 325 { 326 Handle handle = cast(Handle)(opaque); 327 TopicPartition[] offsets; 328 329 if (c_offsets) 330 offsets = c_parts_to_partitions(c_offsets); 331 332 handle.offset_commit_cb_(cast(ErrorCode) err, offsets); 333 334 free_partition_vector(offsets); 335 } 336 337 /** 338 * Pause producing or consumption for the provided list of partitions. 339 * 340 * Success or error is returned per-partition in the \p partitions list. 341 * 342 * ErrorCode::no_error 343 * 344 * See_also: resume() 345 */ 346 ErrorCode pause(TopicPartition[] partitions) 347 { 348 rd_kafka_topic_partition_list_t* c_parts; 349 rd_kafka_resp_err_t err; 350 351 c_parts = partitions_to_c_parts(partitions); 352 353 err = rd_kafka_pause_partitions(rk_, c_parts); 354 355 if (!err) 356 update_partitions_from_c_parts(partitions, c_parts); 357 358 rd_kafka_topic_partition_list_destroy(c_parts); 359 360 return cast(ErrorCode) err; 361 362 } 363 364 /** 365 * Resume producing or consumption for the provided list of partitions. 366 * 367 * Success or error is returned per-partition in the \p partitions list. 368 * 369 * ErrorCode::no_error 370 * 371 * See_also: pause() 372 */ 373 ErrorCode resume(TopicPartition[] partitions) 374 { 375 rd_kafka_topic_partition_list_t* c_parts; 376 rd_kafka_resp_err_t err; 377 378 c_parts = partitions_to_c_parts(partitions); 379 380 err = rd_kafka_resume_partitions(rk_, c_parts); 381 382 if (!err) 383 update_partitions_from_c_parts(partitions, c_parts); 384 385 rd_kafka_topic_partition_list_destroy(c_parts); 386 387 return cast(ErrorCode) err; 388 } 389 390 /** 391 * Get last known low (oldest/beginning) 392 * and high (newest/end) offsets for partition. 393 * 394 * The low offset is updated periodically (if statistics.interval.ms is set) 395 * while the high offset is updated on each fetched message set from the 396 * broker. 397 * 398 * If there is no cached offset (either low or high, or both) then 399 * OFFSET_INVALID will be returned for the respective offset. 400 * 401 * Offsets are returned in \p *low and \p *high respectively. 402 * 403 * ErrorCode.no_error on success or an error code on failure. 404 * 405 * Note: Shall only be used with an active consumer instance. 406 */ 407 ErrorCode getWatermarkOffsets(const(char)* topic, int partition, ref long low, 408 ref long high) 409 { 410 return cast(ErrorCode) rd_kafka_get_watermark_offsets(rk_, topic, 411 partition, &low, &high); 412 } 413 } 414 415 /** 416 * Queue interface 417 * 418 * Create a new message queue. Message queues allows the application 419 * to re-route consumed messages from multiple topic+partitions into 420 * one single queue point. This queue point, containing messages from 421 * a number of topic+partitions, may then be served by a single 422 * consume() method, rather than one per topic+partition combination. 423 * 424 * See the Consumer::start(), Consumer::consume(), and 425 * Consumer::consumeCallback() methods that take a queue as the first 426 * parameter for more information. 427 */ 428 class Queue 429 { 430 nothrow @nogc: 431 /** 432 * Create Queue object 433 */ 434 this(Handle handle) 435 { 436 queue_ = rd_kafka_queue_new(handle.rk_); 437 } 438 439 ~this() 440 { 441 rd_kafka_queue_destroy(queue_); 442 } 443 444 package: 445 rd_kafka_queue_t* queue_; 446 }