kafka get partition count for a topic
In the 0.82 Producer API and 0.9 Consumer api you can use something like
Properties configProperties = new Properties();
configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer");
configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
org.apache.kafka.clients.producer.Producer producer = new KafkaProducer(configProperties);
producer.partitionsFor("test")
Go to your kafka/bin
directory.
Then run this:
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic topic_name
You should see what you need under PartitionCount
.
Topic:topic_name PartitionCount:5 ReplicationFactor:1 Configs:
Topic: topic_name Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: topic_name Partition: 1 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: topic_name Partition: 2 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: topic_name Partition: 3 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: topic_name Partition: 4 Leader: 1001 Replicas: 1001 Isr: 1001
When using a version where zookeeper is no longer a dependency of Kafka
kafka-topics --describe --bootstrap-server localhost:9092 --topic topic_name
In java code we can use AdminClient
to get sum partions of one topic.
Properties props = new Properties();
props.put("bootstrap.servers", "host:9092");
AdminClient client = AdminClient.create(props);
DescribeTopicsResult result = client.describeTopics(Arrays.asList("TEST"));
Map<String, KafkaFuture<TopicDescription>> values = result.values();
KafkaFuture<TopicDescription> topicDescription = values.get("TEST");
int partitions = topicDescription.get().partitions().size();
System.out.println(partitions);
Here's how I do it:
/**
* Retrieves list of all partitions IDs of the given {@code topic}.
*
* @param topic
* @param seedBrokers List of known brokers of a Kafka cluster
* @return list of partitions or empty list if none found
*/
public static List<Integer> getPartitionsForTopic(String topic, List<BrokerInfo> seedBrokers) {
for (BrokerInfo seed : seedBrokers) {
SimpleConsumer consumer = null;
try {
consumer = new SimpleConsumer(seed.getHost(), seed.getPort(), 20000, 128 * 1024, "partitionLookup");
List<String> topics = Collections.singletonList(topic);
TopicMetadataRequest req = new TopicMetadataRequest(topics);
kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
List<Integer> partitions = new ArrayList<>();
// find our partition's metadata
List<TopicMetadata> metaData = resp.topicsMetadata();
for (TopicMetadata item : metaData) {
for (PartitionMetadata part : item.partitionsMetadata()) {
partitions.add(part.partitionId());
}
}
return partitions; // leave on first successful broker (every broker has this info)
} catch (Exception e) {
// try all available brokers, so just report error and go to next one
LOG.error("Error communicating with broker [" + seed + "] to find list of partitions for [" + topic + "]. Reason: " + e);
} finally {
if (consumer != null)
consumer.close();
}
}
throw new RuntimeError("Could not get partitions");
}
Note that I just needed to pull out partition IDs, but you can additionally retrieve any other partition metadata, like leader
, isr
, replicas
, ...
And BrokerInfo
is just a simple POJO that has host
and port
fields.