how to view kafka headers
You can use the excellent kafkacat tool.
Sample command:
kafkacat -b kafka-broker:9092 -t my_topic_name -C \
-f '\nKey (%K bytes): %k
Value (%S bytes): %s
Timestamp: %T
Partition: %p
Offset: %o
Headers: %h\n'
Sample output:
Key (-1 bytes):
Value (13 bytes): {foo:"bar 5"}
Timestamp: 1548350164096
Partition: 0
Offset: 34
Headers: __connect.errors.topic=test_topic_json,__connect.errors.partition=0,__connect.errors.offset=94,__connect.errors.connector.name=file_sink_03,__connect.errors.task.id=0,__connect.errors.stage=VALU
E_CONVERTER,__connect.errors.class.name=org.apache.kafka.connect.json.JsonConverter,__connect.errors.exception.class.name=org.apache.kafka.connect.errors.DataException,__connect.errors.exception.message=Co
nverting byte[] to Kafka Connect data failed due to serialization error: ,__connect.errors.exception.stacktrace=org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed
due to serialization error:
The kafkacat header option is only available in recent builds of kafkacat
; you may want to build from master branch yourself if your current version doesn't include it.
You can also run kafkacat from Docker:
docker run --rm edenhill/kafkacat:1.5.0 \
-b kafka-broker:9092 \
-t my_topic_name -C \
-f '\nKey (%K bytes): %k
Value (%S bytes): %s
Timestamp: %T
Partition: %p
Offset: %o
Headers: %h\n'
If you use Docker bear in mind the network implications of how to reach the Kafka broker.
From kafka-console-consumer.sh
script:
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"
src: https://github.com/apache/kafka/blob/2.1.1/bin/kafka-console-consumer.sh
In kafka.tools.ConsoleConsumer
the header is provided to the Formatter, but none of the existing Formatters makes use of it:
formatter.writeTo(new ConsumerRecord(msg.topic, msg.partition, msg.offset, msg.timestamp,
msg.timestampType, 0, 0, 0, msg.key, msg.value, msg.headers),
output)
src: https://github.com/apache/kafka/blob/2.1.1/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
At the bottom of the above link you can see existing Formatters.
If you want to print headers you need to implement your own kafka.common.MessageFormatter
and in particular its write method:
def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit
and then run your console consumer with --formatter providing your own formatter (it should also be present on the classpath).
Another, simpler and faster way, would be to implement your own mini-program using KafkaConsumer and check headers in debug.
Starting with kafka-2.7.0 you can enable printing headers in console-consumer by providing property print.headers=true
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic quickstart-events --property print.key=true --property print.headers=true --property print.timestamp=true
You can also use kafkactl for this. E.g. with output as yaml:
kafkactl consume my-topic --print-headers -o yaml
Sample output:
partition: 1
offset: 22
headers:
key1: value1
key2: value2
value: my-value
Disclaimer: I am contributor to this project