1 ///
2 module rdkafkad.config;
3 import rdkafkad;
4 
5 /**
6  * \b Portability: OpenCb callback class
7  *
8  */
9 /**
10    * Open callback
11    * The open callback is responsible for opening the file specified by
12    * \p pathname, using \p flags and \p mode.
13    * The file shall be opened with \c CLOEXEC set in a racefree fashion, if
14    * possible.
15    *
16    * It is typically not required to register an alternative open implementation
17    *
18    * Note: Not currently available on native Win32
19    */
20 alias OpenCb = int function(const(char)[] path, int flags, int mode) nothrow @nogc;
21 
22 ///
23 class ConfException : Exception
24 {
25     /**
26     * Conf::Set() result code
27     */
28     enum Result
29     {
30         UNKNOWN = -2, /**< Unknown configuration property */
31         INVALID = -1, /**< Invalid configuration value */
32         OK /**< Configuration property was succesfully set */
33     }
34     ///
35     Result result;
36     ///
37     this(Result result, string msg, string file = __FILE__,
38         uint line = cast(uint) __LINE__, Throwable next = null) pure nothrow @nogc @safe
39     {
40         super(msg, file, line, next);
41         this.result = result;
42     }
43 }
44 
45 /**
46  * Configuration interface
47  *
48  * Holds either global or topic configuration that are passed to
49  * Consumer::create(), Producer::create(),
50  * create(), etc.
51  *
52  * See_also: CONFIGURATION.md for the full list of supported properties.
53  */
54 abstract class Conf
55 {
56     /**
57      * Set configuration property \p name to value \p value.
58      * OK on success, else writes a human readable error
59      *          description to \p errstr on error.
60      */
61     void opIndexAssign(in char[] value, in char[] name);
62 
63     /** Query single configuration value
64    *  OK if the property was set previously set and
65    *           returns the value in \p value. */
66     string opIndex(in char[] name) const;
67 
68     ///
69     string[string] dump();
70 
71     private static string[string] dumpImpl(const (char)** arrc, size_t cnt)
72     {
73         assert(cnt % 2 == 0);
74         string[string] aa;
75         foreach (size_t i; 0 .. cnt / 2)
76             aa[arrc[i * 2].fromStringz.idup] = arrc[i * 2 + 1].fromStringz.idup;
77         rd_kafka_conf_dump_free(arrc, cnt);
78         return aa;
79     }
80 
81     ///
82     void fromText(string text)
83     {
84         import std.algorithm;
85         import std.string;
86         import std.format;
87         import std.conv: ConvException;
88         size_t i;
89         foreach(line; text.lineSplitter)
90         {
91             i++;
92             line = line.findSplit("#")[0].strip;
93             if (line.empty)
94                 continue;
95             auto t = line.findSplit("=");
96             if (t[1].empty)
97                 throw new ConvException(format("failed to parse configuraiton at line %s", i));
98             auto key = t[0].stripRight;
99             auto value = t[2].stripLeft;
100             this[key] = value;
101         }
102     }
103 }
104 
105 
106 ///
107 class GlobalConf : Conf
108 {
109     ///
110     this(TopicConf defaultTopicConf = new TopicConf)
111     {
112         rk_conf_ = rd_kafka_conf_new;
113         this.defaultTopicConf = defaultTopicConf;
114     }
115 
116     private TopicConf _defaultTopicConf;
117 
118     /** Dump configuration names and values to list containing
119    *         name,value tuples */
120     override string[string] dump()
121     {
122         size_t cnt;
123         auto arrc = rd_kafka_conf_dump(rk_conf_, &cnt);
124         return dumpImpl(arrc, cnt);
125     }
126 
127     /**
128      * Set configuration property \p name to value \p value.
129      * OK on success, else writes a human readable error
130      *          description to \p errstr on error.
131      */
132     override void opIndexAssign(in char[] value, in char[] name)
133     {
134         char[512] errbuf = void;
135 
136         auto res = rd_kafka_conf_set(this.rk_conf_, name.toStringz(),
137             value.toStringz(), errbuf.ptr, errbuf.sizeof);
138         if (res != rd_kafka_conf_res_t.RD_KAFKA_CONF_OK)
139             throw new ConfException(cast(ConfException.Result) res,
140                 errbuf.ptr.fromStringz.idup);
141     }
142 
143     /** Query single configuration value
144     *  OK if the property was set previously set and
145     *           returns the value in \p value. */
146     override string opIndex(in char[] name) const
147     {
148         size_t size;
149 
150         rd_kafka_conf_res_t res = rd_kafka_conf_res_t.RD_KAFKA_CONF_OK;
151         res = rd_kafka_conf_get(rk_conf_, name.toStringz(), null, &size);
152         if (res != rd_kafka_conf_res_t.RD_KAFKA_CONF_OK)
153             throw new ConfException(cast(ConfException.Result) res, "can not get config");
154 
155         char[] value = new char[size];
156         res = rd_kafka_conf_get(rk_conf_, name.toStringz(), value.ptr, &size);
157         if (res != rd_kafka_conf_res_t.RD_KAFKA_CONF_OK)
158             throw new ConfException(cast(ConfException.Result) res, "can not get config");
159         return cast(string) value;
160     }
161 
162 nothrow @nogc:
163 
164      ~this() nothrow @nogc
165     {
166         if (rk_conf_)
167             rd_kafka_conf_destroy(rk_conf_);
168     }
169 
170     package DeliveryReportCb dr_cb_;
171     package EventCb event_cb_;
172     package SocketCb socket_cb_;
173     package OpenCb open_cb_;
174     package RebalanceCb rebalance_cb_;
175     package OffsetCommitCb offset_commit_cb_;
176     package rd_kafka_conf_t* rk_conf_;
177 
178     /++
179     +/
180     void drCb(DeliveryReportCb cb) @property
181     {
182         dr_cb_ = cb;
183     }
184     /++
185     +/
186     void eventCb(EventCb cb) @property
187     {
188         event_cb_ = cb;
189     }
190     /++
191     +/
192     void socketCb(SocketCb cb) @property
193     {
194         socket_cb_ = cb;
195     }
196     /++
197     +/
198     void openCb(OpenCb cb) @property
199     {
200         open_cb_ = cb;
201     }
202     /++
203     +/
204     void rebalanceCb(RebalanceCb cb) @property
205     {
206         rebalance_cb_ = cb;
207     }
208     /++
209     +/
210     void offsetCommitCb(OffsetCommitCb cb) @property
211     {
212         offset_commit_cb_ = cb;
213     }
214 
215     /** Use with \p name = \c \"default_topic_conf\"
216      *
217      * Sets the default topic configuration to use for for automatically
218      * subscribed topics and for Topic construction with producer.
219      *
220      * See_also: subscribe()
221      */
222     void defaultTopicConf(TopicConf topic_conf) @property
223     {
224         _defaultTopicConf = topic_conf;
225         rd_kafka_conf_set_default_topic_conf(rk_conf_,
226             rd_kafka_topic_conf_dup(topic_conf.rkt_conf_));
227     }
228 
229     ///ditto
230     TopicConf defaultTopicConf() @property
231     {
232         return _defaultTopicConf;
233     }
234 
235 }
236 
237 class TopicConf : Conf
238 {
239     /**
240      * Set configuration property \p name to value \p value.
241      * OK on success, else writes a human readable error
242      *          description to \p errstr on error.
243      */
244     override void opIndexAssign(in char[] value, in char[] name)
245     {
246         rd_kafka_conf_res_t res;
247         char[512] errbuf = void;
248 
249         res = rd_kafka_topic_conf_set(this.rkt_conf_, name.toStringz(),
250             value.toStringz(), errbuf.ptr, errbuf.sizeof);
251 
252         if (res != rd_kafka_conf_res_t.RD_KAFKA_CONF_OK)
253             throw new ConfException(cast(ConfException.Result) res,
254                 errbuf.ptr.fromStringz.idup);
255     }
256 
257     /** Query single configuration value
258    *  OK if the property was set previously set and
259    *           returns the value in \p value. */
260     override string opIndex(in char[] name) const
261     {
262         size_t size;
263         rd_kafka_conf_res_t res = rd_kafka_conf_res_t.RD_KAFKA_CONF_OK;
264         res = rd_kafka_topic_conf_get(rkt_conf_, name.toStringz(), null, &size);
265         if (res != rd_kafka_conf_res_t.RD_KAFKA_CONF_OK)
266             throw new ConfException(cast(ConfException.Result) res, "can not get config");
267         char[] value = new char[size];
268         res = rd_kafka_topic_conf_get(rkt_conf_, name.toStringz(), value.ptr, &size);
269         if (res != rd_kafka_conf_res_t.RD_KAFKA_CONF_OK)
270             throw new ConfException(cast(ConfException.Result) res, "can not get config");
271         return cast(string) value;
272     }
273 
274     /** Dump configuration names and values to list containing
275      *         name,value tuples */
276     override string[string] dump()
277     {
278         size_t cnt;
279         auto arrc = rd_kafka_topic_conf_dump(rkt_conf_, &cnt);
280         return dumpImpl(arrc, cnt);
281     }
282 
283 nothrow @nogc:
284 
285      ~this()
286     {
287         if (rkt_conf_)
288             rd_kafka_topic_conf_destroy(rkt_conf_);
289     }
290 
291     /**
292     * Create configuration object
293     */
294     this()
295     {
296         rkt_conf_ = rd_kafka_topic_conf_new();
297     }
298 
299     package PartitionerCb partitioner_cb_;
300     package PartitionerKeyPointerCb partitioner_kp_cb_;
301     package rd_kafka_topic_conf_t* rkt_conf_;
302 
303     /++
304     +/
305     void partitionerCb(PartitionerCb cb) @property
306     {
307         partitioner_cb_ = cb;
308     }
309     /++
310     +/
311     void partitionerKpCb(PartitionerKeyPointerCb cb) @property
312     {
313         partitioner_kp_cb_ = cb;
314     }
315 }