java RabbitMQ 的 Spring SimpleMessageListenerContainer 因无效消息而中止

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/39264965/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-11-03 04:14:03  来源:igfitidea点击:

Spring SimpleMessageListenerContainer for RabbitMQ is aborting on invalid message

javaspringrabbitmqspring-rabbit

提问by johannesv

I am using springs SimpleMessageListenerContainer to consume messages from a RabbitMQ queue. Everything works fine, but when a invalid message is sent to the queue (e.g. invalid json) the listener just aborts, is shutting down the worker and doesn't accept any further messages.

我正在使用 springs SimpleMessageListenerContainer 来使用来自 RabbitMQ 队列的消息。一切正常,但是当一个无效的消息被发送到队列(例如无效的 json)时,监听器只是中止,关闭工作器并且不接受任何进一步的消息。

Is it possible to configure it in a way that it discards the broken message and keeps listening to further messages?

是否可以将其配置为丢弃损坏的消息并继续收听更多消息?

I'm using sprint-rabbit-1.6.1.RELEASE.jar

我正在使用 sprint-rabbit-1.6.1.RELEASE.jar

My configuration looks like the following:

我的配置如下所示:

@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
                                                MessageListenerAdapter listenerAdapter,
                                                MessageConverter messageConverter) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames("my.queue");
    container.setMessageListener(listenerAdapter);
    container.setMessageConverter(messageConverter);
    return container;
}

@Bean
public MessageConverter messageConverter() {
    return new Hymanson2JsonMessageConverter();
}

@Bean
MessageListenerAdapter listenerAdapter(Worker worker) {
    MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(worker, "processMessage");
    messageListenerAdapter.setMessageConverter(new Hymanson2JsonMessageConverter());
    return messageListenerAdapter;
}

The declaration of my listener method:

我的监听器方法的声明:

   public void processMessage(Map<String, String> message) {

When I send a message like '"routeId":"7"}'(broken json), then I get the Exception:

当我发送类似'"routeId":"7"}'(broken json)的消息时,我收到异常:

2016-09-02 08:10:35.821  WARN 35841 --- [   container-29] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Failed to invoke target method 'processMessage' with argument type = [class java.lang.String], value = [{routeId}]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:408) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:298) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:777) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:700) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access
catch (ListenerExecutionFailedException ex) {
                    // Continue to process, otherwise re-throw
                    if (ex.getCause() instanceof NoSuchMethodException) {
                        throw new FatalListenerExecutionException("Invalid listener", ex);
                    }
                }
1(SimpleMessageListenerContainer.java:95) [spring-rabbit-1.6.1.RELEASE.jar:na] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:187) ~[spring-rabbit-1.6.1.RELEASE.jar:na] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1187) [spring-rabbit-1.6.1.RELEASE.jar:na] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:681) ~[spring-rabbit-1.6.1.RELEASE.jar:na] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1165) [spring-rabbit-1.6.1.RELEASE.jar:na] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1149) [spring-rabbit-1.6.1.RELEASE.jar:na] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access00(SimpleMessageListenerContainer.java:95) [spring-rabbit-1.6.1.RELEASE.jar:na] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1312) [spring-rabbit-1.6.1.RELEASE.jar:na] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_101] Caused by: java.lang.NoSuchMethodException: nz.co.qrious.transport.batchstarter.service.Worker.processMessage(java.lang.String) at java.lang.Class.getMethod(Class.java:1786) ~[na:1.8.0_101] at org.springframework.util.MethodInvoker.prepare(MethodInvoker.java:174) ~[spring-core-4.3.2.RELEASE.jar:4.3.2.RELEASE] at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:386) ~[spring-rabbit-1.6.1.RELEASE.jar:na] ... 12 common frames omitted 2016-09-02 08:10:35.828 ERROR 35841 --- [ container-29] o.s.a.r.l.SimpleMessageListenerContainer : Consumer received fatal exception during processing org.springframework.amqp.rabbit.listener.exception.FatalListenerExecutionException: Invalid listener at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1351) ~[spring-rabbit-1.6.1.RELEASE.jar:na] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_101] Caused by: org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Failed to invoke target method 'processMessage' with argument type = [class java.lang.String], value = [{routeId}] at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:408) ~[spring-rabbit-1.6.1.RELEASE.jar:na] at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:298) ~[spring-rabbit-1.6.1.RELEASE.jar:na] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:777) ~[spring-rabbit-1.6.1.RELEASE.jar:na] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:700) ~[spring-rabbit-1.6.1.RELEASE.jar:na] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access
package com.example;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.support.converter.ClassMapper;
import org.springframework.amqp.support.converter.Hymanson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class So39264965Application {

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So39264965Application.class, args);
        RabbitTemplate template = context.getBean(RabbitTemplate.class);
        template.convertAndSend("my.queue", new Foo());
        context.getBean(Worker.class).latch.await(60, TimeUnit.SECONDS);

        // bad json
        template.setMessageConverter(new SimpleMessageConverter());
        template.convertAndSend("", "my.queue", "\"routeId\":\"7\"}", m -> {
            m.getMessageProperties().setContentType("application/json");
            return m;
        });


        Thread.sleep(60000);
        context.close();
    }

    @Bean
    public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
            MessageListenerAdapter listenerAdapter, MessageConverter messageConverter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames("my.queue");
        container.setMessageListener(listenerAdapter);
        container.setMessageConverter(messageConverter);
        return container;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(messageConverter());
        return rabbitTemplate;
    }

    @Bean
    public Queue queue() {
        return new Queue("my.queue");
    }

    @Bean
    public MessageConverter messageConverter() {
        Hymanson2JsonMessageConverter Hymanson2JsonMessageConverter = new Hymanson2JsonMessageConverter();
        Hymanson2JsonMessageConverter.setClassMapper(new ClassMapper() {

            @Override
            public Class<?> toClass(MessageProperties properties) {
                return Foo.class;
            }

            @Override
            public void fromClass(Class<?> clazz, MessageProperties properties) {

            }

        });
        return Hymanson2JsonMessageConverter;
    }

    @Bean
    MessageListenerAdapter listenerAdapter(Worker worker) {
        MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(worker, "processMessage");
        messageListenerAdapter.setMessageConverter(messageConverter());
        return messageListenerAdapter;
    }

    @Bean
    public Worker worker() {
        return new Worker();
    }

    public static class Worker {

        private final CountDownLatch latch = new CountDownLatch(1);

        public void processMessage(Foo foo) {
            System.out.println(foo);
            this.latch.countDown();
        }

    }

    public static class Foo {

        private String bar = "bar";

        public String getBar() {
            return this.bar;
        }

        public void setBar(String bar) {
            this.bar = bar;
        }

        @Override
        public String toString() {
            return "Foo [bar=" + this.bar + "]";
        }

    }

}
1(SimpleMessageListenerContainer.java:95) ~[spring-rabbit-1.6.1.RELEASE.jar:na] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:187) ~[spring-rabbit-1.6.1.RELEASE.jar:na] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1187) ~[spring-rabbit-1.6.1.RELEASE.jar:na] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:681) ~[spring-rabbit-1.6.1.RELEASE.jar:na] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1165) ~[spring-rabbit-1.6.1.RELEASE.jar:na] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1149) ~[spring-rabbit-1.6.1.RELEASE.jar:na] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access00(SimpleMessageListenerContainer.java:95) ~[spring-rabbit-1.6.1.RELEASE.jar:na] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1312) ~[spring-rabbit-1.6.1.RELEASE.jar:na] ... 1 common frames omitted Caused by: java.lang.NoSuchMethodException: nz.co.qrious.transport.batchstarter.service.Worker.processMessage(java.lang.String) at java.lang.Class.getMethod(Class.java:1786) ~[na:1.8.0_101] at org.springframework.util.MethodInvoker.prepare(MethodInvoker.java:174) ~[spring-core-4.3.2.RELEASE.jar:4.3.2.RELEASE] at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:386) ~[spring-rabbit-1.6.1.RELEASE.jar:na] ... 12 common frames omitted 2016-09-02 08:10:35.833 ERROR 35841 --- [ container-29] o.s.a.r.l.SimpleMessageListenerContainer : Stopping container from aborted consumer 2016-09-02 08:10:35.833 INFO 35841 --- [ container-29] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish. 2016-09-02 08:10:35.833 INFO 35841 --- [ container-29] o.s.a.r.l.SimpleMessageListenerContainer : Successfully waited for workers to finish.

The fatal Exception in SimpleMessageListenerContainer is thrown here:

SimpleMessageListenerContainer 中的致命异常在这里抛出:

##代码##

So it seems it is supposed to shut down if the container is configured with a non-existent method. But in case of a broken message, it's trying to call the method with a wrong parameter type, which also causes a NoSuchMethodException. This means that any producer can kill my consumer with a broken message.

因此,如果容器配置了不存在的方法,它似乎应该关闭。但是如果消息损坏,它会尝试使用错误的参数类型调用方法,这也会导致 NoSuchMethodException。这意味着任何生产者都可以用损坏的消息杀死我的消费者。

Thanks for any suggestions!

感谢您的任何建议!

采纳答案by Gary Russell

Interesting; I was able to reproduce your issue; it turns out that if the message contains no __TypeID__header (conversion hint), it simply returns the "bad" json as a String.

有趣的; 我能够重现您的问题;事实证明,如果消息不包含__TypeID__标题(转换提示),它只会将“坏”json 作为字符串返回。

I was able to solve it by injecting a custom class mapper into the converter.

我能够通过将自定义类映射器注入转换器来解决它。

You could also have the sending system set the type header.

您还可以让发送系统设置类型标头。

Then, the message is rejected because we get a MessageConversionException.

然后,该消息被拒绝,因为我们得到了一个MessageConversionException.

##代码##