1 ///
2 module rdkafkad.config;
3 import rdkafkad;
4 
5 /**
6  * \b Portability: OpenCb callback class
7  *
8  */
9 /**
10    * Open callback
11    * The open callback is responsible for opening the file specified by
12    * \p pathname, using \p flags and \p mode.
13    * The file shall be opened with \c CLOEXEC set in a racefree fashion, if
14    * possible.
15    *
16    * It is typically not required to register an alternative open implementation
17    *
18    * Note: Not currently available on native Win32
19    */
20 alias OpenCb = int function(const(char)[] path, int flags, int mode) nothrow @nogc;
21 
22 ///
23 class ConfException : Exception
24 {
25     /**
26     * Conf::Set() result code
27     */
28     enum Result
29     {
30         UNKNOWN = -2, /**< Unknown configuration property */
31         INVALID = -1, /**< Invalid configuration value */
32         OK /**< Configuration property was succesfully set */
33     }
34     ///
35     Result result;
36     ///
37     this(Result result, string msg, string file = __FILE__,
38         uint line = cast(uint) __LINE__, Throwable next = null) pure nothrow @nogc @safe
39     {
40         super(msg, file, line, next);
41         this.result = result;
42     }
43 }
44 
45 /**
46  * Configuration interface
47  *
48  * Holds either global or topic configuration that are passed to
49  * Consumer::create(), Producer::create(),
50  * create(), etc.
51  *
52  * See_also: CONFIGURATION.md for the full list of supported properties.
53  */
54 interface Conf
55 {
56     /**
57      * Set configuration property \p name to value \p value.
58      * OK on success, else writes a human readable error
59      *          description to \p errstr on error.
60      */
61     void opIndexAssign(in char[] value, in char[] name);
62 
63     /** Query single configuration value
64    *  OK if the property was set previously set and
65    *           returns the value in \p value. */
66     string opIndex(in char[] name) const;
67 
68     string[] dump();
69 }
70 
71 class GlobalConf : Conf
72 {
73     ///
74     this(TopicConf defaultTopicConf = new TopicConf)
75     {
76         this.defaultTopicConf = defaultTopicConf;
77     }
78 
79     private TopicConf _defaultTopicConf;
80 
81     /** Dump configuration names and values to list containing
82    *         name,value tuples */
83     string[] dump()
84     {
85         size_t cnt;
86         auto arrc = rd_kafka_conf_dump(rk_conf_, &cnt);
87         auto arr = new string[cnt];
88         foreach (size_t i; 0 .. cnt)
89             arr[i] = arrc[i].fromStringz.idup;
90         rd_kafka_conf_dump_free(arrc, cnt);
91         return arr;
92     }
93 
94     /**
95      * Set configuration property \p name to value \p value.
96      * OK on success, else writes a human readable error
97      *          description to \p errstr on error.
98      */
99     void opIndexAssign(in char[] value, in char[] name)
100     {
101         char[512] errbuf = void;
102 
103         auto res = rd_kafka_conf_set(this.rk_conf_, name.toStringz(),
104             value.toStringz(), errbuf.ptr, errbuf.sizeof);
105         if (res != rd_kafka_conf_res_t.RD_KAFKA_CONF_OK)
106             throw new ConfException(cast(ConfException.Result) res,
107                 errbuf.ptr.fromStringz.idup);
108     }
109 
110     /** Query single configuration value
111     *  OK if the property was set previously set and
112     *           returns the value in \p value. */
113     string opIndex(in char[] name) const
114     {
115         size_t size;
116 
117         rd_kafka_conf_res_t res = rd_kafka_conf_res_t.RD_KAFKA_CONF_OK;
118         res = rd_kafka_conf_get(rk_conf_, name.toStringz(), null, &size);
119         if (res != rd_kafka_conf_res_t.RD_KAFKA_CONF_OK)
120             throw new ConfException(cast(ConfException.Result) res, "can not get config");
121 
122         char[] value = new char[size];
123         res = rd_kafka_conf_get(rk_conf_, name.toStringz(), value.ptr, &size);
124         if (res != rd_kafka_conf_res_t.RD_KAFKA_CONF_OK)
125             throw new ConfException(cast(ConfException.Result) res, "can not get config");
126         return cast(string) value;
127     }
128 
129 nothrow @nogc:
130 
131      ~this() nothrow @nogc
132     {
133         if (rk_conf_)
134             rd_kafka_conf_destroy(rk_conf_);
135     }
136 
137     /**
138     * Create configuration object
139     */
140     this()
141     {
142         rk_conf_ = rd_kafka_conf_new();
143     }
144 
145     package DeliveryReportCb dr_cb_;
146     package EventCb event_cb_;
147     package SocketCb socket_cb_;
148     package OpenCb open_cb_;
149     package RebalanceCb rebalance_cb_;
150     package OffsetCommitCb offset_commit_cb_;
151     package rd_kafka_conf_t* rk_conf_;
152 
153     /++
154     +/
155     void drCb(DeliveryReportCb cb) @property
156     {
157         dr_cb_ = cb;
158     }
159     /++
160     +/
161     void eventCb(EventCb cb) @property
162     {
163         event_cb_ = cb;
164     }
165     /++
166     +/
167     void socketCb(SocketCb cb) @property
168     {
169         socket_cb_ = cb;
170     }
171     /++
172     +/
173     void openCb(OpenCb cb) @property
174     {
175         open_cb_ = cb;
176     }
177     /++
178     +/
179     void rebalanceCb(RebalanceCb cb) @property
180     {
181         rebalance_cb_ = cb;
182     }
183     /++
184     +/
185     void offsetCommitCb(OffsetCommitCb cb) @property
186     {
187         offset_commit_cb_ = cb;
188     }
189 
190     /** Use with \p name = \c \"default_topic_conf\"
191      *
192      * Sets the default topic configuration to use for for automatically
193      * subscribed topics and for Topic construction with producer.
194      *
195      * See_also: subscribe()
196      */
197     void defaultTopicConf(TopicConf topic_conf) @property
198     {
199         _defaultTopicConf = topic_conf;
200         rd_kafka_conf_set_default_topic_conf(rk_conf_,
201             rd_kafka_topic_conf_dup(topic_conf.rkt_conf_));
202     }
203 
204     ///ditto
205     TopicConf defaultTopicConf() @property
206     {
207         return _defaultTopicConf;
208     }
209 
210 }
211 
212 class TopicConf : Conf
213 {
214     /**
215      * Set configuration property \p name to value \p value.
216      * OK on success, else writes a human readable error
217      *          description to \p errstr on error.
218      */
219     void opIndexAssign(in char[] value, in char[] name)
220     {
221         rd_kafka_conf_res_t res;
222         char[512] errbuf = void;
223 
224         res = rd_kafka_topic_conf_set(this.rkt_conf_, name.toStringz(),
225             value.toStringz(), errbuf.ptr, errbuf.sizeof);
226 
227         if (res != rd_kafka_conf_res_t.RD_KAFKA_CONF_OK)
228             throw new ConfException(cast(ConfException.Result) res,
229                 errbuf.ptr.fromStringz.idup);
230     }
231 
232     /** Query single configuration value
233    *  OK if the property was set previously set and
234    *           returns the value in \p value. */
235     string opIndex(in char[] name) const
236     {
237         size_t size;
238         rd_kafka_conf_res_t res = rd_kafka_conf_res_t.RD_KAFKA_CONF_OK;
239         res = rd_kafka_topic_conf_get(rkt_conf_, name.toStringz(), null, &size);
240         if (res != rd_kafka_conf_res_t.RD_KAFKA_CONF_OK)
241             throw new ConfException(cast(ConfException.Result) res, "can not get config");
242         char[] value = new char[size];
243         res = rd_kafka_topic_conf_get(rkt_conf_, name.toStringz(), value.ptr, &size);
244         if (res != rd_kafka_conf_res_t.RD_KAFKA_CONF_OK)
245             throw new ConfException(cast(ConfException.Result) res, "can not get config");
246         return cast(string) value;
247     }
248 
249     /** Dump configuration names and values to list containing
250      *         name,value tuples */
251     string[] dump()
252     {
253         size_t cnt;
254         auto arrc = rd_kafka_topic_conf_dump(rkt_conf_, &cnt);
255         auto arr = new string[cnt];
256         foreach (size_t i; 0 .. cnt)
257             arr[i] = arrc[i].fromStringz.idup;
258         rd_kafka_conf_dump_free(arrc, cnt);
259         return arr;
260     }
261 
262 nothrow @nogc:
263 
264      ~this()
265     {
266         if (rkt_conf_)
267             rd_kafka_topic_conf_destroy(rkt_conf_);
268     }
269 
270     /**
271     * Create configuration object
272     */
273     this()
274     {
275         rkt_conf_ = rd_kafka_topic_conf_new();
276     }
277 
278     package PartitionerCb partitioner_cb_;
279     package PartitionerKeyPointerCb partitioner_kp_cb_;
280     package rd_kafka_topic_conf_t* rkt_conf_;
281 
282     /++
283     +/
284     void partitionerCb(PartitionerCb cb) @property
285     {
286         partitioner_cb_ = cb;
287     }
288     /++
289     +/
290     void partitionerKpCb(PartitionerKeyPointerCb cb) @property
291     {
292         partitioner_kp_cb_ = cb;
293     }
294 }