java 如何在 Kafka 中设置消息的大小?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/35691676/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-11-03 00:27:54  来源:igfitidea点击:

How do I set the size of messages in Kafka?

javaapache-kafkakafka-consumer-apikafka-producer-api

提问by Jane Wayne

I'm currently using Kafka 0.9.0.1. According to some sources I've found, the way to set the sizes of messages is to modify the following key values in server.properties.

我目前正在使用 Kafka 0.9.0.1。根据我发现的一些消息来源,设置消息大小的方法是修改server.properties.

  • message.max.bytes
  • replica.fetch.max.bytes
  • fetch.message.max.bytes
  • 消息.max.bytes
  • 副本.fetch.max.bytes
  • fetch.message.max.bytes

My server.propertiesfile actually has these settings.

我的server.properties文件实际上有这些设置。

message.max.bytes=10485760
replica.fetch.max.bytes=20971520
fetch.message.max.bytes=10485760

Other settings that may be relevant are below.

其他可能相关的设置如下。

socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

However, when I attempt to send messages with payloads of 4 to 6 MB in size, the consumer never gets any messages. The producer seems to send the messages without any exceptions being thrown. If I do send smaller payloads (like < 1 MB) then the consumer does actually receive the messages.

但是,当我尝试发送大小为 4 到 6 MB 的有效负载的消息时,消费者永远不会收到任何消息。生产者似乎发送消息而没有抛出任何异常。如果我确实发送了较小的有效负载(例如 < 1 MB),那么消费者确实会收到消息。

Any idea on what I'm doing wrong in terms of configuration settings?

知道我在配置设置方面做错了什么吗?

Here is the example code to send a message.

这是发送消息的示例代码。

Producer<String, byte[]> producer = new KafkaProducer<>(getProducerProps());
File dir = new File("/path/to/dir");
for(String s : dir.list()) {
  File f = new File(dir, s);
  byte[] data = Files.readAllBytes(f.toPath());
  Payload payload = new Payload(data); //a simple pojo to store payload
  String key = String.valueOf(System.currentTimeMillis());
  byte[] val = KryoUtil.toBytes(payload); //custom util to use kryo to get bytes[]
  producer.send(new ProducerRecord<>("test", key, val));
}
producer.close();

Here is the example code to receive a message.

这是接收消息的示例代码。

KafkaConsumer consumer = new KafkaConsumer<>(getConsumerProps());
consumer.subscribe(Arrays.asList("test"));
while(true) {
  ConsumerRecord<String, byte[]> records = consumer.poll(100);
  for(ConsumerRecord<String, byte[]> record : records) {
    long offset = record.offset();
    String key = record.key();
    byte[] val = record.value();
    Payload payload = (Payload)KryoUtil.toObject(val, Payload.class); //custom util to use kryo to deserialize back to object
    System.out.println(
      System.format("offset=%d, key=%s", offset, key));
  }
}

Here are the methods to populate the properties files for the producer and consumer.

以下是为生产者和消费者填充属性文件的方法。

public static Properties getProducerProps() {
  Properties props = new Properties();
  props.put("bootstrap.servers", "qc1:9092,qc2:9092,qc3:9092,qc4:9092");
  props.put("acks", "all");
  props.put("retries", 0);
  props.put("batch.size", 16384);
  props.put("linger.ms", 1);
  props.put("buffer.memory", 33554432);
  props.put("compression.type", "snappy");
  props.put("max.request.size", 10485760); //need this
  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
  return props;
}

public static Properties getConsumerProps() {
  Properties props = new Properties();
  props.put("bootstrap.servers", "qc1:9092,qc2:9092,qc3:9092,qc4:9092");
  props.put("group.id", "test");
  props.put("enable.auto.commit", "true");
  props.put("auto.commit.interval.ms", "1000");
  props.put("session.timeout.ms", "30000");
  props.put("max.partition.fetch.bytes", 10485760); //need this too
  props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
  return props;
}

回答by Nautilus

Jane, Don't use fetch.message.max.bytesfirst of all because that's a property that is from the consumer and doesn't go in the server.properties file and second because is for the old version of the consumer, instead use max.partition.fetch.byteswhen you create the Consumer as part of the properties that you use to instantiate it.

Jane,fetch.message.max.bytes首先不要使用,因为这是来自消费者的属性并且不会进入 server.properties 文件,其次是因为用于旧版本的消费者,而是max.partition.fetch.bytes在创建消费者时使用作为一部分用于实例化它的属性。

回答by really-okay-coder

The max.fetch.bytes option is also possible.

max.fetch.bytes 选项也是可能的。

回答by Thiago Falcao

You need to increase in server (as already described) and client side.

您需要增加服务器(如前所述)和客户端。

Example in Python using kafka-python Producer:

使用 kafka-python Producer 的 Python 示例:

producer = KafkaProducer(bootstrap_servers=brokers, max_request_size=1048576)

Increase max_request_size to desired value, default is 1048576.

将 max_request_size 增加到所需值,默认为 1048576。