java Kafka CommitFailedException 消费者异常

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/35658171/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-11-03 00:25:13  来源:igfitidea点击:

Kafka CommitFailedException consumer exception

javaapache-kafkakafka-consumer-api

提问by Hugo Carmona

After create multiple consumers (using Kafka 0.9 java API) and each thread started, I'm getting the following exception

创建多个消费者(使用 Kafka 0.9 java API)并启动每个线程后,我收到以下异常

Consumer has failed with exception: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalance
class com.messagehub.consumer.Consumer is shutting down.
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalance
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:546)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:487)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:681)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:654)
at org.apache.kafka.clients.consumer.internals.RequestFuture.onSuccess(RequestFuture.java:167)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:350)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:288)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:303)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:197)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:187)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:157)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:352)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:936)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:905)

and then start consuming message normally, I would like to know what is causing this exception in order to fix it.

然后开始正常消费消息,我想知道是什么导致了这个异常以修复它。

回答by ajkret

Try also to tweak the following parameters:

还尝试调整以下参数:

  • heartbeat.interval.ms- This tells Kafka wait the specified amount of milliseconds before it consider the consumer will be considered "dead"
  • max.partition.fetch.bytes- This will limit the amount of messages (up to) the consumer will receive when polling.
  • heartbeat.interval.ms- 这告诉 Kafka 在它认为消费者将被视为“死亡”之前等待指定的毫秒数
  • max.partition.fetch.bytes- 这将限制消费者在轮询时接收的消息数量(最多)。

I noticed that the rebalancing occurs if the consumer does not commit to Kafka before the heartbeat times out. If the commit occurs after the messages are processed, the amount of time to process them will determine these parameters. So, decreasing the number of messages and increasing the heartbeat time will help to avoid rebalancing.

我注意到如果消费者在心跳超时之前没有提交给 Kafka,就会发生重新平衡。如果在处理消息后发生提交,则处理它们的时间量将决定这些参数。因此,减少消息数量和增加心跳时间将有助于避免重新平衡。

Also consider to use more partitions, so there will be more threads processing your data, even with less messages per poll.

还要考虑使用更多分区,这样即使每次轮询的消息较少,也会有更多线程处理您的数据。

I wrote this small application to make tests. Hope it helps.

我编写了这个小应用程序来进行测试。希望能帮助到你。

https://github.com/ajkret/kafka-sample

https://github.com/ajkret/kafka-sample

UPDATE

更新

Kafka 0.10.x now offers a new parameter to control the number of messages received: - max.poll.records- The maximum number of records returned in a single call to poll().

Kafka 0.10.x 现在提供了一个新参数来控制接收的消息数量: - max.poll.records- 一次调用 poll() 时返回的最大记录数。

UPDATE

更新

Kafka offers a way to pausethe queue. While the queue is paused, you can process the messages in a separated Thread, allowing you to call KafkaConsumer.poll()to send heartbeats. Then call KafkaConsumer.resume()after the processing is done. This way you mitigate the problems of causingrebalances due to not sending heartbeats. Here is an outline of what you can do :

Kafka 提供了一种暂停队列的方法。当队列暂停时,您可以在单独的线程中处理消息,允许您调用KafkaConsumer.poll()来发送心跳。然后在处理完成后调用KafkaConsumer.resume()。通过这种方式,您可以减轻由于不发送心跳而导致重新平衡的问题。以下是您可以执行的操作的概述:

while(true) {
    ConsumerRecords records = consumer.poll(Integer.MAX_VALUE);
    consumer.commitSync();

    consumer.pause();
    for(ConsumerRecord record: records) {

        Future<Boolean> future = workers.submit(() -> {
            // Process
            return true;
        }); 


       while (true) {
            try {
                if (future.get(1, TimeUnit.SECONDS) != null) {
                    break;
                }
            } catch (java.util.concurrent.TimeoutException e) {
                getConsumer().poll(0);
            }
        }
    }

    consumer.resume();
}