1 /// 2 module rdkafkad.topic; 3 import rdkafkad; 4 5 /** 6 * Topic+Partition 7 * 8 * This is a generic type to hold a single partition and various 9 * information about it. 10 * 11 * Is typically used with std::vector<TopicPartition*> to provide 12 * a list of partitions for different operations. 13 */ 14 struct TopicPartition 15 { 16 package const(char)* topic_; 17 package int partition_; 18 package long offset_; 19 package ErrorCode err_; 20 21 void toString(in void delegate(const(char)[]) sink) const 22 { 23 sink(topic); 24 import std.format; 25 sink.formattedWrite("[%s]", partition_); 26 } 27 28 /** 29 * Create topic+partition object for \p topic and \p partition. 30 * 31 * Use \c delete to deconstruct. 32 */ 33 34 nothrow: 35 36 /// ditto 37 this(const(char)[] topic, int partition) 38 { 39 this(topic.toStringz, partition); 40 } 41 42 @nogc: 43 44 this(const(char)* topic, int partition) 45 { 46 topic_ = topic; 47 partition_ = partition; 48 offset_ = Offset.invalid; 49 err_ = ErrorCode.no_error; 50 } 51 52 53 package this(const rd_kafka_topic_partition_t* c_part) 54 { 55 topic_ = c_part.topic; 56 partition_ = c_part.partition; 57 offset_ = c_part.offset; 58 err_ = cast(ErrorCode) c_part.err; 59 // FIXME: metadata 60 } 61 62 /** partition id */ 63 int partition() @property 64 { 65 return partition_; 66 } 67 /** topic name */ 68 const(char)[] topic() const @property 69 { 70 return topic_.fromStringz; 71 } 72 73 /** offset (if applicable) */ 74 long offset() @property 75 { 76 return offset_; 77 } 78 79 /** Set offset */ 80 void offset(long offset) @property 81 { 82 offset_ = offset; 83 } 84 85 /** error code (if applicable) */ 86 ErrorCode err() @property 87 { 88 return err_; 89 } 90 } 91 92 /** Special offsets */ 93 enum Offset 94 { 95 beginning = -2, /**< Consume from beginning */ 96 end = -1, /**< Consume from end */ 97 stored = -1000, /**< Use offset storage */ 98 invalid = -1001, /**< Invalid offset */ 99 } 100 101 /** 102 * Topic handle 103 * 104 */ 105 class Topic 106 { 107 /** 108 * Creates a new topic handle for topic named \p topic_str 109 * 110 * \p conf is an optional configuration for the topic that will be used 111 * instead of the default topic configuration. 112 * The \p conf object is reusable after this call. 113 * 114 * the new topic handle or null on error (see \p errstr). 115 */ 116 this(Handle base, const(char)[] topic_str, TopicConf conf) 117 { 118 rd_kafka_topic_conf_t* rkt_conf; 119 120 if (!conf) 121 rkt_conf = rd_kafka_topic_conf_new(); 122 else /* Make a copy of conf struct to allow Conf reuse. */ 123 rkt_conf = rd_kafka_topic_conf_dup(conf.rkt_conf_); 124 125 /* Set topic opaque to the topic so that we can reach our topic object 126 * from whatever callbacks get registered. 127 * The application itself will not need these opaques since their 128 * callbacks are class based. */ 129 rd_kafka_topic_conf_set_opaque(rkt_conf, cast(void*) this); 130 131 if (conf) 132 { 133 if (conf.partitioner_cb_) 134 { 135 rd_kafka_topic_conf_set_partitioner_cb(rkt_conf, &partitioner_cb_trampoline); 136 this.partitioner_cb_ = conf.partitioner_cb_; 137 } 138 else if (conf.partitioner_kp_cb_) 139 { 140 rd_kafka_topic_conf_set_partitioner_cb(rkt_conf, &partitioner_kp_cb_trampoline); 141 this.partitioner_kp_cb_ = conf.partitioner_kp_cb_; 142 } 143 } 144 auto str0 = topic_str.toStringz(); 145 rkt_ = rd_kafka_topic_new(base.rk_, str0, rkt_conf); 146 if (!rkt_) 147 { 148 auto msg = err2str(cast(ErrorCode)rd_kafka_errno2err(errno)); 149 rd_kafka_topic_conf_destroy(rkt_conf); 150 throw new Exception(msg); 151 } 152 } 153 154 nothrow @nogc: 155 156 ~this() 157 { 158 if (rkt_) 159 { 160 rd_kafka_topic_destroy(rkt_); 161 } 162 } 163 164 package rd_kafka_topic_t* rkt_; 165 package PartitionerCb partitioner_cb_; 166 package PartitionerKeyPointerCb partitioner_kp_cb_; 167 168 /** 169 * Unassigned partition. 170 * 171 * The unassigned partition is used by the producer API for messages 172 * that should be partitioned using the configured or default partitioner. 173 */ 174 enum int unassignedPartition = -1; 175 176 package static nothrow @nogc int partitioner_cb_trampoline( 177 const rd_kafka_topic_t* rkt, const(void)* keydata, size_t keylen, 178 int partition_cnt, void* rkt_opaque, void* msg_opaque) 179 { 180 auto topic = cast(Topic) rkt_opaque; 181 auto key = (cast(const(char)*) keydata)[0 .. keylen]; 182 return topic.partitioner_cb_(topic, key, partition_cnt, msg_opaque); 183 } 184 185 package static nothrow @nogc int partitioner_kp_cb_trampoline( 186 const rd_kafka_topic_t* rkt, const(void)* keydata, size_t keylen, 187 int partition_cnt, void* rkt_opaque, void* msg_opaque) 188 { 189 auto topic = cast(Topic) rkt_opaque; 190 return topic.partitioner_kp_cb_(topic, keydata, keylen, partition_cnt, msg_opaque); 191 } 192 193 /** the topic name */ 194 final const(char)[] name() const 195 { 196 return rd_kafka_topic_name(rkt_).fromStringz; 197 } 198 199 /** 200 * true if \p partition is available for the topic (has leader). 201 * Warning: \b MUST \b ONLY be called from within a 202 * PartitionerCb callback. 203 */ 204 final bool partitionAvailable(int partition) const 205 { 206 typeof(return) ret; 207 ret = cast(bool) rd_kafka_topic_partition_available(rkt_, partition); 208 return ret; 209 } 210 211 /** 212 * Store offset \p offset for topic partition \p partition. 213 * The offset will be committed (written) to the offset store according 214 * to \p auto.commit.interval.ms. 215 * 216 * Note: This API should only be used with the simple Consumer, 217 * not the high-level KafkaConsumer. 218 * Note: \c auto.commit.enable must be set to \c false when using this API. 219 * 220 * ErrorCodde.no_error on success or an error code on error. 221 */ 222 final ErrorCode offsetStore(int partition, long offset) 223 { 224 typeof(return) ret; 225 ret = cast(ErrorCode) rd_kafka_offset_store(rkt_, partition, offset); 226 return ret; 227 } 228 }