Java 编写自定义 Kafka 序列化程序
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/23755976/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Writing Custom Kafka Serializer
提问by Paaji
I am using my own class in a Kafka message which has a bunch of String data types.
我在具有一堆 String 数据类型的 Kafka 消息中使用我自己的类。
I therefore cannot use the default serializer class or the StringSerializer
that comes with Kafka library.
因此,我不能使用默认的序列化程序类或StringSerializer
Kafka 库附带的。
I guess I need to write my own serializer and feed it to the producer properties?
我想我需要编写自己的序列化程序并将其提供给生产者属性?
回答by Sam Berry
EDIT
编辑
In newer Kafka Clients, implement Serializer
rather than Encoder
.
在较新的 Kafka 客户端中,实现Serializer
而不是Encoder
.
The things required for writing a custom serializer are:
编写自定义序列化程序所需的东西是:
- Implement
Encoder
with an object specified for the generic- Supplying a
VerifiableProperties
constructor is required
- Supplying a
- Override
toBytes(...)
method making sure a byte array is returned - Inject the serializer class into
ProducerConfig
Encoder
使用为泛型指定的对象 实现- 需要提供
VerifiableProperties
构造函数
- 需要提供
toBytes(...)
确保返回字节数组的覆盖方法- 将序列化器类注入
ProducerConfig
Declaring a custom serializer for a producer
为生产者声明自定义序列化程序
As you noted in your question, Kafka supplies a means to declare a specific serializer for a producer. The serializer class is set in a ProducerConfig
instance and that instance is used to construct the desired Producer
class.
正如您在问题中所指出的,Kafka 提供了一种为生产者声明特定序列化程序的方法。序列化器类设置在一个ProducerConfig
实例中,该实例用于构造所需的Producer
类。
If you follow Kafka's Producer Exampleyou will construct ProducerConfig
via a Properties
object. When building your properties file be sure to include:
如果您遵循Kafka 的 Producer Example,您将ProducerConfig
通过一个Properties
对象进行构建。构建属性文件时,请确保包括:
props.put("serializer.class", "path.to.your.CustomSerializer");
With the path to the class you want Kafka to use to serialize messages before appending them to the log.
使用您希望 Kafka 在将消息附加到日志之前用于序列化消息的类的路径。
Creating a custom serializer that Kafka understands
创建 Kafka 理解的自定义序列化程序
Writing a custom serializer that Kafka can properly interpret requires implementing the Encoder[T]
scala class that Kafka provides. Implementing traits in java is weird, but the following method worked for serializing JSON in my project:
编写 Kafka 可以正确解释的自定义序列化程序需要实现Encoder[T]
Kafka 提供的scala 类。在 java 中实现 trait 很奇怪,但以下方法适用于在我的项目中序列化 JSON:
public class JsonEncoder implements Encoder<Object> {
private static final Logger logger = Logger.getLogger(JsonEncoder.class);
// instantiating ObjectMapper is expensive. In real life, prefer injecting the value.
private static final ObjectMapper objectMapper = new ObjectMapper();
public JsonEncoder(VerifiableProperties verifiableProperties) {
/* This constructor must be present for successful compile. */
}
@Override
public byte[] toBytes(Object object) {
try {
return objectMapper.writeValueAsString(object).getBytes();
} catch (JsonProcessingException e) {
logger.error(String.format("Json processing failed for object: %s", object.getClass().getName()), e);
}
return "".getBytes();
}
}
Your question makes it sound like you are using one object (lets call it CustomMessage
) for all messages appended to your log. If that's the case, your serializer could look more like this:
您的问题听起来好像您正在CustomMessage
为附加到日志的所有消息使用一个对象(让我们称之为)。如果是这种情况,您的序列化程序可能看起来更像这样:
package com.project.serializer;
public class CustomMessageEncoder implements Encoder<CustomMessage> {
public CustomMessageEncoder(VerifiableProperties verifiableProperties) {
/* This constructor must be present for successful compile. */
}
@Override
public byte[] toBytes(CustomMessage customMessage) {
return customMessage.toBytes();
}
}
Which would leave your property config to look like this:
这将使您的属性配置看起来像这样:
props.put("serializer.class", "path.to.your.CustomSerializer");
回答by Harvinder Singh
You need to implement both encode and decoder
您需要同时实现编码器和解码器
public class JsonEncoder implements Encoder<Object> {
private static final Logger LOGGER = Logger.getLogger(JsonEncoder.class);
public JsonEncoder(VerifiableProperties verifiableProperties) {
/* This constructor must be present for successful compile. */
}
@Override
public byte[] toBytes(Object object) {
ObjectMapper objectMapper = new ObjectMapper();
try {
return objectMapper.writeValueAsString(object).getBytes();
} catch (JsonProcessingException e) {
LOGGER.error(String.format("Json processing failed for object: %s", object.getClass().getName()), e);
}
return "".getBytes();
}
}
The decoder code
解码器代码
public class JsonDecoder implements Decoder<Object> {
private static final Logger LOGGER = Logger.getLogger(JsonEncoder.class);
public JsonDecoder(VerifiableProperties verifiableProperties) {
/* This constructor must be present for successful compile. */
}
@Override
public Object fromBytes(byte[] bytes) {
ObjectMapper objectMapper = new ObjectMapper();
try {
return objectMapper.readValue(bytes, Map.class);
} catch (IOException e) {
LOGGER.error(String.format("Json processing failed for object: %s", bytes.toString()), e);
}
return null;
}
}
The pom entry
pom 条目
<dependency>
<groupId>com.fasterxml.Hymanson.core</groupId>
<artifactId>Hymanson-databind</artifactId>
<version>2.4.1.3</version>
</dependency>
Set the default encoder in the Kafka property
在 Kafka 属性中设置默认编码器
properties.put("serializer.class","kafka.serializer.DefaultEncoder");
The writer and reader code is as follows
写入器和读取器代码如下
byte[] bytes = encoder.toBytes(map);
KeyedMessage<String, byte[]> message =new KeyedMessage<String, byte[]>(this.topic, bytes);
JsonDecoder decoder = new JsonDecoder(null);
Map map = (Map) decoder.fromBytes(it.next().message());
回答by OneCricketeer
I therefore cannot use the default serializer class or the StringSerializer that comes with Kafka library.
因此,我无法使用 Kafka 库附带的默认序列化程序类或 StringSerializer。
Sure you can.
你当然可以。
For example, use Hymanson or Gson ; convert your instance into a JSON string or (preferrably) binary bytearray, then use one of the built-in Kafka serializers.
例如,使用 Hymanson 或 Gson ;将您的实例转换为 JSON 字符串或(最好)二进制 bytearray,然后使用内置的 Kafka 序列化程序之一。
Other options
其他选项
Recommended
受到推崇的
Use the Confluent versions of Avro or Protobuf serializers along with the Schema Registry for your class.
将 Avro 或 Protobuf 序列化程序的 Confluent 版本与 Schema Registry 一起用于您的类。
You could also just use ByteArraySerializer
if you write your class to a ObjectOutputStream
(however, that is not recommended due to it not being cross-language supported).
ByteArraySerializer
如果您将类编写为 a ObjectOutputStream
(但是,由于不支持跨语言,不建议这样做),您也可以使用它。