Java 了解消费者组 ID

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/41376647/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-11 23:20:04  来源:igfitidea点击:

understanding consumer group id

javaapache-kafkakafka-consumer-api

提问by Ankit

I did fresh installation of Apache Kafka 0.10.1.0.

我全新安装了 Apache Kafka 0.10.1.0。

I was able to send / receive messages on command prompt.

我能够在命令提示符下发送/接收消息。

While using Producer / Consumer Java Example, I am not able to know group.idparameter on Consumer Example.

在使用生产者/消费者 Java 示例时,我无法知道消费者示例上的group.id参数。

Let me know on how to fix this issue.

让我知道如何解决这个问题。

Below is Consumer Example I had used:

以下是我使用过的消费者示例:

public static void main(String[] args) {
             Properties props = new Properties();
             props.put("bootstrap.servers", "localhost:9092");
             props.put("group.id", "my-topic");
             props.put("enable.auto.commit", "true");
             props.put("auto.commit.interval.ms", "1000");
             props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
             props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
             KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
             try {
                 consumer.subscribe(Arrays.asList("my-topic"));

                     ConsumerRecords<String, String> records = consumer.poll(100);
                     System.err.println("records size=>"+records.count());
                     for (ConsumerRecord<String, String> record : records) 
                         System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());



              }
             catch (Exception ex){
                 ex.printStackTrace();
             }
            finally {
                 consumer.close();
            }
        }

After running the command for consumer, I can see the messages (on the console) posted by producer. But unable to see the messages from java program

为消费者运行命令后,我可以看到生产者发布的消息(在控制台上)。但是无法看到来自java程序的消息

bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic my-topic --from-beginning

bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic my-topic --from-beginning

回答by Raz Omessi

Consumers label themselves with a consumer group name, and each record published to a topic is delivered to one consumer instance within each subscribing consumer group. Consumer instances can be in separate processes or on separate machines.

If all the consumer instances have the same consumer group, then the records will effectively be load balanced over the consumer instances.

If all the consumer instances have different consumer groups, then each record will be broadcast to all the consumer processes.

消费者用消费者组名称标记自己,发布到主题的每条记录都被传递到每个订阅消费者组中的一个消费者实例。消费者实例可以在不同的进程或不同的机器上。

如果所有消费者实例具有相同的消费者组,那么记录将在消费者实例上有效地进行负载平衡。

如果所有的消费者实例都有不同的消费者组,那么每条记录都会广播给所有的消费者进程。

The group.id is a string that uniquely identifies the group of consumer processes to which this consumer belongs.

group.id 是一个字符串,用于唯一标识此消费者所属的消费者进程组。

(Kafka intro)

卡夫卡介绍

回答by guest

In the code you provided you just wait for data once for 100ms. You should receive the data in a loop or wait for longer period of time (you will only get one portion of data in this case). As for 'group.id' it the case you run consumer from console it gets random 'group.id'.

在您提供的代码中,您只需等待数据一次 100 毫秒。您应该循环接收数据或等待更长的时间(在这种情况下您只会获得一部分数据)。至于“group.id”,如果您从控制台运行消费者,它会随机获得“group.id”。

回答by jazz

Since no offset was provided, the java client will wait for new messages but will not show existing messages - this is as expected. If one intends to read all the messages already in the topic one can use this piece of code:

由于没有提供偏移量,java 客户端将等待新消息但不会显示现有消息 - 这正如预期的那样。如果打算阅读主题中已有的所有消息,可以使用以下代码:

if (READ_FROM_BEGINNING) {
    //consume all the messages from the topic from the beginning.
    //this doesn't work reliably if it consumer.poll(..) is not called first 
    //probably because of lazy-loading issues            
    consumer.poll(10);
    consumer.seekToBeginning(consumer.assignment()); //if intending to 
    //read from the beginning or call below to read from a predefined offset.
    //consumer.seek(consumer.assignment().iterator().next(), READ_FROM_OFFSET);
}

回答by Harpreet Varma

Give any random value to group id. It doesn't matter.

为组 ID 提供任何随机值。没关系。

props.put("group.id", "Any Random Value");

回答by Beginner

Here are some test results on partition and consumer property group.id

以下是分区和消费者属性 group.id 的一些测试结果

 Properties props = new Properties();
  //set all other properties as required
  props.put("group.id", "ConsumerGroup1");
  props.put("max.poll.records", "1");
  KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

consumer.group id is to load balance the produced data (if the group.id is different for each consumer, each consumer will get the copy of data)

consumer.group id 是对产生的数据进行负载均衡(如果每个consumer的group.id不同,每个consumer都会得到数据的副本)

if partition=1 and total consumers count = 2, only one out of two active consumer will get data

如果 partition=1 且消费者总数 = 2,则只有二分之一的活跃消费者会获得数据

if partition=2 and total consumers count = 2, each of the two active consumers evenly get data

如果 partition=2 且总消费者数 = 2,则两个活跃消费者中的每一个都均匀地获取数据

if partition=3 and total consumers count = 2, each of the two active consumers will get data. one consumer gets data from 2 partitions and other gets data from 1 partition.

如果 partition=3 且总消费者数 = 2,则两个活动消费者中的每一个都将获得数据。一个消费者从 2 个分区获取数据,另一个消费者从 1 个分区获取数据。

if partition=3 and total consumers count = 3, each of the three active consumers evenly gets data.

如果 partition=3 且总消费者数 = 3,则三个活跃消费者中的每一个都均匀地获取数据。

回答by MANISH PARGANIHA

The consumer group id the consumer group which should be defined in the Kafka consumer.properties file.

消费者组 id 应该在 Kafka consumer.properties 文件中定义的消费者组。

Do add "my-topic" to consumer group and it should work as below:

将“我的主题”添加到消费者组,它应该如下工作:

# consumer group id
group.id=my-topic-consumer-group