How do I delete/clean Kafka queued messages without deleting Topic
To delete all messages in a specific topic, you can run kafka-delete-records.sh
For example, I have a topic called test
, which has 4 partitions
.
Create a Json
file , for example j.json
:
{
"partitions": [
{
"topic": "test",
"partition": 0,
"offset": -1
}, {
"topic": "test",
"partition": 1,
"offset": -1
}, {
"topic": "test",
"partition": 2,
"offset": -1
}, {
"topic": "test",
"partition": 3,
"offset": -1
}
],
"version": 1
}
now delete all messages by this command :
/opt/kafka/confluent-4.1.1/bin/kafdelete-records --bootstrap-server 192.168.XX.XX:9092 --offset-json-file j.json
After executing the command, this message will be displayed
Records delete operation completed:
partition: test-0 low_watermark: 7
partition: test-1 low_watermark: 7
partition: test-2 low_watermark: 7
partition: test-3 low_watermark: 7
In 0.11 or higher you can run the bin/kafka-delete-records.sh
command to mark messages for deletion.
https://github.com/apache/kafka/blob/trunk/bin/kafka-delete-records.sh
For example, publish 100 messages
seq 100 | ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytest
then delete 90 of those 100 messages with the new kafka-delete-records.sh
command line tool
./bin/kafka-delete-records.sh --bootstrap-server localhost:9092 --offset-json-file ./offsetfile.json
where offsetfile.json
contains
{"partitions": [{"topic": "mytest", "partition": 0, "offset": 90}], "version":1 }
and then consume the messages from the beginning to verify that 90 of the 100 messages are indeed marked as deleted.
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytest --from-beginning
91
92
93
94
95
96
97
98
99
100