1 /// 2 module rdkafkad.topic; 3 import rdkafkad; 4 5 /** 6 * Topic+Partition 7 * 8 * This is a generic type to hold a single partition and various 9 * information about it. 10 * 11 * Is typically used with std::vector<TopicPartition*> to provide 12 * a list of partitions for different operations. 13 */ 14 struct TopicPartition 15 { 16 package const(char)* topic_; 17 package int partition_; 18 package long offset_; 19 package ErrorCode err_; 20 21 void toString(in void delegate(const(char)[]) sink) const 22 { 23 sink(topic); 24 import std.format; 25 sink.formattedWrite("[%s]", partition_); 26 } 27 28 nothrow @nogc: 29 30 /** 31 * Create topic+partition object for \p topic and \p partition. 32 * 33 * Use \c delete to deconstruct. 34 */ 35 this(const(char)* topic, int partition) 36 { 37 topic_ = topic; 38 partition_ = partition; 39 offset_ = Offset.invalid; 40 err_ = ErrorCode.no_error; 41 } 42 43 package this(const rd_kafka_topic_partition_t* c_part) 44 { 45 topic_ = c_part.topic; 46 partition_ = c_part.partition; 47 offset_ = c_part.offset; 48 err_ = cast(ErrorCode) c_part.err; 49 // FIXME: metadata 50 } 51 52 /** partition id */ 53 int partition() @property 54 { 55 return partition_; 56 } 57 /** topic name */ 58 const(char)[] topic() const @property 59 { 60 return topic_.fromStringz; 61 } 62 63 /** offset (if applicable) */ 64 long offset() @property 65 { 66 return offset_; 67 } 68 69 /** Set offset */ 70 void offset(long offset) @property 71 { 72 offset_ = offset; 73 } 74 75 /** error code (if applicable) */ 76 ErrorCode err() @property 77 { 78 return err_; 79 } 80 } 81 82 /** Special offsets */ 83 enum Offset 84 { 85 beginning = -2, /**< Consume from beginning */ 86 end = -1, /**< Consume from end */ 87 stored = -1000, /**< Use offset storage */ 88 invalid = -1001, /**< Invalid offset */ 89 } 90 91 /** 92 * Topic handle 93 * 94 */ 95 class Topic 96 { 97 /** 98 * Creates a new topic handle for topic named \p topic_str 99 * 100 * \p conf is an optional configuration for the topic that will be used 101 * instead of the default topic configuration. 102 * The \p conf object is reusable after this call. 103 * 104 * the new topic handle or null on error (see \p errstr). 105 */ 106 this(Handle base, const(char)[] topic_str, TopicConf conf) 107 { 108 rd_kafka_topic_conf_t* rkt_conf; 109 110 if (!conf) 111 rkt_conf = rd_kafka_topic_conf_new(); 112 else /* Make a copy of conf struct to allow Conf reuse. */ 113 rkt_conf = rd_kafka_topic_conf_dup(conf.rkt_conf_); 114 115 /* Set topic opaque to the topic so that we can reach our topic object 116 * from whatever callbacks get registered. 117 * The application itself will not need these opaques since their 118 * callbacks are class based. */ 119 rd_kafka_topic_conf_set_opaque(rkt_conf, cast(void*) this); 120 121 if (conf) 122 { 123 if (conf.partitioner_cb_) 124 { 125 rd_kafka_topic_conf_set_partitioner_cb(rkt_conf, &partitioner_cb_trampoline); 126 this.partitioner_cb_ = conf.partitioner_cb_; 127 } 128 else if (conf.partitioner_kp_cb_) 129 { 130 rd_kafka_topic_conf_set_partitioner_cb(rkt_conf, &partitioner_kp_cb_trampoline); 131 this.partitioner_kp_cb_ = conf.partitioner_kp_cb_; 132 } 133 } 134 rkt_ = rd_kafka_topic_new(base.rk_, topic_str.toStringz(), rkt_conf); 135 if (!rkt_) 136 { 137 auto msg = err2str(cast(ErrorCode)rd_kafka_errno2err(errno)); 138 rd_kafka_topic_conf_destroy(rkt_conf); 139 throw new Exception(msg); 140 } 141 } 142 143 nothrow @nogc: 144 145 ~this() 146 { 147 if (rkt_) 148 rd_kafka_topic_destroy(rkt_); 149 } 150 151 package rd_kafka_topic_t* rkt_; 152 package PartitionerCb partitioner_cb_; 153 package PartitionerKeyPointerCb partitioner_kp_cb_; 154 155 /** 156 * Unassigned partition. 157 * 158 * The unassigned partition is used by the producer API for messages 159 * that should be partitioned using the configured or default partitioner. 160 */ 161 enum int unassignedPartition = -1; 162 163 package static nothrow @nogc int partitioner_cb_trampoline( 164 const rd_kafka_topic_t* rkt, const(void)* keydata, size_t keylen, 165 int partition_cnt, void* rkt_opaque, void* msg_opaque) 166 { 167 auto topic = cast(Topic) rkt_opaque; 168 auto key = (cast(const(char)*) keydata)[0 .. keylen]; 169 return topic.partitioner_cb_(topic, key, partition_cnt, msg_opaque); 170 } 171 172 package static nothrow @nogc int partitioner_kp_cb_trampoline( 173 const rd_kafka_topic_t* rkt, const(void)* keydata, size_t keylen, 174 int partition_cnt, void* rkt_opaque, void* msg_opaque) 175 { 176 auto topic = cast(Topic) rkt_opaque; 177 return topic.partitioner_kp_cb_(topic, keydata, keylen, partition_cnt, msg_opaque); 178 } 179 180 /** the topic name */ 181 final const(char)[] name() const 182 { 183 return rd_kafka_topic_name(rkt_).fromStringz; 184 } 185 186 /** 187 * true if \p partition is available for the topic (has leader). 188 * Warning: \b MUST \b ONLY be called from within a 189 * PartitionerCb callback. 190 */ 191 final bool partitionAvailable(int partition) const 192 { 193 return cast(bool) rd_kafka_topic_partition_available(rkt_, partition); 194 } 195 196 /** 197 * Store offset \p offset for topic partition \p partition. 198 * The offset will be committed (written) to the offset store according 199 * to \p auto.commit.interval.ms. 200 * 201 * Note: This API should only be used with the simple Consumer, 202 * not the high-level KafkaConsumer. 203 * Note: \c auto.commit.enable must be set to \c false when using this API. 204 * 205 * ErrorCodde.no_error on success or an error code on error. 206 */ 207 final ErrorCode offsetStore(int partition, long offset) 208 { 209 return cast(ErrorCode) rd_kafka_offset_store(rkt_, partition, offset); 210 } 211 }