java RabbitMQ:快速生产者和慢速消费者
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/26617649/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
RabbitMQ: fast producer and slow consumer
提问by tonga
I have an application that uses RabbitMQ as the message queue to send/receive message between two components: sender and receiver. The sender sends message in a very fast way. The receiver receives the message and then does some very time-consuming task (mainly database writing for very large data size). Since the receiver takes a very long time to finish the task and then retrieve the next message in the queue, the sender will keep filling up the queue quickly. So my question is: Will this cause the message queue to overflow?
我有一个应用程序,它使用 RabbitMQ 作为消息队列在两个组件之间发送/接收消息:发送方和接收方。发件人以非常快的方式发送消息。接收者收到消息,然后做一些非常耗时的任务(主要是数据库写入非常大的数据)。由于接收方需要很长时间才能完成任务,然后检索队列中的下一条消息,因此发送方将不断快速填满队列。所以我的问题是:这会导致消息队列溢出吗?
The message consumer looks like the following:
消息消费者如下所示:
public void onMessage() throws IOException, InterruptedException {
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare("allDataCase", true, false, false, null).getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
JSONObject json = new JSONObject(message);
String caseID = json.getString("caseID");
//following takes very long time
dao.saveToDB(caseID);
}
}
Each message received by the consumer contains a caseID. For each caseID, it will save large amount of data to the database, which takes very long time. Currently only one consumer is set up for the RabbitMQ since producer/consumer use the same queue for the publish/subscribe of caseID. So how can I speed up the consumer throughput so that the consumer can catch up with the producer and avoid the message overflow in the queue? Should I use multithreading in the consumer part to speed up the consumption rate? Or should I use multiple consumers to consume the incoming message simutaneously? Or is there any asynchronous way to let the consumer consume the message asynchronously without waiting it to finish? Any suggestions are welcome.
消费者收到的每条消息都包含一个 caseID。对于每一个caseID,都会向数据库中保存大量的数据,这需要很长的时间。目前只为 RabbitMQ 设置了一个消费者,因为生产者/消费者使用相同的队列来发布/订阅 caseID。那么如何加快消费者的吞吐量,让消费者赶上生产者,避免队列中的消息溢出呢?我应该在消费者部分使用多线程来加快消费速度吗?或者我应该使用多个消费者同时消费传入的消息?或者是否有任何异步方式让消费者异步消费消息而无需等待消息完成?欢迎任何建议。
回答by Paul Mooney
"Will this cause the message queue to overflow?"
“这会导致消息队列溢出吗?”
Yes. RabbitMQ will enter a state of "flow control" to prevent excessive memory consumption as the queue length increases. It will also start persisting messages to disk, rather than hold them in memory.
是的。RabbitMQ 会进入“流控”状态,防止随着队列长度的增加而过度消耗内存。它还将开始将消息持久化到磁盘,而不是将它们保存在内存中。
"So how can I speed up the consumer throughput so that the consumer can catch up with the producer and avoid the message overflow in the queue"
“那么如何加快消费者吞吐量,让消费者赶上生产者,避免队列中的消息溢出”
You have 2 options:
您有 2 个选择:
- Add more consumers. Bear in mind that your DB will now be manipulated by multiple concurrent processes if you choose this option. Ensure that the DB can withstand the extra pressure.
- Increase the QOSvalue of the consuming channel. This will pull more messages from the queue and buffer them on the consumer. This will increase the overall processing time; if 5 messages are buffered, the 5th message will take the processing time of messages 1...5 to complete.
- 添加更多消费者。请记住,如果您选择此选项,您的数据库现在将由多个并发进程操作。确保 DB 能够承受额外的压力。
- 提高消费通道的QOS值。这将从队列中提取更多消息并将它们缓存在消费者上。这将增加整体处理时间;如果缓存了 5 条消息,则第 5 条消息将需要消息 1...5 的处理时间才能完成。
"Should I use multithreading in the consumer part to speed up the consumption rate?"
“我应该在消费者部分使用多线程来加快消费速度吗?”
Not unless you have a well-designed solution. Adding parallelism to an application is going to add a lot of overhead on the consumer-side. You may end up exhausting the ThreadPool or throttling memory-usage.
除非你有一个精心设计的解决方案。向应用程序添加并行性将在消费者端增加大量开销。您最终可能会耗尽 ThreadPool 或限制内存使用。
When dealing with AMQP, you really need to consider the business requirement for each process in order to design the optimal solution. How time-sensitive are your incoming messages? Do they need to be persisted to DB ASAP, or does it matter to your users whether or not that data is available immediately?
在处理 AMQP 时,您确实需要考虑每个流程的业务需求,以便设计最佳解决方案。您收到的消息对时间有多敏感?它们是否需要尽快保存到 DB 中,或者这些数据是否立即可用对您的用户来说是否重要?
If the data does not need to be persisted immediately, you could modify your application so that the consumer(s) simply remove messages from the queue and save them to a cached collection, in Redis, for example. Introduce a second process which then reads and processes the cached messages sequentially. This will ensure that your queue-length does not grow sufficiently to result in flow-control, while preventing your DB from being bombarded with write requests, which are typically more expensive than read requests. Your consumer(s) now simply remove messages from the queue, to be dealt with by another process later.
如果不需要立即保留数据,您可以修改您的应用程序,以便消费者只需从队列中删除消息并将它们保存到缓存集合中,例如在 Redis 中。引入第二个进程,然后依次读取和处理缓存的消息。这将确保您的队列长度不会增长到足以导致流量控制,同时防止您的数据库受到写入请求的轰炸,写入请求通常比读取请求更昂贵。您的消费者现在只需从队列中删除消息,稍后由另一个进程处理。
回答by Adam Gent
While it is true adding more consumers may speed things up the real issue will be saving to the database.
虽然添加更多消费者确实可以加快速度,但真正的问题将是保存到数据库。
There are already many answers here that talk about adding consumers (threads, and or machines) and changing the QoS so I'm not going to reiterate that. Instead you should seriously consider using the Aggregatorpattern to aggregate the messages into a group of messages and then batch insert the group into your database in one shot.
这里已经有很多关于添加消费者(线程和/或机器)和更改 QoS 的答案,所以我不打算重申这一点。相反,您应该认真考虑使用聚合器模式将消息聚合为一组消息,然后一次性将组批量插入到您的数据库中。
Your current code for each message probably opens up a connection, inserts the data, and the closes that connection (or return to the pool). Worse it may even be using transactions.
您当前为每条消息编写的代码可能会打开一个连接,插入数据,然后关闭该连接(或返回到池中)。更糟糕的是,它甚至可能使用事务。
By using the aggregator pattern your essentially buffering the data before you flush.
通过使用聚合器模式,您基本上可以在刷新之前缓冲数据。
Now writing a good aggregator is tricky. You will need to decide how you want to buffer (ie each worker has its own buffer or a central buffer like Redis). Spring integration has an aggregator I believe.
现在编写一个好的聚合器很棘手。您需要决定如何缓冲(即每个工作人员都有自己的缓冲区或像 Redis 这样的中央缓冲区)。我相信 Spring 集成有一个聚合器。
回答by Gabriele
You have lot of ways to increase your performance.
你有很多方法可以提高你的表现。
You can create a worker queue with more producers, in this way you create an simple load-balance system. don't use exchange---> queue but only queue. Read this post RabbitMQ Non-Round Robin Dispatching
When you get a message you can create a poolthread for insert the data on your database, but in this case you have to manage the failure.
您可以创建一个具有更多生产者的工作队列,这样您就可以创建一个简单的负载平衡系统。不要使用交换---> 队列,但只使用队列。阅读这篇文章RabbitMQ Non-Round Robin Dispatching
当您收到一条消息时,您可以创建一个池线程来在您的数据库中插入数据,但在这种情况下,您必须管理故障。
But I think the principal problem is the database and not RabbitMQ. With a good tuning, multi-threading and worker queue you can have a scalable and fast solution.
但我认为主要问题是数据库而不是 RabbitMQ。通过良好的调优、多线程和工作队列,您可以获得可扩展且快速的解决方案。
Let me know
让我知道
回答by voutrin
"So how can I speed up the consumer throughput so that the consumer can catch up with the producer and avoid the message overflow in the queue?" This is the answer "use multiple consumers to consume the incoming message simultaneously", use multi-threading to run in parallel these consumers implementing principle shared nothing, http://www.eaipatterns.com/CompetingConsumers.html
“那么如何才能加快消费者的吞吐量,让消费者赶上生产者,避免消息在队列中溢出呢?” 这是答案“使用多个消费者同时消费传入的消息”,使用多线程并行运行这些消费者实现无共享原则,http://www.eaipatterns.com/CompetingConsumers.html
回答by mbera
As answer I suggest: both.
作为答案,我建议:两者兼而有之。
You can take advantage from having multiple receiver, as well as setting up each receiver to execute the task in a separate Thread, thus permitting to the receiver to accept the next message in queue.
您可以利用多个接收器,以及设置每个接收器在单独的线程中执行任务,从而允许接收器接受队列中的下一条消息。
Of course this approach assumes that the result of each operation (the writing on the db, if I understood correctly) does not influence in any way the result of the subsequent operations in response from other messages.
当然,这种方法假设每个操作的结果(如果我理解正确的话,写在 db 上)不会以任何方式影响响应其他消息的后续操作的结果。