如何从Java中的kafka服务器获取主题列表
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/24902301/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to get topic list from kafka server in Java
提问by Anand
I am using kafka 0.8
version and very much new to it.
我正在使用kafka 0.8
版本并且非常新。
I want to know the list of topics created in kafka server
along with it's
metadata.
Is there any API available to find out this?
我想知道创建的主题列表及其kafka server
元数据。是否有任何 API 可用于找出这一点?
Basically, I need to write a Java consumer that should auto-discover any topic in kafka server
.There is API to fetch TopicMetadata
, but this needs name of topic as input
parameters.I need information for all topics present in server.
基本上,我需要编写一个 Java 使用者,它应该自动发现kafka server
.There 中的任何主题。有 API 可以获取TopicMetadata
,但这需要主题名称作为输入参数。我需要服务器中存在的所有主题的信息。
采纳答案by David Corley
A good place to start would be the sample shell scripts shipped with Kafka. In the /bin directory of the distribution there's some shell scripts you can use, one of which is ./kafka-topic-list.sh If you run that without specifying a topic, it will return all topics with their metadata. See: https://github.com/apache/kafka/blob/0.8/bin/kafka-list-topic.sh
一个很好的起点是 Kafka 附带的示例 shell 脚本。在发行版的 /bin 目录中,您可以使用一些 shell 脚本,其中之一是 ./kafka-topic-list.sh 如果您在不指定主题的情况下运行该脚本,它将返回所有主题及其元数据。参见:https: //github.com/apache/kafka/blob/0.8/bin/kafka-list-topic.sh
That shell script in turn runs: https://github.com/apache/kafka/blob/0.8/core/src/main/scala/kafka/admin/ListTopicCommand.scala
该shell脚本依次运行:https: //github.com/apache/kafka/blob/0.8/core/src/main/scala/kafka/admin/ListTopicCommand.scala
The above are both references to the 0.8 Kafka version, so if you're using a different version (even a point difference), be sure to use the appropriate branch/tag on github
以上都是对 0.8 Kafka 版本的引用,所以如果你使用的是不同的版本(甚至是点差),请务必使用 github 上的相应分支/标签
回答by Harvinder Singh
You can use zookeeper API to get the list of brokers as mentioned below:
您可以使用 zookeeper API 获取代理列表,如下所述:
ZooKeeper zk = new ZooKeeper("zookeeperhost, 10000, null);
List<String> ids = zk.getChildren("/brokers/ids", false);
List<Map> brokerList = new ArrayList<>();
ObjectMapper objectMapper = new ObjectMapper();
for (String id : ids) {
Map map = objectMapper.readValue(zk.getData("/brokers/ids/" + id, false, null), Map.class);
brokerList.add(map);
}
Use this broker list to get all the topic using the following link
使用此经纪人列表通过以下链接获取所有主题
https://cwiki.apache.org/confluence/display/KAFKA/Finding+Topic+and+Partition+Leader
https://cwiki.apache.org/confluence/display/KAFKA/Finding+Topic+and+Partition+Leader
回答by betaboy00
with Kafka 0.9.0
卡夫卡 0.9.0
you can list the topics in the server with the provided consumer method listTopics();
您可以使用提供的消费者方法 listTopics() 列出服务器中的主题;
eg.
例如。
Map<String, List<PartitionInfo> > topics;
Properties props = new Properties();
props.put("bootstrap.servers", "1.2.3.4:9092");
props.put("group.id", "test-consumer-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
topics = consumer.listTopics();
consumer.close();
回答by hba
If you want to pull broker or other-kafka information from Zookeeper then kafka.utils.ZkUtils
provides a nice interface. Here is the code I have to list all zookeeper brokers (there are a ton of other methods there):
如果你想从 Zookeeper 中提取 broker 或其他 kafka 信息,那么kafka.utils.ZkUtils
提供一个很好的界面。这是我必须列出所有动物园管理员经纪人的代码(那里有很多其他方法):
List<Broker> listBrokers() {
final ZkConnection zkConnection = new ZkConnection(connectionString);
final int sessionTimeoutMs = 10 * 1000;
final int connectionTimeoutMs = 20 * 1000;
final ZkClient zkClient = new ZkClient(connectionString,
sessionTimeoutMs,
connectionTimeoutMs,
ZKStringSerializer$.MODULE$);
final ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false);
scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllBrokersInCluster());
}
回答by fengkb
I think this is the best way:
我认为这是最好的方法:
ZkClient zkClient = new ZkClient("zkHost:zkPort");
List<String> topics = JavaConversions.asJavaList(ZkUtils.getAllTopics(zkClient));
回答by Ranga Reddy
Using Scala:
使用斯卡拉:
import java.util.{Properties}
import org.apache.kafka.clients.consumer.KafkaConsumer
object KafkaTest {
def main(args: Array[String]): Unit = {
val brokers = args(0)
val props = new Properties();
props.put("bootstrap.servers", brokers);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
val consumer = new KafkaConsumer[String, String](props);
val topics = consumer.listTopics().keySet();
println(topics)
}
}