Python 将 multiprocessing.Process 与最大数量的并发进程一起使用

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/20886565/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-18 21:27:21  来源:igfitidea点击:

Using multiprocessing.Process with a maximum number of simultaneous processes

pythonmultithreadingmultiprocessing

提问by Brett

I have the Pythoncode:

我有Python代码:

from multiprocessing import Process

def f(name):
    print 'hello', name

if __name__ == '__main__':
    for i in range(0, MAX_PROCESSES):
        p = Process(target=f, args=(i,))
        p.start()

which runs well. However, MAX_PROCESSESis variable and can be any value between 1and 512. Since I'm only running this code on a machine with 8cores, I need to find out if it is possible to limit the number of processes allowed to run at the same time. I've looked into multiprocessing.Queue, but it doesn't look like what I need - or perhaps I'm interpreting the docs incorrectly.

运行良好。但是,MAX_PROCESSES是可变的,可以是1和之间的任何值512。由于我仅在具有8内核的机器上运行此代码,因此我需要了解是否可以限制允许同时运行的进程数。我已经研究过multiprocessing.Queue,但它看起来不像我需要的 - 或者我可能错误地解释了文档。

Is there a way to limit the number of simultaneous multiprocessing.Processs running?

有没有办法限制同时multiprocessing.Process运行的 s的数量?

采纳答案by treddy

It might be most sensible to use multiprocessing.Poolwhich produces a pool of worker processes based on the max number of cores available on your system, and then basically feeds tasks in as the cores become available.

使用multiprocessing.Pool它根据系统上可用的最大内核数生成一个工作进程池可能是最明智的,然后基本上在内核可用时提供任务。

The example from the standard docs (http://docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers) shows that you can also manually set the number of cores:

标准文档 ( http://docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers) 中的示例表明您还可以手动设置内核数:

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    pool = Pool(processes=4)              # start 4 worker processes
    result = pool.apply_async(f, [10])    # evaluate "f(10)" asynchronously
    print result.get(timeout=1)           # prints "100" unless your computer is *very* slow
    print pool.map(f, range(10))          # prints "[0, 1, 4,..., 81]"

And it's also handy to know that there is the multiprocessing.cpu_count()method to count the number of cores on a given system, if needed in your code.

multiprocessing.cpu_count()如果您的代码需要,知道有一种方法可以计算给定系统上的内核数,这也很方便。

Edit: Here's some draft code that seems to work for your specific case:

编辑:这是一些似乎适用于您的特定情况的草稿代码:

import multiprocessing

def f(name):
    print 'hello', name

if __name__ == '__main__':
    pool = multiprocessing.Pool() #use all available cores, otherwise specify the number you want as an argument
    for i in xrange(0, 512):
        pool.apply_async(f, args=(i,))
    pool.close()
    pool.join()

回答by Baedsch

more generally, this could also look like this:

更一般地说,这也可能是这样的:

import multiprocessing
def chunks(l, n):
    for i in range(0, len(l), n):
        yield l[i:i + n]

numberOfThreads = 4


if __name__ == '__main__':
    jobs = []
    for i, param in enumerate(params):
        p = multiprocessing.Process(target=f, args=(i,param))
        jobs.append(p)
    for i in chunks(jobs,numberOfThreads):
        for j in i:
            j.start()
        for j in i:
            j.join()

Of course, that way is quite cruel (since it waits for every process in a junk until it continues with the next chunk). Still it works well for approx equal run times of the function calls.

当然,这种方式非常残酷(因为它等待垃圾中的每个进程,直到它继续处理下一个块)。它仍然适用于大约相等的函数调用运行时间。

回答by makiko_fly

I think Semaphore is what you are looking for, it will block the main process after counting down to 0. Sample code:

我认为 Semaphore 就是你要找的,它会在倒数到 0 后阻塞主进程。示例代码:

from multiprocessing import Semaphore

def f(name, sema):
    print 'hello', name
    sema.release()

if __name__ == '__main__':
    concurrency = 20
    total_task_num = 1000
    sema = Semaphore(concurrency)
    all_processes = []
    for i in range(total_task_num):
        # once `concurrency` processes are running, 
        # the following code will block main process
        sema.acquire()
        p = Process(target=f, args=(i, sema))
        all_processes.append(p)
        p.start()

    # inside main process, wait for all processes to finish
    for p in all_processes:
        p.join()

Another way, which might make the code look more structured but will consume too much resources if total_task_numis very large, is as follows:

另一种方法可能会使代码看起来更结构化,但如果total_task_num非常大,则会消耗太多资源,如下所示:

from multiprocessing import Semaphore

def f(name, sema):
    sema.acquire()
    print 'hello', name
    sema.release()

if __name__ == '__main__':
    concurrency = 20
    total_task_num = 1000
    sema = Semaphore(concurrency)
    all_processes = []
    for i in range(total_task_num):
        p = Process(target=f, args=(i, sema))
        all_processes.append(p)
        p.start()

    # inside main process, wait for all processes to finish
    for p in all_processes:
        p.join()

The above code will create total_task_numprocesses but only concurrencyprocesses will be running while other processes are blocked.

上面的代码将创建total_task_num进程,但只有concurrency进程会运行,而其他进程会被阻塞。