Java 简单的 Kafka 消费者示例不起作用

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/31745666/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-11 11:33:25  来源:igfitidea点击:

Simple Kafka Consumer Example not working

javaapache-kafkakafka-consumer-api

提问by David

I have a simple class to consume messages from a kafka server. The majority of codes are copied from the comments of org.apache.kafka.clients.consumer.KafkaConsumer.java.

我有一个简单的类来使用来自 kafka 服务器的消息。大多数代码是从 org.apache.kafka.clients.consumer.KafkaConsumer.java 的注释中复制的。

public class Demo {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("metadata.broker.list", "192.168.144.10:29092");
        props.put("group.id", "test");
        props.put("session.timeout.ms", "1000");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "10000");
        KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(props);
        consumer.subscribe("voltdbexportAUDIT", "voltdbexportTEST");
        boolean isRunning = true;
        while (isRunning) {
            Map<String, ConsumerRecords<byte[], byte[]>> records = consumer.poll(100);
            process(records);
        }
        consumer.close();
    }

    private static Map<TopicPartition, Long> process(Map<String, ConsumerRecords<byte[], byte[]>> records) {
        Map<TopicPartition, Long> processedOffsets = new HashMap<>();
        for (Map.Entry<String, ConsumerRecords<byte[], byte[]>> recordMetadata : records.entrySet()) {
            List<ConsumerRecord<byte[], byte[]>> recordsPerTopic = recordMetadata.getValue().records();
            for (int i = 0; i < recordsPerTopic.size(); i++) {
                ConsumerRecord<byte[], byte[]> record = recordsPerTopic.get(i);
                // process record
                try {
                    processedOffsets.put(record.topicAndPartition(), record.offset());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
        return processedOffsets;
    }
}

I am using 'org.apache.kafka:kafka-clients:0.8.2.0'. it throws exception

我正在使用“org.apache.kafka:kafka-clients:0.8.2.0”。它抛出异常

Exception in thread "main" org.apache.kafka.common.config.ConfigException: Missing required configuration "key.deserializer" which has no default value.
    at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)
    at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:48)
    at org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:194)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:430)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:413)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:400)
    at kafka.integration.Demo.main(Demo.java:26)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)

How should I config the key.deserializer?

我应该如何配置 key.deserializer?

回答by Chris Gerken

You need to set the properties:

您需要设置以下属性:

props.put("serializer.class","my.own.serializer.StringSupport");
props.put("key.serializer.class","my.own.serializer.LongSupport");

in your main method so that you pass them to the producer's constructor. Of course, you'd have to specify the right encoders. The serializer class converts the message into a byte array and the key.serializer class turn the key object into a byte array. Generally you'd also have them be able to reverse the process.

在您的主要方法中,以便您将它们传递给生产者的构造函数。当然,您必须指定正确的编码器。序列化器类将消息转换为字节数组,而 key.serializer 类将键对象转换为字节数组。通常,您还可以让他们能够逆转该过程。

回答by Slavek Tecl

This works out of the box without implementing your own serializers

这是开箱即用的,无需实现您自己的序列化程序

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");  
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("partition.assignment.strategy", "range");

回答by Vimal Dhaduk

You are dealing with byte array for key and value parameter. So Byte serializer and deserializer required.

您正在处理键和值参数的字节数组。所以需要字节序列化器和反序列化器。

you can add in properties,

您可以添加属性,

For Deserialize

对于反序列化

props.put("key.deserializer","org.apache.kafka.common.serialization.ByteArrayDeserializer");  

For Serialize

用于序列化

props.put("value.deserializer","org.apache.kafka.common.serialization.ByteArraySerializer");

回答by Bart Theeten

Make sure you pass the string value of the deserialization class, rather than the class object (which was my mistake).

确保传递反序列化类的字符串值,而不是类对象(这是我的错误)。

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
          Serdes.String().deserializer().getClass().getName());

When you forget the .getName()you will get the same exception which in that case is misleading.

当您忘记时,.getName()您将得到相同的异常,在这种情况下会产生误导。

回答by Giorgos Myrianthous

For keys use one of the following

对于键使用以下之一

String Key

字符串键

properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

JSON Key

JSON 密钥

properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

Avro Key

阿夫罗钥匙

properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);

ByteArray Key

字节数组键

properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);


Likewise, use one of the following for your value deserializer:

同样,对您的值解串器使用以下其中一种:

String Value

字符串值

properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

JSON Value

JSON 值

properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

Avro Value

Avro 价值

properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);

ByteArray Value

字节数组值

properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);


Note that for Avro deserialisers, you will need the following dependencies:

请注意,对于 Avro 反序列化器,您将需要以下依赖项:

<dependency> 
    <groupId>io.confluent</groupId> 
    <artifactId>kafka-avro-serializer</artifactId> 
    <version>${confluent.version}</version> 
</dependency> 

<dependency> 
    <groupId>org.apache.avro</groupId> 
    <artifactId>avro</artifactId> 
    <version>${avro.version}</version> 
</dependency>