java 如何在 kafka 中创建自定义序列化程序?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/40154086/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to create Custom serializer in kafka?
提问by Vimal Dhaduk
There is only few serializer available like,
只有很少的序列化程序可用,例如,
org.apache.kafka.common.serialization.StringSerializer
org.apache.kafka.common.serialization.StringSerializer
How can we create our own custom serializer ?
我们如何创建我们自己的自定义序列化程序?
回答by Luciano Afranllie
Here you have an example to use your own serializer/deserializer for the Kafka message value. For Kafka message key is the same thing.
这里有一个示例,可以将您自己的序列化器/反序列化器用于 Kafka 消息值。对于 Kafka 消息密钥是一样的。
We want to send a serialized version of MyMessage as Kafka value and deserialize it again into a MyMessage object at consumer side.
我们希望将 MyMessage 的序列化版本作为 Kafka 值发送,并再次将其反序列化为消费者端的 MyMessage 对象。
Serializing MyMessage in producer side.
在生产者端序列化 MyMessage。
You should create a serializer class that implements org.apache.kafka.common.serialization.Serializer
您应该创建一个实现 org.apache.kafka.common.serialization.Serializer 的序列化程序类
serialize()method do the work, receiving your object and returning a serialized version as bytes array.
serialize()方法完成工作,接收您的对象并将序列化版本作为字节数组返回。
public class MyValueSerializer implements Serializer<MyMessage>
{
private boolean isKey;
@Override
public void configure(Map<String, ?> configs, boolean isKey)
{
this.isKey = isKey;
}
@Override
public byte[] serialize(String topic, MyMessage message)
{
if (message == null) {
return null;
}
try {
(serialize your MyMessage object into bytes)
return bytes;
} catch (IOException | RuntimeException e) {
throw new SerializationException("Error serializing value", e);
}
}
@Override
public void close()
{
}
}
final IntegerSerializer keySerializer = new IntegerSerializer();
final MyValueSerializer myValueSerializer = new MyValueSerializer();
final KafkaProducer<Integer, MyMessage> producer = new KafkaProducer<>(props, keySerializer, myValueSerializer);
int messageNo = 1;
int kafkaKey = messageNo;
MyMessage kafkaValue = new MyMessage();
ProducerRecord producerRecord = new ProducerRecord<>(topic, kafkaKey, kafkaValue);
producer.send(producerRecord, new DemoCallBack(logTag, startTime, messageNo, strValue));
Deserializing MyMessage in consumer side.
在消费者端反序列化 MyMessage。
You should create a deserializer class that implements org.apache.kafka.common.serialization.Deserializer
您应该创建一个实现 org.apache.kafka.common.serialization.Deserializer 的反序列化器类
deserialize()method do the work, receiving serialized value as bytes array and returning your object.
deserialize()方法完成工作,接收序列化值作为字节数组并返回您的对象。
public class MyValueDeserializer implements Deserializer<MyMessage>
{
private boolean isKey;
@Override
public void configure(Map<String, ?> configs, boolean isKey)
{
this.isKey = isKey;
}
@Override
public MyMessage deserialize(String s, byte[] value)
{
if (value == null) {
return null;
}
try {
(deserialize value into your MyMessage object)
MyMessage message = new MyMessage();
return message;
} catch (IOException | RuntimeException e) {
throw new SerializationException("Error deserializing value", e);
}
}
@Override
public void close()
{
}
}
Then use it like this:
然后像这样使用它:
final IntegerDeserializer keyDeserializer = new IntegerDeserializer();
final MyValueDeserializer myValueDeserializer = new MyValueDeserializer();
final KafkaConsumer<Integer, MyMessage> consumer = new KafkaConsumer<>(props, keyDeserializer, myValueDeserializer);
ConsumerRecords<Integer, MyMessage> records = consumer.poll(1000);
for (ConsumerRecord<Integer, MyMessage> record : records) {
int kafkaKey = record.key();
MyMessage kafkaValue = record.value();
...
}
回答by Vladimir
No words, only code
没有文字,只有代码
Some object, which you sent to Kafka
import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import lombok.ToString; @Data @AllArgsConstructor @NoArgsConstructor @ToString public class TestDto { private String name; private String version; }
Create Serializer, which will be used by Producer
@Slf4j public class KafkaValueSerializer implements Serializer<TestDto> { private ObjectMapper objectMapper = new ObjectMapper(); @Override public void configure(Map<String, ?> configs, boolean isKey) { } @Override public byte[] serialize(String topic, TestDto data) { try { return objectMapper.writeValueAsBytes(data); } catch (JsonProcessingException e) { log.error("Unable to serialize object {}", data, e); return null; } } @Override public void close() { } }
Of couser, don't foget about Deserialiser for Consumer
@Slf4j public class KafkaValueDeserializer implements Deserializer<TestDto> { private ObjectMapper objectMapper = new ObjectMapper(); @Override public void configure(Map<String, ?> configs, boolean isKey) { } @Override public TestDto deserialize(String topic, byte[] data) { try { return objectMapper.readValue(new String(data, "UTF-8"), TestDto.class); } catch (Exception e) { log.error("Unable to deserialize message {}", data, e); return null; } } @Override public void close() { } }
Last moment, add serializer/deserializer to application.yml
spring: kafka: bootstrap-servers: 192.168.192.168:9092 producer: value-serializer: com.package.service.kafka.KafkaValueSerializer consumer: group-id: groupId value-deserializer: com.package.service.kafka.KafkaValueDeserializer
你发送给 Kafka 的一些对象
import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import lombok.ToString; @Data @AllArgsConstructor @NoArgsConstructor @ToString public class TestDto { private String name; private String version; }
创建Serializer,Producer会用到
@Slf4j public class KafkaValueSerializer implements Serializer<TestDto> { private ObjectMapper objectMapper = new ObjectMapper(); @Override public void configure(Map<String, ?> configs, boolean isKey) { } @Override public byte[] serialize(String topic, TestDto data) { try { return objectMapper.writeValueAsBytes(data); } catch (JsonProcessingException e) { log.error("Unable to serialize object {}", data, e); return null; } } @Override public void close() { } }
当然,不要忘记消费者的反序列化器
@Slf4j public class KafkaValueDeserializer implements Deserializer<TestDto> { private ObjectMapper objectMapper = new ObjectMapper(); @Override public void configure(Map<String, ?> configs, boolean isKey) { } @Override public TestDto deserialize(String topic, byte[] data) { try { return objectMapper.readValue(new String(data, "UTF-8"), TestDto.class); } catch (Exception e) { log.error("Unable to deserialize message {}", data, e); return null; } } @Override public void close() { } }
最后,将序列化器/反序列化器添加到application.yml
spring: kafka: bootstrap-servers: 192.168.192.168:9092 producer: value-serializer: com.package.service.kafka.KafkaValueSerializer consumer: group-id: groupId value-deserializer: com.package.service.kafka.KafkaValueDeserializer
That's all. It's not necessary any configuration file or dancing with a tambtheitroadine :)
就这样。不需要任何配置文件或与铃鼓跳舞:)
Send
KafkaTemplate<String, TestDto> kafkaTemplate; TestDto test = new TestDto("test name", "test-version"); kafkaTemplate.send(topic, testDto);
Listen
@KafkaListener(topics = "${ktp-agent.kafka.request-topic}", groupId = "${spring.kafka.consumer.group-id}") public void listen(TestDto message) { log.info("Received message '{}' from Kafka.", message.toString()); }
发送
KafkaTemplate<String, TestDto> kafkaTemplate; TestDto test = new TestDto("test name", "test-version"); kafkaTemplate.send(topic, testDto);
听
@KafkaListener(topics = "${ktp-agent.kafka.request-topic}", groupId = "${spring.kafka.consumer.group-id}") public void listen(TestDto message) { log.info("Received message '{}' from Kafka.", message.toString()); }
回答by amethystic
You must create your own serializer which implements the interface Serializer
(org.apache.kafka.common.serialization.Serializer
) and then set the producer option key.serializer
/ value.serializer
to it.
您必须创建自己的序列化程序来实现接口Serializer
( org.apache.kafka.common.serialization.Serializer
),然后将生产者选项key.serializer
/设置value.serializer
为它。