1 /// 2 module rdkafkad.metadata; 3 import rdkafkad; 4 5 /** 6 * Metadata: Broker information 7 */ 8 struct BrokerMetadata 9 { 10 private const(rd_kafka_metadata_broker_t)* broker_metadata_; 11 12 const @property nothrow @nogc: 13 14 /** Broker id */ 15 int id() const 16 { 17 return broker_metadata_.id; 18 } 19 20 /** Broker hostname */ 21 const(char)[] host() const 22 { 23 return broker_metadata_.host.fromStringz; 24 } 25 26 /** Broker listening port */ 27 int port() const 28 { 29 return broker_metadata_.port; 30 } 31 } 32 33 /** 34 * Metadata: Partition information 35 */ 36 struct PartitionMetadata 37 { 38 private const (rd_kafka_metadata_partition_t)* partition_metadata_; 39 40 const @property nothrow @nogc: 41 42 /** Partition id */ 43 int id() 44 { 45 return partition_metadata_.id; 46 } 47 48 /** Partition error reported by broker */ 49 ErrorCode err() 50 { 51 return cast(ErrorCode) partition_metadata_.err; 52 } 53 54 /** Leader broker (id) for partition */ 55 int leader() 56 { 57 return partition_metadata_.leader; 58 } 59 60 /** Replica brokers */ 61 auto replicas() 62 { 63 static struct Replicas 64 { 65 nothrow @nogc: 66 private const(rd_kafka_metadata_partition_t)* partition_metadata_; 67 private size_t _i; 68 auto empty() @property 69 { 70 return _i >= partition_metadata_.replica_cnt; 71 } 72 73 auto front() @property 74 { 75 return partition_metadata_.replicas[_i]; 76 } 77 78 auto popFront() 79 { 80 _i++; 81 } 82 83 auto length() @property 84 { 85 return partition_metadata_.replica_cnt - _i; 86 } 87 } 88 89 return Replicas(partition_metadata_); 90 } 91 92 /** In-Sync-Replica brokers 93 * Warning: The broker may return a cached/outdated list of ISRs. 94 */ 95 auto isrs() 96 { 97 static struct Isrs 98 { 99 nothrow @nogc: 100 private const(rd_kafka_metadata_partition_t)* partition_metadata_; 101 private size_t _i; 102 auto empty() @property 103 { 104 return _i >= partition_metadata_.isr_cnt; 105 } 106 107 auto front() @property 108 { 109 return partition_metadata_.isrs[_i]; 110 } 111 112 auto popFront() 113 { 114 _i++; 115 } 116 117 auto length() @property 118 { 119 return partition_metadata_.isr_cnt - _i; 120 } 121 } 122 123 return Isrs(partition_metadata_); 124 } 125 } 126 127 /** 128 * Metadata: Topic information 129 */ 130 struct TopicMetadata 131 { 132 private const(rd_kafka_metadata_topic_t)* topic_metadata_; 133 134 const @property nothrow @nogc: 135 136 /** Topic name */ 137 const(char)[] topic() 138 { 139 return topic_metadata_.topic.fromStringz; 140 } 141 142 /** Partition list */ 143 auto partitions() 144 { 145 static struct Partitions 146 { 147 nothrow @nogc: 148 private const(rd_kafka_metadata_topic_t)* topic_metadata_; 149 private size_t _i; 150 auto empty() @property 151 { 152 return _i >= topic_metadata_.partition_cnt; 153 } 154 155 auto front() @property 156 { 157 return PartitionMetadata(&topic_metadata_.partitions[_i]); 158 } 159 160 auto popFront() 161 { 162 _i++; 163 } 164 165 auto length() @property 166 { 167 return topic_metadata_.partition_cnt - _i; 168 } 169 } 170 171 return Partitions(topic_metadata_); 172 } 173 174 /** Topic error reported by broker */ 175 ErrorCode err() 176 { 177 return cast(ErrorCode)(topic_metadata_.err); 178 } 179 } 180 181 /** 182 * Metadata container 183 */ 184 final class Metadata 185 { 186 private const(rd_kafka_metadata_t)* metadata_; 187 188 nothrow @nogc: 189 190 this(const(rd_kafka_metadata_t)* metadata) 191 { 192 metadata_ = metadata; 193 } 194 195 ~this() 196 { 197 rd_kafka_metadata_destroy(metadata_); 198 } 199 200 const @property: 201 202 /** Broker list */ 203 auto brokers() 204 { 205 static struct Brokers 206 { 207 nothrow @nogc: 208 private const(rd_kafka_metadata_t)* metadata_; 209 private size_t _i; 210 auto empty() @property 211 { 212 return _i >= metadata_.broker_cnt; 213 } 214 215 auto front() @property 216 { 217 return BrokerMetadata(&metadata_.brokers[_i]); 218 } 219 220 auto popFront() 221 { 222 _i++; 223 } 224 225 auto length() @property 226 { 227 return metadata_.broker_cnt - _i; 228 } 229 } 230 assert(metadata_); 231 return Brokers(metadata_); 232 } 233 234 /** Topic list */ 235 auto topics() 236 { 237 static struct Topics 238 { 239 nothrow @nogc: 240 private const(rd_kafka_metadata_t)* metadata_; 241 private size_t _i; 242 auto empty() @property 243 { 244 return _i >= metadata_.topic_cnt; 245 } 246 247 auto front() @property 248 { 249 return TopicMetadata(&metadata_.topics[_i]); 250 } 251 252 auto popFront() 253 { 254 _i++; 255 } 256 257 auto length() @property 258 { 259 return metadata_.topic_cnt - _i; 260 } 261 } 262 assert(metadata_); 263 return Topics(metadata_); 264 } 265 266 /** Broker (id) originating this metadata */ 267 int origBrokerId() 268 { 269 return metadata_.orig_broker_id; 270 } 271 272 /** Broker (name) originating this metadata */ 273 const(char)[] origBrokerName() 274 { 275 return metadata_.orig_broker_name.fromStringz; 276 } 277 }