Kafka Connect Offsets. Get/Set?
As of 0.10.0.0, Connect doesn't provide an API for managing offsets. It's something we want to improve in the future, but not there yet. The ConsumerGroupCommand
would be the right tool to manage offsets for Sink connectors. Note that source connector offsets are stored in a special offsets topic for Connect (they aren't like normal Kafka offsets since they are defined by the source system, see offset.storage.topic
in the worker configuration docs) and since sink Connectors uses the new consumer, they won't store their offsets in Zookeeper -- all modern clients use native Kafka-based offset storage. The ConsumerGroupCommand
can work with these offsets, you just need to pass the --new-consumer
option).
You can't set offsets, but you can use kafka-consumer-groups.sh
tool to "scroll" the feed forward.
The consumer group of your connector has a name of connect-*CONNECTOR NAME*
, but you can double check:
unset JMX_PORT; ./bin/kafka-consumer-groups.sh --bootstrap-server *KAFKA HOSTS* --list
To view current offset:
unset JMX_PORT; ./bin/kafka-consumer-groups.sh --bootstrap-server *KAFKA HOSTS* --group connect-*CONNECTOR NAME* --describe
To move the offset forward:
unset JMX_PORT; ./bin/kafka-console-consumer.sh --bootstrap-server *KAFKA HOSTS* --topic *TOPIC* --max-messages 10000 --consumer-property group.id=connect-*CONNECTOR NAME* > /dev/null
I suppose you can move the offset backward as well by deleting the consumer group first, using --delete
flag.
Don't forget to pause and resume your connector via Kafka Connect REST API.
In my case(testing reading files into producer and consume in console, all in local only), I just saw this in producer output:
offset.storage.file.filename=/tmp/connect.offsets
So I wanted to open it but it is binary, with some hardly recognizable characters.
I deleted it(rename it also works), and then I can write into the same file and get the file content from consumer again. You have to restart the console producer to take effect because it attempts to read the offset file, if not there, create a new one, so that the offset is reset.
If you want to reset it without deletion, you can use:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group <group-name> --reset-offsets --to-earliest --topic <topic_name>
You can check all group names by:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
and check details of each group:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group <group_name> --describe
In production environment, this offset is managed by zookeeper, so more steps (and caution) is needed. You can refer to this page:
https://metabroadcast.com/blog/resetting-kafka-offsets https://community.hortonworks.com/articles/81357/manually-resetting-offset-for-a-kafka-topic.html
Steps:
kafka-topics --list --zookeeper localhost:2181
kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic vital_signs --time -1 // -1 for largest, -2 for smallest
set /consumers/{yourConsumerGroup}/offsets/{yourFancyTopic}/{partitionId} {newOffset}