Kafka how to read from __consumer_offsets topic
I came across this question when trying to also consume from the __consumer_offsets topic. I managed to figure it out for different Kafka versions and thought I'd share what I'd found
For Kafka 0.8.2.x
Note: This uses Zookeeper connection
#Create consumer config
echo "exclude.internal.topics=false" > /tmp/consumer.config
#Consume all offsets
./kafka-console-consumer.sh --consumer.config /tmp/consumer.config \
--formatter "kafka.server.OffsetManager\$OffsetsMessageFormatter" \
--zookeeper localhost:2181 --topic __consumer_offsets --from-beginning
For Kafka 0.9.x.x and 0.10.x.x
#Create consumer config
echo "exclude.internal.topics=false" > /tmp/consumer.config
#Consume all offsets
./kafka-console-consumer.sh --new-consumer --consumer.config /tmp/consumer.config \
--formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" \
--bootstrap-server localhost:9092 --topic __consumer_offsets --from-beginning
For 0.11.x.x - 2.x
#Create consumer config
echo "exclude.internal.topics=false" > /tmp/consumer.config
#Consume all offsets
./kafka-console-consumer.sh --consumer.config /tmp/consumer.config \
--formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" \
--bootstrap-server localhost:9092 --topic __consumer_offsets --from-beginning
As of Kafka 0.11, the (Scala) source code can be found here
For those who need a Java translation, from any Consumer process, let's say you get a ConsumerRecord<byte[], byte[]> consumerRecord
, and you can use
Get the key, (check if the key is not null first) and use
GroupMetadataManager.readMessageKey(consumerRecord.key)
. That can return different types, so checkif ( ... instanceof OffsetKey)
, then cast it and you can get various values from that.To get the Kafka record value of the offsets, you can use
String.valueOf(GroupMetadataManager.readOffsetMessageValue(consumerRecord.value))
A minimal Java example translated from the Scala code...
byte[] key = consumerRecord.key;
if (key != null) {
Object o = GroupMetadataManager.readMessageKey(key);
if (o != null && o instanceOf OffsetKey) {
OffsetKey offsetKey = (OffsetKey) o;
Object groupTopicPartition = offsetKey.key;
byte[] value = consumerRecord.value;
String formattedValue = String.valueOf(GroupMetadataManager.readOffsetMessageValue(value);
// TODO: Print, store, or compute results with the new key and value
}
}
Note, it's also possible to use the AdminClient APIs to describe groups rather than consume these raw messages
- listConsumerGroupOffsets(): to find all offsets for a specific group
- describeConsumerGroups(): to find details about members of a group
Scala source code extract
def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream) {
Option(consumerRecord.key).map(key => GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key))).foreach {
// Only print if the message is an offset record.
// We ignore the timestamp of the message because GroupMetadataMessage has its own timestamp.
case offsetKey: OffsetKey =>
val groupTopicPartition = offsetKey.key
val value = consumerRecord.value
val formattedValue =
if (value == null) "NULL"
else GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(value)).toString
output.write(groupTopicPartition.toString.getBytes(StandardCharsets.UTF_8))
output.write("::".getBytes(StandardCharsets.UTF_8))
output.write(formattedValue.getBytes(StandardCharsets.UTF_8))
output.write("\n".getBytes(StandardCharsets.UTF_8))
case _ => // no-op
}