1 /// 2 module rdkafkad.handler.producer; 3 import rdkafkad; 4 5 import std.datetime: Clock, UTC; 6 7 /** 8 * Producer 9 */ 10 class Producer : Handle 11 { 12 package GlobalConf _conf; 13 /** 14 * Creates a new Kafka producer handle. 15 * 16 * \p conf is an optional object that will be used instead of the default 17 * configuration. 18 * The \p conf object is reusable after this call. 19 */ 20 this(GlobalConf conf) 21 { 22 _conf = conf; 23 char[512] errbuf = void; 24 rd_kafka_conf_t* rk_conf = null; 25 26 if (conf) 27 { 28 if (!conf.rk_conf_) 29 { 30 throw new Exception("Requires Conf::CONF_GLOBAL object"); 31 } 32 33 this.setCommonConfig(conf); 34 35 rk_conf = rd_kafka_conf_dup(conf.rk_conf_); 36 37 if (conf.dr_cb_) 38 { 39 rd_kafka_conf_set_dr_msg_cb(rk_conf, &dr_msg_cb_trampoline); 40 this.dr_cb_ = conf.dr_cb_; 41 } 42 } 43 44 if (null is(rk_ = rd_kafka_new(rd_kafka_type_t.RD_KAFKA_PRODUCER, 45 rk_conf, errbuf.ptr, errbuf.sizeof))) 46 { 47 throw new Exception(errbuf.ptr.fromStringz.idup); 48 } 49 } 50 51 /++ 52 Returns: new topic for this producer 53 Params: 54 topic = topic name 55 topicConf = TopicConf, if null `defaultTopicConf` should be setted to the global configuration. 56 +/ 57 Topic newTopic(const(char)[] topic, TopicConf topicConf = null) 58 { 59 if(!topicConf) 60 topicConf = _conf.defaultTopicConf; 61 assert(topicConf); 62 return new Topic(this, topic, topicConf); 63 } 64 65 nothrow @nogc: 66 67 ~this() 68 { 69 if (rk_) 70 { 71 rd_kafka_destroy(rk_); 72 } 73 } 74 75 /** 76 * Producer::produce() \p msgflags 77 * 78 * These flags are optional and mutually exclusive. 79 */ 80 enum MsgOpt 81 { 82 free = 0x1, /**< rdkafka will free(3) \p payload 83 * when it is done with it. */ 84 copy = 0x2, /**< the \p payload data will be copied 85 * and the \p payload pointer will not 86 * be used by rdkafka after the 87 * call returns. */ 88 block = 0x4, /**< Block produce*() on message queue 89 * full. 90 * WARNING: 91 * If a delivery report callback 92 * is used the application MUST 93 * call rd_kafka_poll() (or equiv.) 94 * to make sure delivered messages 95 * are drained from the internal 96 * delivery report queue. 97 * Failure to do so will result 98 * in indefinately blocking on 99 * the produce() call when the 100 * message queue is full. 101 */ 102 } 103 104 nothrow @nogc private static void dr_msg_cb_trampoline(rd_kafka_t* rk, 105 const rd_kafka_message_t* rkmessage, void* opaque) 106 { 107 auto handle = cast(Handle) opaque; 108 auto message = Message(null, rkmessage); 109 handle.dr_cb_(message); 110 message.destroy; 111 } 112 113 /** 114 * Produce and send a single message to broker. 115 * 116 * This is an asynch non-blocking API. 117 * 118 * \p partition is the target partition, either: 119 * - Topic::PARTITION_UA (unassigned) for 120 * automatic partitioning using the topic's partitioner function, or 121 * - a fixed partition (0..N) 122 * 123 * \p msgflags is zero or more of the following flags OR:ed together: 124 * block - block \p produce*() call if 125 * \p queue.buffering.max.messages or 126 * \p queue.buffering.max.kbytes are exceeded. 127 * Messages are considered in-queue from the point they 128 * are accepted by produce() until their corresponding 129 * delivery report callback/event returns. 130 * It is thus a requirement to call 131 * poll() (or equiv.) from a separate 132 * thread when block is used. 133 * See WARNING on \c block above. 134 * free - rdkafka will free(3) \p payload when it is done with it. 135 * copy - the \p payload data will be copied and the \p payload 136 * pointer will not be used by rdkafka after the 137 * call returns. 138 * 139 * NOTE: free and copy are mutually exclusive. 140 * 141 * If the function returns -1 and free was specified, then 142 * the memory associated with the payload is still the caller's 143 * responsibility. 144 * 145 * \p payload is the message payload of size \p len bytes. 146 * 147 * \p key is an optional message key, if non-null it 148 * will be passed to the topic partitioner as well as be sent with the 149 * message to the broker and passed on to the consumer. 150 * 151 * \p msg_opaque is an optional application-provided per-message opaque 152 * pointer that will provided in the delivery report callback (\p dr_cb) for 153 * referencing this message. 154 * 155 * an ErrorCode to indicate success or failure: 156 * - _QUEUE_FULL - maximum number of outstanding messages has been 157 * reached: \c queue.buffering.max.message 158 * 159 * - MSG_SIZE_TOO_LARGE - message is larger than configured max size: 160 * \c messages.max.bytes 161 * 162 * - _UNKNOWN_PARTITION - requested \p partition is unknown in the 163 * Kafka cluster. 164 * 165 * - _UNKNOWN_TOPIC - topic is unknown in the Kafka cluster. 166 */ 167 ErrorCode produce(Topic topic, int partition, void[] payload, 168 const(void)[] key = null, 169 long timestamp = Clock.currTime(UTC()).toUnixTime!long, 170 int msgflags = MsgOpt.copy, 171 void* msg_opaque = null) 172 { 173 int err; 174 //with(rd_kafka_vtype_t) 175 //err = rd_kafka_producev( 176 // rk_, 177 // RD_KAFKA_VTYPE_RKT, 178 // topic.rkt_, 179 // RD_KAFKA_VTYPE_PARTITION, 180 // partition, 181 // RD_KAFKA_VTYPE_MSGFLAGS, 182 // msgflags, 183 // RD_KAFKA_VTYPE_VALUE, 184 // payload.ptr, 185 // payload.length, 186 // RD_KAFKA_VTYPE_KEY, 187 // key.ptr, 188 // key.length, 189 // //RD_KAFKA_VTYPE_OPAQUE, 190 // //msg_opaque, 191 // RD_KAFKA_VTYPE_END, 192 // ); 193 err = rd_kafka_produce(topic.rkt_, partition, msgflags, payload.ptr, 194 payload.length, key.ptr, key.length, msg_opaque); 195 if (err == -1) 196 return cast(ErrorCode) rd_kafka_errno2err(errno); 197 return ErrorCode.no_error; 198 } 199 200 /** 201 * Wait until all outstanding produce requests, et.al, are completed. 202 * This should typically be done prior to destroying a producer instance 203 * to make sure all queued and in-flight produce requests are completed 204 * before terminating. 205 * 206 * Note: This function will call poll() and thus trigger callbacks. 207 * 208 * timed_out if \p timeout_ms was reached before all 209 * outstanding requests were completed, else ErrorCode.no_error 210 */ 211 ErrorCode flush(int timeout_ms = 60_000) 212 { 213 typeof(return) ret; 214 ret = cast(ErrorCode) rd_kafka_flush(rk_, timeout_ms); 215 return ret; 216 } 217 }