如何在 Python 中进行并行编程?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/20548628/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-18 20:42:26  来源:igfitidea点击:

How to do parallel programming in Python?

pythonparallel-processing

提问by ilovecp3

For C++, we can use OpenMP to do parallel programming; however, OpenMP will not work for Python. What should I do if I want to parallel some parts of my python program?

对于C++,我们可以使用OpenMP进行并行编程;但是,OpenMP 不适用于 Python。如果我想并行 Python 程序的某些部分,我该怎么办?

The structure of the code may be considered as:

代码的结构可以被认为是:

solve1(A)
solve2(B)

Where solve1and solve2are two independent function. How to run this kind of code in parallel instead of in sequence in order to reduce the running time? Hope someone can help me. Thanks very much in advance. The code is:

wheresolve1solve2是两个独立的函数。如何并行而不是顺序运行这种代码以减少运行时间?希望可以有人帮帮我。首先十分感谢。代码是:

def solve(Q, G, n):
    i = 0
    tol = 10 ** -4

    while i < 1000:
        inneropt, partition, x = setinner(Q, G, n)
        outeropt = setouter(Q, G, n)

        if (outeropt - inneropt) / (1 + abs(outeropt) + abs(inneropt)) < tol:
            break

        node1 = partition[0]
        node2 = partition[1]

        G = updateGraph(G, node1, node2)

        if i == 999:
            print "Maximum iteration reaches"
    print inneropt

Where setinner and setouter are two independent functions. That's where I want to parallel...

其中 setinner 和 setouter 是两个独立的函数。这就是我想要并行的地方...

采纳答案by Matt Williamson

You can use the multiprocessingmodule. For this case I might use a processing pool:

您可以使用多处理模块。对于这种情况,我可能会使用处理池:

from multiprocessing import Pool
pool = Pool()
result1 = pool.apply_async(solve1, [A])    # evaluate "solve1(A)" asynchronously
result2 = pool.apply_async(solve2, [B])    # evaluate "solve2(B)" asynchronously
answer1 = result1.get(timeout=10)
answer2 = result2.get(timeout=10)

This will spawn processes that can do generic work for you. Since we did not pass processes, it will spawn one process for each CPU core on your machine. Each CPU core can execute one process simultaneously.

这将产生可以为您完成通用工作的进程。由于我们没有通过processes,它将为您机器上的每个 CPU 内核生成一个进程。每个 CPU 内核可以同时执行一个进程。

If you want to map a list to a single function you would do this:

如果要将列表映射到单个函数,请执行以下操作:

args = [A, B]
results = pool.map(solve1, args)

Don't use threads because the GILlocks any operations on python objects.

不要使用线程,因为GIL 会锁定对 python 对象的任何操作。

回答by bauman.space

CPython uses the Global Interpreter Lock which makes parallel programing a bit more interesting than C++

CPython 使用全局解释器锁,这使得并行编程比 C++ 更有趣

This topic has several useful examples and descriptions of the challenge:

本主题有几个有用的示例和挑战描述:

Python Global Interpreter Lock (GIL) workaround on multi-core systems using taskset on Linux?

在 Linux 上使用任务集的多核系统上的 Python 全局解释器锁 (GIL) 解决方法?

回答by Robert Nishihara

This can be done very elegantly with Ray.

这可以用Ray非常优雅地完成。

To parallelize your example, you'd need to define your functions with the @ray.remotedecorator, and then invoke them with .remote.

要并行化您的示例,您需要使用@ray.remote装饰器定义您的函数,然后使用.remote.

import ray

ray.init()

# Define the functions.

@ray.remote
def solve1(a):
    return 1

@ray.remote
def solve2(b):
    return 2

# Start two tasks in the background.
x_id = solve1.remote(0)
y_id = solve2.remote(1)

# Block until the tasks are done and get the results.
x, y = ray.get([x_id, y_id])

There are a number of advantages of this over the multiprocessingmodule.

多处理模块相比,这有许多优点。

  1. The same code will run on a multicore machine as well as a cluster of machines.
  2. Processes share data efficiently through shared memory and zero-copy serialization.
  3. Error messages are propagated nicely.
  4. These function calls can be composed together, e.g.,

    @ray.remote
    def f(x):
        return x + 1
    
    x_id = f.remote(1)
    y_id = f.remote(x_id)
    z_id = f.remote(y_id)
    ray.get(z_id)  # returns 4
    
  5. In addition to invoking functions remotely, classes can be instantiated remotely as actors.
  1. 相同的代码将在多核机器以及机器集群上运行。
  2. 进程通过共享内存和零拷贝序列化有效地共享数据。
  3. 错误消息很好地传播。
  4. 这些函数调用可以组合在一起,例如,

    @ray.remote
    def f(x):
        return x + 1
    
    x_id = f.remote(1)
    y_id = f.remote(x_id)
    z_id = f.remote(y_id)
    ray.get(z_id)  # returns 4
    
  5. 除了远程调用函数之外,类还可以远程实例化为actor

Note that Rayis a framework I've been helping develop.

请注意,Ray是我一直在帮助开发的框架。

回答by Juan Galvez

The solution, as others have said, is to use multiple processes. Which framework is more appropriate, however, depends on many factors. In addition to the ones already mentioned, there is also charm4pyand mpi4py(I am the developer of charm4py).

正如其他人所说,解决方案是使用多个进程。然而,哪种框架更合适取决于许多因素。除了已经提到的,还有charm4pympi4py(我是charm4py的开发者)。

There is a more efficient way to implement the above example than using the worker pool abstraction. The main loop sends the same parameters (including the complete graph G) over and over to workers in each of the 1000 iterations. Since at least one worker will reside on a different process, this involves copying and sending the arguments to the other process(es). This could be very costly depending on the size of the objects. Instead, it makes sense to have workers store state and simply send the updated information.

有一种比使用工作池抽象更有效的方法来实现上述示例。G在 1000 次迭代中,主循环一遍又一遍地向工作人员发送相同的参数(包括完整图)。由于至少有一个工人将驻留在不同的进程中,因此这涉及复制参数并将其发送到其他进程。根据对象的大小,这可能会非常昂贵。相反,让工作人员存储状态并简单地发送更新的信息是有意义的。

For example, in charm4py this can be done like this:

例如,在charm4py 中,这可以像这样完成:

class Worker(Chare):

    def __init__(self, Q, G, n):
        self.G = G
        ...

    def setinner(self, node1, node2):
        self.updateGraph(node1, node2)
        ...


def solve(Q, G, n):
    # create 2 workers, each on a different process, passing the initial state
    worker_a = Chare(Worker, onPE=0, args=[Q, G, n])
    worker_b = Chare(Worker, onPE=1, args=[Q, G, n])
    while i < 1000:
        result_a = worker_a.setinner(node1, node2, ret=True)  # execute setinner on worker A
        result_b = worker_b.setouter(node1, node2, ret=True)  # execute setouter on worker B

        inneropt, partition, x = result_a.get()  # wait for result from worker A
        outeropt = result_b.get()  # wait for result from worker B
        ...

Note that for this example we really only need one worker. The main loop could execute one of the functions, and have the worker execute the other. But my code helps to illustrate a couple of things:

请注意,对于此示例,我们实际上只需要一名工人。主循环可以执行其中一个函数,并让 worker 执行另一个。但我的代码有助于说明以下几点:

  1. Worker A runs in process 0 (same as the main loop). While result_a.get()is blocked waiting on the result, worker A does the computation in the same process.
  2. Arguments are automatically passed by reference to worker A, since it is in the same process (there is no copying involved).
  1. 工人 A 在进程 0 中运行(与主循环相同)。当result_a.get()被阻塞等待结果时,工人 A 在同一个进程中进行计算。
  2. 参数通过引用自动传递给工人 A,因为它在同一个进程中(不涉及复制)。

回答by Anderson Green

In some cases, it's possible to automatically parallelize loops using Numba, though it only works with a small subset of Python:

在某些情况下,可以使用Numba自动并行化循环,尽管它只适用于 Python 的一小部分:

from numba import njit, prange

@njit(parallel=True)
def prange_test(A):
    s = 0
    # Without "parallel=True" in the jit-decorator
    # the prange statement is equivalent to range
    for i in prange(A.shape[0]):
        s += A[i]
    return s

Unfortunately, it seems that Numba only works with Numpy arrays, but not with other Python objects. In theory, it might also be possible to compile Python to C++and then automatically parallelize it using the Intel C++ compiler, though I haven't tried this yet.

不幸的是,Numba 似乎只适用于 Numpy 数组,而不适用于其他 Python 对象。理论上,也可以将Python 编译为 C++,然后使用英特尔 C++ 编译器自动将其并行化,尽管我还没有尝试过。

回答by vahab najari

You can use jobliblibrary to do parallel computation and multiprocessing.

您可以使用joblib库进行并行计算和多处理。

from joblib import Parallel, delayed

You can simply create a function foowhich you want to be run in parallel and based on the following piece of code implement parallel processing:

您可以简单地创建一个foo要并行运行的函数,并基于以下代码实现并行处理:

output = Parallel(n_jobs=num_cores)(delayed(foo)(i) for i in input)

Where num_corescan be obtained from multiprocessinglibrary as followed:

在哪里num_cores可以从multiprocessing图书馆获得如下:

import multiprocessing

num_cores = multiprocessing.cpu_count()

If you have a function with more than one input argument, and you just want to iterate over one of the arguments by a list, you can use the the partialfunction from functoolslibrary as follow:

如果您有一个具有多个输入参数的函数,并且您只想通过列表迭代其中一个参数,则可以使用库中的partial函数functools,如下所示:

from joblib import Parallel, delayed
import multiprocessing
from functools import partial
def foo(arg1, arg2, arg3, arg4):
    '''
    body of the function
    '''
    return output
input = [11,32,44,55,23,0,100,...] # arbitrary list
num_cores = multiprocessing.cpu_count()
foo_ = partial(foo, arg2=arg2, arg3=arg3, arg4=arg4)
# arg1 is being fetched from input list
output = Parallel(n_jobs=num_cores)(delayed(foo_)(i) for i in input)

You can find a complete explanation of the python and R multiprocessing with couple of examples here.

您可以在此处通过几个示例找到对 Python 和 R 多处理的完整说明。