Java 编写自定义 Kafka 序列化程序

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/23755976/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-14 01:02:35  来源:igfitidea点击:

Writing Custom Kafka Serializer

javascalaserializationapache-kafka

提问by Paaji

I am using my own class in a Kafka message which has a bunch of String data types.

我在具有一堆 String 数据类型的 Kafka 消息中使用我自己的类。

I therefore cannot use the default serializer class or the StringSerializerthat comes with Kafka library.

因此,我不能使用默认的序列化程序类或StringSerializerKafka 库附带的。

I guess I need to write my own serializer and feed it to the producer properties?

我想我需要编写自己的序列化程序并将其提供给生产者属性?

回答by Sam Berry

EDIT

编辑

In newer Kafka Clients, implement Serializerrather than Encoder.

在较新的 Kafka 客户端中,实现Serializer而不是Encoder.



The things required for writing a custom serializer are:

编写自定义序列化程序所需的东西是:

  1. Implement Encoderwith an object specified for the generic
    • Supplying a VerifiablePropertiesconstructor is required
  2. Override toBytes(...)method making sure a byte array is returned
  3. Inject the serializer class into ProducerConfig
  1. Encoder使用为泛型指定的对象 实现
    • 需要提供VerifiableProperties构造函数
  2. toBytes(...)确保返回字节数组的覆盖方法
  3. 将序列化器类注入 ProducerConfig

Declaring a custom serializer for a producer

为生产者声明自定义序列化程序

As you noted in your question, Kafka supplies a means to declare a specific serializer for a producer. The serializer class is set in a ProducerConfiginstance and that instance is used to construct the desired Producerclass.

正如您在问题中所指出的,Kafka 提供了一种为生产者声明特定序列化程序的方法。序列化器类设置在一个ProducerConfig实例中,该实例用于构造所需的Producer类。

If you follow Kafka's Producer Exampleyou will construct ProducerConfigvia a Propertiesobject. When building your properties file be sure to include:

如果您遵循Kafka 的 Producer Example,您将ProducerConfig通过一个Properties对象进行构建。构建属性文件时,请确保包括:

props.put("serializer.class", "path.to.your.CustomSerializer");

With the path to the class you want Kafka to use to serialize messages before appending them to the log.

使用您希望 Kafka 在将消息附加到日志之前用于序列化消息的类的路径。

Creating a custom serializer that Kafka understands

创建 Kafka 理解的自定义序列化程序

Writing a custom serializer that Kafka can properly interpret requires implementing the Encoder[T]scala class that Kafka provides. Implementing traits in java is weird, but the following method worked for serializing JSON in my project:

编写 Kafka 可以正确解释的自定义序列化程序需要实现Encoder[T]Kafka 提供的scala 类。在 java 中实现 trait 很奇怪,但以下方法适用于在我的项目中序列化 JSON:

public class JsonEncoder implements Encoder<Object> {
    private static final Logger logger = Logger.getLogger(JsonEncoder.class);
    // instantiating ObjectMapper is expensive. In real life, prefer injecting the value.
    private static final ObjectMapper objectMapper = new ObjectMapper();

    public JsonEncoder(VerifiableProperties verifiableProperties) {
        /* This constructor must be present for successful compile. */
    }

    @Override
    public byte[] toBytes(Object object) {
        try {
            return objectMapper.writeValueAsString(object).getBytes();
        } catch (JsonProcessingException e) {
            logger.error(String.format("Json processing failed for object: %s", object.getClass().getName()), e);
        }
        return "".getBytes();
    }
}

Your question makes it sound like you are using one object (lets call it CustomMessage) for all messages appended to your log. If that's the case, your serializer could look more like this:

您的问题听起来好像您正在CustomMessage为附加到日志的所有消息使用一个对象(让我们称之为)。如果是这种情况,您的序列化程序可能看起来更像这样:

package com.project.serializer;

public class CustomMessageEncoder implements Encoder<CustomMessage> {
    public CustomMessageEncoder(VerifiableProperties verifiableProperties) {
        /* This constructor must be present for successful compile. */
    }

    @Override
    public byte[] toBytes(CustomMessage customMessage) {
        return customMessage.toBytes();
    }
}

Which would leave your property config to look like this:

这将使您的属性配置看起来像这样:

props.put("serializer.class", "path.to.your.CustomSerializer");

回答by Harvinder Singh

You need to implement both encode and decoder

您需要同时实现编码器和解码器

public class JsonEncoder implements Encoder<Object> {
        private static final Logger LOGGER = Logger.getLogger(JsonEncoder.class);

        public JsonEncoder(VerifiableProperties verifiableProperties) {
            /* This constructor must be present for successful compile. */
        }

        @Override
        public byte[] toBytes(Object object) {
            ObjectMapper objectMapper = new ObjectMapper();
            try {
                return objectMapper.writeValueAsString(object).getBytes();
            } catch (JsonProcessingException e) {
                LOGGER.error(String.format("Json processing failed for object: %s", object.getClass().getName()), e);
            }
            return "".getBytes();
        }
    }

The decoder code

解码器代码

public class JsonDecoder  implements Decoder<Object> {
    private static final Logger LOGGER = Logger.getLogger(JsonEncoder.class);
    public JsonDecoder(VerifiableProperties verifiableProperties) {
        /* This constructor must be present for successful compile. */
    }

    @Override
    public Object fromBytes(byte[] bytes) {
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            return objectMapper.readValue(bytes, Map.class);
        } catch (IOException e) {
            LOGGER.error(String.format("Json processing failed for object: %s", bytes.toString()), e);
        }
        return null;
    }
}

The pom entry

pom 条目

<dependency>
    <groupId>com.fasterxml.Hymanson.core</groupId>
    <artifactId>Hymanson-databind</artifactId>
    <version>2.4.1.3</version>
</dependency>

Set the default encoder in the Kafka property

在 Kafka 属性中设置默认编码器

properties.put("serializer.class","kafka.serializer.DefaultEncoder");

The writer and reader code is as follows

写入器和读取器代码如下

byte[] bytes = encoder.toBytes(map);
        KeyedMessage<String, byte[]> message =new KeyedMessage<String, byte[]>(this.topic, bytes);

JsonDecoder decoder = new JsonDecoder(null);
Map map = (Map) decoder.fromBytes(it.next().message());

回答by OneCricketeer

I therefore cannot use the default serializer class or the StringSerializer that comes with Kafka library.

因此,我无法使用 Kafka 库附带的默认序列化程序类或 StringSerializer。

Sure you can.

你当然可以。

For example, use Hymanson or Gson ; convert your instance into a JSON string or (preferrably) binary bytearray, then use one of the built-in Kafka serializers.

例如,使用 Hymanson 或 Gson ;将您的实例转换为 JSON 字符串或(最好)二进制 bytearray,然后使用内置的 Kafka 序列化程序之一。

Other options

其他选项

Recommended

受到推崇的

Use the Confluent versions of Avro or Protobuf serializers along with the Schema Registry for your class.

将 Avro 或 Protobuf 序列化程序的 Confluent 版本与 Schema Registry 一起用于您的类。



You could also just use ByteArraySerializerif you write your class to a ObjectOutputStream(however, that is not recommended due to it not being cross-language supported).

ByteArraySerializer如果您将类编写为 a ObjectOutputStream(但是,由于不支持跨语言,不建议这样做),您也可以使用它。