Java 如何使用 Kafka 发送大消息(超过 15MB)?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/21020347/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-13 06:01:06  来源:igfitidea点击:

How can I send large messages with Kafka (over 15MB)?

javaapache-kafka

提问by Sonson123

I send String-messages to Kafka V. 0.8 with the Java Producer API. If the message size is about 15 MB I get a MessageSizeTooLargeException. I have tried to set message.max.bytesto 40 MB, but I still get the exception. Small messages worked without problems.

我使用 Java Producer API 向 Kafka V. 0.8 发送字符串消息。如果消息大小约为 15 MB,我会得到一个MessageSizeTooLargeException. 我尝试设置message.max.bytes为 40 MB,但仍然出现异常。小消息工作没有问题。

(The exception appear in the producer, I don't have a consumer in this application.)

(异常出现在生产者中,我在这个应用程序中没有消费者。)

What can I do to get rid of this exception?

我该怎么做才能摆脱这个异常?

My example producer config

我的示例生产者配置

private ProducerConfig kafkaConfig() {
    Properties props = new Properties();
    props.put("metadata.broker.list", BROKERS);
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("request.required.acks", "1");
    props.put("message.max.bytes", "" + 1024 * 1024 * 40);
    return new ProducerConfig(props);
}

Error-Log:

错误日志:

4709 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
4869 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with    correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5035 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with   correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5198 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5305 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to send requests for topics datasift with correlation ids in [213,224]

kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
at kafka.javaapi.producer.Producer.send(Unknown Source)

采纳答案by laughing_man

You need to adjust three (or four) properties:

您需要调整三个(或四个)属性:

  • Consumer side:fetch.message.max.bytes- this will determine the largest size of a message that can be fetched by the consumer.
  • Broker side: replica.fetch.max.bytes- this will allow for the replicas in the brokers to send messages within the cluster and make sure the messages are replicated correctly. If this is too small, then the message will never be replicated, and therefore, the consumer will never see the message because the message will never be committed (fully replicated).
  • Broker side: message.max.bytes- this is the largest size of the message that can be received by the broker from a producer.
  • Broker side (per topic): max.message.bytes- this is the largest size of the message the broker will allow to be appended to the topic. This size is validated pre-compression. (Defaults to broker's message.max.bytes.)
  • 消费者端:fetch.message.max.bytes- 这将确定消费者可以获取的最大消息大小。
  • 代理端:replica.fetch.max.bytes- 这将允许代理中的副本在集群内发送消息并确保消息被正确复制。如果这太小,那么消息将永远不会被复制,因此消费者永远不会看到该消息,因为该消息永远不会被提交(完全复制)。
  • 代理端:message.max.bytes- 这是代理可以从生产者接收的最大消息大小。
  • 代理端(每个主题):max.message.bytes- 这是代理允许附加到主题的最大消息大小。这个大小是经过验证的预压缩。(默认为经纪人的message.max.bytes.)

I found out the hard way about number 2 - you don't get ANY exceptions, messages, or warnings from Kafka, so be sure to consider this when you are sending large messages.

我发现了第 2 条的艰难方法——您不会从 Kafka 收到任何异常、消息或警告,因此在发送大消息时一定要考虑这一点。

回答by user2720864

One key thing to remember that message.max.bytesattribute must be in syncwith the consumer's fetch.message.max.bytesproperty. the fetch size must be at least as large as the maximum message size otherwise there could be situation where producers can send messages larger than the consumer can consume/fetch. It might worth taking a look at it.
Which version of Kafka you are using? Also provide some more details trace that you are getting. is there some thing like ... payload size of xxxx larger than 1000000coming up in the log?

要记住的一件关键事情是message.max.bytes属性必须与消费者的属性同步fetch.message.max.bytes。获取大小必须至少与最大消息大小一样大,否则可能会出现生产者发送的消息大于消费者可以消费/获取的消息的情况。可能值得一看。
您使用的是哪个版本的 Kafka?还提供一些您获得的更多详细信息跟踪。有没有像......payload size of xxxx larger than 1000000出现在日志中的东西?

回答by user2550587

You need to override the following properties:

您需要覆盖以下属性:

Broker Configs($KAFKA_HOME/config/server.properties)

代理配置($KAFKA_HOME/config/server.properties)

  • replica.fetch.max.bytes
  • message.max.bytes
  • 副本.fetch.max.bytes
  • 消息.max.bytes

Consumer Configs($KAFKA_HOME/config/consumer.properties)
This step didn't work for me. I add it to the consumer app and it was working fine

消费者配置($KAFKA_HOME/config/consumer.properties)
这一步对我不起作用。我将它添加到消费者应用程序并且运行良好

  • fetch.message.max.bytes
  • fetch.message.max.bytes

Restart the server.

重新启动服务器。

look at this documentation for more info: http://kafka.apache.org/08/configuration.html

查看此文档以获取更多信息:http: //kafka.apache.org/08/configuration.html

回答by Ravi

The idea is to have equal size of message being sent from Kafka Producer to Kafka Broker and then received by Kafka Consumer i.e.

这个想法是从 Kafka Producer 向 Kafka Broker 发送相同大小的消息,然后由 Kafka Consumer 接收,即

Kafka producer --> Kafka Broker --> Kafka Consumer

卡夫卡生产者 --> 卡夫卡经纪人 --> 卡夫卡消费者

Suppose if the requirement is to send 15MB of message, then the Producer, the Brokerand the Consumer, all three, needs to be in sync.

假设如果要求发送 15MB 的消息,那么ProducerBrokerConsumer三个都需要同步。

Kafka Producersends 15 MB -->Kafka BrokerAllows/Stores 15 MB -->Kafka Consumerreceives 15 MB

Kafka Producer发送 15 MB --> Kafka Broker允许/存储 15 MB --> Kafka Consumer收到 15 MB

The setting therefore should be:

因此,设置应该是:

a) on Broker:

a) 在经纪商上:

message.max.bytes=15728640 
replica.fetch.max.bytes=15728640

b) on Consumer:

b) 关于消费者:

fetch.message.max.bytes=15728640

回答by Sascha Vetter

Minor changes required for Kafka 0.10and the new consumercompared to laughing_man's answer:

laughing_man 的回答相比,Kafka 0.10新消费者所需的小改动:

  • Broker: No changes, you still need to increase properties message.max.bytesand replica.fetch.max.bytes. message.max.byteshas to be equal or smaller(*) than replica.fetch.max.bytes.
  • Producer: Increase max.request.sizeto send the larger message.
  • Consumer: Increase max.partition.fetch.bytesto receive larger messages.
  • Broker:没有变化,你还需要增加属性message.max.bytesreplica.fetch.max.bytesmessage.max.bytes必须等于或小于 (*) replica.fetch.max.bytes
  • 生产者:增加max.request.size发送更大的消息。
  • 消费者:增加max.partition.fetch.bytes以接收更大的消息。

(*) Read the comments to learn more about message.max.bytes<=replica.fetch.max.bytes

(*) 阅读评论以了解更多关于message.max.bytes<=replica.fetch.max.bytes

回答by Bhanu Hoysala

The answer from @laughing_man is quite accurate. But still, I wanted to give a recommendation which I learned from Kafka expert Stephane Maarekfrom Quora.

@laughing_man 的回答非常准确。但是,我还是想给出一个我从 Quora 的Kafka 专家Stephane Maarek那里学到的建议。

Kafka isn't meant to handle large messages.

Kafka 并不是用来处理大消息的。

Your API should use cloud storage (Ex AWS S3), and just push to Kafka or any message broker a reference of S3. You must find somewhere to persist your data, maybe it's a network drive, maybe it's whatever, but it shouldn't be message broker.

您的 API 应该使用云存储(例如 AWS S3),并将 S3 的引用推送到 Kafka 或任何消息代理。你必须找到一个地方来保存你的数据,也许它是一个网络驱动器,也许它是任何东西,但它不应该是消息代理。

Now, if you don't want to go with the above solution

现在,如果您不想采用上述解决方案

The message max size is 1MB (the setting in your brokers is called message.max.bytes) Apache Kafka. If you really needed it badly, you could increase that size and make sure to increase the network buffers for your producers and consumers.

消息最大大小为 1MB(代理中的设置称为message.max.bytesApache Kafka。如果您真的非常需要它,您可以增加该大小并确保为您的生产者和消费者增加网络缓冲区。

And if you really care about splitting your message, make sure each message split has the exact same key so that it gets pushed to the same partition, and your message content should report a “part id” so that your consumer can fully reconstruct the message.

如果您真的关心拆分消息,请确保每个拆分的消息具有完全相同的键,以便将其推送到同一分区,并且您的消息内容应报告“部分 ID”,以便您的消费者可以完全重建消息.

You can also explore compression, if your message is text-based (gzip, snappy, lz4 compression) which may reduce the data size, but not magically.

如果您的消息是基于文本的(gzip、snappy、lz4 压缩),您还可以探索压缩,这可能会减少数据大小,但不是神奇的。

Again, you have to use an external system to store that data and just push an external reference to Kafka. That is a very common architecture, and one you should go with and widely accepted.

同样,您必须使用外部系统来存储该数据,并将外部引用推送到 Kafka。这是一种非常常见的架构,您应该采用并被广泛接受。

Keep that in mind Kafka works best only if the messages are huge in amount but not in size.

请记住,只有当消息数量巨大但大小不一时,Kafka 才能发挥最佳效果。

Source: https://www.quora.com/How-do-I-send-Large-messages-80-MB-in-Kafka

来源:https: //www.quora.com/How-do-I-send-Large-messages-80-MB-in-Kafka