java Spring Kafka:ApplicationContext 中不同对象的多个侦听器
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/43132733/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Spring Kafka: Multiple Listeners for different objects within an ApplicationContext
提问by yfl
Can I please check with the community what is the best way to listen to multiple topics, with each topic containing a message of a different class?
我能否请教社区,听多个主题的最佳方式是什么,每个主题包含不同类别的消息?
I've been playing around with Spring Kafka for the past couple of days. My thought process so far:
过去几天我一直在玩 Spring Kafka。到目前为止我的思考过程:
Because you need to pass your deserializer into DefaultKafkaConsumerFactory when initializing a KafkaListenerContainerFactory. This seems to indicate that if I need multiple containers each deserializing a message of a different type, I will not be able to use the @EnableKafka and @KafkaListener annotations.
This leads me to think that the only way to do so would be to instantiate multiple KafkaMessageListenerContainers.
And given that KafkaMessageListenerContainers is single threaded and I need to listen to multiple topics at the same time, I really should be using multiple ConcurrentKafkaMessageListenerContainers.
因为在初始化 KafkaListenerContainerFactory 时,您需要将反序列化器传递给 DefaultKafkaConsumerFactory。这似乎表明,如果我需要多个容器,每个容器反序列化不同类型的消息,我将无法使用 @EnableKafka 和 @KafkaListener 注释。
这让我认为唯一的方法是实例化多个 KafkaMessageListenerContainers。
考虑到 KafkaMessageListenerContainers 是单线程的,我需要同时收听多个主题,我真的应该使用多个 ConcurrentKafkaMessageListenerContainers。
Would I be on the right track here? Is there a better way to do this?
我会在正确的轨道上吗?有一个更好的方法吗?
Thanks!
谢谢!
回答by Gordon Ma
Here is a very simple example.
这是一个非常简单的例子。
// -----------------------------------------------
// Sender
// -----------------------------------------------
@Configuration
public class SenderConfig {
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
......
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, Class1> producerFactory1() {
return new DefaultKafkaProducerFactory<String, Class1>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Class1> kafkaTemplate1() {
return new KafkaTemplate<>(producerFactory1());
}
@Bean
public Sender1 sender1() {
return new Sender1();
}
//-------- send the second class --------
@Bean
public ProducerFactory<String, Class2> producerFactory2() {
return new DefaultKafkaProducerFactory<String, Class2>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Class2> kafkaTemplate2() {
return new KafkaTemplate<>(producerFactory2());
}
@Bean
public Sender2 sender2() {
return new Sender2();
}
}
public class Sender1 {
@Autowired
private KafkaTemplate<String, Class1> kafkaTemplate1;
public void send(String topic, Class1 c1) {
kafkaTemplate1.send(topic, c1);
}
}
public class Sender2 {
@Autowired
private KafkaTemplate<String, Class2> kafkaTemplate2;
public void send(String topic, Class2 c2) {
kafkaTemplate2.send(topic, c2);
}
}
// -----------------------------------------------
// Receiver
// -----------------------------------------------
@Configuration
@EnableKafka
public class ReceiverConfig {
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
......
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return props;
}
@Bean
public ConsumerFactory<String, Class1> consumerFactory1() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
new JsonDeserializer<>(Class1.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Class1> kafkaListenerContainerFactory1() {
ConcurrentKafkaListenerContainerFactory<String, Class1> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory1());
return factory;
}
@Bean
public Receiver1 receiver1() {
return new Receiver1();
}
//-------- add the second listener
@Bean
public ConsumerFactory<String, Class2> consumerFactory2() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
new JsonDeserializer<>(Class2.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Class2> kafkaListenerContainerFactory2() {
ConcurrentKafkaListenerContainerFactory<String, Class2> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory2());
return factory;
}
@Bean
public Receiver2 receiver2() {
return new Receiver2();
}
}
public class Receiver1 {
@KafkaListener(id="listener1", topics = "topic1", containerFactory = "kafkaListenerContainerFactory1")
public void receive(Class1 c1) {
LOGGER.info("Received c1");
}
}
public class Receiver2 {
@KafkaListener(id="listener2", topics = "topic2", containerFactory = "kafkaListenerContainerFactory2")
public void receive(Class2 c2) {
LOGGER.info("Received c2");
}
}
回答by Gary Russell
You can use the annotations, you would just need to use a different listener container factory for each.
您可以使用注释,您只需要为每个注释使用不同的侦听器容器工厂。
The framework will create a listener container for each annotation.
该框架将为每个注释创建一个侦听器容器。
You can also listen to multiple topics on a single-threaded container but they would be processed, er, on a single thread.
您还可以在单线程容器上收听多个主题,但它们将在单个线程上进行处理,呃。
Take a look at the code from my SpringOne Platform talk last year- you might want to look at app6, which shows how to use a MessageConverter
instead of a deserializer, which might help simplify your configuration.
看看我去年 SpringOne 平台演讲中的代码- 您可能想看看 app6,它展示了如何使用 aMessageConverter
而不是反序列化器,这可能有助于简化您的配置。
回答by caovn
I would like to use the code below to apply your sence
我想用下面的代码来应用你的意思
@Configuration
@EnableKafka
public class ConsumerConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${kafka.group-id}")
private String groupId;
/**
* Configuration of Consumer properties.
*
* @return
*/
//@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
return props;
}
//@Bean
public ConsumerFactory<String, ClassA> consumerFactory1() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
new ClassA());
}
/**
* Kafka Listener Container Factory.
* @return
*/
@Bean("kafkaListenerContainerFactory1")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, ClassA>> kafkaListenerContainerFactory1() {
ConcurrentKafkaListenerContainerFactory<String, ClassA> factory;
factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory1());
return factory;
}
//@Bean
public ConsumerFactory<String, ClassB> consumerFactory2() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
new ClassB());
}
/**
* Kafka Listener Container Factory.
* @return
*/
@Bean("kafkaListenerContainerFactory2")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, ClassB>> kafkaListenerContainerFactory2() {
ConcurrentKafkaListenerContainerFactory<String, ClassB> factory;
factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory2());
return factory;
}
@Bean
public ReceiverClass receiver() {
return new ReceiverClass();
}
class ReceiverClass {
@KafkaListener(topics = "topic1", group = "group-id-test",
containerFactory = "kafkaListenerContainerFactory1")
public void receiveTopic1(ClassA a) {
System.out.println("ReceiverClass.receive() ClassA : " + a);
}
@KafkaListener(topics = "topic2", group = "group-id-test",
containerFactory = "kafkaListenerContainerFactory2")
public void receiveTopic2(ClassB b) {
System.out.println("ReceiverClass.receive() Classb : " + b);
}
}
class ClassB implements Deserializer {
@Override
public void configure(Map configs, boolean isKey) {
// TODO Auto-generated method stub
}
@Override
public Object deserialize(String topic, byte[] data) {
// TODO Auto-generated method stub
return null;
}
@Override
public void close() {
// TODO Auto-generated method stub
}
}
class ClassA implements Deserializer {
@Override
public void configure(Map configs, boolean isKey) {
// TODO Auto-generated method stub
}
@Override
public Object deserialize(String topic, byte[] data) {
// TODO Auto-generated method stub
return null;
}
@Override
public void close() {
// TODO Auto-generated method stub
}
}
}