1 ///
2 module rdkafkad.config;
3 import rdkafkad;
4 import rdkafkad.iodriver;
5 
6 /**
7  * \b Portability: OpenCb callback class
8  *
9  */
10 /**
11    * Open callback
12    * The open callback is responsible for opening the file specified by
13    * \p pathname, using \p flags and \p mode.
14    * The file shall be opened with \c CLOEXEC set in a racefree fashion, if
15    * possible.
16    *
17    * It is typically not required to register an alternative open implementation
18    *
19    * Note: Not currently available on native Win32
20    */
21 alias OpenCb = int function(const(char)[] path, int flags, int mode) nothrow @nogc;
22 
23 ///
24 class ConfException : Exception
25 {
26     /**
27     * Conf::Set() result code
28     */
29     enum Result
30     {
31         UNKNOWN = -2, /**< Unknown configuration property */
32         INVALID = -1, /**< Invalid configuration value */
33         OK /**< Configuration property was succesfully set */
34     }
35     ///
36     Result result;
37     ///
38     this(Result result, string msg, string file = __FILE__,
39         uint line = cast(uint) __LINE__, Throwable next = null) pure nothrow @nogc @safe
40     {
41         super(msg, file, line, next);
42         this.result = result;
43     }
44 }
45 
46 /**
47  * Configuration interface
48  *
49  * Holds either global or topic configuration that are passed to
50  * Consumer::create(), Producer::create(),
51  * create(), etc.
52  *
53  * See_also: CONFIGURATION.md for the full list of supported properties.
54  */
55 abstract class Conf
56 {
57     /**
58      * Set configuration property \p name to value \p value.
59      * OK on success, else writes a human readable error
60      *          description to \p errstr on error.
61      */
62     void opIndexAssign(in char[] value, in char[] name);
63 
64     /** Query single configuration value
65    *  OK if the property was set previously set and
66    *           returns the value in \p value. */
67     string opIndex(in char[] name) const;
68 
69     ///
70     string[string] dump();
71 
72     private static string[string] dumpImpl(const (char)** arrc, size_t cnt)
73     {
74         assert(cnt % 2 == 0);
75         string[string] aa;
76         foreach (size_t i; 0 .. cnt / 2)
77             aa[arrc[i * 2].fromStringz.idup] = arrc[i * 2 + 1].fromStringz.idup;
78         rd_kafka_conf_dump_free(arrc, cnt);
79         return aa;
80     }
81 
82     ///
83     void fromText(string text)
84     {
85         import std.algorithm;
86         import std.string;
87         import std.format;
88         import std.conv: ConvException;
89         size_t i;
90         foreach(line; text.lineSplitter)
91         {
92             i++;
93             line = line.findSplit("#")[0].strip;
94             if (line.empty)
95                 continue;
96             auto t = line.findSplit("=");
97             if (t[1].empty)
98                 throw new ConvException(format("failed to parse configuraiton at line %s", i));
99             auto key = t[0].stripRight;
100             auto value = t[2].stripLeft;
101             this[key] = value;
102         }
103     }
104 }
105 
106 
107 ///
108 class GlobalConf : Conf
109 {
110     ///
111     this(TopicConf defaultTopicConf = new TopicConf)
112     {
113         rk_conf_ = rd_kafka_conf_new;
114         this.defaultTopicConf = defaultTopicConf;
115     }
116 
117     private TopicConf _defaultTopicConf;
118 
119     /** Dump configuration names and values to list containing
120    *         name,value tuples */
121     override string[string] dump()
122     {
123         size_t cnt;
124         auto arrc = rd_kafka_conf_dump(rk_conf_, &cnt);
125         return dumpImpl(arrc, cnt);
126     }
127 
128     /**
129      * Set configuration property \p name to value \p value.
130      * OK on success, else writes a human readable error
131      *          description to \p errstr on error.
132      */
133     override void opIndexAssign(in char[] value, in char[] name)
134     {
135         char[512] errbuf = void;
136 
137         auto res = rd_kafka_conf_set(this.rk_conf_, name.toStringz(),
138             value.toStringz(), errbuf.ptr, errbuf.sizeof);
139         if (res != rd_kafka_conf_res_t.RD_KAFKA_CONF_OK)
140             throw new ConfException(cast(ConfException.Result) res,
141                 errbuf.ptr.fromStringz.idup);
142     }
143 
144     /** Query single configuration value
145     *  OK if the property was set previously set and
146     *           returns the value in \p value. */
147     override string opIndex(in char[] name) const
148     {
149         size_t size;
150 
151         rd_kafka_conf_res_t res = rd_kafka_conf_res_t.RD_KAFKA_CONF_OK;
152         res = rd_kafka_conf_get(rk_conf_, name.toStringz(), null, &size);
153         if (res != rd_kafka_conf_res_t.RD_KAFKA_CONF_OK)
154             throw new ConfException(cast(ConfException.Result) res, "can not get config");
155 
156         char[] value = new char[size];
157         res = rd_kafka_conf_get(rk_conf_, name.toStringz(), value.ptr, &size);
158         if (res != rd_kafka_conf_res_t.RD_KAFKA_CONF_OK)
159             throw new ConfException(cast(ConfException.Result) res, "can not get config");
160         return cast(string) value;
161     }
162 
163 nothrow @nogc:
164 
165      ~this() nothrow @nogc
166     {
167         if (rk_conf_)
168             rd_kafka_conf_destroy(rk_conf_);
169     }
170 
171     package DeliveryReportCb dr_cb_;
172     package EventCb event_cb_;
173     package SocketCb socket_cb_;
174     package OpenCb open_cb_;
175     package RebalanceCb rebalance_cb_;
176     package OffsetCommitCb offset_commit_cb_;
177     package rd_kafka_conf_t* rk_conf_;
178 
179     /++
180     +/
181     void drCb(DeliveryReportCb cb) @property
182     {
183         dr_cb_ = cb;
184     }
185     /++
186     +/
187     void eventCb(EventCb cb) @property
188     {
189         event_cb_ = cb;
190     }
191     /++
192     +/
193     void socketCb(SocketCb cb) @property
194     {
195         socket_cb_ = cb;
196     }
197     /++
198     +/
199     void openCb(OpenCb cb) @property
200     {
201         open_cb_ = cb;
202     }
203     /++
204     +/
205     void rebalanceCb(RebalanceCb cb) @property
206     {
207         rebalance_cb_ = cb;
208     }
209     /++
210     +/
211     void offsetCommitCb(OffsetCommitCb cb) @property
212     {
213         offset_commit_cb_ = cb;
214     }
215 
216     /** Use with \p name = \c \"default_topic_conf\"
217      *
218      * Sets the default topic configuration to use for for automatically
219      * subscribed topics and for Topic construction with producer.
220      *
221      * See_also: subscribe()
222      */
223     void defaultTopicConf(TopicConf topic_conf) @property
224     {
225         _defaultTopicConf = topic_conf;
226         rd_kafka_conf_set_default_topic_conf(rk_conf_,
227             rd_kafka_topic_conf_dup(topic_conf.rkt_conf_));
228     }
229 
230     ///ditto
231     TopicConf defaultTopicConf() @property
232     {
233         return _defaultTopicConf;
234     }
235 
236 }
237 
238 class TopicConf : Conf
239 {
240     /**
241      * Set configuration property \p name to value \p value.
242      * OK on success, else writes a human readable error
243      *          description to \p errstr on error.
244      */
245     override void opIndexAssign(in char[] value, in char[] name)
246     {
247         rd_kafka_conf_res_t res;
248         char[512] errbuf = void;
249 
250         res = rd_kafka_topic_conf_set(this.rkt_conf_, name.toStringz(),
251             value.toStringz(), errbuf.ptr, errbuf.sizeof);
252 
253         if (res != rd_kafka_conf_res_t.RD_KAFKA_CONF_OK)
254             throw new ConfException(cast(ConfException.Result) res,
255                 errbuf.ptr.fromStringz.idup);
256     }
257 
258     /** Query single configuration value
259    *  OK if the property was set previously set and
260    *           returns the value in \p value. */
261     override string opIndex(in char[] name) const
262     {
263         size_t size;
264         rd_kafka_conf_res_t res = rd_kafka_conf_res_t.RD_KAFKA_CONF_OK;
265         res = rd_kafka_topic_conf_get(rkt_conf_, name.toStringz(), null, &size);
266         if (res != rd_kafka_conf_res_t.RD_KAFKA_CONF_OK)
267             throw new ConfException(cast(ConfException.Result) res, "can not get config");
268         char[] value = new char[size];
269         res = rd_kafka_topic_conf_get(rkt_conf_, name.toStringz(), value.ptr, &size);
270         if (res != rd_kafka_conf_res_t.RD_KAFKA_CONF_OK)
271             throw new ConfException(cast(ConfException.Result) res, "can not get config");
272         return cast(string) value;
273     }
274 
275     /** Dump configuration names and values to list containing
276      *         name,value tuples */
277     override string[string] dump()
278     {
279         size_t cnt;
280         auto arrc = rd_kafka_topic_conf_dump(rkt_conf_, &cnt);
281         return dumpImpl(arrc, cnt);
282     }
283 
284 nothrow @nogc:
285 
286      ~this()
287     {
288         if (rkt_conf_)
289             rd_kafka_topic_conf_destroy(rkt_conf_);
290     }
291 
292     /**
293     * Create configuration object
294     */
295     this()
296     {
297         rkt_conf_ = rd_kafka_topic_conf_new();
298     }
299 
300     package PartitionerCb partitioner_cb_;
301     package PartitionerKeyPointerCb partitioner_kp_cb_;
302     package rd_kafka_topic_conf_t* rkt_conf_;
303 
304     /++
305     +/
306     void partitionerCb(PartitionerCb cb) @property
307     {
308         partitioner_cb_ = cb;
309     }
310     /++
311     +/
312     void partitionerKpCb(PartitionerKeyPointerCb cb) @property
313     {
314         partitioner_kp_cb_ = cb;
315     }
316 }