Is there a code sample for multiple producers in spring kafka?
You can create several Producer
instances (KafkaTemplate
) via the same ProducerFactory
.
If you need different Kafka configurations, you’ll need different ProducerFactory
instances.
If you still want to keep your configuration in application.yaml
as usual, and keep Java configuration as minimum as possible, you can extend KafkaProperties.Producer
.
@Configuration
@ConfigurationProperties(prefix = "spring.kafka.producer-1")
@RequiredArgsConstructor
class FirstProducer extends KafkaProperties.Producer {
private final KafkaProperties common;
@Qualifier("producer-1")
@Bean
public ProducerFactory<?, ?> producerFactory() {
final var conf = new HashMap<>(
this.common.buildProducerProperties()
);
conf.putAll(this.buildProperties());
return new DefaultKafkaProducerFactory<>(conf);
}
@Qualifier("producer-1")
@Bean
public KafkaTemplate<?, ?> kafkaTemplate() {
return new KafkaTemplate<>(this.producerFactory());
}
}
@Configuration
@ConfigurationProperties(prefix = "spring.kafka.producer-2")
@RequiredArgsConstructor
class SecondProducer extends KafkaProperties.Producer {
private final KafkaProperties common;
@Qualifier("producer-2")
@Bean
public ProducerFactory<?, ?> producerFactory() {
final var conf = new HashMap<>(
this.common.buildProducerProperties()
);
conf.putAll(this.buildProperties());
return new DefaultKafkaProducerFactory<>(conf);
}
@Qualifier("producer-2")
@Bean
public KafkaTemplate<?, ?> kafkaTemplate() {
return new KafkaTemplate<>(this.producerFactory());
}
}
you will have to create two different ProducerFactory
below is example
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> confluentProducerFactory() {
HashMap<String, Object> configProps = new HashMap<String, Object>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public ProducerFactory<String, String> cloudraProducerFactory() {
HashMap<String, Object> configProps = new HashMap<String, Object>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9094");
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean(name = "confluent")
public KafkaTemplate<String, String> confluentKafkaTemplate() {
return new KafkaTemplate<>(confluentProducerFactory());
}
@Bean(name = "cloudera")
public KafkaTemplate<String, String> clouderaKafkaTemplate() {
return new KafkaTemplate<>(cloudraProducerFactory());
}
}
public class ProducerExample {
@Autowired
@Qualifier("cloudera")
private KafkaTemplate clouderaKafkaTemplate;
@Autowired
@Qualifier("confluent")
private KafkaTemplate confluentKafkaTemplate;
public void send() {
confluentKafkaTemplate.send("TestConfluent", "hey there..confluent");
clouderaKafkaTemplate.send("TestCloudEra","hey there.. cloudera");
}
}