java Spring Asynchronous MessageListener用例中发生业务异常时如何要求RabbitMQ重试
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/36979840/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to ask RabbitMQ to retry when business Exception occurs in Spring Asynchronous MessageListener use case
提问by Santosh
I have a Spring AMQP message listener running.
我有一个 Spring AMQP 消息侦听器正在运行。
public class ConsumerService implements MessageListener {
@Autowired
RabbitTemplate rabbitTemplate;
@Override
public void onMessage(Message message) {
try {
testService.process(message); //This process method can throw Business Exception
} catch (BusinessException e) {
//Here we can just log the exception. How the retry attempt is made?
} catch (Exception e) {
//Here we can just log the exception. How the retry attempt is made?
}
}
}
As you can see, there could be exception coming out during process. I want to retry because of a particular error in Catch block. I cannot through exception in onMessage. How to tell RabbitMQ to there is an exception and retry?
如您所见,在处理过程中可能会出现异常。由于 Catch 块中的特定错误,我想重试。我无法通过 onMessage 中的异常。如何告诉 RabbitMQ 出现异常并重试?
回答by Nazaret K.
Since onMessage()
doesn't allow to throw checked exceptions you can wrap the exception in a RuntimeException
and re-throw it.
由于onMessage()
不允许抛出已检查的异常,您可以将异常包装在 a 中RuntimeException
并重新抛出它。
try {
testService.process(message);
} catch (BusinessException e) {
throw new RuntimeException(e);
}
Note however that this may result in the message to be re-delivered indefinitely. Here is how this works:
但是请注意,这可能会导致消息无限期地重新发送。这是它的工作原理:
RabbitMQ supports rejecting a message and asking the broker to requeue it. This is shown here. But RabbitMQ doesn't natively have a mechanism for retry policy, e.g. setting max retries, delay, etc.
RabbitMQ 支持拒绝消息并要求代理重新排队。这在此处显示。但是RabbitMQ本身没有重试策略的机制,例如设置最大重试,延迟等。
When using Spring AMQP, "requeue on reject" is the default option. Spring's SimpleMessageListenerContainer
will by default do this when there is an unhandled exception. So in your case you just need to re-throw the caught exception. Note however that if you cannot process a message and you always throw the exception this will be re-delivered indefinitely and will result in an infinite loop.
使用 Spring AMQP 时,“requeue on reject”是默认选项。SimpleMessageListenerContainer
默认情况下,当出现未处理的异常时,Spring将执行此操作。所以在你的情况下,你只需要重新抛出捕获的异常。但是请注意,如果您无法处理消息并且总是抛出异常,这将无限期地重新传递并导致无限循环。
You can override this behaviour per message by throwing a AmqpRejectAndDontRequeueException
exception, in which case the message will not be requeued.
您可以通过抛出AmqpRejectAndDontRequeueException
异常来覆盖每条消息的此行为,在这种情况下,消息将不会重新排队。
You can also switch off the "requeue on reject" behavior of SimpleMessageListenerContainer
entirely by setting
您还可以SimpleMessageListenerContainer
通过设置完全关闭“拒绝时重新排队”行为
container.setDefaultRequeueRejected(false)
When a message is rejected and not requeued it will either be lost or transferred to a DLQ, if one is set in RabbitMQ.
当一条消息被拒绝并且没有重新排队时,它要么丢失要么转移到 DLQ,如果在 RabbitMQ 中设置了一个。
If you need a retry policy with max attempts, delay, etc the easiest is to setup a spring "stateless" RetryOperationsInterceptor
which will do all retries within the thread (using Thread.sleep()
) without rejecting the message on each retry (so without going back to RabbitMQ for each retry). When retries are exhausted, by default a warning will be logged and the message will be consumed. If you want to send to a DLQ you will need either a RepublishMessageRecoverer
or a custom MessageRecoverer
that rejects the message without requeuing (in that latter case you should also setupa RabbitMQ DLQ on the queue). Example with default message recoverer:
如果您需要具有最大尝试次数、延迟等的重试策略,最简单的方法是设置一个 spring“无状态” RetryOperationsInterceptor
,它将在线程内执行所有重试(使用Thread.sleep()
),而不会在每次重试时拒绝消息(因此无需为每个重试返回 RabbitMQ重试)。当重试用尽时,默认情况下将记录警告并消耗消息。如果您想发送到 DLQ,您将需要一个RepublishMessageRecoverer
或自定义MessageRecoverer
拒绝消息而不重新排队(在后一种情况下,您还应该在队列上设置一个 RabbitMQ DLQ)。默认消息恢复器的示例:
container.setAdviceChain(new Advice[] {
org.springframework.amqp.rabbit.config.RetryInterceptorBuilder
.stateless()
.maxAttempts(5)
.backOffOptions(1000, 2, 5000)
.build()
});
This obviously has the drawback that you will occupy the Thread for the entire duration of the retries. You also have the option to use a "stateful" RetryOperationsInterceptor
, which will send the message back to RabbitMQ for each retry, but the delay will still be implemented with Thread.sleep()
within the application, plus setting up a stateful interceptor is a bit more complicated.
这显然有一个缺点,即您将在整个重试期间占用线程。您还可以选择使用 "stateful" RetryOperationsInterceptor
,它会在每次重试时将消息发送回 RabbitMQ,但延迟仍将Thread.sleep()
在应用程序内实现,另外设置有状态拦截器有点复杂。
Therefore, if you want retries with delays without occupying a Thread
you will need a much more involved custom solution using TTL on RabbitMQ queues. If you don't want exponential backoff (so delay doesn't increase on each retry) it's a bit simpler. To implement such a solution you basically create another queue on rabbitMQ with arguments: "x-message-ttl": <delay time in milliseconds>
and "x-dead-letter-exchange":"<name of the original queue>"
. Then on the main queue you set "x-dead-letter-exchange":"<name of the queue with the TTL>"
. So now when you reject and don't requeue a message RabbitMQ will redirect it to the second queue. When TTL expires it will be redirected to the original queue and thus redelivered to the application. So now you need a retry interceptor that rejects the message to RabbitMQ after each failure and also keeps track of the retry count. To avoid the need to keep state in the application (because if your application is clustered you need to replicate state) you can calculate the retry count from the x-death
header that RabbitMQ sets. See more info about this header here. So at that point implementing a custom interceptor is easier than customising the Spring stateful interceptor with this behaviour.
因此,如果您想在不占用时间的情况下延迟重试,Thread
您将需要在 RabbitMQ 队列上使用 TTL 的更复杂的自定义解决方案。如果您不想要指数退避(因此每次重试时延迟都不会增加),那就更简单了。要实现这样的解决方案,您基本上在rabbitMQ 上创建另一个队列,参数为:"x-message-ttl": <delay time in milliseconds>
和"x-dead-letter-exchange":"<name of the original queue>"
。然后在你设置的主队列上"x-dead-letter-exchange":"<name of the queue with the TTL>"
. 所以现在当您拒绝并且不重新排队消息时,RabbitMQ 会将其重定向到第二个队列。当 TTL 到期时,它将被重定向到原始队列,从而重新传递给应用程序。所以现在你需要一个重试拦截器,它在每次失败后拒绝向 RabbitMQ 发送消息,并跟踪重试计数。为了避免需要在应用程序中保持状态(因为如果您的应用程序是集群的,您需要复制状态),您可以根据x-death
RabbitMQ 设置的标头计算重试计数。在此处查看有关此标题的更多信息。因此,此时实现自定义拦截器比使用此行为自定义 Spring 有状态拦截器更容易。
Also check the section about retries in the Spring AMQP reference.