spring kafka No type information in headers and no default type provided
See the documentation.
Specifically...
JsonDeserializer.VALUE_DEFAULT_TYPE
: Fallback type for deserialization of values if no header information is present.
It's spring.json.value.default.type
You can also set spring.json.use.type.headers
(default true) to prevent even looking for headers.
The deserializer automatically trusts the package of the default type so it's not necessary to add it there.
EDIT
However, also see Spring Messaging Message Conversion.
Use a BytesDeserializer
and BytesJsonMessageConverter
and the framework will pass the method parameter type as the target for conversion.
"Answering" my own question largely to consolidate the info in the comments to and from @GaryRussell, but basically, he provided the best answer. In short I did the following:
- Set consumer deserializer to a StringDeserializer
- Add a messageConverter bean as a StringJsonMessageConverter
- In the KafkaListener annotated methods, Just use the expected type for the Payload
- If using a ConsumerRecord in the KafaListener annotated method, do NOT expect it to be of the Payload type. It will now be String (since the message converter, not the deserializer is doing this).
One other thing: By default, simply adding the messageConverter also adds it to the automatically configured kafkaTemplate when using the spring boot autoconfigure. This doesn't seem to be an issue when calling kafkaTemplate.send(K9000Consts.STREAM_TOPIC_IN, "1", greeting)
, though I think it may be if using send(Message).
Below is a working config, in that I get the messages as expected with minimal configuration
application.yml:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: foo
auto-offset-reset: latest
key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
properties:
spring.json.trusted.packages: com.teramedica.kafakaex001web.model
spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
streams:
application-id: kafka9000-v0.1
properties: # properties not explicitly handled by KafkaProperties.streams
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.springframework.kafka.support.serializer.JsonSerde
spring.json.trusted.packages: com.teramedica.kafakaex001web.model
KafkaConfig:
@Bean RecordMessageConverter messageConverter() { return new StringJsonMessageConverter(); }
...
@Bean
public KStream<String, Greeting> kStream(StreamsBuilder kStreamBuilder) {
KStream<String, Greeting> stream = kStreamBuilder.stream(K9000Consts.STREAM_TOPIC_IN);
stream.peek((k, greeting) -> {
logger.info("-------------- STREAM_IN_TOPIC peek: Got a greeting in the stream: {}", greeting.getContent());
})
.map((k, v) -> new KeyValue<>(k, new GreetingResponse(v)))
.to(K9000Consts.STREAM_TOPIC_OUT, Produced.with(stringSerde, new JsonSerde<>(GreetingResponse.class)));
return stream;
}
@KafkaListener(topics = K9000Consts.STREAM_TOPIC_OUT, groupId="oofda", errorHandler = "myTopicErrorHandler")
public void listenForGreetingResponse(GreetingResponse gr) throws Exception {
// logger.info("STREAM_OUT_TOPIC Listener : {}" + cr.toString());
logger.info("STREAM_OUT_TOPIC Listener : GreetingResponse is {}" + gr);
}
@KafkaListener(topics = K9000Consts.STREAM_TOPIC_IN, groupId = "bar")
public void listenForGreetingResponses(@Payload Greeting gr,
ConsumerRecord<String, String> record, // <--- NOTE: String, NOT Greeting
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts) throws Exception {
//logger.info("STREAM_IN_TOPIC Listener: ConsumerRecord: {}" + cr.toString());
logger.info("STREAM_IN_TOPIC Listener: Greeting: {}", gr.getContent());
logger.info("STREAM_IN_TOPIC Listener: From Headers: topic: {}, partition: {}, key: {}", topic, partition,
key);
logger.info("STREAM_IN_TOPIC Listener:: From Record: topic: {}, parition: {}, key: {}",
record.topic(), record.partition(), record.key());
logger.info("STREAM_IN_TOPIC Listener:: record value: {}, class: {}", record.value(), record.value().getClass() );
}
@Bean
public KafkaListenerErrorHandler myTopicErrorHandler() {
return (m, e) -> {
logger.error("Got an error {}", e.getMessage());
return "some info about the failure";
};
}
And output for a message is:
13 Mar 2019 09:56:57,884 DEBUG [KafkaMessageController [] http-nio-8080-exec-1] Sending a Kafka Message
13 Mar 2019 09:56:57,913 INFO [KafkaConfig [] kafka9000-v0.1-b0589cc5-2fab-4b72-81f7-b0d5488c7478-StreamThread-1] -------------- STREAM_IN_TOPIC peek: Got a greeting in the stream: Hello, World!
13 Mar 2019 09:56:57,919 INFO [Metadata [] kafka-producer-network-thread | kafka9000-v0.1-b0589cc5-2fab-4b72-81f7-b0d5488c7478-StreamThread-1-producer] Cluster ID: 8nREAmTCS0SZT-NzWsCacQ
13 Mar 2019 09:56:57,919 INFO [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] STREAM_IN_TOPIC Listener: Greeting: Hello, World!
13 Mar 2019 09:56:57,920 INFO [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] STREAM_IN_TOPIC Listener: Record: ConsumerRecord(topic = STREAM_TOPIC_IN_SSS, partition = 0, offset = 23, CreateTime = 1552489017878, serialized key size = 1, serialized value size = 34, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 116, 101, 114, 97, 109, 101, 100, 105, 99, 97, 46, 107, 97, 102, 97, 107, 97, 101, 120, 48, 48, 49, 119, 101, 98, 46, 109, 111, 100, 101, 108, 46, 71, 114, 101, 101, 116, 105, 110, 103])], isReadOnly = false), key = 1, value = {"id":1,"content":"Hello, World!"})
13 Mar 2019 09:56:57,920 INFO [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] STREAM_IN_TOPIC Listener: From Headers: topic: STREAM_TOPIC_IN_SSS, partition: 0, key: 1
13 Mar 2019 09:56:57,920 INFO [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] STREAM_IN_TOPIC Listener:: From Record: topic: STREAM_TOPIC_IN_SSS, parition: 0, key: 1
13 Mar 2019 09:56:57,921 INFO [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] STREAM_IN_TOPIC Listener:: record value: {"id":1,"content":"Hello, World!"}, class: class java.lang.String
13 Mar 2019 09:56:58,030 INFO [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] STREAM_OUT_TOPIC Listener : GreetingResponse id: 1000, response: Hello, World!, yourself
So I was also facing the same problem.
I fixed it this way
You have to set the following property to the class you are trying to deserialze to
spring.json.value.default.type=com.something.model.TransactionEventPayload
I set the properties for the KafkaListener as this:
@KafkaListener(topics = "topic", groupId = "myGroupId", properties = {"spring.json.value.default.type=com.something.model.TransactionEventPayload"})
public void consumeTransactionEvent(@Payload TransactionEventPayload payload,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp) {
This exception is thrown by org.springframework.kafka.support.serializer.JsonDeserializer
, which requires type information to be included in a special type header, or provided to @KafkaListener
via the spring.json.value.default.type configuration property
.
That is how I solved this issue in SpringBoot 2.5.3:
- Add the
ByteArrayJsonMessageConverter
to the Context:
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.support.converter.ByteArrayJsonMessageConverter;
import org.springframework.kafka.support.converter.JsonMessageConverter;
@Configuration
public class JsonMessageConverterConfig {
@Bean
public JsonMessageConverter jsonMessageConverter() {
return new ByteArrayJsonMessageConverter();
}
}
- Setup the
app.kafka.producer.value-serializer
and theapp.kafka.consumer.value-deserializer
:
app.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
app.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
- Now you can disable serialization of the
TypeId header
:
spring.kafka.producer.properties.spring.json.add.type.headers=false