java RabbitMQ 的 Spring SimpleMessageListenerContainer 因无效消息而中止
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/39264965/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Spring SimpleMessageListenerContainer for RabbitMQ is aborting on invalid message
提问by johannesv
I am using springs SimpleMessageListenerContainer to consume messages from a RabbitMQ queue. Everything works fine, but when a invalid message is sent to the queue (e.g. invalid json) the listener just aborts, is shutting down the worker and doesn't accept any further messages.
我正在使用 springs SimpleMessageListenerContainer 来使用来自 RabbitMQ 队列的消息。一切正常,但是当一个无效的消息被发送到队列(例如无效的 json)时,监听器只是中止,关闭工作器并且不接受任何进一步的消息。
Is it possible to configure it in a way that it discards the broken message and keeps listening to further messages?
是否可以将其配置为丢弃损坏的消息并继续收听更多消息?
I'm using sprint-rabbit-1.6.1.RELEASE.jar
我正在使用 sprint-rabbit-1.6.1.RELEASE.jar
My configuration looks like the following:
我的配置如下所示:
@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter,
MessageConverter messageConverter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("my.queue");
container.setMessageListener(listenerAdapter);
container.setMessageConverter(messageConverter);
return container;
}
@Bean
public MessageConverter messageConverter() {
return new Hymanson2JsonMessageConverter();
}
@Bean
MessageListenerAdapter listenerAdapter(Worker worker) {
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(worker, "processMessage");
messageListenerAdapter.setMessageConverter(new Hymanson2JsonMessageConverter());
return messageListenerAdapter;
}
The declaration of my listener method:
我的监听器方法的声明:
public void processMessage(Map<String, String> message) {
When I send a message like '"routeId":"7"}'
(broken json), then I get the Exception:
当我发送类似'"routeId":"7"}'
(broken json)的消息时,我收到异常:
2016-09-02 08:10:35.821 WARN 35841 --- [ container-29] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Failed to invoke target method 'processMessage' with argument type = [class java.lang.String], value = [{routeId}]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:408) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:298) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:777) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:700) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.accesscatch (ListenerExecutionFailedException ex) {
// Continue to process, otherwise re-throw
if (ex.getCause() instanceof NoSuchMethodException) {
throw new FatalListenerExecutionException("Invalid listener", ex);
}
}
1(SimpleMessageListenerContainer.java:95) [spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:187) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1187) [spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:681) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1165) [spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1149) [spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access00(SimpleMessageListenerContainer.java:95) [spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1312) [spring-rabbit-1.6.1.RELEASE.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_101]
Caused by: java.lang.NoSuchMethodException: nz.co.qrious.transport.batchstarter.service.Worker.processMessage(java.lang.String)
at java.lang.Class.getMethod(Class.java:1786) ~[na:1.8.0_101]
at org.springframework.util.MethodInvoker.prepare(MethodInvoker.java:174) ~[spring-core-4.3.2.RELEASE.jar:4.3.2.RELEASE]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:386) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
... 12 common frames omitted
2016-09-02 08:10:35.828 ERROR 35841 --- [ container-29] o.s.a.r.l.SimpleMessageListenerContainer : Consumer received fatal exception during processing
org.springframework.amqp.rabbit.listener.exception.FatalListenerExecutionException: Invalid listener
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1351) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_101]
Caused by: org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Failed to invoke target method 'processMessage' with argument type = [class java.lang.String], value = [{routeId}]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:408) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:298) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:777) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:700) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.accesspackage com.example;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.support.converter.ClassMapper;
import org.springframework.amqp.support.converter.Hymanson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class So39264965Application {
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(So39264965Application.class, args);
RabbitTemplate template = context.getBean(RabbitTemplate.class);
template.convertAndSend("my.queue", new Foo());
context.getBean(Worker.class).latch.await(60, TimeUnit.SECONDS);
// bad json
template.setMessageConverter(new SimpleMessageConverter());
template.convertAndSend("", "my.queue", "\"routeId\":\"7\"}", m -> {
m.getMessageProperties().setContentType("application/json");
return m;
});
Thread.sleep(60000);
context.close();
}
@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter, MessageConverter messageConverter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("my.queue");
container.setMessageListener(listenerAdapter);
container.setMessageConverter(messageConverter);
return container;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter());
return rabbitTemplate;
}
@Bean
public Queue queue() {
return new Queue("my.queue");
}
@Bean
public MessageConverter messageConverter() {
Hymanson2JsonMessageConverter Hymanson2JsonMessageConverter = new Hymanson2JsonMessageConverter();
Hymanson2JsonMessageConverter.setClassMapper(new ClassMapper() {
@Override
public Class<?> toClass(MessageProperties properties) {
return Foo.class;
}
@Override
public void fromClass(Class<?> clazz, MessageProperties properties) {
}
});
return Hymanson2JsonMessageConverter;
}
@Bean
MessageListenerAdapter listenerAdapter(Worker worker) {
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(worker, "processMessage");
messageListenerAdapter.setMessageConverter(messageConverter());
return messageListenerAdapter;
}
@Bean
public Worker worker() {
return new Worker();
}
public static class Worker {
private final CountDownLatch latch = new CountDownLatch(1);
public void processMessage(Foo foo) {
System.out.println(foo);
this.latch.countDown();
}
}
public static class Foo {
private String bar = "bar";
public String getBar() {
return this.bar;
}
public void setBar(String bar) {
this.bar = bar;
}
@Override
public String toString() {
return "Foo [bar=" + this.bar + "]";
}
}
}
1(SimpleMessageListenerContainer.java:95) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:187) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1187) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:681) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1165) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1149) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access00(SimpleMessageListenerContainer.java:95) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1312) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
... 1 common frames omitted
Caused by: java.lang.NoSuchMethodException: nz.co.qrious.transport.batchstarter.service.Worker.processMessage(java.lang.String)
at java.lang.Class.getMethod(Class.java:1786) ~[na:1.8.0_101]
at org.springframework.util.MethodInvoker.prepare(MethodInvoker.java:174) ~[spring-core-4.3.2.RELEASE.jar:4.3.2.RELEASE]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:386) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
... 12 common frames omitted
2016-09-02 08:10:35.833 ERROR 35841 --- [ container-29] o.s.a.r.l.SimpleMessageListenerContainer : Stopping container from aborted consumer
2016-09-02 08:10:35.833 INFO 35841 --- [ container-29] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish.
2016-09-02 08:10:35.833 INFO 35841 --- [ container-29] o.s.a.r.l.SimpleMessageListenerContainer : Successfully waited for workers to finish.
The fatal Exception in SimpleMessageListenerContainer is thrown here:
SimpleMessageListenerContainer 中的致命异常在这里抛出:
##代码##So it seems it is supposed to shut down if the container is configured with a non-existent method. But in case of a broken message, it's trying to call the method with a wrong parameter type, which also causes a NoSuchMethodException. This means that any producer can kill my consumer with a broken message.
因此,如果容器配置了不存在的方法,它似乎应该关闭。但是如果消息损坏,它会尝试使用错误的参数类型调用方法,这也会导致 NoSuchMethodException。这意味着任何生产者都可以用损坏的消息杀死我的消费者。
Thanks for any suggestions!
感谢您的任何建议!
采纳答案by Gary Russell
Interesting; I was able to reproduce your issue; it turns out that if the message contains no __TypeID__
header (conversion hint), it simply returns the "bad" json as a String.
有趣的; 我能够重现您的问题;事实证明,如果消息不包含__TypeID__
标题(转换提示),它只会将“坏”json 作为字符串返回。
I was able to solve it by injecting a custom class mapper into the converter.
我能够通过将自定义类映射器注入转换器来解决它。
You could also have the sending system set the type header.
您还可以让发送系统设置类型标头。
Then, the message is rejected because we get a MessageConversionException
.
然后,该消息被拒绝,因为我们得到了一个MessageConversionException
.