1 /// 2 module rdkafkad.config; 3 import rdkafkad; 4 5 /** 6 * \b Portability: OpenCb callback class 7 * 8 */ 9 /** 10 * Open callback 11 * The open callback is responsible for opening the file specified by 12 * \p pathname, using \p flags and \p mode. 13 * The file shall be opened with \c CLOEXEC set in a racefree fashion, if 14 * possible. 15 * 16 * It is typically not required to register an alternative open implementation 17 * 18 * Note: Not currently available on native Win32 19 */ 20 alias OpenCb = int function(const(char)[] path, int flags, int mode) nothrow @nogc; 21 22 /// 23 class ConfException : Exception 24 { 25 /** 26 * Conf::Set() result code 27 */ 28 enum Result 29 { 30 UNKNOWN = -2, /**< Unknown configuration property */ 31 INVALID = -1, /**< Invalid configuration value */ 32 OK /**< Configuration property was succesfully set */ 33 } 34 /// 35 Result result; 36 /// 37 this(Result result, string msg, string file = __FILE__, 38 uint line = cast(uint) __LINE__, Throwable next = null) pure nothrow @nogc @safe 39 { 40 super(msg, file, line, next); 41 this.result = result; 42 } 43 } 44 45 /** 46 * Configuration interface 47 * 48 * Holds either global or topic configuration that are passed to 49 * Consumer::create(), Producer::create(), 50 * create(), etc. 51 * 52 * See_also: CONFIGURATION.md for the full list of supported properties. 53 */ 54 abstract class Conf 55 { 56 /** 57 * Set configuration property \p name to value \p value. 58 * OK on success, else writes a human readable error 59 * description to \p errstr on error. 60 */ 61 void opIndexAssign(in char[] value, in char[] name); 62 63 /** Query single configuration value 64 * OK if the property was set previously set and 65 * returns the value in \p value. */ 66 string opIndex(in char[] name) const; 67 68 /// 69 string[string] dump(); 70 71 private static string[string] dumpImpl(const (char)** arrc, size_t cnt) 72 { 73 assert(cnt % 2 == 0); 74 string[string] aa; 75 foreach (size_t i; 0 .. cnt / 2) 76 aa[arrc[i * 2].fromStringz.idup] = arrc[i * 2 + 1].fromStringz.idup; 77 rd_kafka_conf_dump_free(arrc, cnt); 78 return aa; 79 } 80 81 /// 82 void fromText(string text) 83 { 84 import std.algorithm; 85 import std.string; 86 import std.format; 87 import std.conv: ConvException; 88 size_t i; 89 foreach(line; text.lineSplitter) 90 { 91 i++; 92 line = line.findSplit("#")[0].strip; 93 if (line.empty) 94 continue; 95 auto t = line.findSplit("="); 96 if (t[1].empty) 97 throw new ConvException(format("failed to parse configuraiton at line %s", i)); 98 auto key = t[0].stripRight; 99 auto value = t[2].stripLeft; 100 this[key] = value; 101 } 102 } 103 } 104 105 106 /// 107 class GlobalConf : Conf 108 { 109 /// 110 this(TopicConf defaultTopicConf = new TopicConf) 111 { 112 rk_conf_ = rd_kafka_conf_new; 113 this.defaultTopicConf = defaultTopicConf; 114 } 115 116 private TopicConf _defaultTopicConf; 117 118 /** Dump configuration names and values to list containing 119 * name,value tuples */ 120 override string[string] dump() 121 { 122 size_t cnt; 123 auto arrc = rd_kafka_conf_dump(rk_conf_, &cnt); 124 return dumpImpl(arrc, cnt); 125 } 126 127 /** 128 * Set configuration property \p name to value \p value. 129 * OK on success, else writes a human readable error 130 * description to \p errstr on error. 131 */ 132 override void opIndexAssign(in char[] value, in char[] name) 133 { 134 char[512] errbuf = void; 135 136 auto res = rd_kafka_conf_set(this.rk_conf_, name.toStringz(), 137 value.toStringz(), errbuf.ptr, errbuf.sizeof); 138 if (res != rd_kafka_conf_res_t.RD_KAFKA_CONF_OK) 139 throw new ConfException(cast(ConfException.Result) res, 140 errbuf.ptr.fromStringz.idup); 141 } 142 143 /** Query single configuration value 144 * OK if the property was set previously set and 145 * returns the value in \p value. */ 146 override string opIndex(in char[] name) const 147 { 148 size_t size; 149 150 rd_kafka_conf_res_t res = rd_kafka_conf_res_t.RD_KAFKA_CONF_OK; 151 res = rd_kafka_conf_get(rk_conf_, name.toStringz(), null, &size); 152 if (res != rd_kafka_conf_res_t.RD_KAFKA_CONF_OK) 153 throw new ConfException(cast(ConfException.Result) res, "can not get config"); 154 155 char[] value = new char[size]; 156 res = rd_kafka_conf_get(rk_conf_, name.toStringz(), value.ptr, &size); 157 if (res != rd_kafka_conf_res_t.RD_KAFKA_CONF_OK) 158 throw new ConfException(cast(ConfException.Result) res, "can not get config"); 159 return cast(string) value; 160 } 161 162 nothrow @nogc: 163 164 ~this() nothrow @nogc 165 { 166 if (rk_conf_) 167 rd_kafka_conf_destroy(rk_conf_); 168 } 169 170 package DeliveryReportCb dr_cb_; 171 package EventCb event_cb_; 172 package SocketCb socket_cb_; 173 package OpenCb open_cb_; 174 package RebalanceCb rebalance_cb_; 175 package OffsetCommitCb offset_commit_cb_; 176 package rd_kafka_conf_t* rk_conf_; 177 178 /++ 179 +/ 180 void drCb(DeliveryReportCb cb) @property 181 { 182 dr_cb_ = cb; 183 } 184 /++ 185 +/ 186 void eventCb(EventCb cb) @property 187 { 188 event_cb_ = cb; 189 } 190 /++ 191 +/ 192 void socketCb(SocketCb cb) @property 193 { 194 socket_cb_ = cb; 195 } 196 /++ 197 +/ 198 void openCb(OpenCb cb) @property 199 { 200 open_cb_ = cb; 201 } 202 /++ 203 +/ 204 void rebalanceCb(RebalanceCb cb) @property 205 { 206 rebalance_cb_ = cb; 207 } 208 /++ 209 +/ 210 void offsetCommitCb(OffsetCommitCb cb) @property 211 { 212 offset_commit_cb_ = cb; 213 } 214 215 /** Use with \p name = \c \"default_topic_conf\" 216 * 217 * Sets the default topic configuration to use for for automatically 218 * subscribed topics and for Topic construction with producer. 219 * 220 * See_also: subscribe() 221 */ 222 void defaultTopicConf(TopicConf topic_conf) @property 223 { 224 _defaultTopicConf = topic_conf; 225 rd_kafka_conf_set_default_topic_conf(rk_conf_, 226 rd_kafka_topic_conf_dup(topic_conf.rkt_conf_)); 227 } 228 229 ///ditto 230 TopicConf defaultTopicConf() @property 231 { 232 return _defaultTopicConf; 233 } 234 235 } 236 237 class TopicConf : Conf 238 { 239 /** 240 * Set configuration property \p name to value \p value. 241 * OK on success, else writes a human readable error 242 * description to \p errstr on error. 243 */ 244 override void opIndexAssign(in char[] value, in char[] name) 245 { 246 rd_kafka_conf_res_t res; 247 char[512] errbuf = void; 248 249 res = rd_kafka_topic_conf_set(this.rkt_conf_, name.toStringz(), 250 value.toStringz(), errbuf.ptr, errbuf.sizeof); 251 252 if (res != rd_kafka_conf_res_t.RD_KAFKA_CONF_OK) 253 throw new ConfException(cast(ConfException.Result) res, 254 errbuf.ptr.fromStringz.idup); 255 } 256 257 /** Query single configuration value 258 * OK if the property was set previously set and 259 * returns the value in \p value. */ 260 override string opIndex(in char[] name) const 261 { 262 size_t size; 263 rd_kafka_conf_res_t res = rd_kafka_conf_res_t.RD_KAFKA_CONF_OK; 264 res = rd_kafka_topic_conf_get(rkt_conf_, name.toStringz(), null, &size); 265 if (res != rd_kafka_conf_res_t.RD_KAFKA_CONF_OK) 266 throw new ConfException(cast(ConfException.Result) res, "can not get config"); 267 char[] value = new char[size]; 268 res = rd_kafka_topic_conf_get(rkt_conf_, name.toStringz(), value.ptr, &size); 269 if (res != rd_kafka_conf_res_t.RD_KAFKA_CONF_OK) 270 throw new ConfException(cast(ConfException.Result) res, "can not get config"); 271 return cast(string) value; 272 } 273 274 /** Dump configuration names and values to list containing 275 * name,value tuples */ 276 override string[string] dump() 277 { 278 size_t cnt; 279 auto arrc = rd_kafka_topic_conf_dump(rkt_conf_, &cnt); 280 return dumpImpl(arrc, cnt); 281 } 282 283 nothrow @nogc: 284 285 ~this() 286 { 287 if (rkt_conf_) 288 rd_kafka_topic_conf_destroy(rkt_conf_); 289 } 290 291 /** 292 * Create configuration object 293 */ 294 this() 295 { 296 rkt_conf_ = rd_kafka_topic_conf_new(); 297 } 298 299 package PartitionerCb partitioner_cb_; 300 package PartitionerKeyPointerCb partitioner_kp_cb_; 301 package rd_kafka_topic_conf_t* rkt_conf_; 302 303 /++ 304 +/ 305 void partitionerCb(PartitionerCb cb) @property 306 { 307 partitioner_cb_ = cb; 308 } 309 /++ 310 +/ 311 void partitionerKpCb(PartitionerKeyPointerCb cb) @property 312 { 313 partitioner_kp_cb_ = cb; 314 } 315 }