Python 如何在 RabbitMQ 中创建延迟队列?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/17014584/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to create a delayed queue in RabbitMQ?
提问by eandersson
What is the easiest way to create a delay (or parking) queue with Python, Pika and RabbitMQ? I have seen an similar questions, but none for Python.
使用 Python、Pika 和 RabbitMQ 创建延迟(或停车)队列的最简单方法是什么?我见过类似的问题,但没有针对 Python。
I find this an useful idea when designing applications, as it allows us to throttle messages that needs to be re-queued again.
我发现在设计应用程序时这是一个有用的想法,因为它允许我们限制需要再次重新排队的消息。
There are always the possibility that you will receive more messages than you can handle, maybe the HTTP server is slow, or the database is under too much stress.
总是有可能您会收到比您可以处理的更多的消息,可能是 HTTP 服务器速度很慢,或者数据库压力太大。
I also found it very useful when something went wrong in scenarios where there is a zero tolerance to losing messages, and while re-queuing messages that could not be handled may solve that. It can also cause problems where the message will be queued over and over again. Potentially causing performance issues, and log spam.
我还发现,在对丢失消息零容忍的情况下出现问题时,它非常有用,而重新排队无法处理的消息可能会解决这个问题。它还可能导致消息反复排队的问题。可能会导致性能问题,并记录垃圾邮件。
采纳答案by eandersson
I found this extremely useful when developing my applications. As it gives you an alternative to simply re-queuing your messages. This can easily reduce the complexity of your code, and is one of many powerful hidden features in RabbitMQ.
我发现这在开发我的应用程序时非常有用。因为它为您提供了一种简单地重新排队消息的替代方法。这可以轻松降低代码的复杂性,并且是 RabbitMQ 中众多强大的隐藏功能之一。
Steps
脚步
First we need to set up two basic channels, one for the main queue, and one for the delay queue. In my example at the end, I include a couple of additional flags that are not required, but makes the code more reliable; such as confirm delivery, delivery_modeand durable. You can find more information on these in the RabbitMQ manual.
首先我们需要设置两个基本通道,一个用于主队列,一个用于延迟队列。在我最后的示例中,我包含了一些不需要的附加标志,但使代码更可靠;比如confirm delivery,delivery_mode和durable。您可以在 RabbitMQ手册中找到有关这些的更多信息。
After we have set up the channels we add a binding to the main channel that we can use to send messages from the delay channel to our main queue.
在我们设置了通道之后,我们向主通道添加了一个绑定,我们可以使用它来将消息从延迟通道发送到我们的主队列。
channel.queue_bind(exchange='amq.direct',
queue='hello')
Next we need to configure our delay channel to forward messages to the main queue once they have expired.
接下来,我们需要配置我们的延迟通道,以便在消息过期后将消息转发到主队列。
delay_channel.queue_declare(queue='hello_delay', durable=True, arguments={
'x-message-ttl' : 5000,
'x-dead-letter-exchange' : 'amq.direct',
'x-dead-letter-routing-key' : 'hello'
})
x-message-ttl(Message - Time To Live)
This is normally used to automatically remove old messages in the queue after a specific duration, but by adding two optional arguments we can change this behaviour, and instead have this parameter determine in milliseconds how long messages will stay in the delay queue.
This variable allows us to transfer the message to a different queue once they have expired, instead of the default behaviour of removing it completely.
This variable determines which Exchange used to transfer the message from hello_delay to hello queue.
x-message-ttl (消息 - 生存时间)
这通常用于在特定持续时间后自动删除队列中的旧消息,但通过添加两个可选参数,我们可以更改此行为,而是让此参数以毫秒为单位确定消息将在延迟队列中停留的时间。
这个变量允许我们在消息过期后将消息传输到不同的队列,而不是完全删除它的默认行为。
此变量确定用于将消息从 hello_delay 传输到 hello 队列的 Exchange。
Publishing to the delay queue
发布到延迟队列
When we are done setting up all the basic Pika parameters you simply send a message to the delay queue using basic publish.
当我们完成所有基本 Pika 参数的设置后,您只需使用基本发布将消息发送到延迟队列。
delay_channel.basic_publish(exchange='',
routing_key='hello_delay',
body="test",
properties=pika.BasicProperties(delivery_mode=2))
Once you have executed the script you should see the following queues created in your RabbitMQ management module.

执行脚本后,您应该会看到在 RabbitMQ 管理模块中创建的以下队列。

Example.
例子。
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
# Create normal 'Hello World' type channel.
channel = connection.channel()
channel.confirm_delivery()
channel.queue_declare(queue='hello', durable=True)
# We need to bind this channel to an exchange, that will be used to transfer
# messages from our delay queue.
channel.queue_bind(exchange='amq.direct',
queue='hello')
# Create our delay channel.
delay_channel = connection.channel()
delay_channel.confirm_delivery()
# This is where we declare the delay, and routing for our delay channel.
delay_channel.queue_declare(queue='hello_delay', durable=True, arguments={
'x-message-ttl' : 5000, # Delay until the message is transferred in milliseconds.
'x-dead-letter-exchange' : 'amq.direct', # Exchange used to transfer the message from A to B.
'x-dead-letter-routing-key' : 'hello' # Name of the queue we want the message transferred to.
})
delay_channel.basic_publish(exchange='',
routing_key='hello_delay',
body="test",
properties=pika.BasicProperties(delivery_mode=2))
print " [x] Sent"
回答by Ryan Walls
FYI, how to do this in Spring 3.2.x.
仅供参考,如何在 Spring 3.2.x 中执行此操作。
<rabbit:queue name="delayQueue" durable="true" queue-arguments="delayQueueArguments"/>
<rabbit:queue-arguments id="delayQueueArguments">
<entry key="x-message-ttl">
<value type="java.lang.Long">10000</value>
</entry>
<entry key="x-dead-letter-exchange" value="finalDestinationTopic"/>
<entry key="x-dead-letter-routing-key" value="finalDestinationQueue"/>
</rabbit:queue-arguments>
<rabbit:fanout-exchange name="finalDestinationTopic">
<rabbit:bindings>
<rabbit:binding queue="finalDestinationQueue"/>
</rabbit:bindings>
</rabbit:fanout-exchange>
回答by flycee
You can use RabbitMQ official plugin: x-delayed-message.
您可以使用 RabbitMQ 官方插件:x-delayed-message。
Firstly, download and copy the ez fileinto Your_rabbitmq_root_path/plugins
首先,下载ez文件并复制到Your_rabbitmq_root_path/plugins
Secondly, enable the plugin (do not need to restart the server):
其次,启用插件(不需要重启服务器):
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Finally, publish your message with "x-delay" headers like:
最后,使用“x-delay”标题发布您的消息,例如:
headers.put("x-delay", 5000);
Notice:
注意:
It does not ensure your message's safety, cause if your message expires just during your rabbitmq-server's downtime, unfortunately the message is lost. So be carefulwhen you use this scheme.
它不能确保您的消息的安全性,因为如果您的消息在您的 rabbitmq-server 停机期间过期,那么该消息将丢失。所以当你使用这个方案时要小心。
Enjoy it and more info in rabbitmq-delayed-message-exchange
在rabbitmq-delayed-message-exchange中享受它和更多信息
回答by walv
NodeJS implementation.
Node.js 实现。
Everything is pretty clear from the code. Hope it will save somebody's time.
从代码中一切都非常清楚。希望它会节省某人的时间。
var ch = channel;
ch.assertExchange("my_intermediate_exchange", 'fanout', {durable: false});
ch.assertExchange("my_final_delayed_exchange", 'fanout', {durable: false});
// setup intermediate queue which will never be listened.
// all messages are TTLed so when they are "dead", they come to another exchange
ch.assertQueue("my_intermediate_queue", {
deadLetterExchange: "my_final_delayed_exchange",
messageTtl: 5000, // 5sec
}, function (err, q) {
ch.bindQueue(q.queue, "my_intermediate_exchange", '');
});
ch.assertQueue("my_final_delayed_queue", {}, function (err, q) {
ch.bindQueue(q.queue, "my_final_delayed_exchange", '');
ch.consume(q.queue, function (msg) {
console.log("delayed - [x] %s", msg.content.toString());
}, {noAck: true});
});
回答by Kushwanth Mandadapu
Message in Rabbit queue can be delayed in 2 ways - using QUEUE TTL - using Message TTL If all messages in queue are to be delayed for fixed time use queue TTL. If each message has to be delayed by varied time use Message TTL. I have explained it using python3 and pika module. pika BasicProperties argument 'expiration' in milliseconds has to be set to delay message in delay queue. After setting expiration time, publish message to a delayed_queue ("not actual queue where consumers are waiting to consume") , once message in delayed_queue expires, message will be routed to a actual queue using exchange 'amq.direct'
Rabbit 队列中的消息可以通过两种方式延迟 - 使用 QUEUE TTL - 使用消息 TTL 如果队列中的所有消息都延迟固定时间,请使用队列 TTL。如果每条消息必须延迟不同的时间,请使用消息 TTL。我已经使用 python3 和 pika 模块进行了解释。必须将 pika BasicProperties 参数“expiration”(以毫秒为单位)设置为延迟队列中的延迟消息。设置过期时间后,将消息发布到delayed_queue(“不是消费者正在等待消费的实际队列”),一旦delayed_queue中的消息过期,消息将使用exchange 'amq.direct'路由到实际队列
def delay_publish(self, messages, queue, headers=None, expiration=0):
"""
Connect to RabbitMQ and publish messages to the queue
Args:
queue (string): queue name
messages (list or single item): messages to publish to rabbit queue
expiration(int): TTL in milliseconds for message
"""
delay_queue = "".join([queue, "_delay"])
logging.info('Publishing To Queue: {queue}'.format(queue=delay_queue))
logging.info('Connecting to RabbitMQ: {host}'.format(
host=self.rabbit_host))
credentials = pika.PlainCredentials(
RABBIT_MQ_USER, RABBIT_MQ_PASS)
parameters = pika.ConnectionParameters(
rabbit_host, RABBIT_MQ_PORT,
RABBIT_MQ_VHOST, credentials, heartbeat_interval=0)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue=queue, durable=True)
channel.queue_bind(exchange='amq.direct',
queue=queue)
delay_channel = connection.channel()
delay_channel.queue_declare(queue=delay_queue, durable=True,
arguments={
'x-dead-letter-exchange': 'amq.direct',
'x-dead-letter-routing-key': queue
})
properties = pika.BasicProperties(
delivery_mode=2, headers=headers, expiration=str(expiration))
if type(messages) not in (list, tuple):
messages = [messages]
try:
for message in messages:
try:
json_data = json.dumps(message)
except Exception as err:
logging.error(
'Error Jsonify Payload: {err}, {payload}'.format(
err=err, payload=repr(message)), exc_info=True
)
if (type(message) is dict) and ('data' in message):
message['data'] = {}
message['error'] = 'Payload Invalid For JSON'
json_data = json.dumps(message)
else:
raise
try:
delay_channel.basic_publish(
exchange='', routing_key=delay_queue,
body=json_data, properties=properties)
except Exception as err:
logging.error(
'Error Publishing Data: {err}, {payload}'.format(
err=err, payload=json_data), exc_info=True
)
raise
except Exception:
raise
finally:
logging.info(
'Done Publishing. Closing Connection to {queue}'.format(
queue=delay_queue
)
)
connection.close()
回答by henrylilei
Depends on your scenario and needs, I would recommend the following approaches,
取决于您的场景和需求,我会推荐以下方法,
- Using the official plugin, https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq/, but it will have a capacity issue if the total count of delayed messages exceeds certain number (https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/issues/72), it will not have the high availability option and it will suffer lose of data when it runs out of delayed time during a MQ restart.
- Implement a set of cascading delayed queues just like NServiceBus did (https://docs.particular.net/transports/rabbitmq/delayed-delivery).
- 使用官方插件https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq/,但如果延迟消息的总数超过一定数量,则会出现容量问题(https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/issues/72),它没有高可用性选项,并且在 MQ 重启期间延迟时间用完时会丢失数据。
- 像 NServiceBus 一样实现一组级联延迟队列(https://docs.particular.net/transports/rabbitmq/delayed-delivery)。

