Creating Kafka topic in sarama
EDIT : Below was an old answer which still works, but at that point the sarama admin apis were under development. Since then ClusterAdmin apis have come a long way and today should be treated as a preferred way to solve this problem. Refer to the other 2 answers below if you are looking to solve this in 2020+.
It is possible to use sarama for managing Topics in Kafka. I am writing a terraform provider for managing Kafka topics and use sarama to do heavy lifting in the backend.
You need to use the sarama.Broker apis to do this. For example
// Set broker configuration
broker := sarama.NewBroker("localhost:9092")
// Additional configurations. Check sarama doc for more info
config := sarama.NewConfig()
config.Version = sarama.V1_0_0_0
// Open broker connection with configs defined above
broker.Open(config)
// check if the connection was OK
connected, err := broker.Connected()
if err != nil {
log.Print(err.Error())
}
log.Print(connected)
// Setup the Topic details in CreateTopicRequest struct
topic := "blah25s"
topicDetail := &sarama.TopicDetail{}
topicDetail.NumPartitions = int32(1)
topicDetail.ReplicationFactor = int16(1)
topicDetail.ConfigEntries = make(map[string]*string)
topicDetails := make(map[string]*sarama.TopicDetail)
topicDetails[topic] = topicDetail
request := sarama.CreateTopicsRequest{
Timeout: time.Second * 15,
TopicDetails: topicDetails,
}
// Send request to Broker
response, err := broker.CreateTopics(&request)
// handle errors if any
if err != nil {
log.Printf("%#v", &err)
}
t := response.TopicErrors
for key, val := range t {
log.Printf("Key is %s", key)
log.Printf("Value is %#v", val.Err.Error())
log.Printf("Value3 is %#v", val.ErrMsg)
}
log.Printf("the response is %#v", response)
// close connection to broker
broker.Close()
You can have a look at a working code at github. Remember to start kafka broker and import all golang dependency before running the code.
Indeed, in newer versions of Sarama you can use ClusterAdmin to create topics. Below you can find the sample code:
package main
import (
"github.com/Shopify/sarama" // Sarama 1.22.0
"log"
)
func main() {
brokerAddrs := []string{"localhost:9092"}
config := sarama.NewConfig()
config.Version = sarama.V2_1_0_0
admin, err := sarama.NewClusterAdmin(brokerAddrs, config)
if err != nil {
log.Fatal("Error while creating cluster admin: ", err.Error())
}
defer func() { _ = admin.Close() }()
err = admin.CreateTopic("topic.test.1", &sarama.TopicDetail{
NumPartitions: 1,
ReplicationFactor: 1,
}, false)
if err != nil {
log.Fatal("Error while creating topic: ", err.Error())
}
}
It is better to directly use : https://github.com/Shopify/sarama/blob/master/admin.go for this instead of directly connecting to a broker.
This handles lot of cases like:
- You can add multiple broker addresses for a cluster config.
- Identification of which broker acts as the controller is done automatically.