Java kafka 获取主题的分区数
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/35437681/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
kafka get partition count for a topic
提问by vish4071
How can I get number of partitions for any kafka topic from the code. I have researched many links but none seem to work.
如何从代码中获取任何 kafka 主题的分区数。我研究了很多链接,但似乎都没有。
Mentioning a few:
提几点:
http://grokbase.com/t/kafka/users/148132gdzk/find-topic-partition-count-through-simpleclient-api
http://grokbase.com/t/kafka/users/148132gdzk/find-topic-partition-count-through-simpleclient-api
http://grokbase.com/t/kafka/users/151cv3htga/get-replication-and-partition-count-of-a-topic
http://grokbase.com/t/kafka/users/151cv3htga/get-replication-and-partition-count-of-a-topic
http://qnalist.com/questions/5809219/get-replication-and-partition-count-of-a-topic
http://qnalist.com/questions/5809219/get-replication-and-partition-count-of-a-topic
which look like similar discussions.
看起来类似的讨论。
Also there are similar links on SO which do not have a working solution to this.
在 SO 上也有类似的链接,但对此没有有效的解决方案。
采纳答案by Sunil Patil
In the 0.82 Producer API and 0.9 Consumer api you can use something like
在 0.82 Producer API 和 0.9 Consumer api 中,您可以使用类似
Properties configProperties = new Properties();
configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer");
configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
org.apache.kafka.clients.producer.Producer producer = new KafkaProducer(configProperties);
producer.partitionsFor("test")
回答by peter.petrov
Go to your kafka/bin
directory.
转到您的kafka/bin
目录。
Then run this:
然后运行这个:
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic topic_name
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic topic_name
You should see what you need under PartitionCount
.
您应该在PartitionCount
.
Topic:topic_name PartitionCount:5 ReplicationFactor:1 Configs:
Topic: topic_name Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: topic_name Partition: 1 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: topic_name Partition: 2 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: topic_name Partition: 3 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: topic_name Partition: 4 Leader: 1001 Replicas: 1001 Isr: 1001
回答by Marko Bonaci
Here's how I do it:
这是我的方法:
/**
* Retrieves list of all partitions IDs of the given {@code topic}.
*
* @param topic
* @param seedBrokers List of known brokers of a Kafka cluster
* @return list of partitions or empty list if none found
*/
public static List<Integer> getPartitionsForTopic(String topic, List<BrokerInfo> seedBrokers) {
for (BrokerInfo seed : seedBrokers) {
SimpleConsumer consumer = null;
try {
consumer = new SimpleConsumer(seed.getHost(), seed.getPort(), 20000, 128 * 1024, "partitionLookup");
List<String> topics = Collections.singletonList(topic);
TopicMetadataRequest req = new TopicMetadataRequest(topics);
kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
List<Integer> partitions = new ArrayList<>();
// find our partition's metadata
List<TopicMetadata> metaData = resp.topicsMetadata();
for (TopicMetadata item : metaData) {
for (PartitionMetadata part : item.partitionsMetadata()) {
partitions.add(part.partitionId());
}
}
return partitions; // leave on first successful broker (every broker has this info)
} catch (Exception e) {
// try all available brokers, so just report error and go to next one
LOG.error("Error communicating with broker [" + seed + "] to find list of partitions for [" + topic + "]. Reason: " + e);
} finally {
if (consumer != null)
consumer.close();
}
}
throw new RuntimeError("Could not get partitions");
}
Note that I just needed to pull out partition IDs, but you can additionally retrieve any other partition metadata, like leader
, isr
, replicas
, ...
And BrokerInfo
is just a simple POJO that has host
and port
fields.
请注意,我只需要提取分区 ID,但您还可以检索任何其他分区元数据,例如leader
, isr
, replicas
, ...
并且BrokerInfo
只是一个具有host
和port
字段的简单 POJO 。
回答by Andy
@Sunil-patil answer stopped short of answering the count piece of it. You have to get the size of the List
@Sunil-patil 的回答没有回答它的计数部分。你必须得到列表的大小
producer.partitionsFor("test").size()
producer.partitionsFor("test").size()
@vish4071 no point butting Sunil, you did not mention that you are using ConsumerConnector in the question.
@vish4071 没有必要反对 Sunil,您没有提到您在问题中使用 ConsumerConnector。
回答by Avinash Kumar Pandey
So the following approach works for kafka 0.10 and it does not use any producer or consumer APIs. It uses some classes from the scala API in kafka such as ZkConnection and ZkUtils.
因此,以下方法适用于 kafka 0.10,并且不使用任何生产者或消费者 API。它使用了 kafka 中 scala API 的一些类,例如 ZkConnection 和 ZkUtils。
ZkConnection zkConnection = new ZkConnection(zkConnect);
ZkUtils zkUtils = new ZkUtils(zkClient,zkConnection,false);
System.out.println(JavaConversions.mapAsJavaMap(zkUtils.getPartitionAssignmentForTopics(
JavaConversions.asScalaBuffer(topicList))).get("bidlogs_kafka10").size());
回答by user1314742
I have had the same issue, where I needed to get the partitions for a topic.
我遇到了同样的问题,我需要获取主题的分区。
With the help of the answer hereI was able to get the information from Zookeeper.
Here is my code in Scala (but could be easily translated into Java)
这是我在 Scala 中的代码(但可以很容易地翻译成 Java)
import org.apache.zookeeper.ZooKeeper
def extractPartitionNumberForTopic(topicName: String, zookeeperQurom: String): Int = {
val zk = new ZooKeeper(zookeeperQurom, 10000, null);
val zkNodeName = s"/brokers/topics/$topicName/partitions"
val numPartitions = zk.getChildren(zkNodeName, false).size
zk.close()
numPartitions
}
Using this approach allowed me to access the information about Kafka topics as well as other information about Kafka brokers ...
使用这种方法允许我访问有关 Kafka 主题的信息以及有关 Kafka 代理的其他信息......
From Zookeeperyou could check for the number of partitions for a topic by browsing to /brokers/topics/MY_TOPIC_NAME/partitions
从 Zookeeper 中,您可以通过浏览到检查主题的分区数/brokers/topics/MY_TOPIC_NAME/partitions
Using zookeeper-client.sh
to connect to your zookeeper:
使用zookeeper-client.sh
连接到您的动物园管理员:
[zk: ZkServer:2181(CONNECTED) 5] ls /brokers/topics/MY_TOPIC_NAME/partitions
[0, 1, 2]
That shows us that there are 3 partitions for the topic MY_TOPIC_NAME
这向我们表明该主题有 3 个分区 MY_TOPIC_NAME
回答by Danny Mor
You can explore the kafka.utils.ZkUtils
which has many methods aimed to help extract metadata about the cluster. The answers here are nice so I'm just adding for the sake of diversity:
您可以探索kafka.utils.ZkUtils
其中有许多旨在帮助提取有关集群的元数据的方法。这里的答案很好,所以我只是为了多样性而添加:
import kafka.utils.ZkUtils
import org.I0Itec.zkclient.ZkClient
def getTopicPartitionCount(zookeeperQuorum: String, topic: String): Int = {
val client = new ZkClient(zookeeperQuorum)
val partitionCount = ZkUtils.getAllPartitions(client)
.count(topicPartitionPair => topicPartitionPair.topic == topic)
client.close
partitionCount
}
回答by MD5
Below shell cmd can print the number of partitions. You should be in kafka bin directory before executing the cmd:
shell cmd下面可以打印分区数。在执行 cmd 之前,您应该在 kafka bin 目录中:
sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic **TopicName** | awk '{print }' | uniq -c |awk 'NR==2{print "count of partitions=" }'
Note that you have to change the topic name according to your need. You can further validate this using if condition as well:
请注意,您必须根据需要更改主题名称。您还可以使用 if 条件进一步验证这一点:
sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic **TopicName** | awk '{print }' | uniq -c |awk 'NR==2{if (=="16") print "valid partitions"}'
The above cmd command prints valid partitions if count is 16. You can change count depending on your requirement.
如果计数为 16,上面的 cmd 命令会打印有效分区。您可以根据需要更改计数。
回答by gstackoverflow
cluster.availablePartitionsForTopic(topicName).size()
回答by Raman Mishra
//create the kafka producer
def getKafkaProducer: KafkaProducer[String, String] = {
val kafkaProps: Properties = new Properties()
kafkaProps.put("bootstrap.servers", "localhost:9092")
kafkaProps.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer")
kafkaProps.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer")
new KafkaProducer[String, String](kafkaProps)
}
val kafkaProducer = getKafkaProducer
val noOfPartition = kafkaProducer.partitionsFor("TopicName")
println(noOfPartition) //it will print the number of partiton for the given
//topic