Java RabbitMQ 示例:多线程、通道和队列
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/18531072/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
RabbitMQ by Example: Multiple Threads, Channels and Queues
提问by
I just read RabbitMQ's Java API docs, and found it very informative and straight-forward. The example for how to set up a simple Channel
for publishing/consuming is very easy to follow and understand. But it's a very simple/basic example, and it left me with an important question: How can I set up 1+ Channels
to publish/consume to and from multiple queues?
我刚刚阅读了RabbitMQ 的 Java API 文档,发现它非常有用且直接。如何设置简单Channel
的发布/消费示例非常容易理解和理解。但这是一个非常简单/基本的例子,它给我留下了一个重要的问题:如何设置 1+Channels
以在多个队列之间发布/消费?
Let's say I have a RabbitMQ server with 3 queues on it: logging
, security_events
and customer_orders
. So we'd either need a single Channel
to have the ability to publish/consume to all 3 queues, or more likely, have 3 separate Channels
, each dedicated to a single queue.
假设我有一个 RabbitMQ 服务器,上面有 3 个队列:logging
,security_events
和customer_orders
. 所以我们要么需要一个Channel
能够发布/消费到所有 3 个队列的能力,或者更有可能有 3 个单独的Channels
,每个专用于一个队列。
On top of this, RabbitMQ's best practices dictate that we set up 1 Channel
per consumer thread. For this example, let's say security_events
is fine with only 1 consumer thread, but logging
and customer_order
both need 5 threads to handle the volume. So, if I understand correctly, does that mean we need:
最重要的是,RabbitMQ 的最佳实践要求我们Channel
为每个消费者线程设置 1个。对于这个例子,让我们说security_events
是罚款,只有1消费者线程,但logging
并customer_order
都需要5个线程来处理卷。所以,如果我理解正确,这是否意味着我们需要:
- 1
Channel
and 1 consumer thread for publishing/consuming to and fromsecurity_events
; and - 5
Channels
and 5 consumer threads for publishing/consuming to and fromlogging
; and - 5
Channels
and 5 consumer threads for publishing/consuming to and fromcustomer_orders
?
- 1
Channel
和 1 消费者线程,用于发布/消费到和来自security_events
;和 - 5 个
Channels
和 5 个消费者线程,用于发布/消费往返logging
;和 - 5 个
Channels
和 5 个消费者线程,用于发布/消费来往customer_orders
?
If my understanding is misguided here, please begin by correcting me. Either way, could some battle-weary RabbitMQ veteran help me "connect the dots" with a decent code example for setting up publishers/consumers that meet my requirements here?Thanks in advance!
如果我的理解在这里被误导,请首先纠正我。无论哪种方式,一些厌倦了战斗的 RabbitMQ 老手是否可以帮助我用一个体面的代码示例“连接点”来设置满足我的要求的发布者/消费者?提前致谢!
回答by Dileep
How can I set up 1+ Channels to publish/consume to and from multiple queues?
如何设置 1+ 个频道以在多个队列之间发布/消费?
You can implement using threads and channels. All you need is a way to categorize things, ie all the queue items from the login, all the queue elements from security_events etc. The catagorization can be achived using a routingKey.
ie: Every time when you add an item to the queue u specify the routing key. It will be appended as a property element. By this you can get the values from a particular event say logging.
您可以使用线程和通道来实现。您所需要的只是一种对事物进行分类的方法,即来自登录的所有队列项、来自 security_events 的所有队列元素等。可以使用 routingKey 来实现分类。
即:每次将项目添加到队列时,您都指定路由键。它将作为属性元素附加。通过这个,你可以从一个特定的事件中获取值,比如logging。
The following Code sample explain how you make it done in client side.
下面的代码示例解释了如何在客户端完成它。
Eg:
例如:
The routing key is used identify the type of the channel and retrive the types.
路由键用于识别通道的类型并检索类型。
For example if you need to get all the channels about the type Login then you must specify the routing key as login or some other keyword to identify that.
例如,如果您需要获取有关类型 Login 的所有频道,那么您必须将路由键指定为 login 或其他一些关键字来标识它。
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
string routingKey="login";
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
You can Look herefor more details about the Categorization ..
您可以在此处查看有关分类的更多详细信息..
Threads Part
螺纹零件
Once the publishing part is over you can run the thread part..
发布部分结束后,您可以运行线程部分..
In this part you can get the Published data on the basis of category. ie; routing Key which in your case is logging, security_events and customer_orders etc.
在这部分中,您可以根据类别获取已发布的数据。IE; 路由密钥,在您的情况下是日志记录、security_events 和 customer_orders 等。
look in the Example to know how retrieve the data in threads.
查看示例以了解如何在线程中检索数据。
Eg :
例如:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//**The threads part is as follows**
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = channel.queueDeclare().getQueue();
// This part will biend the queue with the severity (login for eg:)
for(String severity : argv){
channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
}
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "myConsumerTag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
String routingKey = envelope.getRoutingKey();
String contentType = properties.contentType;
long deliveryTag = envelope.getDeliveryTag();
// (process the message components here ...)
channel.basicAck(deliveryTag, false);
}
});
Now a thread that process the Data in the Queue of the type login(routing key) is created. By this way you can create multiple threads. Each serving different purpose.
现在创建了一个线程来处理类型为 login(routing key) 的队列中的数据。通过这种方式,您可以创建多个线程。每个服务于不同的目的。
look herefor more details about the threads part..
看这里有关螺纹部分的详细信息..
回答by Renat Gilmanov
I think you have several issues with initial understanding. Frankly, I'm a bit surprised to see the following: both need 5 threads to handle the volume
. How did you identify you need that exact number? Do you have any guarantees 5 threads will be enough?
我认为您对初步理解有几个问题。坦率地说,看到以下内容我有点惊讶:both need 5 threads to handle the volume
. 你是如何确定你需要那个确切的数字的?你有任何保证 5 个线程就足够了吗?
RabbitMQ is tuned and time tested, so it is all about proper design and efficient message processing.
RabbitMQ 已经过调优和时间测试,因此一切都与正确的设计和高效的消息处理有关。
Let's try to review the problem and find a proper solution. BTW, message queue itself will not provide any guarantees you have really good solution. You have to understand what you are doing and also do some additional testing.
让我们尝试查看问题并找到合适的解决方案。顺便说一句,消息队列本身不会提供任何保证您有真正好的解决方案。您必须了解自己在做什么,并进行一些额外的测试。
As you definitely know there are many layouts possible:
您肯定知道有许多布局可能:
I will use layout B
as the simplest way to illustrate 1
producer N
consumers problem. Since you are so worried about the throughput. BTW, as you might expect RabbitMQ behaves quite well (source). Pay attention to prefetchCount
, I'll address it later:
我将使用布局B
作为最简单的方式来说明1
生产者N
消费者问题。由于您非常担心吞吐量。顺便说一句,正如您可能期望的那样,RabbitMQ 表现得很好(来源)。注意prefetchCount
,我后面会讲到:
So it is likely message processing logic is a right place to make sure you'll have enough throughput. Naturally you can span a new thread every time you need to process a message, but eventually such approach will kill your system. Basically, more threads you have bigger latency you'll get (you can check Amdahl's lawif you want).
因此,消息处理逻辑很可能是确保您拥有足够吞吐量的正确位置。当然,每次需要处理消息时,您都可以跨一个新线程,但最终这种方法会杀死您的系统。基本上,您将获得更多线程,从而获得更大的延迟(如果需要,您可以查看Amdahl 定律)。
(see Amdahl's law illustrated)
(见阿姆达尔定律图示)
Tip #1: Be careful with threads, use ThreadPools (details)
提示 #1:小心线程,使用 ThreadPools(详细信息)
A thread pool can be described as a collection of Runnable objects (work queue) and a connections of running threads. These threads are constantly running and are checking the work query for new work. If there is new work to be done they execute this Runnable. The Thread class itself provides a method, e.g. execute(Runnable r) to add a new Runnable object to the work queue.
线程池可以描述为 Runnable 对象(工作队列)的集合和运行线程的连接。这些线程不断运行并检查新工作的工作查询。如果有新的工作要做,他们会执行这个 Runnable。Thread 类本身提供了一个方法,例如 execute(Runnable r) 将一个新的 Runnable 对象添加到工作队列中。
public class Main {
private static final int NTHREDS = 10;
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(NTHREDS);
for (int i = 0; i < 500; i++) {
Runnable worker = new MyRunnable(10000000L + i);
executor.execute(worker);
}
// This will make the executor accept no new threads
// and finish all existing threads in the queue
executor.shutdown();
// Wait until all threads are finish
executor.awaitTermination();
System.out.println("Finished all threads");
}
}
Tip #2: Be careful with message processing overhead
提示#2:小心消息处理开销
I would say this is obvious optimization technique. It is likely you'll send small and easy to process messages. The whole approach is about smaller messages to be continuously set and processed. Big messages eventually will play a bad joke, so it is better to avoid that.
我会说这是明显的优化技术。您很可能会发送小而易于处理的消息。整个方法是关于连续设置和处理较小的消息。大消息最终会开玩笑,所以最好避免这种情况。
So it is better to send tiny pieces of information, but what about processing? There is an overhead every time you submit a job. Batch processing can be very helpful in case of high incoming message rate.
所以最好发送一小段信息,但是处理呢?每次提交作业都会产生开销。在高传入消息率的情况下,批处理非常有用。
For example, let's say we have simple message processing logic and we do not want to have thread specific overheads every time message is being processed. In order to optimize that very simple CompositeRunnable can be introduced
:
例如,假设我们有简单的消息处理逻辑,并且我们不希望每次处理消息时都有线程特定的开销。为了优化这个非常简单CompositeRunnable can be introduced
:
class CompositeRunnable implements Runnable {
protected Queue<Runnable> queue = new LinkedList<>();
public void add(Runnable a) {
queue.add(a);
}
@Override
public void run() {
for(Runnable r: queue) {
r.run();
}
}
}
Or do the same in a slightly different way, by collecting messages to be processed:
或者以稍微不同的方式做同样的事情,通过收集要处理的消息:
class CompositeMessageWorker<T> implements Runnable {
protected Queue<T> queue = new LinkedList<>();
public void add(T message) {
queue.add(message);
}
@Override
public void run() {
for(T message: queue) {
// process a message
}
}
}
In such a way you can process messages more effectively.
通过这种方式,您可以更有效地处理消息。
Tip #3: Optimize message processing
技巧 #3:优化消息处理
Despite the fact you know can process messages in parallel (Tip #1
) and reduce processing overhead (Tip #2
) you have to do everything fast. Redundant processing steps, heavy loops and so on might affect performance a lot. Please see interesting case-study:
尽管您知道可以并行处理消息 ( Tip #1
) 并减少处理开销 ( Tip #2
),但您必须快速完成所有工作。冗余的处理步骤、繁重的循环等可能会极大地影响性能。请参阅有趣的案例研究:
Improving Message Queue Throughput tenfold by choosing the right XML Parser
Tip #4: Connection and Channel Management
技巧 #4:连接和通道管理
- Starting a new channel on an existing connection involves one network round trip - starting a new connection takes several.
- Each connection uses a file descriptor on the server. Channels don't.
- Publishing a large message on one channel will block a connection while it goes out. Other than that, the multiplexing is fairly transparent.
- Connections which are publishing can get blocked if the server is overloaded - it's a good idea to separate publishing and consuming connections
- Be prepared to handle message bursts
- 在现有连接上启动新通道涉及一次网络往返 - 启动新连接需要多次。
- 每个连接都使用服务器上的文件描述符。频道没有。
- 在一个通道上发布一条大消息会在连接出去时阻塞连接。除此之外,多路复用是相当透明的。
- 如果服务器过载,正在发布的连接可能会被阻止 - 将发布和使用连接分开是个好主意
- 准备好处理消息突发
(source)
(来源)
Please note, all tips are perfectly work together. Feel free to let me know if you need additional details.
请注意,所有提示都可以完美地协同工作。如果您需要其他详细信息,请随时告诉我。
Complete consumer example (source)
完整的消费者示例(来源)
Please note the following:
请注意以下事项:
- channel.basicQos(prefetch)- As you saw earlier
prefetchCount
might be very useful:This command allows a consumer to choose a prefetch window that specifies the amount of unacknowledged messages it is prepared to receive. By setting the prefetch count to a non-zero value, the broker will not deliver any messages to the consumer that would breach that limit. To move the window forwards, the consumer has to acknowledge the receipt of a message (or a group of messages).
- ExecutorService threadExecutor- you can specify properly configured executor service.
- channel.basicQos(prefetch)- 正如你之前看到的
prefetchCount
可能非常有用:该命令允许消费者选择一个预取窗口,指定它准备接收的未确认消息的数量。通过将预取计数设置为非零值,代理将不会向消费者传递任何会违反该限制的消息。要向前移动窗口,消费者必须确认收到一条消息(或一组消息)。
- ExecutorService threadExecutor- 您可以指定正确配置的执行程序服务。
Example:
例子:
static class Worker extends DefaultConsumer {
String name;
Channel channel;
String queue;
int processed;
ExecutorService executorService;
public Worker(int prefetch, ExecutorService threadExecutor,
, Channel c, String q) throws Exception {
super(c);
channel = c;
queue = q;
channel.basicQos(prefetch);
channel.basicConsume(queue, false, this);
executorService = threadExecutor;
}
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
Runnable task = new VariableLengthTask(this,
envelope.getDeliveryTag(),
channel);
executorService.submit(task);
}
}
You can also check the following:
您还可以检查以下内容: