1 ///
2 module rdkafkad.topic;
3 import rdkafkad;
4
5 /**
6 * Topic+Partition
7 *
8 * This is a generic type to hold a single partition and various
9 * information about it.
10 *
11 * Is typically used with std::vector<TopicPartition*> to provide
12 * a list of partitions for different operations.
13 */
14 struct TopicPartition
15 {
16 package const(char)* topic_;
17 package int partition_;
18 package long offset_;
19 package ErrorCode err_;
20
21 void toString(in void delegate(const(char)[]) sink) const
22 {
23 sink(topic);
24 import std.format;
25 sink.formattedWrite("[%s]", partition_);
26 }
27
28 /**
29 * Create topic+partition object for \p topic and \p partition.
30 *
31 * Use \c delete to deconstruct.
32 */
33
34 nothrow:
35
36 /// ditto
37 this(const(char)[] topic, int partition)
38 {
39 this(topic.toStringz, partition);
40 }
41
42 @nogc:
43
44 this(const(char)* topic, int partition)
45 {
46 topic_ = topic;
47 partition_ = partition;
48 offset_ = Offset.invalid;
49 err_ = ErrorCode.no_error;
50 }
51
52
53 package this(const rd_kafka_topic_partition_t* c_part)
54 {
55 topic_ = c_part.topic;
56 partition_ = c_part.partition;
57 offset_ = c_part.offset;
58 err_ = cast(ErrorCode) c_part.err;
59 // FIXME: metadata
60 }
61
62 /** partition id */
63 int partition() @property
64 {
65 return partition_;
66 }
67 /** topic name */
68 const(char)[] topic() const @property
69 {
70 return topic_.fromStringz;
71 }
72
73 /** offset (if applicable) */
74 long offset() @property
75 {
76 return offset_;
77 }
78
79 /** Set offset */
80 void offset(long offset) @property
81 {
82 offset_ = offset;
83 }
84
85 /** error code (if applicable) */
86 ErrorCode err() @property
87 {
88 return err_;
89 }
90 }
91
92 /** Special offsets */
93 enum Offset
94 {
95 beginning = -2, /**< Consume from beginning */
96 end = -1, /**< Consume from end */
97 stored = -1000, /**< Use offset storage */
98 invalid = -1001, /**< Invalid offset */
99 }
100
101 /**
102 * Topic handle
103 *
104 */
105 class Topic
106 {
107 /**
108 * Creates a new topic handle for topic named \p topic_str
109 *
110 * \p conf is an optional configuration for the topic that will be used
111 * instead of the default topic configuration.
112 * The \p conf object is reusable after this call.
113 *
114 * the new topic handle or null on error (see \p errstr).
115 */
116 this(Handle base, const(char)[] topic_str, TopicConf conf)
117 {
118 rd_kafka_topic_conf_t* rkt_conf;
119
120 if (!conf)
121 rkt_conf = rd_kafka_topic_conf_new();
122 else /* Make a copy of conf struct to allow Conf reuse. */
123 rkt_conf = rd_kafka_topic_conf_dup(conf.rkt_conf_);
124
125 /* Set topic opaque to the topic so that we can reach our topic object
126 * from whatever callbacks get registered.
127 * The application itself will not need these opaques since their
128 * callbacks are class based. */
129 rd_kafka_topic_conf_set_opaque(rkt_conf, cast(void*) this);
130
131 if (conf)
132 {
133 if (conf.partitioner_cb_)
134 {
135 rd_kafka_topic_conf_set_partitioner_cb(rkt_conf, &partitioner_cb_trampoline);
136 this.partitioner_cb_ = conf.partitioner_cb_;
137 }
138 else if (conf.partitioner_kp_cb_)
139 {
140 rd_kafka_topic_conf_set_partitioner_cb(rkt_conf, &partitioner_kp_cb_trampoline);
141 this.partitioner_kp_cb_ = conf.partitioner_kp_cb_;
142 }
143 }
144 auto str0 = topic_str.toStringz();
145 rkt_ = rd_kafka_topic_new(base.rk_, str0, rkt_conf);
146 if (!rkt_)
147 {
148 auto msg = err2str(cast(ErrorCode)rd_kafka_errno2err(errno));
149 rd_kafka_topic_conf_destroy(rkt_conf);
150 throw new Exception(msg);
151 }
152 }
153
154 nothrow @nogc:
155
156 ~this()
157 {
158 if (rkt_)
159 {
160 rd_kafka_topic_destroy(rkt_);
161 }
162 }
163
164 package rd_kafka_topic_t* rkt_;
165 package PartitionerCb partitioner_cb_;
166 package PartitionerKeyPointerCb partitioner_kp_cb_;
167
168 /**
169 * Unassigned partition.
170 *
171 * The unassigned partition is used by the producer API for messages
172 * that should be partitioned using the configured or default partitioner.
173 */
174 enum int unassignedPartition = -1;
175
176 package static nothrow @nogc int partitioner_cb_trampoline(
177 const rd_kafka_topic_t* rkt, const(void)* keydata, size_t keylen,
178 int partition_cnt, void* rkt_opaque, void* msg_opaque)
179 {
180 auto topic = cast(Topic) rkt_opaque;
181 auto key = (cast(const(char)*) keydata)[0 .. keylen];
182 return topic.partitioner_cb_(topic, key, partition_cnt, msg_opaque);
183 }
184
185 package static nothrow @nogc int partitioner_kp_cb_trampoline(
186 const rd_kafka_topic_t* rkt, const(void)* keydata, size_t keylen,
187 int partition_cnt, void* rkt_opaque, void* msg_opaque)
188 {
189 auto topic = cast(Topic) rkt_opaque;
190 return topic.partitioner_kp_cb_(topic, keydata, keylen, partition_cnt, msg_opaque);
191 }
192
193 /** the topic name */
194 final const(char)[] name() const
195 {
196 return rd_kafka_topic_name(rkt_).fromStringz;
197 }
198
199 /**
200 * true if \p partition is available for the topic (has leader).
201 * Warning: \b MUST \b ONLY be called from within a
202 * PartitionerCb callback.
203 */
204 final bool partitionAvailable(int partition) const
205 {
206 typeof(return) ret;
207 ret = cast(bool) rd_kafka_topic_partition_available(rkt_, partition);
208 return ret;
209 }
210
211 /**
212 * Store offset \p offset for topic partition \p partition.
213 * The offset will be committed (written) to the offset store according
214 * to \p auto.commit.interval.ms.
215 *
216 * Note: This API should only be used with the simple Consumer,
217 * not the high-level KafkaConsumer.
218 * Note: \c auto.commit.enable must be set to \c false when using this API.
219 *
220 * ErrorCodde.no_error on success or an error code on error.
221 */
222 final ErrorCode offsetStore(int partition, long offset)
223 {
224 typeof(return) ret;
225 ret = cast(ErrorCode) rd_kafka_offset_store(rkt_, partition, offset);
226 return ret;
227 }
228 }