1 /// 2 module rdkafkad.config; 3 import rdkafkad; 4 import rdkafkad.iodriver; 5 6 /** 7 * \b Portability: OpenCb callback class 8 * 9 */ 10 /** 11 * Open callback 12 * The open callback is responsible for opening the file specified by 13 * \p pathname, using \p flags and \p mode. 14 * The file shall be opened with \c CLOEXEC set in a racefree fashion, if 15 * possible. 16 * 17 * It is typically not required to register an alternative open implementation 18 * 19 * Note: Not currently available on native Win32 20 */ 21 alias OpenCb = int function(const(char)[] path, int flags, int mode) nothrow @nogc; 22 23 /// 24 class ConfException : Exception 25 { 26 /** 27 * Conf::Set() result code 28 */ 29 enum Result 30 { 31 UNKNOWN = -2, /**< Unknown configuration property */ 32 INVALID = -1, /**< Invalid configuration value */ 33 OK /**< Configuration property was succesfully set */ 34 } 35 /// 36 Result result; 37 /// 38 this(Result result, string msg, string file = __FILE__, 39 uint line = cast(uint) __LINE__, Throwable next = null) pure nothrow @nogc @safe 40 { 41 super(msg, file, line, next); 42 this.result = result; 43 } 44 } 45 46 /** 47 * Configuration interface 48 * 49 * Holds either global or topic configuration that are passed to 50 * Consumer::create(), Producer::create(), 51 * create(), etc. 52 * 53 * See_also: CONFIGURATION.md for the full list of supported properties. 54 */ 55 abstract class Conf 56 { 57 /** 58 * Set configuration property \p name to value \p value. 59 * OK on success, else writes a human readable error 60 * description to \p errstr on error. 61 */ 62 void opIndexAssign(in char[] value, in char[] name); 63 64 /** Query single configuration value 65 * OK if the property was set previously set and 66 * returns the value in \p value. */ 67 string opIndex(in char[] name) const; 68 69 /// 70 string[string] dump(); 71 72 private static string[string] dumpImpl(const (char)** arrc, size_t cnt) 73 { 74 assert(cnt % 2 == 0); 75 string[string] aa; 76 foreach (size_t i; 0 .. cnt / 2) 77 aa[arrc[i * 2].fromStringz.idup] = arrc[i * 2 + 1].fromStringz.idup; 78 rd_kafka_conf_dump_free(arrc, cnt); 79 return aa; 80 } 81 82 /// 83 void fromText(string text) 84 { 85 import std.algorithm; 86 import std.string; 87 import std.format; 88 import std.conv: ConvException; 89 size_t i; 90 foreach(line; text.lineSplitter) 91 { 92 i++; 93 line = line.findSplit("#")[0].strip; 94 if (line.empty) 95 continue; 96 auto t = line.findSplit("="); 97 if (t[1].empty) 98 throw new ConvException(format("failed to parse configuraiton at line %s", i)); 99 auto key = t[0].stripRight; 100 auto value = t[2].stripLeft; 101 this[key] = value; 102 } 103 } 104 } 105 106 107 /// 108 class GlobalConf : Conf 109 { 110 /// 111 this(TopicConf defaultTopicConf = new TopicConf) 112 { 113 rk_conf_ = rd_kafka_conf_new; 114 this.defaultTopicConf = defaultTopicConf; 115 } 116 117 private TopicConf _defaultTopicConf; 118 119 /** Dump configuration names and values to list containing 120 * name,value tuples */ 121 override string[string] dump() 122 { 123 size_t cnt; 124 auto arrc = rd_kafka_conf_dump(rk_conf_, &cnt); 125 return dumpImpl(arrc, cnt); 126 } 127 128 /** 129 * Set configuration property \p name to value \p value. 130 * OK on success, else writes a human readable error 131 * description to \p errstr on error. 132 */ 133 override void opIndexAssign(in char[] value, in char[] name) 134 { 135 char[512] errbuf = void; 136 137 auto res = rd_kafka_conf_set(this.rk_conf_, name.toStringz(), 138 value.toStringz(), errbuf.ptr, errbuf.sizeof); 139 if (res != rd_kafka_conf_res_t.RD_KAFKA_CONF_OK) 140 throw new ConfException(cast(ConfException.Result) res, 141 errbuf.ptr.fromStringz.idup); 142 } 143 144 /** Query single configuration value 145 * OK if the property was set previously set and 146 * returns the value in \p value. */ 147 override string opIndex(in char[] name) const 148 { 149 size_t size; 150 151 rd_kafka_conf_res_t res = rd_kafka_conf_res_t.RD_KAFKA_CONF_OK; 152 res = rd_kafka_conf_get(rk_conf_, name.toStringz(), null, &size); 153 if (res != rd_kafka_conf_res_t.RD_KAFKA_CONF_OK) 154 throw new ConfException(cast(ConfException.Result) res, "can not get config"); 155 156 char[] value = new char[size]; 157 res = rd_kafka_conf_get(rk_conf_, name.toStringz(), value.ptr, &size); 158 if (res != rd_kafka_conf_res_t.RD_KAFKA_CONF_OK) 159 throw new ConfException(cast(ConfException.Result) res, "can not get config"); 160 return cast(string) value; 161 } 162 163 nothrow @nogc: 164 165 ~this() nothrow @nogc 166 { 167 if (rk_conf_) 168 rd_kafka_conf_destroy(rk_conf_); 169 } 170 171 package DeliveryReportCb dr_cb_; 172 package EventCb event_cb_; 173 package SocketCb socket_cb_; 174 package OpenCb open_cb_; 175 package RebalanceCb rebalance_cb_; 176 package OffsetCommitCb offset_commit_cb_; 177 package rd_kafka_conf_t* rk_conf_; 178 179 /++ 180 +/ 181 void drCb(DeliveryReportCb cb) @property 182 { 183 dr_cb_ = cb; 184 } 185 /++ 186 +/ 187 void eventCb(EventCb cb) @property 188 { 189 event_cb_ = cb; 190 } 191 /++ 192 +/ 193 void socketCb(SocketCb cb) @property 194 { 195 socket_cb_ = cb; 196 } 197 /++ 198 +/ 199 void openCb(OpenCb cb) @property 200 { 201 open_cb_ = cb; 202 } 203 /++ 204 +/ 205 void rebalanceCb(RebalanceCb cb) @property 206 { 207 rebalance_cb_ = cb; 208 } 209 /++ 210 +/ 211 void offsetCommitCb(OffsetCommitCb cb) @property 212 { 213 offset_commit_cb_ = cb; 214 } 215 216 /** Use with \p name = \c \"default_topic_conf\" 217 * 218 * Sets the default topic configuration to use for for automatically 219 * subscribed topics and for Topic construction with producer. 220 * 221 * See_also: subscribe() 222 */ 223 void defaultTopicConf(TopicConf topic_conf) @property 224 { 225 _defaultTopicConf = topic_conf; 226 rd_kafka_conf_set_default_topic_conf(rk_conf_, 227 rd_kafka_topic_conf_dup(topic_conf.rkt_conf_)); 228 } 229 230 ///ditto 231 TopicConf defaultTopicConf() @property 232 { 233 return _defaultTopicConf; 234 } 235 236 } 237 238 class TopicConf : Conf 239 { 240 /** 241 * Set configuration property \p name to value \p value. 242 * OK on success, else writes a human readable error 243 * description to \p errstr on error. 244 */ 245 override void opIndexAssign(in char[] value, in char[] name) 246 { 247 rd_kafka_conf_res_t res; 248 char[512] errbuf = void; 249 250 res = rd_kafka_topic_conf_set(this.rkt_conf_, name.toStringz(), 251 value.toStringz(), errbuf.ptr, errbuf.sizeof); 252 253 if (res != rd_kafka_conf_res_t.RD_KAFKA_CONF_OK) 254 throw new ConfException(cast(ConfException.Result) res, 255 errbuf.ptr.fromStringz.idup); 256 } 257 258 /** Query single configuration value 259 * OK if the property was set previously set and 260 * returns the value in \p value. */ 261 override string opIndex(in char[] name) const 262 { 263 size_t size; 264 rd_kafka_conf_res_t res = rd_kafka_conf_res_t.RD_KAFKA_CONF_OK; 265 res = rd_kafka_topic_conf_get(rkt_conf_, name.toStringz(), null, &size); 266 if (res != rd_kafka_conf_res_t.RD_KAFKA_CONF_OK) 267 throw new ConfException(cast(ConfException.Result) res, "can not get config"); 268 char[] value = new char[size]; 269 res = rd_kafka_topic_conf_get(rkt_conf_, name.toStringz(), value.ptr, &size); 270 if (res != rd_kafka_conf_res_t.RD_KAFKA_CONF_OK) 271 throw new ConfException(cast(ConfException.Result) res, "can not get config"); 272 return cast(string) value; 273 } 274 275 /** Dump configuration names and values to list containing 276 * name,value tuples */ 277 override string[string] dump() 278 { 279 size_t cnt; 280 auto arrc = rd_kafka_topic_conf_dump(rkt_conf_, &cnt); 281 return dumpImpl(arrc, cnt); 282 } 283 284 nothrow @nogc: 285 286 ~this() 287 { 288 if (rkt_conf_) 289 rd_kafka_topic_conf_destroy(rkt_conf_); 290 } 291 292 /** 293 * Create configuration object 294 */ 295 this() 296 { 297 rkt_conf_ = rd_kafka_topic_conf_new(); 298 } 299 300 package PartitionerCb partitioner_cb_; 301 package PartitionerKeyPointerCb partitioner_kp_cb_; 302 package rd_kafka_topic_conf_t* rkt_conf_; 303 304 /++ 305 +/ 306 void partitionerCb(PartitionerCb cb) @property 307 { 308 partitioner_cb_ = cb; 309 } 310 /++ 311 +/ 312 void partitionerKpCb(PartitionerKeyPointerCb cb) @property 313 { 314 partitioner_kp_cb_ = cb; 315 } 316 }