1 ///
2 module rdkafkad.handler.producer;
3 import rdkafkad;
4 
5 import std.datetime: Clock, UTC;
6 
7 /**
8  * Producer
9  */
10 class Producer : Handle
11 {
12     package GlobalConf _conf;
13     /**
14    * Creates a new Kafka producer handle.
15    *
16    * \p conf is an optional object that will be used instead of the default
17    * configuration.
18    * The \p conf object is reusable after this call.
19    */
20     this(GlobalConf conf)
21     {
22         _conf = conf;
23         char[512] errbuf = void;
24         rd_kafka_conf_t* rk_conf = null;
25 
26         if (conf)
27         {
28             if (!conf.rk_conf_)
29             {
30                 throw new Exception("Requires Conf::CONF_GLOBAL object");
31             }
32 
33             this.setCommonConfig(conf);
34 
35             rk_conf = rd_kafka_conf_dup(conf.rk_conf_);
36 
37             if (conf.dr_cb_)
38             {
39                 rd_kafka_conf_set_dr_msg_cb(rk_conf, &dr_msg_cb_trampoline);
40                 this.dr_cb_ = conf.dr_cb_;
41             }
42         }
43 
44         if (null is(rk_ = rd_kafka_new(rd_kafka_type_t.RD_KAFKA_PRODUCER,
45                 rk_conf, errbuf.ptr, errbuf.sizeof)))
46         {
47             throw new Exception(errbuf.ptr.fromStringz.idup);
48         }
49     }
50 
51     /++
52     Returns: new topic for this producer
53     Params:
54         topic = topic name
55         topicConf = TopicConf, if null `defaultTopicConf` should be setted to the global configuration.
56     +/
57     Topic newTopic(const(char)[] topic, TopicConf topicConf = null)
58     {
59         if(!topicConf)
60             topicConf = _conf.defaultTopicConf;
61         assert(topicConf);
62         return new Topic(this, topic, topicConf);
63     }
64 
65 nothrow @nogc:
66 
67      ~this()
68     {
69         if (rk_)
70         {
71             rd_kafka_destroy(rk_);
72         }
73     }
74 
75     /**
76    * Producer::produce() \p msgflags
77    *
78    * These flags are optional and mutually exclusive.
79    */
80    enum MsgOpt
81    {
82         free = 0x1, /**< rdkafka will free(3) \p payload
83                                             * when it is done with it. */
84         copy = 0x2, /**< the \p payload data will be copied
85                                            * and the \p payload pointer will not
86                                            * be used by rdkafka after the
87                                            * call returns. */
88         block = 0x4, /**< Block produce*() on message queue
89               *   full.
90               *   WARNING:
91               *   If a delivery report callback
92               *   is used the application MUST
93               *   call rd_kafka_poll() (or equiv.)
94               *   to make sure delivered messages
95               *   are drained from the internal
96               *   delivery report queue.
97               *   Failure to do so will result
98               *   in indefinately blocking on
99               *   the produce() call when the
100               *   message queue is full.
101               */
102    }
103 
104     nothrow @nogc private static void dr_msg_cb_trampoline(rd_kafka_t* rk,
105         const rd_kafka_message_t* rkmessage, void* opaque)
106     {
107         auto handle = cast(Handle) opaque;
108         auto message = Message(null, rkmessage);
109         handle.dr_cb_(message);
110         message.destroy;
111     }
112 
113     /**
114    * Produce and send a single message to broker.
115    *
116    * This is an asynch non-blocking API.
117    *
118    * \p partition is the target partition, either:
119    *   - Topic::PARTITION_UA (unassigned) for
120    *     automatic partitioning using the topic's partitioner function, or
121    *   - a fixed partition (0..N)
122    *
123    * \p msgflags is zero or more of the following flags OR:ed together:
124    *    block - block \p produce*() call if
125    *                   \p queue.buffering.max.messages or
126    *                   \p queue.buffering.max.kbytes are exceeded.
127    *                   Messages are considered in-queue from the point they
128    *                   are accepted by produce() until their corresponding
129    *                   delivery report callback/event returns.
130    *                   It is thus a requirement to call 
131    *                   poll() (or equiv.) from a separate
132    *                   thread when block is used.
133    *                   See WARNING on \c block above.
134    *    free - rdkafka will free(3) \p payload when it is done with it.
135    *    copy - the \p payload data will be copied and the \p payload
136    *               pointer will not be used by rdkafka after the
137    *               call returns.
138    *
139    *  NOTE: free and copy are mutually exclusive.
140    *
141    *  If the function returns -1 and free was specified, then
142    *  the memory associated with the payload is still the caller's
143    *  responsibility.
144    *
145    * \p payload is the message payload of size \p len bytes.
146    *
147    * \p key is an optional message key, if non-null it
148    * will be passed to the topic partitioner as well as be sent with the
149    * message to the broker and passed on to the consumer.
150    *
151    * \p msg_opaque is an optional application-provided per-message opaque
152    * pointer that will provided in the delivery report callback (\p dr_cb) for
153    * referencing this message.
154    *
155    * an ErrorCode to indicate success or failure:
156    *  - _QUEUE_FULL - maximum number of outstanding messages has been
157    *                      reached: \c queue.buffering.max.message
158    *
159    *  - MSG_SIZE_TOO_LARGE - message is larger than configured max size:
160    *                            \c messages.max.bytes
161    *
162    *  - _UNKNOWN_PARTITION - requested \p partition is unknown in the
163    *                           Kafka cluster.
164    *
165    *  - _UNKNOWN_TOPIC     - topic is unknown in the Kafka cluster.
166    */
167     ErrorCode produce(Topic topic, int partition, void[] payload,
168         const(void)[] key = null,
169         long timestamp = Clock.currTime(UTC()).toUnixTime!long,
170         int msgflags = MsgOpt.copy,
171         void* msg_opaque = null)
172     {
173         int err;
174         //with(rd_kafka_vtype_t)
175         //err = rd_kafka_producev(
176         //        rk_,
177         //        RD_KAFKA_VTYPE_RKT,
178         //        topic.rkt_,
179         //        RD_KAFKA_VTYPE_PARTITION,
180         //        partition,
181         //        RD_KAFKA_VTYPE_MSGFLAGS,
182         //        msgflags,
183         //        RD_KAFKA_VTYPE_VALUE,
184         //        payload.ptr,
185         //        payload.length,
186         //        RD_KAFKA_VTYPE_KEY,
187         //        key.ptr,
188         //        key.length,
189         //        //RD_KAFKA_VTYPE_OPAQUE,
190         //        //msg_opaque,
191         //        RD_KAFKA_VTYPE_END,
192         //        );
193         err = rd_kafka_produce(topic.rkt_, partition, msgflags, payload.ptr,
194                 payload.length, key.ptr, key.length, msg_opaque);
195         if (err == -1)
196             return cast(ErrorCode) rd_kafka_errno2err(errno);
197         return ErrorCode.no_error;
198     }
199 
200     /**
201    * Wait until all outstanding produce requests, et.al, are completed.
202    *        This should typically be done prior to destroying a producer instance
203    *        to make sure all queued and in-flight produce requests are completed
204    *        before terminating.
205    *
206    * Note: This function will call poll() and thus trigger callbacks.
207    *
208    * timed_out if \p timeout_ms was reached before all
209    *          outstanding requests were completed, else ErrorCode.no_error
210    */
211     ErrorCode flush(int timeout_ms = 60_000)
212     {
213         typeof(return) ret;
214         ret = cast(ErrorCode) rd_kafka_flush(rk_, timeout_ms);
215         return ret;
216     }
217 }