Error to serialize message when sending to kafka topic
I'd say the error is obvious:
Can't convert value of class org.springframework.messaging.support.GenericMessage to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
Where your value is GenericMessage
, but StringSerializer
can work only with strings.
What you need is called JavaSerializer
which does not exist, but not so difficult to write:
public class JavaSerializer implements Serializer<Object> {
@Override
public byte[] serialize(String topic, Object data) {
try {
ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
ObjectOutputStream objectStream = new ObjectOutputStream(byteStream);
objectStream.writeObject(data);
objectStream.flush();
objectStream.close();
return byteStream.toByteArray();
}
catch (IOException e) {
throw new IllegalStateException("Can't serialize object: " + data, e);
}
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public void close() {
}
}
And configure it for that value.serializer
property.
private void configureProducer() {
Properties props = new Properties();
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.ByteArraySerializer");
producer = new KafkaProducer<String, String>(props);
}
This will do the job.