1 ///
2 module rdkafkad.topic;
3 import rdkafkad;
4 import rdkafkad.iodriver;
5 
6 /**
7  * Topic+Partition
8  *
9  * This is a generic type to hold a single partition and various
10  * information about it.
11  *
12  * Is typically used with std::vector<TopicPartition*> to provide
13  * a list of partitions for different operations.
14  */
15 struct TopicPartition
16 {
17     package const(char)* topic_;
18     package int partition_;
19     package long offset_;
20     package ErrorCode err_;
21 
22     void toString(in void delegate(const(char)[]) sink) const
23     {
24         sink(topic);
25         import std.format;
26         sink.formattedWrite("[%s]", partition_);
27     }
28 
29    /**
30    * Create topic+partition object for \p topic and \p partition.
31    *
32    * Use \c delete to deconstruct.
33    */
34 
35     nothrow:
36 
37     /// ditto
38     this(const(char)[] topic, int partition)
39     {
40         this(topic.toStringz, partition);
41     }
42 
43     @nogc:
44 
45     this(const(char)* topic, int partition)
46     {
47         topic_ = topic;
48         partition_ = partition;
49         offset_ = Offset.invalid;
50         err_ = ErrorCode.no_error;
51     }
52 
53 
54     package this(const rd_kafka_topic_partition_t* c_part)
55     {
56         topic_ = c_part.topic;
57         partition_ = c_part.partition;
58         offset_ = c_part.offset;
59         err_ = cast(ErrorCode) c_part.err;
60         // FIXME: metadata
61     }
62 
63     /** partition id */
64     int partition() @property
65     {
66         return partition_;
67     }
68     /** topic name */
69     const(char)[] topic() const @property
70     {
71         return topic_.fromStringz;
72     }
73 
74     /** offset (if applicable) */
75     long offset() @property
76     {
77         return offset_;
78     }
79 
80     /** Set offset */
81     void offset(long offset) @property
82     {
83         offset_ = offset;
84     }
85 
86     /** error code (if applicable) */
87     ErrorCode err() @property
88     {
89         return err_;
90     }
91 }
92 
93 /** Special offsets */
94 enum Offset
95 {
96     beginning = -2, /**< Consume from beginning */
97     end = -1, /**< Consume from end */
98     stored = -1000, /**< Use offset storage */
99     invalid = -1001, /**< Invalid offset */
100 }
101 
102 /**
103  * Topic handle
104  *
105  */
106 class Topic
107 {
108     /**
109    * Creates a new topic handle for topic named \p topic_str
110    *
111    * \p conf is an optional configuration for the topic  that will be used
112    * instead of the default topic configuration.
113    * The \p conf object is reusable after this call.
114    *
115    * the new topic handle or null on error (see \p errstr).
116    */
117     this(Handle base, const(char)[] topic_str, TopicConf conf)
118     {
119         rd_kafka_topic_conf_t* rkt_conf;
120 
121         if (!conf)
122             rkt_conf = rd_kafka_topic_conf_new();
123         else /* Make a copy of conf struct to allow Conf reuse. */
124             rkt_conf = rd_kafka_topic_conf_dup(conf.rkt_conf_);
125 
126         /* Set topic opaque to the topic so that we can reach our topic object
127      * from whatever callbacks get registered.
128      * The application itself will not need these opaques since their
129      * callbacks are class based. */
130         rd_kafka_topic_conf_set_opaque(rkt_conf, cast(void*) this);
131 
132         if (conf)
133         {
134             if (conf.partitioner_cb_)
135             {
136                 rd_kafka_topic_conf_set_partitioner_cb(rkt_conf, &partitioner_cb_trampoline);
137                 this.partitioner_cb_ = conf.partitioner_cb_;
138             }
139             else if (conf.partitioner_kp_cb_)
140             {
141                 rd_kafka_topic_conf_set_partitioner_cb(rkt_conf, &partitioner_kp_cb_trampoline);
142                 this.partitioner_kp_cb_ = conf.partitioner_kp_cb_;
143             }
144         }
145         auto str0 = topic_str.toStringz();
146         mixin(IO!q{
147         rkt_ = rd_kafka_topic_new(base.rk_, str0, rkt_conf);
148         });
149         if (!rkt_)
150         {
151             auto msg = err2str(cast(ErrorCode)rd_kafka_errno2err(errno));
152             rd_kafka_topic_conf_destroy(rkt_conf);
153             throw new Exception(msg);
154         }
155     }
156 
157 static if(have_vibed)
158 mixin("nothrow @nogc:");
159 
160      ~this()
161     {
162         if (rkt_)
163         {
164             mixin(IO!q{
165             rd_kafka_topic_destroy(rkt_);
166             });
167         }
168     }
169 
170     package rd_kafka_topic_t* rkt_;
171     package PartitionerCb partitioner_cb_;
172     package PartitionerKeyPointerCb partitioner_kp_cb_;
173 
174     /**
175    * Unassigned partition.
176    *
177    * The unassigned partition is used by the producer API for messages
178    * that should be partitioned using the configured or default partitioner.
179    */
180     enum int unassignedPartition = -1;
181 
182     package static nothrow @nogc int partitioner_cb_trampoline(
183         const rd_kafka_topic_t* rkt, const(void)* keydata, size_t keylen,
184         int partition_cnt, void* rkt_opaque, void* msg_opaque)
185     {
186         auto topic = cast(Topic) rkt_opaque;
187         auto key = (cast(const(char)*) keydata)[0 .. keylen];
188         return topic.partitioner_cb_(topic, key, partition_cnt, msg_opaque);
189     }
190 
191     package static nothrow @nogc int partitioner_kp_cb_trampoline(
192         const rd_kafka_topic_t* rkt, const(void)* keydata, size_t keylen,
193         int partition_cnt, void* rkt_opaque, void* msg_opaque)
194     {
195         auto topic = cast(Topic) rkt_opaque;
196         return topic.partitioner_kp_cb_(topic, keydata, keylen, partition_cnt, msg_opaque);
197     }
198 
199     /** the topic name */
200     final const(char)[] name() const
201     {
202         return rd_kafka_topic_name(rkt_).fromStringz;
203     }
204 
205     /**
206    * true if \p partition is available for the topic (has leader).
207    * Warning: \b MUST \b ONLY be called from within a
208    *          PartitionerCb callback.
209    */
210     final bool partitionAvailable(int partition) const
211     {
212         typeof(return) ret;
213         mixin(IO!q{
214         ret = cast(bool) rd_kafka_topic_partition_available(rkt_, partition);
215         });
216         return ret;
217     }
218 
219     /**
220    * Store offset \p offset for topic partition \p partition.
221    * The offset will be committed (written) to the offset store according
222    * to \p auto.commit.interval.ms.
223    *
224    * Note: This API should only be used with the simple Consumer,
225    *         not the high-level KafkaConsumer.
226    * Note: \c auto.commit.enable must be set to \c false when using this API.
227    *
228    * ErrorCodde.no_error on success or an error code on error.
229    */
230     final ErrorCode offsetStore(int partition, long offset)
231     {
232         typeof(return) ret;
233         mixin(IO!q{
234         ret = cast(ErrorCode) rd_kafka_offset_store(rkt_, partition, offset);
235         });
236         return ret;
237     }
238 }