如何从Java中的kafka服务器获取主题列表

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/24902301/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-14 15:28:00  来源:igfitidea点击:

How to get topic list from kafka server in Java

javaapache-kafka

提问by Anand

I am using kafka 0.8version and very much new to it.

我正在使用kafka 0.8版本并且非常新。

I want to know the list of topics created in kafka serveralong with it's metadata. Is there any API available to find out this?

我想知道创建的主题列表及其kafka server元数据。是否有任何 API 可用于找出这一点?

Basically, I need to write a Java consumer that should auto-discover any topic in kafka server.There is API to fetch TopicMetadata, but this needs name of topic as input parameters.I need information for all topics present in server.

基本上,我需要编写一个 Java 使用者,它应该自动发现kafka server.There 中的任何主题。有 API 可以获取TopicMetadata,但这需要主题名称作为输入参数。我需要服务器中存在的所有主题的信息。

采纳答案by David Corley

A good place to start would be the sample shell scripts shipped with Kafka. In the /bin directory of the distribution there's some shell scripts you can use, one of which is ./kafka-topic-list.sh If you run that without specifying a topic, it will return all topics with their metadata. See: https://github.com/apache/kafka/blob/0.8/bin/kafka-list-topic.sh

一个很好的起点是 Kafka 附带的示例 shell 脚本。在发行版的 /bin 目录中,您可以使用一些 shell 脚本,其中之一是 ./kafka-topic-list.sh 如果您在不指定主题的情况下运行该脚本,它将返回所有主题及其元数据。参见:https: //github.com/apache/kafka/blob/0.8/bin/kafka-list-topic.sh

That shell script in turn runs: https://github.com/apache/kafka/blob/0.8/core/src/main/scala/kafka/admin/ListTopicCommand.scala

该shell脚本依次运行:https: //github.com/apache/kafka/blob/0.8/core/src/main/scala/kafka/admin/ListTopicCommand.scala

The above are both references to the 0.8 Kafka version, so if you're using a different version (even a point difference), be sure to use the appropriate branch/tag on github

以上都是对 0.8 Kafka 版本的引用,所以如果你使用的是不同的版本(甚至是点差),请务必使用 github 上的相应分支/标签

回答by Harvinder Singh

You can use zookeeper API to get the list of brokers as mentioned below:

您可以使用 zookeeper API 获取代理列表,如下所述:

    ZooKeeper zk = new ZooKeeper("zookeeperhost, 10000, null);
    List<String> ids = zk.getChildren("/brokers/ids", false);
    List<Map> brokerList = new ArrayList<>();
    ObjectMapper objectMapper = new ObjectMapper();

    for (String id : ids) {
        Map map = objectMapper.readValue(zk.getData("/brokers/ids/" + id, false, null), Map.class);
        brokerList.add(map);
    }

Use this broker list to get all the topic using the following link

使用此经纪人列表通过以下链接获取所有主题

https://cwiki.apache.org/confluence/display/KAFKA/Finding+Topic+and+Partition+Leader

https://cwiki.apache.org/confluence/display/KAFKA/Finding+Topic+and+Partition+Leader

回答by betaboy00

with Kafka 0.9.0

卡夫卡 0.9.0

you can list the topics in the server with the provided consumer method listTopics();

您可以使用提供的消费者方法 listTopics() 列出服务器中的主题;

eg.

例如。

Map<String, List<PartitionInfo> > topics;

Properties props = new Properties();
props.put("bootstrap.servers", "1.2.3.4:9092");
props.put("group.id", "test-consumer-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
topics = consumer.listTopics();
consumer.close();

回答by hba

If you want to pull broker or other-kafka information from Zookeeper then kafka.utils.ZkUtilsprovides a nice interface. Here is the code I have to list all zookeeper brokers (there are a ton of other methods there):

如果你想从 Zookeeper 中提取 broker 或其他 kafka 信息,那么kafka.utils.ZkUtils提供一个很好的界面。这是我必须列出所有动物园管理员经纪人的代码(那里有很多其他方法):

List<Broker> listBrokers() {

        final ZkConnection zkConnection = new ZkConnection(connectionString);
        final int sessionTimeoutMs = 10 * 1000;
        final int connectionTimeoutMs = 20 * 1000;
        final ZkClient zkClient = new ZkClient(connectionString,
                                               sessionTimeoutMs,
                                               connectionTimeoutMs,
                                               ZKStringSerializer$.MODULE$);

        final ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false);

        scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllBrokersInCluster());
}

回答by fengkb

I think this is the best way:

我认为这是最好的方法:

ZkClient zkClient = new ZkClient("zkHost:zkPort");
List<String> topics = JavaConversions.asJavaList(ZkUtils.getAllTopics(zkClient));

回答by Ranga Reddy

Using Scala:

使用斯卡拉:

import java.util.{Properties}
import org.apache.kafka.clients.consumer.KafkaConsumer

object KafkaTest {
  def main(args: Array[String]): Unit = {

    val brokers = args(0)
    val props = new Properties();
    props.put("bootstrap.servers", brokers);
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    val consumer = new KafkaConsumer[String, String](props);
    val topics = consumer.listTopics().keySet();

    println(topics)
  }
}