Java CommitFailedException 提交无法完成,因为该组已经重新平衡并将分区分配给另一个成员

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/45560255/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-12 02:04:54  来源:igfitidea点击:

CommitFailedException Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member

javaapache-kafka

提问by Simon Su

I was using kafka 0.10.2 and now faced a CommitFailedException. like:

我使用的是 kafka 0.10.2,现在遇到了 CommitFailedException。喜欢:

Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

由于组已经重新平衡并将分区分配给另一个成员,因此无法完成提交。这意味着对 poll() 的后续调用之间的时间比配置的 max.poll.interval.ms 长,这通常意味着轮询循环花费了太多时间处理消息。您可以通过增加会话超时或通过使用 max.poll.records 减少 poll() 中返回的批处理的最大大小来解决此问题。

I have set max.poll.interval.ms to Integer.MAX_VALUE. so can anyone tell me why this still happens even I have set the value ?

我已将 max.poll.interval.ms 设置为 Integer.MAX_VALUE。所以谁能告诉我为什么即使我已经设置了这个值仍然会发生?

Another question is: I do as description to set session.timeout.ms to 60000 and it still happens. I try to reproduce by a simple code

另一个问题是:我按照说明将 session.timeout.ms 设置为 60000 并且它仍然发生。我尝试通过简单的代码重现

 public static void main(String[] args) throws InterruptedException {     
        Logger logger = Logger.getLogger(KafkaConsumer10.class);
        logger.info("XX");
        Properties props = new Properties();
        props.put("bootstrap.servers", "kafka-broker:9098");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("max.poll.interval.ms", "300000");
        props.put("session.timeout.ms", "10000");
        props.put("max.poll.records", "2");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("t1"));
        while (true) {
            Thread.sleep(11000);
            ConsumerRecords<String, String> records = consumer.poll(100);
            //Thread.sleep(11000);
            Thread.sleep(11000);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }

when I set session.timeout.ms to 10000, I try to sleep more than 10000 ms in my poll loop, but it seems work and no Exception out. so I'm confused about this. if heartbeat is triggered by consumer.poll and consumer.commit, seems heartbeat is out of session timeout in my code. why not throw CommitFailedException ?

当我将 session.timeout.ms 设置为 10000 时,我尝试在轮询循环中睡眠超过 10000 毫秒,但它似乎有效并且没有异常。所以我对此感到困惑。如果心跳是由consumer.poll 和consumer.commit 触发的,则在我的代码中,心跳似乎已超出会话超时。为什么不抛出 CommitFailedException ?

回答by Rahul Teke

session.timeout.msset on the consumer should be less than the group.max.session.timeout.msset on Kafka broker.

session.timeout.ms消费者上的设置应该小于group.max.session.timeout.msKafka 代理上的设置。

This resolved the issue for me.

这为我解决了这个问题。

Credit to github link Commit Failures

归功于 github 链接提交失败

回答by Abhimanyu

Hi For this you need to handle the rebalancing condition in your code and should process the ongoing message and commit it before rebalancing

嗨为此,您需要处理代码中的重新平衡条件,并且应该处理正在进行的消息并在重新平衡之前提交它

Like :

喜欢 :

private class HandleRebalance implements ConsumerRebalanceListener {
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            // Implement what you want to do once rebalancing is done.
        }

        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {

           // commit current method

       }
    }

and Use this syntax for subscribing the topic :

并使用此语法订阅主题:

kafkaConsumer.subscribe(topicNameList , new HandleRebalance())

kafkaConsumer.subscribe(topicNameList , new HandleRebalance())

The advantage of doing this :

这样做的好处:

  1. Messages will not repeat when the rebalancing is taking place.

  2. No commit fail exception led exception

  1. 当重新平衡发生时,消息不会重复。

  2. 没有提交失败异常导致异常