Java,如何在 apache kafka 中获取主题中的消息数
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/28579948/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Java, How to get number of messages in a topic in apache kafka
提问by Chetan
I am using apache kafka for messaging. I have implemented the producer and consumer in Java. How can we get the number of messages in a topic?
我正在使用 apache kafka 进行消息传递。我已经用 Java 实现了生产者和消费者。我们如何获取主题中的消息数量?
采纳答案by Lundahl
The only way that comes to mind for this from a consumer point of view is to actually consume the messages and count them then.
从消费者的角度来看,唯一想到的方法是实际消费消息并计算它们。
The Kafka broker exposes JMX counters for number of messages received since start-up but you cannot know how many of them have been purged already.
Kafka 代理公开 JMX 计数器以显示自启动以来收到的消息数,但您无法知道其中有多少已被清除。
In most common scenarios, messages in Kafka is best seen as an infinite stream and getting a discrete value of how many that is currently being kept on disk is not relevant. Furthermore things get more complicated when dealing with a cluster of brokers which all have a subset of the messages in a topic.
在大多数常见场景中,Kafka 中的消息最好被视为一个无限流,并且获取当前保留在磁盘上的数量的离散值是无关紧要的。此外,在处理一组代理时,事情会变得更加复杂,这些代理在一个主题中都有一个消息子集。
回答by hba
回答by ssemichev
It is not java, but may be useful
它不是java,但可能有用
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell
--broker-list <broker>: <port>
--topic <topic-name> --time -1 --offsets 1
| awk -F ":" '{sum += } END {print sum}'
回答by Thomas Decaux
Use https://prestodb.io/docs/current/connector/kafka-tutorial.html
使用https://prestodb.io/docs/current/connector/kafka-tutorial.html
A super SQL engine, provided by Facebook, that connects on several data sources (Cassandra, Kafka, JMX, Redis ...).
Facebook 提供的超级 SQL 引擎,可连接多个数据源(Cassandra、Kafka、JMX、Redis ...)。
PrestoDB is running as a server with optional workers (there is a standalone mode without extra workers), then you use a small executable JAR (called presto CLI) to make queries.
PrestoDB 作为带有可选工作程序的服务器运行(有一个没有额外工作程序的独立模式),然后您使用一个小的可执行 JAR(称为 presto CLI)进行查询。
Once you have configured well the Presto server , you can use traditionnal SQL:
一旦你配置好 Presto 服务器,你就可以使用传统的 SQL:
SELECT count(*) FROM TOPIC_NAME;
回答by Rudy
I actually use this for benchmarking my POC. The item you want to use ConsumerOffsetChecker. You can run it using bash script like below.
我实际上使用它来对我的 POC 进行基准测试。您要使用 ConsumerOffsetChecker 的项目。您可以使用如下所示的 bash 脚本运行它。
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --zookeeper localhost:2181 --group testgroup
And below is the result :
As you can see on the red box, 999 is the number of message currently in the topic.
下面是结果:
正如您在红色框中看到的那样,999 是当前主题中的消息数。
Update: ConsumerOffsetChecker is deprecated since 0.10.0, you may want to start using ConsumerGroupCommand.
更新:ConsumerOffsetChecker 自 0.10.0 起已弃用,您可能希望开始使用 ConsumerGroupCommand。
回答by Eric Leschinski
Apache Kafka command to get un handled messages on all partitions of a topic:
用于获取主题所有分区上未处理消息的 Apache Kafka 命令:
kafka-run-class kafka.tools.ConsumerOffsetChecker
--topic test --zookeeper localhost:2181
--group test_group
Prints:
印刷:
Group Topic Pid Offset logSize Lag Owner
test_group test 0 11051 11053 2 none
test_group test 1 10810 10812 2 none
test_group test 2 11027 11028 1 none
Column 6 is the un-handled messages. Add them up like this:
第 6 列是未处理的消息。像这样把它们加起来:
kafka-run-class kafka.tools.ConsumerOffsetChecker
--topic test --zookeeper localhost:2181
--group test_group 2>/dev/null | awk 'NR>1 {sum += }
END {print sum}'
awk reads the rows, skips the header line and adds up the 6th column and at the end prints the sum.
awk 读取行,跳过标题行并将第 6 列相加,最后打印总和。
Prints
印刷
5
回答by AutomatedMike
To get all the messages stored for the topic you can seek the consumer to the beginning and end of the stream for each partition and sum the results
要获取为该主题存储的所有消息,您可以寻找消费者到每个分区的流的开头和结尾,并对结果求和
List<TopicPartition> partitions = consumer.partitionsFor(topic).stream()
.map(p -> new TopicPartition(topic, p.partition()))
.collect(Collectors.toList());
consumer.assign(partitions);
consumer.seekToEnd(Collections.emptySet());
Map<TopicPartition, Long> endPartitions = partitions.stream()
.collect(Collectors.toMap(Function.identity(), consumer::position));
consumer.seekToBeginning(Collections.emptySet());
System.out.println(partitions.stream().mapToLong(p -> endPartitions.get(p) - consumer.position(p)).sum());
回答by Christophe Quintard
Using the Java client of Kafka 2.11-1.0.0, you can do the following thing :
使用Kafka 2.11-1.0.0的Java客户端,可以做以下事情:
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test"));
while(true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
// after each message, query the number of messages of the topic
Set<TopicPartition> partitions = consumer.assignment();
Map<TopicPartition, Long> offsets = consumer.endOffsets(partitions);
for(TopicPartition partition : offsets.keySet()) {
System.out.printf("partition %s is at %d\n", partition.topic(), offsets.get(partition));
}
}
}
Output is something like this :
输出是这样的:
offset = 10, key = null, value = un
partition test is at 13
offset = 11, key = null, value = deux
partition test is at 13
offset = 12, key = null, value = trois
partition test is at 13
回答by f01
回答by pdp
Sometimes the interest is in knowing the number of messages in each partition, for example, when testing a custom partitioner.The ensuing steps have been tested to work with Kafka 0.10.2.1-2 from Confluent 3.2. Given a Kafka topic, kt
and the following command-line:
有时感兴趣的是知道每个分区中的消息数量,例如,在测试自定义分区器时。随后的步骤已经过测试,可与 Confluent 3.2 中的 Kafka 0.10.2.1-2 一起使用。给定一个 Kafka 主题kt
和以下命令行:
$ kafka-run-class kafka.tools.GetOffsetShell \
--broker-list host01:9092,host02:9092,host02:9092 --topic kt
That prints the sample output showing the count of messages in the three partitions:
这将打印显示三个分区中消息计数的示例输出:
kt:2:6138
kt:1:6123
kt:0:6137
The number of lines could be more or less depending on the number of partitions for the topic.
行数可能更多或更少,具体取决于主题的分区数。