Java 简单的 Kafka 消费者示例不起作用
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/31745666/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Simple Kafka Consumer Example not working
提问by David
I have a simple class to consume messages from a kafka server. The majority of codes are copied from the comments of org.apache.kafka.clients.consumer.KafkaConsumer.java.
我有一个简单的类来使用来自 kafka 服务器的消息。大多数代码是从 org.apache.kafka.clients.consumer.KafkaConsumer.java 的注释中复制的。
public class Demo {
public static void main(String[] args) {
Properties props = new Properties();
props.put("metadata.broker.list", "192.168.144.10:29092");
props.put("group.id", "test");
props.put("session.timeout.ms", "1000");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "10000");
KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(props);
consumer.subscribe("voltdbexportAUDIT", "voltdbexportTEST");
boolean isRunning = true;
while (isRunning) {
Map<String, ConsumerRecords<byte[], byte[]>> records = consumer.poll(100);
process(records);
}
consumer.close();
}
private static Map<TopicPartition, Long> process(Map<String, ConsumerRecords<byte[], byte[]>> records) {
Map<TopicPartition, Long> processedOffsets = new HashMap<>();
for (Map.Entry<String, ConsumerRecords<byte[], byte[]>> recordMetadata : records.entrySet()) {
List<ConsumerRecord<byte[], byte[]>> recordsPerTopic = recordMetadata.getValue().records();
for (int i = 0; i < recordsPerTopic.size(); i++) {
ConsumerRecord<byte[], byte[]> record = recordsPerTopic.get(i);
// process record
try {
processedOffsets.put(record.topicAndPartition(), record.offset());
} catch (Exception e) {
e.printStackTrace();
}
}
}
return processedOffsets;
}
}
I am using 'org.apache.kafka:kafka-clients:0.8.2.0'. it throws exception
我正在使用“org.apache.kafka:kafka-clients:0.8.2.0”。它抛出异常
Exception in thread "main" org.apache.kafka.common.config.ConfigException: Missing required configuration "key.deserializer" which has no default value.
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:48)
at org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:194)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:430)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:413)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:400)
at kafka.integration.Demo.main(Demo.java:26)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
How should I config the key.deserializer?
我应该如何配置 key.deserializer?
回答by Chris Gerken
You need to set the properties:
您需要设置以下属性:
props.put("serializer.class","my.own.serializer.StringSupport");
props.put("key.serializer.class","my.own.serializer.LongSupport");
in your main method so that you pass them to the producer's constructor. Of course, you'd have to specify the right encoders. The serializer class converts the message into a byte array and the key.serializer class turn the key object into a byte array. Generally you'd also have them be able to reverse the process.
在您的主要方法中,以便您将它们传递给生产者的构造函数。当然,您必须指定正确的编码器。序列化器类将消息转换为字节数组,而 key.serializer 类将键对象转换为字节数组。通常,您还可以让他们能够逆转该过程。
回答by Slavek Tecl
This works out of the box without implementing your own serializers
这是开箱即用的,无需实现您自己的序列化程序
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("partition.assignment.strategy", "range");
回答by Vimal Dhaduk
You are dealing with byte array for key and value parameter. So Byte serializer and deserializer required.
您正在处理键和值参数的字节数组。所以需要字节序列化器和反序列化器。
you can add in properties,
您可以添加属性,
For Deserialize
对于反序列化
props.put("key.deserializer","org.apache.kafka.common.serialization.ByteArrayDeserializer");
For Serialize
用于序列化
props.put("value.deserializer","org.apache.kafka.common.serialization.ByteArraySerializer");
回答by Bart Theeten
Make sure you pass the string value of the deserialization class, rather than the class object (which was my mistake).
确保传递反序列化类的字符串值,而不是类对象(这是我的错误)。
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
Serdes.String().deserializer().getClass().getName());
When you forget the .getName()
you will get the same exception which in that case is misleading.
当您忘记时,.getName()
您将得到相同的异常,在这种情况下会产生误导。
回答by Giorgos Myrianthous
For keys use one of the following
对于键使用以下之一
String Key
字符串键
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
JSON Key
JSON 密钥
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
Avro Key
阿夫罗钥匙
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
ByteArray Key
字节数组键
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
Likewise, use one of the following for your value deserializer:
同样,对您的值解串器使用以下其中一种:
String Value
字符串值
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
JSON Value
JSON 值
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
Avro Value
Avro 价值
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
ByteArray Value
字节数组值
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
Note that for Avro deserialisers, you will need the following dependencies:
请注意,对于 Avro 反序列化器,您将需要以下依赖项:
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${confluent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>