1 ///
2 module rdkafkad.metadata;
3 import rdkafkad;
4 
5 /**
6  * Metadata: Broker information
7  */
8 struct BrokerMetadata
9 {
10     private const(rd_kafka_metadata_broker_t)* broker_metadata_;
11 
12 const @property nothrow @nogc:
13 
14     /** Broker id */
15     int id() const
16     {
17         return broker_metadata_.id;
18     }
19 
20     /** Broker hostname */
21     const(char)[] host() const
22     {
23         return broker_metadata_.host.fromStringz;
24     }
25 
26     /** Broker listening port */
27     int port() const
28     {
29         return broker_metadata_.port;
30     }
31 }
32 
33 /**
34  * Metadata: Partition information
35  */
36 struct PartitionMetadata
37 {
38     private const (rd_kafka_metadata_partition_t)* partition_metadata_;
39 
40 const @property nothrow @nogc:
41 
42     /** Partition id */
43     int id()
44     {
45         return partition_metadata_.id;
46     }
47 
48     /** Partition error reported by broker */
49     ErrorCode err()
50     {
51         return cast(ErrorCode) partition_metadata_.err;
52     }
53 
54     /** Leader broker (id) for partition */
55     int leader()
56     {
57         return partition_metadata_.leader;
58     }
59 
60     /** Replica brokers */
61     auto replicas()
62     {
63         static struct Replicas
64         {
65         nothrow @nogc:
66             private const(rd_kafka_metadata_partition_t)* partition_metadata_;
67             private size_t _i;
68             auto empty() @property
69             {
70                 return _i >= partition_metadata_.replica_cnt;
71             }
72 
73             auto front() @property
74             {
75                 return partition_metadata_.replicas[_i];
76             }
77 
78             auto popFront()
79             {
80                 _i++;
81             }
82 
83             auto length() @property
84             {
85                 return partition_metadata_.replica_cnt - _i;
86             }
87         }
88 
89         return Replicas(partition_metadata_);
90     }
91 
92     /** In-Sync-Replica brokers
93    *  Warning: The broker may return a cached/outdated list of ISRs.
94    */
95     auto isrs()
96     {
97         static struct Isrs
98         {
99         nothrow @nogc:
100             private const(rd_kafka_metadata_partition_t)* partition_metadata_;
101             private size_t _i;
102             auto empty() @property
103             {
104                 return _i >= partition_metadata_.isr_cnt;
105             }
106 
107             auto front() @property
108             {
109                 return partition_metadata_.isrs[_i];
110             }
111 
112             auto popFront()
113             {
114                 _i++;
115             }
116 
117             auto length() @property
118             {
119                 return partition_metadata_.isr_cnt - _i;
120             }
121         }
122 
123         return Isrs(partition_metadata_);
124     }
125 }
126 
127 /**
128  * Metadata: Topic information
129  */
130 struct TopicMetadata
131 {
132     private const(rd_kafka_metadata_topic_t)* topic_metadata_;
133 
134 const @property nothrow @nogc:
135 
136     /** Topic name */
137     const(char)[] topic()
138     {
139         return topic_metadata_.topic.fromStringz;
140     }
141 
142     /** Partition list */
143     auto partitions()
144     {
145         static struct Partitions
146         {
147         nothrow @nogc:
148             private const(rd_kafka_metadata_topic_t)* topic_metadata_;
149             private size_t _i;
150             auto empty() @property
151             {
152                 return _i >= topic_metadata_.partition_cnt;
153             }
154 
155             auto front() @property
156             {
157                 return PartitionMetadata(&topic_metadata_.partitions[_i]);
158             }
159 
160             auto popFront()
161             {
162                 _i++;
163             }
164 
165             auto length() @property
166             {
167                 return topic_metadata_.partition_cnt - _i;
168             }
169         }
170 
171         return Partitions(topic_metadata_);
172     }
173 
174     /** Topic error reported by broker */
175     ErrorCode err()
176     {
177         return cast(ErrorCode)(topic_metadata_.err);
178     }
179 }
180 
181 /**
182  * Metadata container
183  */
184 final class Metadata
185 {
186     private const(rd_kafka_metadata_t)* metadata_;
187 
188 nothrow @nogc:
189 
190     this(const(rd_kafka_metadata_t)* metadata)
191     {
192         metadata_ = metadata;
193     }
194 
195     ~this()
196     {
197         rd_kafka_metadata_destroy(metadata_);
198     }
199 
200 const @property:
201 
202     /** Broker list */
203     auto brokers() 
204     {
205         static struct Brokers
206         {
207         nothrow @nogc:
208             private const(rd_kafka_metadata_t)* metadata_;
209             private size_t _i;
210             auto empty() @property
211             {
212                 return _i >= metadata_.broker_cnt;
213             }
214 
215             auto front() @property
216             {
217                 return BrokerMetadata(&metadata_.brokers[_i]);
218             }
219 
220             auto popFront()
221             {
222                 _i++;
223             }
224 
225             auto length() @property
226             {
227                 return metadata_.broker_cnt - _i;
228             }
229         }
230         assert(metadata_);
231         return Brokers(metadata_);
232     }
233 
234     /** Topic list */
235     auto topics()
236     {
237         static struct Topics
238         {
239         nothrow @nogc:
240             private const(rd_kafka_metadata_t)* metadata_;
241             private size_t _i;
242             auto empty() @property
243             {
244                 return _i >= metadata_.topic_cnt;
245             }
246 
247             auto front() @property
248             {
249                 return TopicMetadata(&metadata_.topics[_i]);
250             }
251 
252             auto popFront()
253             {
254                 _i++;
255             }
256 
257             auto length() @property
258             {
259                 return metadata_.topic_cnt - _i;
260             }
261         }
262         assert(metadata_);
263         return Topics(metadata_);
264     }
265 
266     /** Broker (id) originating this metadata */
267     int origBrokerId()
268     {
269         return metadata_.orig_broker_id;
270     }
271 
272     /** Broker (name) originating this metadata */
273     const(char)[] origBrokerName()
274     {
275         return metadata_.orig_broker_name.fromStringz;
276     }
277 }