1 /// 2 module rdkafkad.config; 3 import rdkafkad; 4 5 /** 6 * \b Portability: OpenCb callback class 7 * 8 */ 9 /** 10 * Open callback 11 * The open callback is responsible for opening the file specified by 12 * \p pathname, using \p flags and \p mode. 13 * The file shall be opened with \c CLOEXEC set in a racefree fashion, if 14 * possible. 15 * 16 * It is typically not required to register an alternative open implementation 17 * 18 * Note: Not currently available on native Win32 19 */ 20 alias OpenCb = int function(const(char)[] path, int flags, int mode) nothrow @nogc; 21 22 /// 23 class ConfException : Exception 24 { 25 /** 26 * Conf::Set() result code 27 */ 28 enum Result 29 { 30 UNKNOWN = -2, /**< Unknown configuration property */ 31 INVALID = -1, /**< Invalid configuration value */ 32 OK /**< Configuration property was succesfully set */ 33 } 34 /// 35 Result result; 36 /// 37 this(Result result, string msg, string file = __FILE__, 38 uint line = cast(uint) __LINE__, Throwable next = null) pure nothrow @nogc @safe 39 { 40 super(msg, file, line, next); 41 this.result = result; 42 } 43 } 44 45 /** 46 * Configuration interface 47 * 48 * Holds either global or topic configuration that are passed to 49 * Consumer::create(), Producer::create(), 50 * create(), etc. 51 * 52 * See_also: CONFIGURATION.md for the full list of supported properties. 53 */ 54 interface Conf 55 { 56 /** 57 * Set configuration property \p name to value \p value. 58 * OK on success, else writes a human readable error 59 * description to \p errstr on error. 60 */ 61 void opIndexAssign(in char[] value, in char[] name); 62 63 /** Query single configuration value 64 * OK if the property was set previously set and 65 * returns the value in \p value. */ 66 string opIndex(in char[] name) const; 67 68 string[] dump(); 69 } 70 71 class GlobalConf : Conf 72 { 73 /// 74 this(TopicConf defaultTopicConf = new TopicConf) 75 { 76 this.defaultTopicConf = defaultTopicConf; 77 } 78 79 private TopicConf _defaultTopicConf; 80 81 /** Dump configuration names and values to list containing 82 * name,value tuples */ 83 string[] dump() 84 { 85 size_t cnt; 86 auto arrc = rd_kafka_conf_dump(rk_conf_, &cnt); 87 auto arr = new string[cnt]; 88 foreach (size_t i; 0 .. cnt) 89 arr[i] = arrc[i].fromStringz.idup; 90 rd_kafka_conf_dump_free(arrc, cnt); 91 return arr; 92 } 93 94 /** 95 * Set configuration property \p name to value \p value. 96 * OK on success, else writes a human readable error 97 * description to \p errstr on error. 98 */ 99 void opIndexAssign(in char[] value, in char[] name) 100 { 101 char[512] errbuf = void; 102 103 auto res = rd_kafka_conf_set(this.rk_conf_, name.toStringz(), 104 value.toStringz(), errbuf.ptr, errbuf.sizeof); 105 if (res != rd_kafka_conf_res_t.RD_KAFKA_CONF_OK) 106 throw new ConfException(cast(ConfException.Result) res, 107 errbuf.ptr.fromStringz.idup); 108 } 109 110 /** Query single configuration value 111 * OK if the property was set previously set and 112 * returns the value in \p value. */ 113 string opIndex(in char[] name) const 114 { 115 size_t size; 116 117 rd_kafka_conf_res_t res = rd_kafka_conf_res_t.RD_KAFKA_CONF_OK; 118 res = rd_kafka_conf_get(rk_conf_, name.toStringz(), null, &size); 119 if (res != rd_kafka_conf_res_t.RD_KAFKA_CONF_OK) 120 throw new ConfException(cast(ConfException.Result) res, "can not get config"); 121 122 char[] value = new char[size]; 123 res = rd_kafka_conf_get(rk_conf_, name.toStringz(), value.ptr, &size); 124 if (res != rd_kafka_conf_res_t.RD_KAFKA_CONF_OK) 125 throw new ConfException(cast(ConfException.Result) res, "can not get config"); 126 return cast(string) value; 127 } 128 129 nothrow @nogc: 130 131 ~this() nothrow @nogc 132 { 133 if (rk_conf_) 134 rd_kafka_conf_destroy(rk_conf_); 135 } 136 137 /** 138 * Create configuration object 139 */ 140 this() 141 { 142 rk_conf_ = rd_kafka_conf_new(); 143 } 144 145 package DeliveryReportCb dr_cb_; 146 package EventCb event_cb_; 147 package SocketCb socket_cb_; 148 package OpenCb open_cb_; 149 package RebalanceCb rebalance_cb_; 150 package OffsetCommitCb offset_commit_cb_; 151 package rd_kafka_conf_t* rk_conf_; 152 153 /++ 154 +/ 155 void drCb(DeliveryReportCb cb) @property 156 { 157 dr_cb_ = cb; 158 } 159 /++ 160 +/ 161 void eventCb(EventCb cb) @property 162 { 163 event_cb_ = cb; 164 } 165 /++ 166 +/ 167 void socketCb(SocketCb cb) @property 168 { 169 socket_cb_ = cb; 170 } 171 /++ 172 +/ 173 void openCb(OpenCb cb) @property 174 { 175 open_cb_ = cb; 176 } 177 /++ 178 +/ 179 void rebalanceCb(RebalanceCb cb) @property 180 { 181 rebalance_cb_ = cb; 182 } 183 /++ 184 +/ 185 void offsetCommitCb(OffsetCommitCb cb) @property 186 { 187 offset_commit_cb_ = cb; 188 } 189 190 /** Use with \p name = \c \"default_topic_conf\" 191 * 192 * Sets the default topic configuration to use for for automatically 193 * subscribed topics and for Topic construction with producer. 194 * 195 * See_also: subscribe() 196 */ 197 void defaultTopicConf(TopicConf topic_conf) @property 198 { 199 _defaultTopicConf = topic_conf; 200 rd_kafka_conf_set_default_topic_conf(rk_conf_, 201 rd_kafka_topic_conf_dup(topic_conf.rkt_conf_)); 202 } 203 204 ///ditto 205 TopicConf defaultTopicConf() @property 206 { 207 return _defaultTopicConf; 208 } 209 210 } 211 212 class TopicConf : Conf 213 { 214 /** 215 * Set configuration property \p name to value \p value. 216 * OK on success, else writes a human readable error 217 * description to \p errstr on error. 218 */ 219 void opIndexAssign(in char[] value, in char[] name) 220 { 221 rd_kafka_conf_res_t res; 222 char[512] errbuf = void; 223 224 res = rd_kafka_topic_conf_set(this.rkt_conf_, name.toStringz(), 225 value.toStringz(), errbuf.ptr, errbuf.sizeof); 226 227 if (res != rd_kafka_conf_res_t.RD_KAFKA_CONF_OK) 228 throw new ConfException(cast(ConfException.Result) res, 229 errbuf.ptr.fromStringz.idup); 230 } 231 232 /** Query single configuration value 233 * OK if the property was set previously set and 234 * returns the value in \p value. */ 235 string opIndex(in char[] name) const 236 { 237 size_t size; 238 rd_kafka_conf_res_t res = rd_kafka_conf_res_t.RD_KAFKA_CONF_OK; 239 res = rd_kafka_topic_conf_get(rkt_conf_, name.toStringz(), null, &size); 240 if (res != rd_kafka_conf_res_t.RD_KAFKA_CONF_OK) 241 throw new ConfException(cast(ConfException.Result) res, "can not get config"); 242 char[] value = new char[size]; 243 res = rd_kafka_topic_conf_get(rkt_conf_, name.toStringz(), value.ptr, &size); 244 if (res != rd_kafka_conf_res_t.RD_KAFKA_CONF_OK) 245 throw new ConfException(cast(ConfException.Result) res, "can not get config"); 246 return cast(string) value; 247 } 248 249 /** Dump configuration names and values to list containing 250 * name,value tuples */ 251 string[] dump() 252 { 253 size_t cnt; 254 auto arrc = rd_kafka_topic_conf_dump(rkt_conf_, &cnt); 255 auto arr = new string[cnt]; 256 foreach (size_t i; 0 .. cnt) 257 arr[i] = arrc[i].fromStringz.idup; 258 rd_kafka_conf_dump_free(arrc, cnt); 259 return arr; 260 } 261 262 nothrow @nogc: 263 264 ~this() 265 { 266 if (rkt_conf_) 267 rd_kafka_topic_conf_destroy(rkt_conf_); 268 } 269 270 /** 271 * Create configuration object 272 */ 273 this() 274 { 275 rkt_conf_ = rd_kafka_topic_conf_new(); 276 } 277 278 package PartitionerCb partitioner_cb_; 279 package PartitionerKeyPointerCb partitioner_kp_cb_; 280 package rd_kafka_topic_conf_t* rkt_conf_; 281 282 /++ 283 +/ 284 void partitionerCb(PartitionerCb cb) @property 285 { 286 partitioner_cb_ = cb; 287 } 288 /++ 289 +/ 290 void partitionerKpCb(PartitionerKeyPointerCb cb) @property 291 { 292 partitioner_kp_cb_ = cb; 293 } 294 }