1 ///
2 module rdkafkad.topic;
3 import rdkafkad;
4 
5 /**
6  * Topic+Partition
7  *
8  * This is a generic type to hold a single partition and various
9  * information about it.
10  *
11  * Is typically used with std::vector<TopicPartition*> to provide
12  * a list of partitions for different operations.
13  */
14 struct TopicPartition
15 {
16     package const(char)* topic_;
17     package int partition_;
18     package long offset_;
19     package ErrorCode err_;
20 
21     void toString(in void delegate(const(char)[]) sink) const
22     {
23         sink(topic);
24         import std.format;
25         sink.formattedWrite("[%s]", partition_);
26     }
27 
28    /**
29    * Create topic+partition object for \p topic and \p partition.
30    *
31    * Use \c delete to deconstruct.
32    */
33 
34     nothrow:
35 
36     /// ditto
37     this(const(char)[] topic, int partition)
38     {
39         this(topic.toStringz, partition);
40     }
41 
42     @nogc:
43 
44     this(const(char)* topic, int partition)
45     {
46         topic_ = topic;
47         partition_ = partition;
48         offset_ = Offset.invalid;
49         err_ = ErrorCode.no_error;
50     }
51 
52 
53     package this(const rd_kafka_topic_partition_t* c_part)
54     {
55         topic_ = c_part.topic;
56         partition_ = c_part.partition;
57         offset_ = c_part.offset;
58         err_ = cast(ErrorCode) c_part.err;
59         // FIXME: metadata
60     }
61 
62     /** partition id */
63     int partition() @property
64     {
65         return partition_;
66     }
67     /** topic name */
68     const(char)[] topic() const @property
69     {
70         return topic_.fromStringz;
71     }
72 
73     /** offset (if applicable) */
74     long offset() @property
75     {
76         return offset_;
77     }
78 
79     /** Set offset */
80     void offset(long offset) @property
81     {
82         offset_ = offset;
83     }
84 
85     /** error code (if applicable) */
86     ErrorCode err() @property
87     {
88         return err_;
89     }
90 }
91 
92 /** Special offsets */
93 enum Offset
94 {
95     beginning = -2, /**< Consume from beginning */
96     end = -1, /**< Consume from end */
97     stored = -1000, /**< Use offset storage */
98     invalid = -1001, /**< Invalid offset */
99 }
100 
101 /**
102  * Topic handle
103  *
104  */
105 class Topic
106 {
107     /**
108    * Creates a new topic handle for topic named \p topic_str
109    *
110    * \p conf is an optional configuration for the topic  that will be used
111    * instead of the default topic configuration.
112    * The \p conf object is reusable after this call.
113    *
114    * the new topic handle or null on error (see \p errstr).
115    */
116     this(Handle base, const(char)[] topic_str, TopicConf conf)
117     {
118         rd_kafka_topic_conf_t* rkt_conf;
119 
120         if (!conf)
121             rkt_conf = rd_kafka_topic_conf_new();
122         else /* Make a copy of conf struct to allow Conf reuse. */
123             rkt_conf = rd_kafka_topic_conf_dup(conf.rkt_conf_);
124 
125         /* Set topic opaque to the topic so that we can reach our topic object
126      * from whatever callbacks get registered.
127      * The application itself will not need these opaques since their
128      * callbacks are class based. */
129         rd_kafka_topic_conf_set_opaque(rkt_conf, cast(void*) this);
130 
131         if (conf)
132         {
133             if (conf.partitioner_cb_)
134             {
135                 rd_kafka_topic_conf_set_partitioner_cb(rkt_conf, &partitioner_cb_trampoline);
136                 this.partitioner_cb_ = conf.partitioner_cb_;
137             }
138             else if (conf.partitioner_kp_cb_)
139             {
140                 rd_kafka_topic_conf_set_partitioner_cb(rkt_conf, &partitioner_kp_cb_trampoline);
141                 this.partitioner_kp_cb_ = conf.partitioner_kp_cb_;
142             }
143         }
144         auto str0 = topic_str.toStringz();
145         rkt_ = rd_kafka_topic_new(base.rk_, str0, rkt_conf);
146         if (!rkt_)
147         {
148             auto msg = err2str(cast(ErrorCode)rd_kafka_errno2err(errno));
149             rd_kafka_topic_conf_destroy(rkt_conf);
150             throw new Exception(msg);
151         }
152     }
153 
154 nothrow @nogc:
155 
156      ~this()
157     {
158         if (rkt_)
159         {
160             rd_kafka_topic_destroy(rkt_);
161         }
162     }
163 
164     package rd_kafka_topic_t* rkt_;
165     package PartitionerCb partitioner_cb_;
166     package PartitionerKeyPointerCb partitioner_kp_cb_;
167 
168     /**
169    * Unassigned partition.
170    *
171    * The unassigned partition is used by the producer API for messages
172    * that should be partitioned using the configured or default partitioner.
173    */
174     enum int unassignedPartition = -1;
175 
176     package static nothrow @nogc int partitioner_cb_trampoline(
177         const rd_kafka_topic_t* rkt, const(void)* keydata, size_t keylen,
178         int partition_cnt, void* rkt_opaque, void* msg_opaque)
179     {
180         auto topic = cast(Topic) rkt_opaque;
181         auto key = (cast(const(char)*) keydata)[0 .. keylen];
182         return topic.partitioner_cb_(topic, key, partition_cnt, msg_opaque);
183     }
184 
185     package static nothrow @nogc int partitioner_kp_cb_trampoline(
186         const rd_kafka_topic_t* rkt, const(void)* keydata, size_t keylen,
187         int partition_cnt, void* rkt_opaque, void* msg_opaque)
188     {
189         auto topic = cast(Topic) rkt_opaque;
190         return topic.partitioner_kp_cb_(topic, keydata, keylen, partition_cnt, msg_opaque);
191     }
192 
193     /** the topic name */
194     final const(char)[] name() const
195     {
196         return rd_kafka_topic_name(rkt_).fromStringz;
197     }
198 
199     /**
200    * true if \p partition is available for the topic (has leader).
201    * Warning: \b MUST \b ONLY be called from within a
202    *          PartitionerCb callback.
203    */
204     final bool partitionAvailable(int partition) const
205     {
206         typeof(return) ret;
207         ret = cast(bool) rd_kafka_topic_partition_available(rkt_, partition);
208         return ret;
209     }
210 
211     /**
212    * Store offset \p offset for topic partition \p partition.
213    * The offset will be committed (written) to the offset store according
214    * to \p auto.commit.interval.ms.
215    *
216    * Note: This API should only be used with the simple Consumer,
217    *         not the high-level KafkaConsumer.
218    * Note: \c auto.commit.enable must be set to \c false when using this API.
219    *
220    * ErrorCodde.no_error on success or an error code on error.
221    */
222     final ErrorCode offsetStore(int partition, long offset)
223     {
224         typeof(return) ret;
225         ret = cast(ErrorCode) rd_kafka_offset_store(rkt_, partition, offset);
226         return ret;
227     }
228 }