1 /// 2 module rdkafkad.metadata; 3 import rdkafkad; 4 import rdkafkad.iodriver; 5 6 /** 7 * Metadata: Broker information 8 */ 9 struct BrokerMetadata 10 { 11 private const(rd_kafka_metadata_broker_t)* broker_metadata_; 12 13 const @property nothrow @nogc: 14 15 /** Broker id */ 16 int id() const 17 { 18 return broker_metadata_.id; 19 } 20 21 /** Broker hostname */ 22 const(char)[] host() const 23 { 24 return broker_metadata_.host.fromStringz; 25 } 26 27 /** Broker listening port */ 28 int port() const 29 { 30 return broker_metadata_.port; 31 } 32 } 33 34 /** 35 * Metadata: Partition information 36 */ 37 struct PartitionMetadata 38 { 39 private const (rd_kafka_metadata_partition_t)* partition_metadata_; 40 41 const @property nothrow @nogc: 42 43 /** Partition id */ 44 int id() 45 { 46 return partition_metadata_.id; 47 } 48 49 /** Partition error reported by broker */ 50 ErrorCode err() 51 { 52 return cast(ErrorCode) partition_metadata_.err; 53 } 54 55 /** Leader broker (id) for partition */ 56 int leader() 57 { 58 return partition_metadata_.leader; 59 } 60 61 /** Replica brokers */ 62 auto replicas() 63 { 64 static struct Replicas 65 { 66 nothrow @nogc: 67 private const(rd_kafka_metadata_partition_t)* partition_metadata_; 68 private size_t _i; 69 auto empty() @property 70 { 71 return _i >= partition_metadata_.replica_cnt; 72 } 73 74 auto front() @property 75 { 76 return partition_metadata_.replicas[_i]; 77 } 78 79 auto popFront() 80 { 81 _i++; 82 } 83 84 auto length() @property 85 { 86 return partition_metadata_.replica_cnt - _i; 87 } 88 } 89 90 return Replicas(partition_metadata_); 91 } 92 93 /** In-Sync-Replica brokers 94 * Warning: The broker may return a cached/outdated list of ISRs. 95 */ 96 auto isrs() 97 { 98 static struct Isrs 99 { 100 nothrow @nogc: 101 private const(rd_kafka_metadata_partition_t)* partition_metadata_; 102 private size_t _i; 103 auto empty() @property 104 { 105 return _i >= partition_metadata_.isr_cnt; 106 } 107 108 auto front() @property 109 { 110 return partition_metadata_.isrs[_i]; 111 } 112 113 auto popFront() 114 { 115 _i++; 116 } 117 118 auto length() @property 119 { 120 return partition_metadata_.isr_cnt - _i; 121 } 122 } 123 124 return Isrs(partition_metadata_); 125 } 126 } 127 128 /** 129 * Metadata: Topic information 130 */ 131 struct TopicMetadata 132 { 133 private const(rd_kafka_metadata_topic_t)* topic_metadata_; 134 135 const @property nothrow @nogc: 136 137 /** Topic name */ 138 const(char)[] topic() 139 { 140 return topic_metadata_.topic.fromStringz; 141 } 142 143 /** Partition list */ 144 auto partitions() 145 { 146 static struct Partitions 147 { 148 nothrow @nogc: 149 private const(rd_kafka_metadata_topic_t)* topic_metadata_; 150 private size_t _i; 151 auto empty() @property 152 { 153 return _i >= topic_metadata_.partition_cnt; 154 } 155 156 auto front() @property 157 { 158 return PartitionMetadata(&topic_metadata_.partitions[_i]); 159 } 160 161 auto popFront() 162 { 163 _i++; 164 } 165 166 auto length() @property 167 { 168 return topic_metadata_.partition_cnt - _i; 169 } 170 } 171 172 return Partitions(topic_metadata_); 173 } 174 175 /** Topic error reported by broker */ 176 ErrorCode err() 177 { 178 return cast(ErrorCode)(topic_metadata_.err); 179 } 180 } 181 182 /** 183 * Metadata container 184 */ 185 final class Metadata 186 { 187 private const(rd_kafka_metadata_t)* metadata_; 188 189 nothrow @nogc: 190 191 this(const(rd_kafka_metadata_t)* metadata) 192 { 193 metadata_ = metadata; 194 } 195 196 ~this() 197 { 198 rd_kafka_metadata_destroy(metadata_); 199 } 200 201 const @property: 202 203 /** Broker list */ 204 auto brokers() 205 { 206 static struct Brokers 207 { 208 nothrow @nogc: 209 private const(rd_kafka_metadata_t)* metadata_; 210 private size_t _i; 211 auto empty() @property 212 { 213 return _i >= metadata_.broker_cnt; 214 } 215 216 auto front() @property 217 { 218 return BrokerMetadata(&metadata_.brokers[_i]); 219 } 220 221 auto popFront() 222 { 223 _i++; 224 } 225 226 auto length() @property 227 { 228 return metadata_.broker_cnt - _i; 229 } 230 } 231 assert(metadata_); 232 return Brokers(metadata_); 233 } 234 235 /** Topic list */ 236 auto topics() 237 { 238 static struct Topics 239 { 240 nothrow @nogc: 241 private const(rd_kafka_metadata_t)* metadata_; 242 private size_t _i; 243 auto empty() @property 244 { 245 return _i >= metadata_.topic_cnt; 246 } 247 248 auto front() @property 249 { 250 return TopicMetadata(&metadata_.topics[_i]); 251 } 252 253 auto popFront() 254 { 255 _i++; 256 } 257 258 auto length() @property 259 { 260 return metadata_.topic_cnt - _i; 261 } 262 } 263 assert(metadata_); 264 return Topics(metadata_); 265 } 266 267 /** Broker (id) originating this metadata */ 268 int origBrokerId() 269 { 270 return metadata_.orig_broker_id; 271 } 272 273 /** Broker (name) originating this metadata */ 274 const(char)[] origBrokerName() 275 { 276 return metadata_.orig_broker_name.fromStringz; 277 } 278 }