Java Spring RabbitMQ - 在具有 @RabbitListener 配置的服务上使用手动通道确认
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/38728668/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Spring RabbitMQ - using manual channel acknowledgement on a service with @RabbitListener configuration
提问by Guru
How to acknowledge the messages manually without using auto acknowledgement.
Is there a way to use this along with the @RabbitListener
and @EnableRabbit
style of configuration.
Most of the documentation tells us to use SimpleMessageListenerContainer
along with ChannelAwareMessageListener
.
However using that we lose the flexibility that is provided with the annotations.
I have configured my service as below :
如何在不使用自动确认的情况下手动确认消息。有没有办法将它与配置@RabbitListener
和@EnableRabbit
样式一起使用。大多数文档都告诉我们SimpleMessageListenerContainer
与ChannelAwareMessageListener
. 然而,使用它我们失去了注释提供的灵活性。我已将我的服务配置如下:
@Service
public class EventReceiver {
@Autowired
private MessageSender messageSender;
@RabbitListener(queues = "${eventqueue}")
public void receiveMessage(Order order) throws Exception {
// code for processing order
}
My RabbitConfiguration is as below
我的RabbitConfiguration如下
@EnableRabbit
public class RabbitApplication implements RabbitListenerConfigurer {
public static void main(String[] args) {
SpringApplication.run(RabbitApplication.class, args);
}
@Bean
public MappingHymanson2MessageConverter Hymanson2Converter() {
MappingHymanson2MessageConverter converter = new MappingHymanson2MessageConverter();
return converter;
@Bean
public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(rabbitConnectionFactory());
factory.setMaxConcurrentConsumers(5);
factory.setMessageConverter((MessageConverter) Hymanson2Converter());
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
@Bean
public ConnectionFactory rabbitConnectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost("localhost");
return connectionFactory;
}
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
registrar.setContainerFactory(myRabbitListenerContainerFactory());
}
@Autowired
private EventReceiver receiver;
}
}
Any help will be appreciated on how to adapt manual channel acknowledgement along with the above style of configuration. If we implement the ChannelAwareMessageListener then the onMessage signature will change. Can we implement ChannelAwareMessageListener on a service ?
关于如何调整手动通道确认以及上述配置风格的任何帮助将不胜感激。如果我们实现 ChannelAwareMessageListener 那么 onMessage 签名将会改变。我们可以在服务上实现 ChannelAwareMessageListener 吗?
采纳答案by Gary Russell
Add the Channel
to the @RabbitListener
method...
添加Channel
到@RabbitListener
方法...
@RabbitListener(queues = "${eventqueue}")
public void receiveMessage(Order order, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
...
}
and use the tag in the basicAck
, basicReject
.
并使用了标签basicAck
,basicReject
。
EDIT
编辑
@SpringBootApplication
@EnableRabbit
public class So38728668Application {
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(So38728668Application.class, args);
context.getBean(RabbitTemplate.class).convertAndSend("", "so38728668", "foo");
context.getBean(Listener.class).latch.await(60, TimeUnit.SECONDS);
context.close();
}
@Bean
public Queue so38728668() {
return new Queue("so38728668");
}
@Bean
public Listener listener() {
return new Listener();
}
public static class Listener {
private final CountDownLatch latch = new CountDownLatch(1);
@RabbitListener(queues = "so38728668")
public void receive(String payload, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
throws IOException {
System.out.println(payload);
channel.basicAck(tag, false);
latch.countDown();
}
}
}
application.properties:
应用程序属性:
spring.rabbitmq.listener.acknowledge-mode=manual
回答by Guru
Thanks for gary's help. I finally solved the issue. I am documenting this for the benefit of others. This needs to be documented as part of standard documentation in Spring AMQP reference documentation page. Service class is as below.
感谢加里的帮助。我终于解决了这个问题。为了他人的利益,我正在记录这一点。这需要在 Spring AMQP 参考文档页面中作为标准文档的一部分进行记录。服务等级如下。
@Service
public class Consumer {
@RabbitListener(queues = "${eventqueue}")
public void receiveMessage(Order order, Channel channel) throws Exception {
// the above methodname can be anything but should have channel as second signature
channel.basicConsume(eventQueue, false, channel.getDefaultConsumer());
// Get the delivery tag
long deliveryTag = channel.basicGet(eventQueue, false).getEnvelope().getDeliveryTag();
try {
// code for processing order
catch(Exception) {
// handle exception
channel.basicReject(deliveryTag, true);
}
// If all logic is successful
channel.basicAck(deliveryTag, false);
}
the configuration has also been modified as below
配置也被修改如下
public class RabbitApplication implements RabbitListenerConfigurer {
private static final Logger log = LoggerFactory.getLogger(RabbitApplication .class);
public static void main(String[] args) {
SpringApplication.run(RabbitApplication.class, args);
}
@Bean
public MappingHymanson2MessageConverter Hymanson2Converter() {
MappingHymanson2MessageConverter converter = new MappingHymanson2MessageConverter();
return converter;
}
@Bean
public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setMessageConverter(Hymanson2Converter());
return factory;
}
@Autowired
private Consumer consumer;
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
}
...
}
Note: no need to configure Rabbitconnectionfactory or containerfactor etc since the annotation implicity take care of all this.
注意:不需要配置 Rabbitconnectionfactory 或 containerfactor 等,因为注释隐含地处理了所有这些。
回答by Pari Ngang
Just in case you need to use #onMessage() from ChannelAwareMessageListener class. Then you can do it this way.
以防万一您需要使用 ChannelAwareMessageListener 类中的 #onMessage()。那么你可以这样做。
@Component
public class MyMessageListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) {
log.info("Message received.");
// do something with the message
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
}
And for the rabbitConfiguration
而对于rabbitConfiguration
@Configuration
public class RabbitConfig {
public static final String topicExchangeName = "exchange1";
public static final String queueName = "queue1";
public static final String routingKey = "queue1.route.#";
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
connectionFactory.setUsername("xxxx");
connectionFactory.setPassword("xxxxxxxxxx");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("vHost1");
return connectionFactory;
}
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}
@Bean
Queue queue() {
return new Queue(queueName, true);
}
@Bean
TopicExchange exchange() {
return new TopicExchange(topicExchangeName);
}
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(routingKey);
}
@Bean
public SimpleMessageListenerContainer listenerContainer(MyMessageListener myRabbitMessageListener) {
SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
listenerContainer.setConnectionFactory(connectionFactory());
listenerContainer.setQueueNames(queueName);
listenerContainer.setMessageListener(myRabbitMessageListener);
listenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
listenerContainer.setConcurrency("4");
listenerContainer.setPrefetchCount(20);
return listenerContainer;
}
}
}