java ActiveMQ:如何在使用临时队列时处理代理故障转移
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/6432672/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
ActiveMQ: How to handle broker failovers while using temporary queues
提问by anubhava
On my JMS applications we use temporary queues on Producers to be able to receive replies back from Consumer applications.
在我的 JMS 应用程序中,我们在生产者上使用临时队列来接收消费者应用程序的回复。
I am facing exactly same issue on my end as mentioned in this thread: http://activemq.2283324.n4.nabble.com/jira-Created-AMQ-3336-Temporary-Destination-errors-on-H-A-failover-in-broker-network-with-Failover-tt-td3551034.html#a3612738
正如此线程中提到的,我最终面临完全相同的问题:http: //activemq.2283324.n4.nabble.com/jira-Created-AMQ-3336-Temporary-Destination-errors-on-HA-failover-in -broker-network-with-Failover-tt-td3551034.html#a3612738
Whenever I restarted an arbitrary broker in my network, I was getting many errors like this in my Consumer application log while trying to send reply to a temporary queue:
每当我重新启动网络中的任意代理时,我都会在我的消费者应用程序日志中收到许多类似这样的错误,同时尝试将回复发送到临时队列:
javax.jms.InvalidDestinationException:
Cannot publish to a deleted Destination: temp-queue://ID:...
Then I saw Gary's response there suggesting to use
然后我看到加里的回应建议使用
jms.watchTopicAdvisories=false
as a url param on the client brokerURL
. I promptly changed my client broker URLs with this additional parameter. However now I am seeing errors like this when I restart my brokers in network for this failover testing:
作为客户端上的 url 参数brokerURL
。我立即使用此附加参数更改了我的客户端代理 URL。但是,现在当我在网络中重新启动代理以进行故障转移测试时,我看到了这样的错误:
javax.jms.JMSException:
The destination temp-queue:
//ID:client.host-65070-1308610734958-2:1:1 does not exist.
I am using ActiveMQ 5.5 version. And my client broker URL looks like this:
我使用的是 ActiveMQ 5.5 版本。我的客户端代理 URL 如下所示:
failover:(tcp://amq-host1:61616,tcp://amq-host2.tred.aol.com:61616,tcp://amq-host3:61616,tcp://amq-host4:61616)?jms.useAsyncSend=true&timeout=5000&jms.watchTopicAdvisories=false
Additionally here is my activemq config XML for one of the 4 brokers: amq1.xml
此外,这里是我的 activemq 配置 XML,用于 4 个代理之一: amq1.xml
Can someone here please look into this problem and suggest me what mistake I am making in this setup.
这里有人可以看看这个问题,并建议我在这个设置中犯了什么错误。
Update:
更新:
To clarify further on how I am doing request-response in my code:
进一步阐明我如何在我的代码中进行请求响应:
- I already use a per producer destination (i.e. temporary queue) and set this in reply-to header of every message.
- I am already sending a per message unique correlation identifier in JMSCorrelationID header.
- As far as I know even Camel and Spring are also using temporary queue for request-response mechanism. Only difference is that Spring JMS implementation creates and destroys temporary queue for every message whereas I create temporary queue for the lifetime of the producer. This temporary queue is destroyed when client (producer) app shutsdown or by the AMQ broker when it realizes there are no active producer attached with this temporary queue.
- I am already setting a message expiry on each message on Producer side so that message is not held up in a queue for too long (60 sec).
- 我已经使用了每个生产者的目的地(即临时队列)并将其设置在每条消息的回复标头中。
- 我已经在 JMSCorrelationID 标头中发送了每条消息的唯一相关标识符。
- 据我所知,即使 Camel 和 Spring 也在使用临时队列来实现请求-响应机制。唯一的区别是 Spring JMS 实现为每条消息创建和销毁临时队列,而我为生产者的生命周期创建临时队列。当客户端(生产者)应用程序关闭时,或者当 AMQ 代理意识到没有与此临时队列相关联的活动生产者时,此临时队列将被销毁。
- 我已经在 Producer 端的每条消息上设置了消息到期时间,以便消息不会在队列中停留太长时间(60 秒)。
采纳答案by gtully
There is a broker attribute, org.apache.activemq.broker.BrokerService#cacheTempDestinations that should help in the failover: case. Set that to true in xml configuration, and a temp destination will not be removed immediately when a client disconnects. A fast failover: reconnect will be able to producer and/or consume from the temp queue again.
有一个代理属性 org.apache.activemq.broker.BrokerService#cacheTempDestinations 应该有助于故障转移:案例。在 xml 配置中将其设置为 true,当客户端断开连接时,不会立即删除临时目标。快速故障转移:重新连接将能够再次从临时队列中生产和/或消费。
There is a timer task based on timeBeforePurgeTempDestinations (default 5 seconds) that handles cache removal.
有一个基于 timeBeforePurgeTempDestinations(默认 5 秒)的计时器任务来处理缓存移除。
One caveat though, I don't see any tests in activemq-core that make use of that attribute so I can't give you any guarantee on this one.
但需要注意的是,我在 activemq-core 中没有看到任何使用该属性的测试,因此我无法对此做出任何保证。
回答by Jakub Korab
Temporary queues are created on the broker to which the requestor (producer) in your request-reply scenario connects. They are created from a javax.jms.Session
, so on that session disconnecting, either because of client disconnect or broker failure/failover, those queues are permanently gone. None of the other brokers will understand what is meant when one of your consumers attempts to reply to those queues; hence your exception.
临时队列是在请求-回复场景中请求者(生产者)连接到的代理上创建的。它们是从 a 创建的javax.jms.Session
,因此在该会话断开连接时,无论是由于客户端断开连接还是代理故障/故障转移,这些队列都将永久消失。当您的消费者之一试图回复这些队列时,其他经纪人都不会理解这意味着什么;因此你的例外。
This requires an architectural shift in mindset assuming that you want to deal with failover and persist all your messages. Here is a general way that you could attack the problem:
假设您想要处理故障转移并保留所有消息,这需要在思维方式上进行架构转变。这是您可以解决问题的一般方法:
- Your reply-to headers should refer to a queue specific to the requestor process: e.g.
queue:response.<client id>
. The client id might be a standard name if you have a limited number of clients, or a UUID if you have a large number of these. - The outbound message should set a correlation identifier (simply a sting that lets you associate a request with a response - requestors after all might make more than one request at the same time). This is set in the
JMSCorrelationID
header, and ought to be copied from the request to the response message. - The requestor needs to set up a listener on that queue that will return the message body to the requesting thread based on that correllation id. There is some multithreading code that needs to be written for this, as you'll need to manually manage something like a map of correlation ids to originating threads (via Futures perhaps).
- 您的回复标头应引用特定于请求者进程的队列:例如
queue:response.<client id>
. 如果客户端数量有限,则客户端 ID 可能是标准名称;如果客户端数量有限,则可能是 UUID。 - 出站消息应该设置一个相关标识符(只是一个让您将请求与响应相关联的字符串 - 毕竟请求者可能同时发出多个请求)。这是在
JMSCorrelationID
标头中设置的,应该从请求复制到响应消息。 - 请求者需要在该队列上设置一个侦听器,该侦听器将根据该关联 ID 将消息正文返回给请求线程。需要为此编写一些多线程代码,因为您需要手动管理类似关联 id 到原始线程的映射(可能通过 Futures)之类的东西。
This is a similar approach to that taken by Apache Camelfor request-response over messaging.
这是一种类似于Apache Camel用于通过消息传递请求-响应的方法。
One thing to be mindful of is that the queue will not go away when the client does, so you should set a time to live on the response message such that it gets deleted from the broker if it has not been consumed, otherwise you will get a backlog of unconsumed messages. You will also need to set up a dead letter queue strategy to automatically discard expired messages.
需要注意的一件事是队列不会在客户端消失时消失,因此您应该设置响应消息的生存时间,以便如果它没有被消耗,它就会从代理中删除,否则你会得到未使用的消息的积压。您还需要设置死信队列策略以自动丢弃过期消息。