Number of commits and offset in each partition of a kafka topic
Regarding the offset of the topic and partition you can use kafka.tools.GetOffsetShell. For example using these command (I have topic games
):
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic games --time -1
I will get games:0:47841
which means that for topic games
and 0
partition I have latest not used offset 47841
(latest available message).
You can use -2
to see the first available message.
It is not clear from your question, what kind of offset you're interested in. There are actually three types of offsets:
- The offset of the first available message in topic's partition. Use -2 (earliest) as --time parameter for GetOffsetShell tool
- The offset of the last available message in topic's partition. Use -1(latest) as --time parameter.
- The last read/processed message offset maintained by kafka consumer. High level consumer stores this information, for every consumer group, in an internal Kafka topic (used to be Zookeeper) and takes care about keeping it up to date when you call commit() or when auto-commit setting is set to true. For simple consumer, your code have to take care about managing offsets.
In addition to command line utility, the offset information for #1 and #2 is also available via SimpleConsumer.earliestOrLatestOffset().
If the number of messages is not too large, you can specify a large --offsets parameter to GetOffsetShell and then count number of lines returned by the tool. Otherwise, you can write a simple loop in scala/java that would iterate all available offsets starting from the earliest.
From Kafka documentation:
Get Offset Shell
get offsets for a topic
bin/kafka-run-class.sh kafka.tools.GetOffsetShell
required argument [broker-list], [topic]
Option Description
------ -----------
--broker-list <hostname:port,..., REQUIRED: The list of hostname and hostname:port> port of the server to connect to.
--max-wait-ms <Integer: ms> The max amount of time each fetch request waits. (default: 1000)
--offsets <Integer: count> number of offsets returned (default: 1)
--partitions <partition ids> comma separated list of partition ids. If not specified, will find offsets for all partitions (default)
--time <Long: timestamp in milliseconds / -1(latest) / -2 (earliest) timestamp; offsets will come before this timestamp, as in getOffsetsBefore >
--topic <topic> REQUIRED: The topic to get offsets from.