1 ///
2 module rdkafkad.topic;
3 import rdkafkad;
4 
5 /**
6  * Topic+Partition
7  *
8  * This is a generic type to hold a single partition and various
9  * information about it.
10  *
11  * Is typically used with std::vector<TopicPartition*> to provide
12  * a list of partitions for different operations.
13  */
14 struct TopicPartition
15 {
16     package const(char)* topic_;
17     package int partition_;
18     package long offset_;
19     package ErrorCode err_;
20 
21     void toString(in void delegate(const(char)[]) sink) const
22     {
23         sink(topic);
24         import std.format;
25         sink.formattedWrite("[%s]", partition_);
26     }
27 
28 nothrow @nogc:
29 
30     /**
31    * Create topic+partition object for \p topic and \p partition.
32    *
33    * Use \c delete to deconstruct.
34    */
35     this(const(char)* topic, int partition)
36     {
37         topic_ = topic;
38         partition_ = partition;
39         offset_ = Offset.invalid;
40         err_ = ErrorCode.no_error;
41     }
42 
43     package this(const rd_kafka_topic_partition_t* c_part)
44     {
45         topic_ = c_part.topic;
46         partition_ = c_part.partition;
47         offset_ = c_part.offset;
48         err_ = cast(ErrorCode) c_part.err;
49         // FIXME: metadata
50     }
51 
52     /** partition id */
53     int partition() @property
54     {
55         return partition_;
56     }
57     /** topic name */
58     const(char)[] topic() const @property
59     {
60         return topic_.fromStringz;
61     }
62 
63     /** offset (if applicable) */
64     long offset() @property
65     {
66         return offset_;
67     }
68 
69     /** Set offset */
70     void offset(long offset) @property
71     {
72         offset_ = offset;
73     }
74 
75     /** error code (if applicable) */
76     ErrorCode err() @property
77     {
78         return err_;
79     }
80 }
81 
82 /** Special offsets */
83 enum Offset
84 {
85     beginning = -2, /**< Consume from beginning */
86     end = -1, /**< Consume from end */
87     stored = -1000, /**< Use offset storage */
88     invalid = -1001, /**< Invalid offset */
89 }
90 
91 /**
92  * Topic handle
93  *
94  */
95 class Topic
96 {
97     /**
98    * Creates a new topic handle for topic named \p topic_str
99    *
100    * \p conf is an optional configuration for the topic  that will be used
101    * instead of the default topic configuration.
102    * The \p conf object is reusable after this call.
103    *
104    * the new topic handle or null on error (see \p errstr).
105    */
106     this(Handle base, const(char)[] topic_str, TopicConf conf)
107     {
108         rd_kafka_topic_conf_t* rkt_conf;
109 
110         if (!conf)
111             rkt_conf = rd_kafka_topic_conf_new();
112         else /* Make a copy of conf struct to allow Conf reuse. */
113             rkt_conf = rd_kafka_topic_conf_dup(conf.rkt_conf_);
114 
115         /* Set topic opaque to the topic so that we can reach our topic object
116      * from whatever callbacks get registered.
117      * The application itself will not need these opaques since their
118      * callbacks are class based. */
119         rd_kafka_topic_conf_set_opaque(rkt_conf, cast(void*) this);
120 
121         if (conf)
122         {
123             if (conf.partitioner_cb_)
124             {
125                 rd_kafka_topic_conf_set_partitioner_cb(rkt_conf, &partitioner_cb_trampoline);
126                 this.partitioner_cb_ = conf.partitioner_cb_;
127             }
128             else if (conf.partitioner_kp_cb_)
129             {
130                 rd_kafka_topic_conf_set_partitioner_cb(rkt_conf, &partitioner_kp_cb_trampoline);
131                 this.partitioner_kp_cb_ = conf.partitioner_kp_cb_;
132             }
133         }
134         rkt_ = rd_kafka_topic_new(base.rk_, topic_str.toStringz(), rkt_conf);
135         if (!rkt_)
136         {
137             auto msg = err2str(cast(ErrorCode)rd_kafka_errno2err(errno));
138             rd_kafka_topic_conf_destroy(rkt_conf);
139             throw new Exception(msg);
140         }
141     }
142 
143 nothrow @nogc:
144 
145      ~this()
146     {
147         if (rkt_)
148             rd_kafka_topic_destroy(rkt_);
149     }
150 
151     package rd_kafka_topic_t* rkt_;
152     package PartitionerCb partitioner_cb_;
153     package PartitionerKeyPointerCb partitioner_kp_cb_;
154 
155     /**
156    * Unassigned partition.
157    *
158    * The unassigned partition is used by the producer API for messages
159    * that should be partitioned using the configured or default partitioner.
160    */
161     enum int unassignedPartition = -1;
162 
163     package static nothrow @nogc int partitioner_cb_trampoline(
164         const rd_kafka_topic_t* rkt, const(void)* keydata, size_t keylen,
165         int partition_cnt, void* rkt_opaque, void* msg_opaque)
166     {
167         auto topic = cast(Topic) rkt_opaque;
168         auto key = (cast(const(char)*) keydata)[0 .. keylen];
169         return topic.partitioner_cb_(topic, key, partition_cnt, msg_opaque);
170     }
171 
172     package static nothrow @nogc int partitioner_kp_cb_trampoline(
173         const rd_kafka_topic_t* rkt, const(void)* keydata, size_t keylen,
174         int partition_cnt, void* rkt_opaque, void* msg_opaque)
175     {
176         auto topic = cast(Topic) rkt_opaque;
177         return topic.partitioner_kp_cb_(topic, keydata, keylen, partition_cnt, msg_opaque);
178     }
179 
180     /** the topic name */
181     final const(char)[] name() const
182     {
183         return rd_kafka_topic_name(rkt_).fromStringz;
184     }
185 
186     /**
187    * true if \p partition is available for the topic (has leader).
188    * Warning: \b MUST \b ONLY be called from within a
189    *          PartitionerCb callback.
190    */
191     final bool partitionAvailable(int partition) const
192     {
193         return cast(bool) rd_kafka_topic_partition_available(rkt_, partition);
194     }
195 
196     /**
197    * Store offset \p offset for topic partition \p partition.
198    * The offset will be committed (written) to the offset store according
199    * to \p auto.commit.interval.ms.
200    *
201    * Note: This API should only be used with the simple Consumer,
202    *         not the high-level KafkaConsumer.
203    * Note: \c auto.commit.enable must be set to \c false when using this API.
204    *
205    * ErrorCodde.no_error on success or an error code on error.
206    */
207     final ErrorCode offsetStore(int partition, long offset)
208     {
209         return cast(ErrorCode) rd_kafka_offset_store(rkt_, partition, offset);
210     }
211 }