Java Apache Kafka 客户端什么时候抛出“批量过期”异常?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/34794260/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
When does the Apache Kafka client throw a "Batch Expired" exception?
提问by James Thomas
Using the Apache Kafka Java client (0.9), I'm trying to send a long series of records to the broker using the Kafka Producer class.
使用 Apache Kafka Java 客户端 (0.9),我尝试使用Kafka Producer 类向代理发送一长串记录。
The asynchronous send methodreturns immediately for a while, then starts blocking on each call for a short time period. After around thirty seconds, the client starts throwing exceptions (TimeoutException), with the message "Batch expired".
异步发送方法立即返回一段时间,然后在短时间内开始阻塞每次调用。大约 30 秒后,客户端开始抛出异常 ( TimeoutException),并显示消息"Batch expired"。
What circumstances cause this exception to be thrown?
什么情况会导致抛出这个异常?
采纳答案by James Thomas
This exception indicates you are queueing records at a faster rate than they can be sent.
此异常表明您正在以比发送记录更快的速度对记录进行排队。
When you call the sendmethod, the ProducerRecordwill be stored in an internal buffer for sending to the broker. The method returns immediately once the ProducerRecordhas been buffered, regardless of whether it has been sent.
当您调用send方法时,ProducerRecord将存储在内部缓冲区中以发送给代理。一旦ProducerRecord被缓冲,该方法立即返回,无论它是否已发送。
Records are grouped into batchesfor sending to the broker, to reduce the transport overheard per message and increase throughput.
记录被分成批次发送到代理,以减少每封邮件偷听运输和提高吞吐量。
Once a record is added a batch, there is a time limit for sending that batch to ensure it has been sent within a specified duration. This is controlled by the Producer configuration parameter, request.timeout.ms, which defaults to thirty seconds.
将记录添加为批次后,发送该批次有时间限制,以确保它已在指定的持续时间内发送。这由 Producer 配置参数request.timeout.ms 控制,默认为 30 秒。
If the batch has been queued longer than the timeout limit, the exception will be thrown. Records in that batch will be removed from the send queue.
如果批处理排队的时间超过超时限制,则会抛出异常。该批次中的记录将从发送队列中删除。
Increasing the timeout limit, using the configuration parameter, will allow the client to queue batches for longer before expiring.
使用配置参数增加超时限制将允许客户端在到期前将批次排队更长时间。
回答by S. Sar
The parameter that controls the time before sending to broker is linger.ms
. Its default value is 0 (no delay).
控制发送到代理之前时间的参数是linger.ms
。其默认值为 0(无延迟)。
回答by Roberto
I got this exception in a completely different context.
我在完全不同的上下文中得到了这个异常。
I have setup a mini cluster of a zookeeper vm, a broker vm and a producer/consumer vm. I opened all neccessary ports on the server (9092) and on the zookeeper (2181) and then tried to publish a message from the consumer/publisher vm to the broker. I got the exception mentioned by the OP, but since I had only published one single message so far (or at least I tried to), the solution couldn't be to increase the timeout or batch size. So I searched on and found this mailing list describing a similar problem I had when trying to consume messages from within the consumer/producer vm (ClosedChannelException): http://grokbase.com/t/kafka/users/152jsjekrm/having-trouble-with-the-simplest-remote-kafka-configThe last post in this mailing list actually describes how to solve the problem.
我已经设置了一个由 zookeeper vm、broker vm 和生产者/消费者 vm 组成的迷你集群。我打开了服务器 (9092) 和 zookeeper (2181) 上的所有必要端口,然后尝试将消息从使用者/发布者 vm 发布到代理。我得到了 OP 提到的异常,但由于到目前为止我只发布了一条消息(或者至少我尝试过),因此解决方案不能是增加超时或批量大小。所以我搜索并发现这个邮件列表描述了我在尝试从消费者/生产者虚拟机(ClosedChannelException)中消费消息时遇到的类似问题:http: //grokbase.com/t/kafka/users/152jsjekrm/have-trouble -with-the-simplest-remote-kafka-config这个邮件列表中的最后一篇文章实际上描述了如何解决问题。
Long story short, if you face both the ChannelClosedException
and the Batch Expired
exception, you likely have to change this line to the following in the server.config
file and restart the broker:
长话短说,如果您同时遇到异常ChannelClosedException
和Batch Expired
异常,您可能必须在server.config
文件中将此行更改为以下内容并重新启动代理:
advertised.host.name=<broker public IP address>
If it isn't set, it falls back to the host.name
property (which probably isn't set neither) and then falls back to the canonical host name of the InetAddress
Java class, which finally isn't correct of course and thus confusing remote nodes.
如果没有设置,它会回退到host.name
属性(可能也没有设置),然后回退到InetAddress
Java 类的规范主机名,这当然最终是不正确的,从而混淆远程节点。
回答by sse
when you create the consumer set ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to true.
创建使用者时,将 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG 设置为 true。
回答by Binita Bharati
I am using Kafka Java client version 0.11.0.0. I also started seeing the same pattern in failure to produce large messages consistently. It was passing for few of the messages, and failing for some others. (Although both passed and failed messages were of the same size).In my case, each message size was around 60KB, which is far higher than Kafka's default batch.size
of 16kB, also my linger.ms
was set to default of 0. This error is being thrown as the Producer client is timing out before it can receive a succesful response from the server.Basically, in my code , this call was timing out : kafkaProd.send(pr).get()
. To fix this, I had to increase the Producer client's default request.timeout.ms
to 60000
我正在使用 Kafka Java 客户端版本 0.11.0.0。我也开始看到相同的模式,无法始终如一地生成大消息。它传递了很少的消息,而其他一些则失败了。(尽管通过和失败的消息大小相同)。在我的例子中,每条消息大小约为 60KB,远高于 Kafka 的默认值batch.size
16kB,我linger.ms
也设置为默认值 0。这个错误被抛出为Producer 客户端在收到来自服务器的成功响应之前超时kafkaProd.send(pr).get()
。基本上,在我的代码中,此调用超时:。为了解决这个问题,我不得不将 Producer 客户端的默认值request.timeout.ms
增加到 60000
回答by Rory G
Had a similar issue with Kafka running in a docker-compose. My docker-compose.yml was set with
在 docker-compose 中运行的 Kafka 也有类似的问题。我的 docker-compose.yml 设置为
KAFKA_ADVERTISED_HOST_NAME: kafka
ports:
- 9092:9092
But when I tried to send a message with camel from outside docker
但是当我尝试从外部 docker 用骆驼发送消息时
to("kafka:test?brokers=localhost:9092")
I got a TimeoutException. I solved it by adding
我得到了一个超时异常。我通过添加解决了它
127.0.0.1 kafka
to Windows\System32\drivers\etc\hosts file and then changing my camel url to
到 Windows\System32\drivers\etc\hosts 文件,然后将我的骆驼网址更改为
to("kafka:test?brokers=kafka:9092")