Python 和 RabbitMQ - 侦听来自多个渠道的消费事件的最佳方式?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/28550140/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 03:25:39  来源:igfitidea点击:

Python and RabbitMQ - Best way to listen to consume events from multiple channels?

pythonrabbitmqpika

提问by blindsnowmobile

I have two, separate RabbitMQ instances. I'm trying to find the best way to listen to events from both.

我有两个单独的 RabbitMQ 实例。我正在尝试找到听取两者事件的最佳方式。

For example, I can consume events on one with the following:

例如,我可以使用以下方式使用事件:

credentials = pika.PlainCredentials(user, pass)
connection = pika.BlockingConnection(pika.ConnectionParameters(host="host1", credentials=credentials))
channel = connection.channel()
result = channel.queue_declare(Exclusive=True)
self.channel.queue_bind(exchange="my-exchange", result.method.queue, routing_key='*.*.*.*.*')
channel.basic_consume(callback_func, result.method.queue, no_ack=True)
self.channel.start_consuming()

I have a second host, "host2", that I'd like to listen to as well. I thought about creating two separate threads to do this, but from what I've read, pika isn't thread safe. Is there a better way? Or would creating two separate threads, each listening to a different Rabbit instance (host1, and host2) be sufficient?

我还有第二位主持人“host2”,我也想听听。我想过创建两个单独的线程来执行此操作,但从我读过的内容来看,pika 不是线程安全的。有没有更好的办法?或者创建两个单独的线程,每个线程侦听不同的 Rabbit 实例(host1 和 host2)就足够了吗?

采纳答案by Unit03

The answer to "what is the best way" depends heavily on your usage pattern of queues and what you mean by "best". Since I can't comment on questions yet, I'll just try to suggest some possible solutions.

“什么是最佳方式”的答案在很大程度上取决于您的队列使用模式以及您所说的“最佳”是什么意思。由于我还不能对问题发表评论,我将尝试提出一些可能的解决方案。

In each example I'm going to assume exchange is already declared.

在每个示例中,我都假设已经声明了 exchange。

Threads

线程

You can consume messages from two queues on separate hosts in single process using pika.

您可以使用pika.

You are right - as its own FAQ states, pikais not thread safe, but it can be used in multi-threaded manner by creating connections to RabbitMQ hosts per thread. Making this example run in threads using threadingmodule looks as follows:

你是对的 - 正如它自己的 FAQ 所说pika它不是线程安全的,但它可以通过为每个线程创建到 RabbitMQ 主机的连接以多线程方式使用。使用threading模块在线程中运行此示例如下所示:

import pika
import threading


class ConsumerThread(threading.Thread):
    def __init__(self, host, *args, **kwargs):
        super(ConsumerThread, self).__init__(*args, **kwargs)

        self._host = host

    # Not necessarily a method.
    def callback_func(self, channel, method, properties, body):
        print("{} received '{}'".format(self.name, body))

    def run(self):
        credentials = pika.PlainCredentials("guest", "guest")

        connection = pika.BlockingConnection(
            pika.ConnectionParameters(host=self._host,
                                      credentials=credentials))

        channel = connection.channel()

        result = channel.queue_declare(exclusive=True)

        channel.queue_bind(result.method.queue,
                           exchange="my-exchange",
                           routing_key="*.*.*.*.*")

        channel.basic_consume(self.callback_func,
                              result.method.queue,
                              no_ack=True)

        channel.start_consuming()


if __name__ == "__main__":
    threads = [ConsumerThread("host1"), ConsumerThread("host2")]
    for thread in threads:
        thread.start()

I've declared callback_funcas a method purely to use ConsumerThread.namewhile printing message body. It might as well be a function outside the ConsumerThreadclass.

我已经声明callback_func为一种纯粹ConsumerThread.name在打印消息正文时使用的方法。它也可能是ConsumerThread类之外的函数。

Processes

流程

Alternatively, you can always just run one process with consumer code per queue you want to consume events.

或者,您始终可以在每个要使用事件的队列中使用使用者代码运行一个进程。

import pika
import sys


def callback_func(channel, method, properties, body):
    print(body)


if __name__ == "__main__":
    credentials = pika.PlainCredentials("guest", "guest")

    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host=sys.argv[1],
                                  credentials=credentials))

    channel = connection.channel()

    result = channel.queue_declare(exclusive=True)

    channel.queue_bind(result.method.queue,
                       exchange="my-exchange",
                       routing_key="*.*.*.*.*")

    channel.basic_consume(callback_func, result.method.queue, no_ack=True)

    channel.start_consuming()

and then run by:

然后运行:

$ python single_consume.py host1
$ python single_consume.py host2  # e.g. on another console

If the work you're doing on messages from queues is CPU-heavyand as long as number of cores in your CPU >= number of consumers, it is generally better to use this approach - unless your queues are empty most of the time and consumers won't utilize this CPU time*.

如果您对来自队列的消息所做的工作占用大量 CPU,并且只要您的 CPU 中的内核数 >= 消费者数,那么使用这种方法通常会更好 - 除非您的队列大部分时间都是空的并且消费者不会使用此 CPU 时间*。

Async

异步

Another alternative is to involve some asynchronous framework (for example Twisted) and running whole thing in single thread.

另一种选择是涉及一些异步框架(例如Twisted)并在单线程中运行整个事物。

You can no longer use BlockingConnectionin asynchronous code; fortunately, pikahas adapter for Twisted:

不能再BlockingConnection在异步代码中使用;幸运的是,pika有适配器Twisted

from pika.adapters.twisted_connection import TwistedProtocolConnection
from pika.connection import ConnectionParameters
from twisted.internet import protocol, reactor, task
from twisted.python import log


class Consumer(object):
    def on_connected(self, connection):
        d = connection.channel()
        d.addCallback(self.got_channel)
        d.addCallback(self.queue_declared)
        d.addCallback(self.queue_bound)
        d.addCallback(self.handle_deliveries)
        d.addErrback(log.err)

    def got_channel(self, channel):
        self.channel = channel

        return self.channel.queue_declare(exclusive=True)

    def queue_declared(self, queue):
        self._queue_name = queue.method.queue

        self.channel.queue_bind(queue=self._queue_name,
                                exchange="my-exchange",
                                routing_key="*.*.*.*.*")

    def queue_bound(self, ignored):
        return self.channel.basic_consume(queue=self._queue_name)

    def handle_deliveries(self, queue_and_consumer_tag):
        queue, consumer_tag = queue_and_consumer_tag
        self.looping_call = task.LoopingCall(self.consume_from_queue, queue)

        return self.looping_call.start(0)

    def consume_from_queue(self, queue):
        d = queue.get()

        return d.addCallback(lambda result: self.handle_payload(*result))

    def handle_payload(self, channel, method, properties, body):
        print(body)


if __name__ == "__main__":
    consumer1 = Consumer()
    consumer2 = Consumer()

    parameters = ConnectionParameters()
    cc = protocol.ClientCreator(reactor,
                                TwistedProtocolConnection,
                                parameters)
    d1 = cc.connectTCP("host1", 5672)
    d1.addCallback(lambda protocol: protocol.ready)
    d1.addCallback(consumer1.on_connected)
    d1.addErrback(log.err)

    d2 = cc.connectTCP("host2", 5672)
    d2.addCallback(lambda protocol: protocol.ready)
    d2.addCallback(consumer2.on_connected)
    d2.addErrback(log.err)

    reactor.run()

This approach would be even better, the more queues you would consume from and the less CPU-bound the work performing by consumers is*.

这种方法会更好,您将使用的队列越多,消费者执行的工作对 CPU 的限制就越少*。

Python 3

蟒蛇 3

Since you've mentioned pika, I've restricted myself to Python 2.x-based solutions, because pikais not yet ported.

既然您已经提到了pika,我就将自己限制在基于 Python 2.x 的解决方案中,因为pika尚未移植。

But in case you would want to move to >=3.3, one possible option is to use asynciowith one of AMQP protocol (the protocol you speak in with RabbitMQ) , e.g. asynqpor aioamqp.

但是,如果您想移动到 >=3.3,一种可能的选择是使用asyncioAMQP 协议之一(您与 RabbitMQ 交谈的协议),例如asynqpaioamqp

* - please note that these are very shallow tips - in most cases choice is not that obvious; what will be the best for you depends on queues "saturation" (messages/time), what work do you do upon receiving these messages, what environment you run your consumers in etc.; there's no way to be sure other than to benchmark all implementations

* - 请注意,这些都是非常浅薄的提示 - 在大多数情况下,选择并不那么明显;什么对你来说是最好的取决于队列的“饱和度”(消息/时间),你在收到这些消息后做了什么工作,你在什么环境中运行你的消费者等等;除了对所有实现进行基准测试之外,没有办法确定

回答by NasimBM

Below is an example of how I use one rabbitmq instance to listen to 2 queues at the same time:

下面是我如何使用一个 rabbitmq 实例同时侦听 2 个队列的示例:

import pika
import threading

threads=[]
def client_info(channel):    
   channel.queue_declare(queue='proxy-python')
   print (' [*] Waiting for client messages. To exit press CTRL+C')


   def callback(ch, method, properties, body):
       print (" Received %s" % (body))

   channel.basic_consume(callback, queue='proxy-python', no_ack=True)
   channel.start_consuming()

def scenario_info(channel):    
   channel.queue_declare(queue='savi-virnet-python')
   print (' [*] Waiting for scenrio messages. To exit press CTRL+C')


   def callback(ch, method, properties, body):
      print (" Received %s" % (body))

   channel.basic_consume(callback, queue='savi-virnet-python', no_ack=True)
   channel.start_consuming()

def manager():
   connection1= pika.BlockingConnection(pika.ConnectionParameters
  (host='localhost'))
   channel1 = connection1.channel()
  connection2= pika.BlockingConnection(pika.ConnectionParameters
  (host='localhost'))
   channel2 = connection2.channel()
   t1 = threading.Thread(target=client_info, args=(channel1,))
   t1.daemon = True
   threads.append(t1)
   t1.start()  

   t2 = threading.Thread(target=scenario_info, args=(channel2,))
   t2.daemon = True
   threads.append(t2)


   t2.start()
   for t in threads:
     t.join()


 manager()