java 如何在 Kafka 中设置消息的大小?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/35691676/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How do I set the size of messages in Kafka?
提问by Jane Wayne
I'm currently using Kafka 0.9.0.1. According to some sources I've found, the way to set the sizes of messages is to modify the following key values in server.properties
.
我目前正在使用 Kafka 0.9.0.1。根据我发现的一些消息来源,设置消息大小的方法是修改server.properties
.
- message.max.bytes
- replica.fetch.max.bytes
- fetch.message.max.bytes
- 消息.max.bytes
- 副本.fetch.max.bytes
- fetch.message.max.bytes
My server.properties
file actually has these settings.
我的server.properties
文件实际上有这些设置。
message.max.bytes=10485760
replica.fetch.max.bytes=20971520
fetch.message.max.bytes=10485760
Other settings that may be relevant are below.
其他可能相关的设置如下。
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
However, when I attempt to send messages with payloads of 4 to 6 MB in size, the consumer never gets any messages. The producer seems to send the messages without any exceptions being thrown. If I do send smaller payloads (like < 1 MB) then the consumer does actually receive the messages.
但是,当我尝试发送大小为 4 到 6 MB 的有效负载的消息时,消费者永远不会收到任何消息。生产者似乎发送消息而没有抛出任何异常。如果我确实发送了较小的有效负载(例如 < 1 MB),那么消费者确实会收到消息。
Any idea on what I'm doing wrong in terms of configuration settings?
知道我在配置设置方面做错了什么吗?
Here is the example code to send a message.
这是发送消息的示例代码。
Producer<String, byte[]> producer = new KafkaProducer<>(getProducerProps());
File dir = new File("/path/to/dir");
for(String s : dir.list()) {
File f = new File(dir, s);
byte[] data = Files.readAllBytes(f.toPath());
Payload payload = new Payload(data); //a simple pojo to store payload
String key = String.valueOf(System.currentTimeMillis());
byte[] val = KryoUtil.toBytes(payload); //custom util to use kryo to get bytes[]
producer.send(new ProducerRecord<>("test", key, val));
}
producer.close();
Here is the example code to receive a message.
这是接收消息的示例代码。
KafkaConsumer consumer = new KafkaConsumer<>(getConsumerProps());
consumer.subscribe(Arrays.asList("test"));
while(true) {
ConsumerRecord<String, byte[]> records = consumer.poll(100);
for(ConsumerRecord<String, byte[]> record : records) {
long offset = record.offset();
String key = record.key();
byte[] val = record.value();
Payload payload = (Payload)KryoUtil.toObject(val, Payload.class); //custom util to use kryo to deserialize back to object
System.out.println(
System.format("offset=%d, key=%s", offset, key));
}
}
Here are the methods to populate the properties files for the producer and consumer.
以下是为生产者和消费者填充属性文件的方法。
public static Properties getProducerProps() {
Properties props = new Properties();
props.put("bootstrap.servers", "qc1:9092,qc2:9092,qc3:9092,qc4:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("compression.type", "snappy");
props.put("max.request.size", 10485760); //need this
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
return props;
}
public static Properties getConsumerProps() {
Properties props = new Properties();
props.put("bootstrap.servers", "qc1:9092,qc2:9092,qc3:9092,qc4:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("max.partition.fetch.bytes", 10485760); //need this too
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
return props;
}
回答by Nautilus
Jane,
Don't use fetch.message.max.bytes
first of all because that's a property that is from the consumer and doesn't go in the server.properties file and second because is for the old version of the consumer, instead
use max.partition.fetch.bytes
when you create the Consumer as part of the properties that you use to instantiate it.
Jane,fetch.message.max.bytes
首先不要使用,因为这是来自消费者的属性并且不会进入 server.properties 文件,其次是因为用于旧版本的消费者,而是max.partition.fetch.bytes
在创建消费者时使用作为一部分用于实例化它的属性。
回答by really-okay-coder
The max.fetch.bytes option is also possible.
max.fetch.bytes 选项也是可能的。
回答by Thiago Falcao
You need to increase in server (as already described) and client side.
您需要增加服务器(如前所述)和客户端。
Example in Python using kafka-python Producer:
使用 kafka-python Producer 的 Python 示例:
producer = KafkaProducer(bootstrap_servers=brokers, max_request_size=1048576)
Increase max_request_size to desired value, default is 1048576.
将 max_request_size 增加到所需值,默认为 1048576。