node.js RabbitMQ / AMQP:单个队列,同一消息的多个消费者?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/10620976/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
RabbitMQ / AMQP: single queue, multiple consumers for same message?
提问by mikemaccana
I am just starting to use RabbitMQ and AMQP in general.
我刚刚开始使用 RabbitMQ 和 AMQP。
- I have a queue of messages
- I have multiple consumers, which I would like to do different things with the same message.
- 我有一个消息队列
- 我有多个消费者,我想用相同的消息做不同的事情。
Most of the RabbitMQ documentation seems to be focused on round-robin, ie where a single message is consumed by a single consumer, with the load being spread between each consumer. This is indeed the behavior I witness.
大多数 RabbitMQ 文档似乎都侧重于轮询,即单个消息由单个消费者消费,负载分布在每个消费者之间。这确实是我亲眼目睹的行为。
An example: the producer has a single queue, and send messages every 2 sec:
一个例子:生产者有一个队列,每 2 秒发送一次消息:
var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;
connection.on('ready', function () {
var sendMessage = function(connection, queue_name, payload) {
var encoded_payload = JSON.stringify(payload);
connection.publish(queue_name, encoded_payload);
}
setInterval( function() {
var test_message = 'TEST '+count
sendMessage(connection, "my_queue_name", test_message)
count += 1;
}, 2000)
})
And here's a consumer:
这是一个消费者:
var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
connection.on('ready', function () {
connection.queue("my_queue_name", function(queue){
queue.bind('#');
queue.subscribe(function (message) {
var encoded_payload = unescape(message.data)
var payload = JSON.parse(encoded_payload)
console.log('Recieved a message:')
console.log(payload)
})
})
})
If I start the consumer twice, I can see that each consumer is consuming alternate messages in round-robin behavior. Eg, I'll see messages 1, 3, 5 in one terminal, 2, 4, 6 in the other.
如果我启动消费者两次,我可以看到每个消费者都在循环行为中消费替代消息。例如,我会在一个终端中看到消息 1、3、5,在另一个终端中看到消息 2、4、6。
My question is:
我的问题是:
Can I have each consumer receive the same messages? Ie, both consumers get message 1, 2, 3, 4, 5, 6? What is this called in AMQP/RabbitMQ speak? How is it normally configured?
Is this commonly done? Should I just have the exchange route the message into two separate queues, with a single consumer, instead?
我可以让每个消费者收到相同的消息吗?即,两个消费者都收到消息 1、2、3、4、5、6?这在 AMQP/RabbitMQ 中叫什么?一般是怎么配置的?
这是常见的做法吗?我应该让交换将消息路由到两个单独的队列中,而不是一个消费者吗?
采纳答案by mikemaccana
Can I have each consumer receive the same messages? Ie, both consumers get message 1, 2, 3, 4, 5, 6? What is this called in AMQP/RabbitMQ speak? How is it normally configured?
我可以让每个消费者收到相同的消息吗?即,两个消费者都收到消息 1、2、3、4、5、6?这在 AMQP/RabbitMQ 中叫什么?一般是怎么配置的?
No, not if the consumers are on the same queue. From RabbitMQ's AMQP Conceptsguide:
不,如果消费者在同一个队列中,则不会。来自 RabbitMQ 的AMQP 概念指南:
it is important to understand that, in AMQP 0-9-1, messages are load balanced between consumers.
重要的是要了解,在 AMQP 0-9-1 中,消息在消费者之间进行负载平衡。
This seems to imply that round-robin behavior within a queue is a given, and not configurable. Ie, separate queues are required in order to have the same message ID be handled by multiple consumers.
这似乎意味着队列中的循环行为是给定的,不可配置。即,为了让多个消费者处理相同的消息 ID,需要单独的队列。
Is this commonly done? Should I just have the exchange route the message into two separate queues, with a single consumer, instead?
这是常见的做法吗?我应该让交换将消息路由到两个单独的队列中,而不是一个消费者吗?
No it's not, single queue/multiple consumers with each each consumer handling the same message ID isn't possible. Having the exchange route the message onto into two separate queues is indeed better.
不,不是,单个队列/多个消费者每个消费者都处理相同的消息 ID 是不可能的。让交换将消息路由到两个单独的队列中确实更好。
As I don't require too complex routing, a fanout exchangewill handle this nicely. I didn't focus too much on Exchanges earlier as node-amqp has the concept of a 'default exchange' allowing you to publish messages to a connection directly, however most AMQP messages are published to a specific exchange.
由于我不需要太复杂的路由,扇出交换可以很好地处理这个问题。我之前并没有过多关注交换,因为 node-amqp 具有“默认交换”的概念,允许您直接将消息发布到连接,但是大多数 AMQP 消息发布到特定的交换。
Here's my fanout exchange, both sending and receiving:
这是我的扇出交换,包括发送和接收:
var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;
connection.on('ready', function () {
connection.exchange("my_exchange", options={type:'fanout'}, function(exchange) {
var sendMessage = function(exchange, payload) {
console.log('about to publish')
var encoded_payload = JSON.stringify(payload);
exchange.publish('', encoded_payload, {})
}
// Recieve messages
connection.queue("my_queue_name", function(queue){
console.log('Created queue')
queue.bind(exchange, '');
queue.subscribe(function (message) {
console.log('subscribed to queue')
var encoded_payload = unescape(message.data)
var payload = JSON.parse(encoded_payload)
console.log('Recieved a message:')
console.log(payload)
})
})
setInterval( function() {
var test_message = 'TEST '+count
sendMessage(exchange, test_message)
count += 1;
}, 2000)
})
})
回答by driushkin
Just read the rabbitmq tutorial. You publish message to exchange, not to queue; it is then routed to appropriate queues. In your case, you should bind separate queue for each consumer. That way, they can consume messages completely independently.
只需阅读rabbitmq 教程。您发布消息以进行交换,而不是排队;然后将其路由到适当的队列。在您的情况下,您应该为每个消费者绑定单独的队列。这样,他们就可以完全独立地消费消息。
回答by z900collector
The last couple of answers are almost correct - I have tons of apps that generate messages that need to end up with different consumers so the process is very simple.
最后几个答案几乎是正确的 - 我有大量的应用程序可以生成需要最终发送给不同消费者的消息,所以这个过程非常简单。
If you want multiple consumers to the same message, do the following procedure.
如果您希望多个消费者接收同一条消息,请执行以下过程。
Create multiple queues, one for each app that is to receive the message, in each queue properties, "bind" a routing tag with the amq.direct exchange. Change you publishing app to send to amq.direct and use the routing-tag (not a queue). AMQP will then copy the message into each queue with the same binding. Works like a charm :)
创建多个队列,为每个要接收消息的应用程序创建一个队列,在每个队列属性中,将路由标记与 amq.direct 交换“绑定”。更改您的发布应用程序以发送到 amq.direct 并使用路由标记(而不是队列)。然后,AMQP 会将消息复制到具有相同绑定的每个队列中。奇迹般有效 :)
Example: Lets say I have a JSON string I generate, I publish it to the "amq.direct" exchange using the routing tag "new-sales-order", I have a queue for my order_printer app that prints order, I have a queue for my billing system that will send a copy of the order and invoice the client and I have a web archive system where I archive orders for historic/compliance reasons and I have a client web interface where orders are tracked as other info comes in about an order.
示例:假设我有一个生成的 JSON 字符串,我使用路由标签“new-sales-order”将其发布到“amq.direct”交换,我有一个打印订单的 order_printer 应用程序队列,我有一个我的计费系统的队列,它将向客户发送订单副本并开具发票,我有一个网络存档系统,我出于历史/合规原因存档订单,我有一个客户端网络界面,在其他信息出现时跟踪订单订单。
So my queues are: order_printer, order_billing, order_archive and order_tracking All have the binding tag "new-sales-order" bound to them, all 4 will get the JSON data.
所以我的队列是:order_printer、order_billing、order_archive 和 order_tracking 都绑定了绑定标签“new-sales-order”,这 4 个都将获得 JSON 数据。
This is an ideal way to send data without the publishing app knowing or caring about the receiving apps.
这是在发布应用程序不知道或不关心接收应用程序的情况下发送数据的理想方式。
回答by robthewolf
Yes each consumer can receive the same messages. have a look at http://www.rabbitmq.com/tutorials/tutorial-three-python.htmlhttp://www.rabbitmq.com/tutorials/tutorial-four-python.html http://www.rabbitmq.com/tutorials/tutorial-five-python.html
是的,每个消费者都可以收到相同的消息。看看 http://www.rabbitmq.com/tutorials/tutorial-three-python.htmlhttp://www.rabbitmq.com/tutorials/tutorial-four-python.html http://www.rabbitmq。 com/tutorials/tutorial-five-python.html
for different ways to route messages. I know they are for python and java but its good to understand the principles, decide what you are doing and then find how to do it in JS. Its sounds like you want to do a simple fanout (tutorial 3), which sends the messages to all queues connected to the exchange.
用于路由消息的不同方式。我知道它们是针对 python 和 java 的,但是很好地理解原理,决定你在做什么,然后在 JS 中找到如何做。听起来您想做一个简单的扇出(教程 3),它将消息发送到连接到交换的所有队列。
The difference with what you are doing and what you want to do is basically that you are going to set up and exchange or type fanout. Fanout excahnges send all messages to all connected queues. Each queue will have a consumer that will have access to all the messages separately.
你正在做什么和你想要做什么的区别基本上是你要设置和交换或输入扇出。扇出交换将所有消息发送到所有连接的队列。每个队列都有一个消费者,可以分别访问所有消息。
Yes this is commonly done, it is one of the features of AMPQ.
是的,这很常见,它是 AMPQ 的功能之一。
回答by Peter Ritchie
The send pattern is a one-to-one relationship. If you want to "send" to more than one receiver you should be using the pub/sub pattern. See http://www.rabbitmq.com/tutorials/tutorial-three-python.htmlfor more details.
发送模式是一对一的关系。如果您想“发送”给多个接收者,您应该使用发布/订阅模式。有关更多详细信息,请参阅http://www.rabbitmq.com/tutorials/tutorial-three-python.html。
回答by durai
RabbitMQ / AMQP: single queue, multiple consumers for same message and page refresh.
RabbitMQ / AMQP:单个队列,多个消费者用于相同的消息和页面刷新。
rabbit.on('ready', function () { });
sockjs_chat.on('connection', function (conn) {
conn.on('data', function (message) {
try {
var obj = JSON.parse(message.replace(/\r/g, '').replace(/\n/g, ''));
if (obj.header == "register") {
// Connect to RabbitMQ
try {
conn.exchange = rabbit.exchange(exchange, { type: 'topic',
autoDelete: false,
durable: false,
exclusive: false,
confirm: true
});
conn.q = rabbit.queue('my-queue-'+obj.agentID, {
durable: false,
autoDelete: false,
exclusive: false
}, function () {
conn.channel = 'my-queue-'+obj.agentID;
conn.q.bind(conn.exchange, conn.channel);
conn.q.subscribe(function (message) {
console.log("[MSG] ---> " + JSON.stringify(message));
conn.write(JSON.stringify(message) + "\n");
}).addCallback(function(ok) {
ctag[conn.channel] = ok.consumerTag; });
});
} catch (err) {
console.log("Could not create connection to RabbitMQ. \nStack trace -->" + err.stack);
}
} else if (obj.header == "typing") {
var reply = {
type: 'chatMsg',
msg: utils.escp(obj.msga),
visitorNick: obj.channel,
customField1: '',
time: utils.getDateTime(),
channel: obj.channel
};
conn.exchange.publish('my-queue-'+obj.agentID, reply);
}
} catch (err) {
console.log("ERROR ----> " + err.stack);
}
});
// When the visitor closes or reloads a page we need to unbind from RabbitMQ?
conn.on('close', function () {
try {
// Close the socket
conn.close();
// Close RabbitMQ
conn.q.unsubscribe(ctag[conn.channel]);
} catch (er) {
console.log(":::::::: EXCEPTION SOCKJS (ON-CLOSE) ::::::::>>>>>>> " + er.stack);
}
});
});
回答by Anshuman Banerjee
As I assess your case is:
我评估你的情况是:
I have a queue of messages (your source for receiving messages, lets name it q111)
I have multiple consumers, which I would like to do different things with the same message.
我有一个消息队列(您接收消息的来源,让它命名为 q111)
我有多个消费者,我想用相同的消息做不同的事情。
Your problem here is while 3 messages are received by this queue, message 1 is consumed by a consumer A, other consumers B and C consumes message 2 and 3. Where as you are in need of a setup where rabbitmq passes on the same copies of all these three messages(1,2,3) to all three connected consumers (A,B,C) simultaneously.
你的问题是这个队列收到 3 条消息,消息 1 被消费者 A 消费,其他消费者 B 和 C 消费消息 2 和 3。因为你需要一个设置,rabbitmq 传递相同的副本所有这三个消息(1,2,3)同时发送给所有三个连接的消费者(A,B,C)。
While many configurations can be made to achieve this, a simple way is to use the following two step concept:
虽然可以进行许多配置来实现这一点,但一种简单的方法是使用以下两步概念:
- Use a dynamic rabbitmq-shovel to pickup messages from the desired queue(q111) and publish to a fanout exchange (exchange exclusively created and dedicated for this purpose).
- Now re-configure your consumers A,B & C (who were listening to queue(q111)) to listen from this Fanout exchange directly using a exclusive & anonymous queue for each consumer.
- 使用动态 rabbitmq-shovel 从所需队列(q111)中提取消息并发布到扇出交换(专门为此目的创建和专用的交换)。
- 现在重新配置您的消费者 A、B 和 C(他们正在侦听队列(q111))以直接使用每个消费者的独占和匿名队列从这个扇出交换中侦听。
Note: While using this concept don't consume directly from the source queue(q111), as messages already consumed wont be shovelled to your Fanout exchange.
注意:使用这个概念时不要直接从源队列(q111)消费,因为已经消费的消息不会被铲到你的扇出交换。
If you think this does not satisfies your exact requirement... feel free to post your suggestions :-)
如果您认为这不满足您的确切要求...请随时发布您的建议:-)
回答by Skylos
To get the behavior you want, simply have each consumer consume from its own queue. You'll have to use a non-direct exchange type (topic, header, fanout) in order to get the message to all of the queues at once.
要获得您想要的行为,只需让每个消费者从自己的队列中消费即可。您必须使用非直接交换类型(主题、标题、扇出)才能一次将消息发送到所有队列。
回答by brettjonesdev
If you happen to be using the amqpliblibrary as I am, they have a handy exampleof an implementation of the Publish/Subscribe RabbitMQ tutorialwhich you might find handy.
如果您碰巧像我一样使用amqplib库,他们有一个方便的示例,说明您可能会觉得很方便的发布/订阅 RabbitMQ 教程的实现。
回答by Alejandro Serret
I think you should check sending your messages using the fan-outexchanger. That way you willl receiving the same message for differents consumers, under the table RabbitMQ is creating differents queues for each one of this new consumers/subscribers.
我认为您应该检查使用扇出交换器发送消息。这样你就会为不同的消费者接收相同的消息,在 RabbitMQ 表下为每个新的消费者/订阅者创建不同的队列。
This is the link for see the tutorial example in javascript https://www.rabbitmq.com/tutorials/tutorial-one-javascript.html
这是在 javascript https://www.rabbitmq.com/tutorials/tutorial-one-javascript.html 中查看教程示例的链接

