将自定义 Java 对象发送到 Kafka 主题

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/41141924/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-11-03 05:42:34  来源:igfitidea点击:

Send Custom Java Objects to Kafka Topic

javaserializationapache-kafka

提问by Vinod Jayachandran

I have my custom Java Object and wish to leverage JVM's in built serialization to send it to a Kafka topic, but serialization fails with below error

我有我的自定义 Java 对象并希望利用内置序列化中的 JVM 将其发送到 Kafka 主题,但序列化失败并出现以下错误

org.apache.kafka.common.errors.SerializationException: Can't convert value of class com.spring.kafka.Payload to class org.apache.kafka.common.serialization.ByteArraySerializer specified in value.serializer

org.apache.kafka.common.errors.SerializationException:无法将类 com.spring.kafka.Payload 的值转换为 value.serializer 中指定的类 org.apache.kafka.common.serialization.ByteArraySerializer

Payload.java

有效载荷.java

public class Payload implements Serializable {

    private static final long serialVersionUID = 123L;

    private String name="vinod";

    private int anInt = 5;

    private Double aDouble = new Double("5.0");

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAnInt() {
        return anInt;
    }

    public void setAnInt(int anInt) {
        this.anInt = anInt;
    }

    public Double getaDouble() {
        return aDouble;
    }

    public void setaDouble(Double aDouble) {
        this.aDouble = aDouble;
    }

}

During my creation of producer, I have the following properties set

在我创建生产者的过程中,我设置了以下属性

<entry key="key.serializer"
                       value="org.apache.kafka.common.serialization.ByteArraySerializer" />
                <entry key="value.serializer"
                       value="org.apache.kafka.common.serialization.ByteArraySerializer" />

My send invoke is as below

我的发送调用如下

kafkaProducer.send(new ProducerRecord<String, Payload>("test", new Payload()));

What is correct way to send a custom java object through a producer to a kafka topic ?

通过生产者将自定义 java 对象发送到 kafka 主题的正确方法是什么?

回答by Vinod Jayachandran

We have 2 Options as listed below

我们有 2 个选项,如下所列

1) If we intend to send custom java objects to producer, We need to create a serializer which implements org.apache.kafka.common.serialization.Serializerand pass that Serializer class during creation of your producer

1)如果我们打算将自定义java对象发送给生产者,我们需要创建一个实现org.apache.kafka.common.serialization.Serializer的序列化程序,并在您的生产者创建过程中传递该序列化程序类

Code Reference below

下面的代码参考

public class PayloadSerializer implements org.apache.kafka.common.serialization.Serializer {

    public void configure(Map map, boolean b) {

    }

    public byte[] serialize(String s, Object o) {

       try {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            ObjectOutputStream oos = new ObjectOutputStream(baos);
            oos.writeObject(o);
            oos.close();
            byte[] b = baos.toByteArray();
            return b;
        } catch (IOException e) {
            return new byte[0];
        }
    }

    public void close() {

    }
}

And set the value serializer accordingly

并相应地设置值序列化器

<entry key="value.serializer"
                       value="com.spring.kafka.PayloadSerializer" />

2) No need to create custom serializer class. Use the existing ByteArraySerializer, but during send follow the process

2) 无需创建自定义序列化程序类。使用现有的 ByteArraySerializer,但在发送过程中遵循流程

Java Object -> String (Preferrably JSON represenation instead of toString)->byteArray

Java 对象 -> 字符串(最好用 JSON 表示而不是 toString)->byteArray

回答by Abhishek Kumar

Since you are using ByteArraySerializer,you need to instantiate a byte[] producer.

由于您正在使用ByteArraySerializer,您需要实例化一个 byte[] 生产者。

Producer<byte[],byte[]> producer = new KafkaProducer<>(props);

and then while producing pass the byte[] after serializing or some other method,for instance,

然后在序列化或其他方法后生成传递字节 [] 时,例如,

producer.send(new ProducerRecord<byte[],byte[]>("test", new Payload().toString().getBytes()));

If you are passing just a Payload Object to the producer then it will be better to have key serializer and value serializer as whatever you intend to pass and while reading you need to read from that data.

如果您仅将有效负载对象传递给生产者,那么最好将键序列化器和值序列化器作为您打算传递的任何内容,并且在读取时需要从该数据中读取。

It is good practice to use Serializable and ByteArraySerializer/ByteArrayDeserializer.

使用 Serializable 和 ByteArraySerializer/ByteArrayDeserializer 是一种很好的做法。