Python 具有多处理功能的 Celery 并行分布式任务

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/23916413/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 03:40:20  来源:igfitidea点击:

Celery parallel distributed task with multiprocessing

pythondjangomultithreadingmultiprocessingcelery

提问by Prometheus

I have a CPU intensive Celery task. I would like to use all the processing power (cores) across lots of EC2 instances to get this job done faster (a celery parallel distributed task with multiprocessing - I think).

我有一个 CPU 密集型 Celery 任务。我想使用跨多个 EC2 实例的所有处理能力(核心)来更快地完成这项工作(具有多处理功能的 celery 并行分布式任务 -我认为

The terms, threading, multiprocessing, distributed computing, distributed parallel processingare all terms I'm trying to understand better.

术语、线程多处理分布式计算分布式并行处理都是我试图更好地理解的术语。

Example task:

示例任务:

  @app.task
  for item in list_of_millions_of_ids:
      id = item # do some long complicated equation here very CPU heavy!!!!!!! 
      database.objects(newid=id).save()

Using the code above (with an example if possible)how one would ago about distributed this task using Celery by allowing this one task to be split up utilising all the computing CPU power across all available machine in the cloud?

使用上面的代码(如果可能的话,有一个例子)如何通过允许利用云中所有可用机器上的所有计算 CPU 能力来拆分这个任务来使用 Celery 分发这个任务?

采纳答案by dano

Your goals are:

你的目标是:

  1. Distribute your work to many machines (distributed computing/distributed parallel processing)
  2. Distribute the work on a given machine across all CPUs (multiprocessing/threading)
  1. 将您的工作分配到多台机器上(分布式计算/分布式并行处理)
  2. 跨所有 CPU 分配给定机器上的工作(多处理/线程)

Celery can do both of these for you fairly easily. The first thing to understand is that each celery worker is configured by defaultto run as many tasks as there are CPU cores available on a system:

Celery 可以很容易地为您完成这两项工作。首先要了解的是,每个 celery worker默认配置为运行与系统上可用的 CPU 内核一样多的任务:

Concurrency is the number of prefork worker process used to process your tasks concurrently, when all of these are busy doing work new tasks will have to wait for one of the tasks to finish before it can be processed.

The default concurrency number is the number of CPU's on that machine (including cores), you can specify a custom number using -c option. There is no recommended value, as the optimal number depends on a number of factors, but if your tasks are mostly I/O-bound then you can try to increase it, experimentation has shown that adding more than twice the number of CPU's is rarely effective, and likely to degrade performance instead.

并发是用于并发处理您的任务的 prefork 工作进程的数量,当所有这些都忙于工作时,新任务必须等待其中一个任务完成才能被处理。

默认并发数是该机器上的 CPU 数(包括核心数),您可以使用 -c 选项指定自定义数。没有推荐值,因为最佳数量取决于许多因素,但是如果您的任务主要受 I/O 限制,那么您可以尝试增加它,实验表明,添加两倍以上的 CPU 数量很少有效,并且可能会降低性能。

This means each individual task doesn't need to worry about using multiprocessing/threading to make use of multiple CPUs/cores. Instead, celery will run enough tasks concurrently to use each available CPU.

这意味着每个单独的任务不需要担心使用多处理/线程来利用多个 CPU/内核。相反,celery 将同时运行足够多的任务来使用每个可用的 CPU。

With that out of the way, the next step is to create a task that handles processing some subset of your list_of_millions_of_ids. You have a couple of options here - one is to have each task handle a single ID, so you run N tasks, where N == len(list_of_millions_of_ids). This will guarantee that work is evenly distributed amongst all your tasks, since there will never be a case where one worker finishes early and is just waiting around; if it needs work, it can pull an id off the queue. You can do this (as mentioned by John Doe) using the a celery group.

解决这个问题后,下一步是创建一个任务来处理list_of_millions_of_ids. 您在这里有几个选择 - 一个是让每个任务处理一个 ID,因此您运行 N 个任务,其中N == len(list_of_millions_of_ids). 这将保证工作在您的所有任务中平均分配,因为永远不会出现一个工人提前完成并只是等待的情况;如果它需要工作,它可以从队列中取出一个 id。您可以使用 a celery 执行此操作(如 John Doe 所述)group

tasks.py:

任务.py:

@app.task
def process_id(item):
    id = item #long complicated equation here
    database.objects(newid=id).save()

And to execute the tasks:

并执行任务:

from celery import group
from tasks import process_id

jobs = group(process_id.s(item) for item in list_of_millions_of_ids)
result = jobs.apply_async()

Another option is to break the list into smaller pieces, and distribute the pieces to your workers. This approach runs the risk of wasting some cycles, because you may end up with some workers waiting around while others are still doing work. However, the celery documentation notesthat this concern is often unfounded:

另一种选择是将列表分成更小的部分,然后将这些部分分发给您的工作人员。这种方法冒着浪费一些周期的风险,因为您最终可能会导致一些工人等待而其他人仍在工作。但是,celery 文档指出这种担忧通常是没有根据的:

Some may worry that chunking your tasks results in a degradation of parallelism, but this is rarely true for a busy cluster and in practice since you are avoiding the overhead of messaging it may considerably increase performance.

有些人可能会担心分块你的任务会导致并行性下降,但对于繁忙的集群来说这很少是真的,而且在实践中,因为你避免了消息传递的开销,它可能会大大提高性能。

So, you may find that chunking the list and distributing the chunks to each task performs better, because of the reduced messaging overhead. You can probably also lighten the load on the database a bit this way, by calculating each id, storing it in a list, and then adding the whole list into the DB once you're done, rather than doing it one id at a time. The chunking approach would look something like this

因此,您可能会发现对列表进行分块并将块分配给每个任务的效果更好,因为消息传递开销减少了。您也可以通过这种方式减轻数据库的负载,通过计算每个 id,将其存储在一个列表中,然后在完成后将整个列表添加到数据库中,而不是一次执行一个 id . 分块方法看起来像这样

tasks.py:

任务.py:

@app.task
def process_ids(items):
    for item in items:
        id = item #long complicated equation here
        database.objects(newid=id).save() # Still adding one id at a time, but you don't have to.

And to start the tasks:

并开始任务:

from tasks import process_ids

jobs = process_ids.chunks(list_of_millions_of_ids, 30) # break the list into 30 chunks. Experiment with what number works best here.
jobs.apply_async()

You can experiment a bit with what chunking size gives you the best result. You want to find a sweet spot where you're cutting down messaging overhead while also keeping the size small enough that you don't end up with workers finishing their chunk much faster than another worker, and then just waiting around with nothing to do.

您可以尝试一些分块大小给您最好的结果。您想找到一个最佳点,在那里您可以减少消息传递开销,同时保持大小足够小,这样您最终不会让工作人员比其他工作人员更快地完成他们的块,然后就无所事事地等待。

回答by Torsten Engelbrecht

Adding more celery workers will certainly speed up executing the task. You might have another bottleneck though: the database. Make sure it can handle the simultaneous inserts/updates.

添加更多 celery worker 肯定会加快执行任务的速度。不过,您可能还有另一个瓶颈:数据库。确保它可以处理同时插入/更新。

Regarding your question: You are adding celery workers by assigning another process on your EC2 instances as celeryd. Depending on how many workers you need you might want to add even more instances.

关于您的问题:您通过将 EC2 实例上的另一个进程分配为celeryd. 根据您需要多少工人,您可能想要添加更多实例。

回答by LetMeSOThat4U

Why not use groupcelery task for this?

为什么不group为此使用芹菜任务?

http://celery.readthedocs.org/en/latest/userguide/canvas.html#groups

http://celery.readthedocs.org/en/latest/userguide/canvas.html#groups

Basically, you should divide idsinto chunks (or ranges) and give them to a bunch of tasks in group.

基本上,你应该ids分成块(或范围)并将它们交给group.

For smth more sophisticated, like aggregating results of particular celery tasks, I have successfully used chordtask for similar purpose:

对于更复杂的chord事情,比如聚合特定 celery 任务的结果,我已经成功地将task 用于类似目的:

http://celery.readthedocs.org/en/latest/userguide/canvas.html#chords

http://celery.readthedocs.org/en/latest/userguide/canvas.html#chords

Increase settings.CELERYD_CONCURRENCYto a number that is reasonable and you can afford, then those celery workers will keep executing your tasks in a group or a chord until done.

增加settings.CELERYD_CONCURRENCY的数量是合理的,你能负担得起,那么这些工人芹菜将继续执行群组或弦在您的任务,直至完成。

Note: due to a bug in kombuthere were trouble with reusing workers for high number of tasks in the past, I don't know if it's fixed now. Maybe it is, but if not, reduce CELERYD_MAX_TASKS_PER_CHILD.

注意:由于kombu过去在大量任务中重用工人时存在一个错误,我不知道现在是否已修复。也许是,但如果不是,请减少 CELERYD_MAX_TASKS_PER_CHILD。

Example based on simplified and modified code I run:

基于我运行的简化和修改代码的示例:

@app.task
def do_matches():
    match_data = ...
    result = chord(single_batch_processor.s(m) for m in match_data)(summarize.s())

summarizegets results of all single_batch_processortasks. Every task runs on any Celery worker, kombucoordinates that.

summarize获取所有single_batch_processor任务的结果。每个任务都在任何 Celery worker 上运行,并进行kombu协调。

Now I get it: single_batch_processorand summarizeALSO have to be celery tasks, not regular functions - otherwise of course it will not be parallelized (I'm not even sure chord constructor will accept it if it's not a celery task).

现在我明白了:single_batch_processor而且summarize还必须是 celery 任务,而不是常规函数 - 否则当然不会并行化(如果它不是 celery 任务,我什至不确定和弦构造函数会接受它)。

回答by tk.

In the world of distribution there is only one thing you should remember above all :

在分销领域,您首先应该记住的只有一件事:

Premature optimization is the root of all evil. By D. Knuth

过早的优化是万恶之源。通过 D. Knuth

I know it sounds evident but before distributing double check you are using the best algorithm (if it exists...). Having said that, optimizing distribution is a balancing act between 3 things:

我知道这听起来很明显,但在分发双重检查之前,您使用的是最好的算法(如果存在的话......)。话虽如此,优化分配是三件事之间的平衡:

  1. Writing/Reading data from a persistent medium,
  2. Moving data from medium A to medium B,
  3. Processing data,
  1. 从持久性介质写入/读取数据,
  2. 将数据从介质 A 移动到介质 B,
  3. 处理数据,

Computers are made so the closer you get to your processing unit (3) the faster and more efficient (1) and (2) will be. The order in a classic cluster will be : network hard drive, local hard drive, RAM, inside processing unit territory... Nowadays processors are becoming sophisticated enough to be considered as an ensemble of independent hardware processing units commonly called cores, these cores process data (3) through threads (2). Imagine your core is so fast that when you send data with one thread you are using 50% of the computer power, if the core has 2 threads you will then use 100%. Two threads per core is called hyper threading, and your OS will see 2 CPUs per hyper threaded core.

计算机是这样制造的,您离处理单元 (3) 越近,(1) 和 (2) 就越快、越高效。经典集群中的顺序将是:网络硬盘驱动器、本地硬盘驱动器、RAM、内部处理单元区域……如今处理器变得足够复杂,可以被视为通常称为核心的独立硬件处理单元的集合,这些核心处理数据 (3) 通过线程 (2)。想象一下,您的内核如此之快,以至于当您使用一个线程发送数据时,您将使用 50% 的计算机功率,如果内核有 2 个线程,您将使用 100%。每个内核两个线程称为超线程,您的操作系统将看到每个超线程内核有 2 个 CPU。

Managing threads in a processor is commonly called multi-threading. Managing CPUs from the OS is commonly called multi-processing. Managing concurrent tasks in a cluster is commonly called parallel programming. Managing dependent tasks in a cluster is commonly called distributed programming.

在处理器中管理线程通常称为多线程。从操作系统管理 CPU 通常称为多处理。在集群中管理并发任务通常称为并行编程。管理集群中的依赖任务通常称为分布式编程。

So where is your bottleneck ?

那么你的瓶颈在哪里?

  • In (1): Try to persist and stream from the upper level (the one closer to your processing unit, for example if network hard drive is slow first save in local hard drive)
  • In (2): This is the most common one, try to avoid communication packets not needed for the distribution or compress "on the fly" packets (for example if the HD is slow, save only a "batch computed" message and keep the intermediary results in RAM).
  • In (3): You are done! You are using all the processing power at your disposal.
  • 在(1)中:尝试从上层持久化和流式传输(靠近您的处理单元的那个,例如如果网络硬盘驱动器很慢,则首先保存在本地硬盘驱动器中)
  • 在(2)中:这是最常见的一种,尽量避免分发不需要的通信数据包或压缩“动态”数据包(例如,如果 HD 很慢,则只保存“批量计算”消息并保持RAM 中的中间结果)。
  • 在(3)中:你完成了!您正在使用可支配的所有处理能力。

What about Celery ?

芹菜呢?

Celery is a messaging framework for distributed programming, that will use a broker module for communication (2) and a backend module for persistence (1), this means that you will be able by changing the configuration to avoid most bottlenecks (if possible) on your network and only on your network. First profile your code to achieve the best performance in a single computer. Then use celery in your cluster with the default configuration and set CELERY_RESULT_PERSISTENT=True:

Celery 是分布式编程的消息传递框架,它将使用代理模块进行通信 (2) 和后端模块进行持久化 (1),这意味着您将能够通过更改配置来避免大多数瓶颈(如果可能)您的网络,并且仅在您的网络上。首先分析您的代码以在单台计算机上实现最佳性能。然后在集群中使用默认配置的 celery 并设置CELERY_RESULT_PERSISTENT=True

from celery import Celery

app = Celery('tasks', 
             broker='amqp://guest@localhost//',
             backend='redis://localhost')

@app.task
def process_id(all_the_data_parameters_needed_to_process_in_this_computer):
    #code that does stuff
    return result

During execution open your favorite monitoring tools, I use the default for rabbitMQ and flower for celery and top for cpus, your results will be saved in your backend. An example of network bottleneck is tasks queue growing so much that they delay execution, you can proceed to change modules or celery configuration, if not your bottleneck is somewhere else.

在执行过程中打开你最喜欢的监控工具,我使用默认的rabbitMQ和芹菜的花,cpus的top,你的结果将保存在你的后端。网络瓶颈的一个例子是任务队列增长太多以至于它们延迟执行,你可以继续更改模块或芹菜配置,如果不是你的瓶颈在其他地方。