Java,如何在 apache kafka 中获取主题中的消息数

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/28579948/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-11 06:27:54  来源:igfitidea点击:

Java, How to get number of messages in a topic in apache kafka

javamessagesapache-kafka

提问by Chetan

I am using apache kafka for messaging. I have implemented the producer and consumer in Java. How can we get the number of messages in a topic?

我正在使用 apache kafka 进行消息传递。我已经用 Java 实现了生产者和消费者。我们如何获取主题中的消息数量?

采纳答案by Lundahl

The only way that comes to mind for this from a consumer point of view is to actually consume the messages and count them then.

从消费者的角度来看,唯一想到的方法是实际消费消息并计算它们。

The Kafka broker exposes JMX counters for number of messages received since start-up but you cannot know how many of them have been purged already.

Kafka 代理公开 JMX 计数器以显示自启动以来收到的消息数,但您无法知道其中有多少已被清除。

In most common scenarios, messages in Kafka is best seen as an infinite stream and getting a discrete value of how many that is currently being kept on disk is not relevant. Furthermore things get more complicated when dealing with a cluster of brokers which all have a subset of the messages in a topic.

在大多数常见场景中,Kafka 中的消息最好被视为一个无限流,并且获取当前保留在磁盘上的数量的离散值是无关紧要的。此外,在处理一组代理时,事情会变得更加复杂,这些代理在一个主题中都有一个消息子集。

回答by hba

I haven't tried thismyself, but it seems to make sense.

我自己没有尝试过这个,但它似乎是有道理的。

You can also use kafka.tools.ConsumerOffsetChecker(source).

您也可以使用kafka.tools.ConsumerOffsetChecker)。

回答by ssemichev

It is not java, but may be useful

它不是java,但可能有用

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell 
  --broker-list <broker>:  <port> 
  --topic <topic-name> --time -1 --offsets 1 
  | awk -F  ":" '{sum += } END {print sum}'

回答by Thomas Decaux

Use https://prestodb.io/docs/current/connector/kafka-tutorial.html

使用https://prestodb.io/docs/current/connector/kafka-tutorial.html

A super SQL engine, provided by Facebook, that connects on several data sources (Cassandra, Kafka, JMX, Redis ...).

Facebook 提供的超级 SQL 引擎,可连接多个数据源(Cassandra、Kafka、JMX、Redis ...)。

PrestoDB is running as a server with optional workers (there is a standalone mode without extra workers), then you use a small executable JAR (called presto CLI) to make queries.

PrestoDB 作为带有可选工作程序的服务器运行(有一个没有额外工作程序的独立模式),然后您使用一个小的可执行 JAR(称为 presto CLI)进行查询。

Once you have configured well the Presto server , you can use traditionnal SQL:

一旦你配置好 Presto 服务器,你就可以使用传统的 SQL:

SELECT count(*) FROM TOPIC_NAME;

回答by Rudy

I actually use this for benchmarking my POC. The item you want to use ConsumerOffsetChecker. You can run it using bash script like below.

我实际上使用它来对我的 POC 进行基准测试。您要使用 ConsumerOffsetChecker 的项目。您可以使用如下所示的 bash 脚本运行它。

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker  --topic test --zookeeper localhost:2181 --group testgroup

And below is the result : enter image description hereAs you can see on the red box, 999 is the number of message currently in the topic.

下面是结果: 在此处输入图片说明正如您在红色框中看到的那样,999 是当前主题中的消息数。

Update: ConsumerOffsetChecker is deprecated since 0.10.0, you may want to start using ConsumerGroupCommand.

更新:ConsumerOffsetChecker 自 0.10.0 起已弃用,您可能希望开始使用 ConsumerGroupCommand。

回答by Eric Leschinski

Apache Kafka command to get un handled messages on all partitions of a topic:

用于获取主题所有分区上未处理消息的 Apache Kafka 命令:

kafka-run-class kafka.tools.ConsumerOffsetChecker 
    --topic test --zookeeper localhost:2181 
    --group test_group

Prints:

印刷:

Group      Topic        Pid Offset          logSize         Lag             Owner
test_group test         0   11051           11053           2               none
test_group test         1   10810           10812           2               none
test_group test         2   11027           11028           1               none

Column 6 is the un-handled messages. Add them up like this:

第 6 列是未处理的消息。像这样把它们加起来:

kafka-run-class kafka.tools.ConsumerOffsetChecker 
    --topic test --zookeeper localhost:2181 
    --group test_group 2>/dev/null | awk 'NR>1 {sum += } 
    END {print sum}'

awk reads the rows, skips the header line and adds up the 6th column and at the end prints the sum.

awk 读取行,跳过标题行并将第 6 列相加,最后打印总和。

Prints

印刷

5

回答by AutomatedMike

To get all the messages stored for the topic you can seek the consumer to the beginning and end of the stream for each partition and sum the results

要获取为该主题存储的所有消息,您可以寻找消费者到每个分区的流的开头和结尾,并对结果求和

List<TopicPartition> partitions = consumer.partitionsFor(topic).stream()
        .map(p -> new TopicPartition(topic, p.partition()))
        .collect(Collectors.toList());
    consumer.assign(partitions); 
    consumer.seekToEnd(Collections.emptySet());
Map<TopicPartition, Long> endPartitions = partitions.stream()
        .collect(Collectors.toMap(Function.identity(), consumer::position));
    consumer.seekToBeginning(Collections.emptySet());
System.out.println(partitions.stream().mapToLong(p -> endPartitions.get(p) - consumer.position(p)).sum());

回答by Christophe Quintard

Using the Java client of Kafka 2.11-1.0.0, you can do the following thing :

使用Kafka 2.11-1.0.0的Java客户端,可以做以下事情:

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Collections.singletonList("test"));
    while(true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());

            // after each message, query the number of messages of the topic
            Set<TopicPartition> partitions = consumer.assignment();
            Map<TopicPartition, Long> offsets = consumer.endOffsets(partitions);
            for(TopicPartition partition : offsets.keySet()) {
                System.out.printf("partition %s is at %d\n", partition.topic(), offsets.get(partition));
            }
        }
    }

Output is something like this :

输出是这样的:

offset = 10, key = null, value = un
partition test is at 13
offset = 11, key = null, value = deux
partition test is at 13
offset = 12, key = null, value = trois
partition test is at 13

回答by f01

In most recent versions of Kafka Manager, there is a column titled Summed Recent Offsets.

在最新版本的 Kafka Manager 中,有一列标题为Summed 最近的偏移量

enter image description here

在此处输入图片说明

回答by pdp

Sometimes the interest is in knowing the number of messages in each partition, for example, when testing a custom partitioner.The ensuing steps have been tested to work with Kafka 0.10.2.1-2 from Confluent 3.2. Given a Kafka topic, ktand the following command-line:

有时感兴趣的是知道每个分区中的消息数量,例如,在测试自定义分区器时。随后的步骤已经过测试,可与 Confluent 3.2 中的 Kafka 0.10.2.1-2 一起使用。给定一个 Kafka 主题kt和以下命令行:

$ kafka-run-class kafka.tools.GetOffsetShell \
  --broker-list host01:9092,host02:9092,host02:9092 --topic kt

That prints the sample output showing the count of messages in the three partitions:

这将打印显示三个分区中消息计数的示例输出:

kt:2:6138
kt:1:6123
kt:0:6137

The number of lines could be more or less depending on the number of partitions for the topic.

行数可能更多或更少,具体取决于主题的分区数。