1 ///
2 module rdkafkad.metadata;
3 import rdkafkad;
4 import rdkafkad.iodriver;
5 
6 /**
7  * Metadata: Broker information
8  */
9 struct BrokerMetadata
10 {
11     private const(rd_kafka_metadata_broker_t)* broker_metadata_;
12 
13 const @property nothrow @nogc:
14 
15     /** Broker id */
16     int id() const
17     {
18         return broker_metadata_.id;
19     }
20 
21     /** Broker hostname */
22     const(char)[] host() const
23     {
24         return broker_metadata_.host.fromStringz;
25     }
26 
27     /** Broker listening port */
28     int port() const
29     {
30         return broker_metadata_.port;
31     }
32 }
33 
34 /**
35  * Metadata: Partition information
36  */
37 struct PartitionMetadata
38 {
39     private const (rd_kafka_metadata_partition_t)* partition_metadata_;
40 
41 const @property nothrow @nogc:
42 
43     /** Partition id */
44     int id()
45     {
46         return partition_metadata_.id;
47     }
48 
49     /** Partition error reported by broker */
50     ErrorCode err()
51     {
52         return cast(ErrorCode) partition_metadata_.err;
53     }
54 
55     /** Leader broker (id) for partition */
56     int leader()
57     {
58         return partition_metadata_.leader;
59     }
60 
61     /** Replica brokers */
62     auto replicas()
63     {
64         static struct Replicas
65         {
66         nothrow @nogc:
67             private const(rd_kafka_metadata_partition_t)* partition_metadata_;
68             private size_t _i;
69             auto empty() @property
70             {
71                 return _i >= partition_metadata_.replica_cnt;
72             }
73 
74             auto front() @property
75             {
76                 return partition_metadata_.replicas[_i];
77             }
78 
79             auto popFront()
80             {
81                 _i++;
82             }
83 
84             auto length() @property
85             {
86                 return partition_metadata_.replica_cnt - _i;
87             }
88         }
89 
90         return Replicas(partition_metadata_);
91     }
92 
93     /** In-Sync-Replica brokers
94    *  Warning: The broker may return a cached/outdated list of ISRs.
95    */
96     auto isrs()
97     {
98         static struct Isrs
99         {
100         nothrow @nogc:
101             private const(rd_kafka_metadata_partition_t)* partition_metadata_;
102             private size_t _i;
103             auto empty() @property
104             {
105                 return _i >= partition_metadata_.isr_cnt;
106             }
107 
108             auto front() @property
109             {
110                 return partition_metadata_.isrs[_i];
111             }
112 
113             auto popFront()
114             {
115                 _i++;
116             }
117 
118             auto length() @property
119             {
120                 return partition_metadata_.isr_cnt - _i;
121             }
122         }
123 
124         return Isrs(partition_metadata_);
125     }
126 }
127 
128 /**
129  * Metadata: Topic information
130  */
131 struct TopicMetadata
132 {
133     private const(rd_kafka_metadata_topic_t)* topic_metadata_;
134 
135 const @property nothrow @nogc:
136 
137     /** Topic name */
138     const(char)[] topic()
139     {
140         return topic_metadata_.topic.fromStringz;
141     }
142 
143     /** Partition list */
144     auto partitions()
145     {
146         static struct Partitions
147         {
148         nothrow @nogc:
149             private const(rd_kafka_metadata_topic_t)* topic_metadata_;
150             private size_t _i;
151             auto empty() @property
152             {
153                 return _i >= topic_metadata_.partition_cnt;
154             }
155 
156             auto front() @property
157             {
158                 return PartitionMetadata(&topic_metadata_.partitions[_i]);
159             }
160 
161             auto popFront()
162             {
163                 _i++;
164             }
165 
166             auto length() @property
167             {
168                 return topic_metadata_.partition_cnt - _i;
169             }
170         }
171 
172         return Partitions(topic_metadata_);
173     }
174 
175     /** Topic error reported by broker */
176     ErrorCode err()
177     {
178         return cast(ErrorCode)(topic_metadata_.err);
179     }
180 }
181 
182 /**
183  * Metadata container
184  */
185 final class Metadata
186 {
187     private const(rd_kafka_metadata_t)* metadata_;
188 
189 nothrow @nogc:
190 
191     this(const(rd_kafka_metadata_t)* metadata)
192     {
193         metadata_ = metadata;
194     }
195 
196     ~this()
197     {
198         rd_kafka_metadata_destroy(metadata_);
199     }
200 
201 const @property:
202 
203     /** Broker list */
204     auto brokers() 
205     {
206         static struct Brokers
207         {
208         nothrow @nogc:
209             private const(rd_kafka_metadata_t)* metadata_;
210             private size_t _i;
211             auto empty() @property
212             {
213                 return _i >= metadata_.broker_cnt;
214             }
215 
216             auto front() @property
217             {
218                 return BrokerMetadata(&metadata_.brokers[_i]);
219             }
220 
221             auto popFront()
222             {
223                 _i++;
224             }
225 
226             auto length() @property
227             {
228                 return metadata_.broker_cnt - _i;
229             }
230         }
231         assert(metadata_);
232         return Brokers(metadata_);
233     }
234 
235     /** Topic list */
236     auto topics()
237     {
238         static struct Topics
239         {
240         nothrow @nogc:
241             private const(rd_kafka_metadata_t)* metadata_;
242             private size_t _i;
243             auto empty() @property
244             {
245                 return _i >= metadata_.topic_cnt;
246             }
247 
248             auto front() @property
249             {
250                 return TopicMetadata(&metadata_.topics[_i]);
251             }
252 
253             auto popFront()
254             {
255                 _i++;
256             }
257 
258             auto length() @property
259             {
260                 return metadata_.topic_cnt - _i;
261             }
262         }
263         assert(metadata_);
264         return Topics(metadata_);
265     }
266 
267     /** Broker (id) originating this metadata */
268     int origBrokerId()
269     {
270         return metadata_.orig_broker_id;
271     }
272 
273     /** Broker (name) originating this metadata */
274     const(char)[] origBrokerName()
275     {
276         return metadata_.orig_broker_name.fromStringz;
277     }
278 }