KafkaConsumer

High-level KafkaConsumer (for brokers 0.9 and later)

Note: Requires Apache Kafka >= 0.9.0 brokers

Currently supports the \c range and \c roundrobin partition assignment strategies (see \c partition.assignment.strategy)

Constructors

this
this(GlobalConf conf)

Creates a KafkaConsumer.

Members

Functions

assign
ErrorCode assign(TopicPartition[] partitions)

Update the assignment set to \p partitions.

assignment
ErrorCode assignment(TopicPartition[] partitions)

Returns the current partition assignment as set by * assign()

close
ErrorCode close()

Close and shut down the proper.

commitAsync
ErrorCode commitAsync()

Asynchronous version of CommitSync()

commitAsync
ErrorCode commitAsync(Message message)

Commit offset for a single topic+partition based on \p message

commitAsync
ErrorCode commitAsync(TopicPartition[] offsets)

Commit offset for the provided list of partitions.

commitSync
ErrorCode commitSync()

Commit offsets for the current assignment.

commitSync
ErrorCode commitSync(Message message)

Commit offset for a single topic+partition based on \p message

commitSync
ErrorCode commitSync(TopicPartition[] offsets)

Commit offsets for the provided list of partitions.

committed
ErrorCode committed(TopicPartition[] partitions, int timeout_ms)

Retrieve committed offsets for topics+partitions.

consume
auto consume(Message msg, int timeout_ms)
position
ErrorCode position(TopicPartition[] partitions)

Retrieve current positions (offsets) for topics+partitions.

subscribe
ErrorCode subscribe(const(char)*[] topics)

Update the subscription set to \p topics.

unassign
ErrorCode unassign()

Stop consumption and remove the current assignment.

unsubscribe
auto unsubscribe()

Unsubscribe from the current subscription set.

Inherited Members

From Handle

memberid
auto memberid()

Returns the client's broker-assigned group member id

metadata
Metadata metadata(bool all_topics, Topic only_rkt, int timeout_ms)

Request Metadata from broker.

setCommonConfig
void setCommonConfig(GlobalConf conf)
Undocumented in source. Be warned that the author may not have intended to support it.
queryWatermarkOffsets
auto queryWatermarkOffsets(const(char)* topic, int partition, long low, long high, int timeout_ms)

Query broker for low (oldest/beginning) and high (newest/end) offsets for partition.

rk_
rd_kafka_t* rk_;
Undocumented in source.
event_cb_
EventCb event_cb_;
Undocumented in source.
socket_cb_
SocketCb socket_cb_;
Undocumented in source.
open_cb_
OpenCb open_cb_;
Undocumented in source.
dr_cb_
DeliveryReportCb dr_cb_;
Undocumented in source.
partitioner_cb_
PartitionerCb partitioner_cb_;
Undocumented in source.
partitioner_kp_cb_
PartitionerKeyPointerCb partitioner_kp_cb_;
Undocumented in source.
rebalance_cb_
RebalanceCb rebalance_cb_;
Undocumented in source.
offset_commit_cb_
OffsetCommitCb offset_commit_cb_;
Undocumented in source.
name
const(char)[] name()

the name of the handle

poll
int poll(int timeout_ms)

Polls the provided kafka handle for events.

outqLen
int outqLen()

Returns the current out queue length

free_partition_vector
void free_partition_vector(TopicPartition[] v)
Undocumented in source. Be warned that the author may not have intended to support it.
pause
ErrorCode pause(TopicPartition[] partitions)

Pause producing or consumption for the provided list of partitions.

resume
ErrorCode resume(TopicPartition[] partitions)

Resume producing or consumption for the provided list of partitions.

getWatermarkOffsets
ErrorCode getWatermarkOffsets(const(char)* topic, int partition, long low, long high)

Get last known low (oldest/beginning) and high (newest/end) offsets for partition.

Meta