Java kafka 获取主题的分区数

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/35437681/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-11 16:40:01  来源:igfitidea点击:

kafka get partition count for a topic

javaapache-kafka

提问by vish4071

How can I get number of partitions for any kafka topic from the code. I have researched many links but none seem to work.

如何从代码中获取任何 kafka 主题的分区数。我研究了很多链接,但似乎都没有。

Mentioning a few:

提几点:

http://grokbase.com/t/kafka/users/148132gdzk/find-topic-partition-count-through-simpleclient-api

http://grokbase.com/t/kafka/users/148132gdzk/find-topic-partition-count-through-simpleclient-api

http://grokbase.com/t/kafka/users/151cv3htga/get-replication-and-partition-count-of-a-topic

http://grokbase.com/t/kafka/users/151cv3htga/get-replication-and-partition-count-of-a-topic

http://qnalist.com/questions/5809219/get-replication-and-partition-count-of-a-topic

http://qnalist.com/questions/5809219/get-replication-and-partition-count-of-a-topic

which look like similar discussions.

看起来类似的讨论。

Also there are similar links on SO which do not have a working solution to this.

在 SO 上也有类似的链接,但对此没有有效的解决方案。

采纳答案by Sunil Patil

In the 0.82 Producer API and 0.9 Consumer api you can use something like

在 0.82 Producer API 和 0.9 Consumer api 中,您可以使用类似

Properties configProperties = new Properties();
configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer");
configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

org.apache.kafka.clients.producer.Producer producer = new KafkaProducer(configProperties);
producer.partitionsFor("test")

回答by peter.petrov

Go to your kafka/bindirectory.

转到您的kafka/bin目录。

Then run this:

然后运行这个:

./kafka-topics.sh --describe --zookeeper localhost:2181 --topic topic_name

./kafka-topics.sh --describe --zookeeper localhost:2181 --topic topic_name

You should see what you need under PartitionCount.

您应该在PartitionCount.

Topic:topic_name        PartitionCount:5        ReplicationFactor:1     Configs:
        Topic: topic_name       Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001
        Topic: topic_name       Partition: 1    Leader: 1001    Replicas: 1001  Isr: 1001
        Topic: topic_name       Partition: 2    Leader: 1001    Replicas: 1001  Isr: 1001
        Topic: topic_name       Partition: 3    Leader: 1001    Replicas: 1001  Isr: 1001
        Topic: topic_name       Partition: 4    Leader: 1001    Replicas: 1001  Isr: 1001

回答by Marko Bonaci

Here's how I do it:

这是我的方法:

  /**
   * Retrieves list of all partitions IDs of the given {@code topic}.
   * 
   * @param topic
   * @param seedBrokers List of known brokers of a Kafka cluster
   * @return list of partitions or empty list if none found
   */
  public static List<Integer> getPartitionsForTopic(String topic, List<BrokerInfo> seedBrokers) {
    for (BrokerInfo seed : seedBrokers) {
      SimpleConsumer consumer = null;
      try {
        consumer = new SimpleConsumer(seed.getHost(), seed.getPort(), 20000, 128 * 1024, "partitionLookup");
        List<String> topics = Collections.singletonList(topic);
        TopicMetadataRequest req = new TopicMetadataRequest(topics);
        kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

        List<Integer> partitions = new ArrayList<>();
        // find our partition's metadata
        List<TopicMetadata> metaData = resp.topicsMetadata();
        for (TopicMetadata item : metaData) {
          for (PartitionMetadata part : item.partitionsMetadata()) {
            partitions.add(part.partitionId());
          }
        }
        return partitions;  // leave on first successful broker (every broker has this info)
      } catch (Exception e) {
        // try all available brokers, so just report error and go to next one
        LOG.error("Error communicating with broker [" + seed + "] to find list of partitions for [" + topic + "]. Reason: " + e);
      } finally {
        if (consumer != null)
          consumer.close();
      }
    }
    throw new RuntimeError("Could not get partitions");
  }

Note that I just needed to pull out partition IDs, but you can additionally retrieve any other partition metadata, like leader, isr, replicas, ...
And BrokerInfois just a simple POJO that has hostand portfields.

请注意,我只需要提取分区 ID,但您还可以检索任何其他分区元数据,例如leader, isr, replicas, ...
并且BrokerInfo只是一个具有hostport字段的简单 POJO 。

回答by Andy

@Sunil-patil answer stopped short of answering the count piece of it. You have to get the size of the List

@Sunil-patil 的回答没有回答它的计数部分。你必须得到列表的大小

producer.partitionsFor("test").size()

producer.partitionsFor("test").size()

@vish4071 no point butting Sunil, you did not mention that you are using ConsumerConnector in the question.

@vish4071 没有必要反对 Sunil,您没有提到您在问题中使用 ConsumerConnector。

回答by Avinash Kumar Pandey

So the following approach works for kafka 0.10 and it does not use any producer or consumer APIs. It uses some classes from the scala API in kafka such as ZkConnection and ZkUtils.

因此,以下方法适用于 kafka 0.10,并且不使用任何生产者或消费者 API。它使用了 kafka 中 scala API 的一些类,例如 ZkConnection 和 ZkUtils。

    ZkConnection zkConnection = new ZkConnection(zkConnect);
    ZkUtils zkUtils = new ZkUtils(zkClient,zkConnection,false);
    System.out.println(JavaConversions.mapAsJavaMap(zkUtils.getPartitionAssignmentForTopics(
         JavaConversions.asScalaBuffer(topicList))).get("bidlogs_kafka10").size());

回答by user1314742

I have had the same issue, where I needed to get the partitions for a topic.

我遇到了同样的问题,我需要获取主题的分区。

With the help of the answer hereI was able to get the information from Zookeeper.

借助此处的答案我能够从 Zookeeper 获取信息。

Here is my code in Scala (but could be easily translated into Java)

这是我在 Scala 中的代码(但可以很容易地翻译成 Java)

import org.apache.zookeeper.ZooKeeper

def extractPartitionNumberForTopic(topicName: String, zookeeperQurom: String): Int = {
  val zk = new ZooKeeper(zookeeperQurom, 10000, null);
  val zkNodeName = s"/brokers/topics/$topicName/partitions"
  val numPartitions = zk.getChildren(zkNodeName, false).size
  zk.close()
  numPartitions
}

Using this approach allowed me to access the information about Kafka topics as well as other information about Kafka brokers ...

使用这种方法允许我访问有关 Kafka 主题的信息以及有关 Kafka 代理的其他信息......

From Zookeeperyou could check for the number of partitions for a topic by browsing to /brokers/topics/MY_TOPIC_NAME/partitions

从 Zookeeper 中,您可以通过浏览到检查主题的分区数/brokers/topics/MY_TOPIC_NAME/partitions

Using zookeeper-client.shto connect to your zookeeper:

使用zookeeper-client.sh连接到您的动物园管理员:

[zk: ZkServer:2181(CONNECTED) 5] ls /brokers/topics/MY_TOPIC_NAME/partitions
[0, 1, 2]

That shows us that there are 3 partitions for the topic MY_TOPIC_NAME

这向我们表明该主题有 3 个分区 MY_TOPIC_NAME

回答by Danny Mor

You can explore the kafka.utils.ZkUtilswhich has many methods aimed to help extract metadata about the cluster. The answers here are nice so I'm just adding for the sake of diversity:

您可以探索kafka.utils.ZkUtils其中有许多旨在帮助提取有关集群的元数据的方法。这里的答案很好,所以我只是为了多样性而添加:

import kafka.utils.ZkUtils
import org.I0Itec.zkclient.ZkClient

def getTopicPartitionCount(zookeeperQuorum: String, topic: String): Int = {
  val client = new ZkClient(zookeeperQuorum)
  val partitionCount = ZkUtils.getAllPartitions(client)
    .count(topicPartitionPair => topicPartitionPair.topic == topic)

  client.close
  partitionCount
}

回答by MD5

Below shell cmd can print the number of partitions. You should be in kafka bin directory before executing the cmd:

shell cmd下面可以打印分区数。在执行 cmd 之前,您应该在 kafka bin 目录中:

sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic **TopicName** | awk '{print }' | uniq -c |awk 'NR==2{print "count of partitions=" }'

Note that you have to change the topic name according to your need. You can further validate this using if condition as well:

请注意,您必须根据需要更改主题名称。您还可以使用 if 条件进一步验证这一点:

sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic **TopicName** | awk '{print }' | uniq -c |awk 'NR==2{if (=="16") print "valid partitions"}'

The above cmd command prints valid partitions if count is 16. You can change count depending on your requirement.

如果计数为 16,上面的 cmd 命令会打印有效分区。您可以根据需要更改计数。

回答by gstackoverflow

cluster.availablePartitionsForTopic(topicName).size()

回答by Raman Mishra

//create the kafka producer
def getKafkaProducer: KafkaProducer[String, String] = {
val kafkaProps: Properties = new Properties()
kafkaProps.put("bootstrap.servers", "localhost:9092")
kafkaProps.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer")
kafkaProps.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer")

new KafkaProducer[String, String](kafkaProps)
}
val kafkaProducer = getKafkaProducer
val noOfPartition = kafkaProducer.partitionsFor("TopicName") 
println(noOfPartition) //it will print the number of partiton for the given 
//topic