1 /// 2 module rdkafkad.topic; 3 import rdkafkad; 4 import rdkafkad.iodriver; 5 6 /** 7 * Topic+Partition 8 * 9 * This is a generic type to hold a single partition and various 10 * information about it. 11 * 12 * Is typically used with std::vector<TopicPartition*> to provide 13 * a list of partitions for different operations. 14 */ 15 struct TopicPartition 16 { 17 package const(char)* topic_; 18 package int partition_; 19 package long offset_; 20 package ErrorCode err_; 21 22 void toString(in void delegate(const(char)[]) sink) const 23 { 24 sink(topic); 25 import std.format; 26 sink.formattedWrite("[%s]", partition_); 27 } 28 29 /** 30 * Create topic+partition object for \p topic and \p partition. 31 * 32 * Use \c delete to deconstruct. 33 */ 34 35 nothrow: 36 37 /// ditto 38 this(const(char)[] topic, int partition) 39 { 40 this(topic.toStringz, partition); 41 } 42 43 @nogc: 44 45 this(const(char)* topic, int partition) 46 { 47 topic_ = topic; 48 partition_ = partition; 49 offset_ = Offset.invalid; 50 err_ = ErrorCode.no_error; 51 } 52 53 54 package this(const rd_kafka_topic_partition_t* c_part) 55 { 56 topic_ = c_part.topic; 57 partition_ = c_part.partition; 58 offset_ = c_part.offset; 59 err_ = cast(ErrorCode) c_part.err; 60 // FIXME: metadata 61 } 62 63 /** partition id */ 64 int partition() @property 65 { 66 return partition_; 67 } 68 /** topic name */ 69 const(char)[] topic() const @property 70 { 71 return topic_.fromStringz; 72 } 73 74 /** offset (if applicable) */ 75 long offset() @property 76 { 77 return offset_; 78 } 79 80 /** Set offset */ 81 void offset(long offset) @property 82 { 83 offset_ = offset; 84 } 85 86 /** error code (if applicable) */ 87 ErrorCode err() @property 88 { 89 return err_; 90 } 91 } 92 93 /** Special offsets */ 94 enum Offset 95 { 96 beginning = -2, /**< Consume from beginning */ 97 end = -1, /**< Consume from end */ 98 stored = -1000, /**< Use offset storage */ 99 invalid = -1001, /**< Invalid offset */ 100 } 101 102 /** 103 * Topic handle 104 * 105 */ 106 class Topic 107 { 108 /** 109 * Creates a new topic handle for topic named \p topic_str 110 * 111 * \p conf is an optional configuration for the topic that will be used 112 * instead of the default topic configuration. 113 * The \p conf object is reusable after this call. 114 * 115 * the new topic handle or null on error (see \p errstr). 116 */ 117 this(Handle base, const(char)[] topic_str, TopicConf conf) 118 { 119 rd_kafka_topic_conf_t* rkt_conf; 120 121 if (!conf) 122 rkt_conf = rd_kafka_topic_conf_new(); 123 else /* Make a copy of conf struct to allow Conf reuse. */ 124 rkt_conf = rd_kafka_topic_conf_dup(conf.rkt_conf_); 125 126 /* Set topic opaque to the topic so that we can reach our topic object 127 * from whatever callbacks get registered. 128 * The application itself will not need these opaques since their 129 * callbacks are class based. */ 130 rd_kafka_topic_conf_set_opaque(rkt_conf, cast(void*) this); 131 132 if (conf) 133 { 134 if (conf.partitioner_cb_) 135 { 136 rd_kafka_topic_conf_set_partitioner_cb(rkt_conf, &partitioner_cb_trampoline); 137 this.partitioner_cb_ = conf.partitioner_cb_; 138 } 139 else if (conf.partitioner_kp_cb_) 140 { 141 rd_kafka_topic_conf_set_partitioner_cb(rkt_conf, &partitioner_kp_cb_trampoline); 142 this.partitioner_kp_cb_ = conf.partitioner_kp_cb_; 143 } 144 } 145 auto str0 = topic_str.toStringz(); 146 mixin(IO!q{ 147 rkt_ = rd_kafka_topic_new(base.rk_, str0, rkt_conf); 148 }); 149 if (!rkt_) 150 { 151 auto msg = err2str(cast(ErrorCode)rd_kafka_errno2err(errno)); 152 rd_kafka_topic_conf_destroy(rkt_conf); 153 throw new Exception(msg); 154 } 155 } 156 157 static if(have_vibed) 158 mixin("nothrow @nogc:"); 159 160 ~this() 161 { 162 if (rkt_) 163 { 164 mixin(IO!q{ 165 rd_kafka_topic_destroy(rkt_); 166 }); 167 } 168 } 169 170 package rd_kafka_topic_t* rkt_; 171 package PartitionerCb partitioner_cb_; 172 package PartitionerKeyPointerCb partitioner_kp_cb_; 173 174 /** 175 * Unassigned partition. 176 * 177 * The unassigned partition is used by the producer API for messages 178 * that should be partitioned using the configured or default partitioner. 179 */ 180 enum int unassignedPartition = -1; 181 182 package static nothrow @nogc int partitioner_cb_trampoline( 183 const rd_kafka_topic_t* rkt, const(void)* keydata, size_t keylen, 184 int partition_cnt, void* rkt_opaque, void* msg_opaque) 185 { 186 auto topic = cast(Topic) rkt_opaque; 187 auto key = (cast(const(char)*) keydata)[0 .. keylen]; 188 return topic.partitioner_cb_(topic, key, partition_cnt, msg_opaque); 189 } 190 191 package static nothrow @nogc int partitioner_kp_cb_trampoline( 192 const rd_kafka_topic_t* rkt, const(void)* keydata, size_t keylen, 193 int partition_cnt, void* rkt_opaque, void* msg_opaque) 194 { 195 auto topic = cast(Topic) rkt_opaque; 196 return topic.partitioner_kp_cb_(topic, keydata, keylen, partition_cnt, msg_opaque); 197 } 198 199 /** the topic name */ 200 final const(char)[] name() const 201 { 202 return rd_kafka_topic_name(rkt_).fromStringz; 203 } 204 205 /** 206 * true if \p partition is available for the topic (has leader). 207 * Warning: \b MUST \b ONLY be called from within a 208 * PartitionerCb callback. 209 */ 210 final bool partitionAvailable(int partition) const 211 { 212 typeof(return) ret; 213 mixin(IO!q{ 214 ret = cast(bool) rd_kafka_topic_partition_available(rkt_, partition); 215 }); 216 return ret; 217 } 218 219 /** 220 * Store offset \p offset for topic partition \p partition. 221 * The offset will be committed (written) to the offset store according 222 * to \p auto.commit.interval.ms. 223 * 224 * Note: This API should only be used with the simple Consumer, 225 * not the high-level KafkaConsumer. 226 * Note: \c auto.commit.enable must be set to \c false when using this API. 227 * 228 * ErrorCodde.no_error on success or an error code on error. 229 */ 230 final ErrorCode offsetStore(int partition, long offset) 231 { 232 typeof(return) ret; 233 mixin(IO!q{ 234 ret = cast(ErrorCode) rd_kafka_offset_store(rkt_, partition, offset); 235 }); 236 return ret; 237 } 238 }