Python 多处理池在加入时挂起?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/15314189/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-18 19:46:05  来源:igfitidea点击:

Python multiprocessing pool hangs at join?

pythonmultiprocessing

提问by clemej

I'm trying to run some python code on several files in parallel. The construct is basically:

我正在尝试在多个文件上并行运行一些 python 代码。构造基本上是:

def process_file(filename, foo, bar, baz=biz):
    # do stuff that may fail and cause exception

if __name__ == '__main__':
    # setup code setting parameters foo, bar, and biz

    psize = multiprocessing.cpu_count()*2
    pool = multiprocessing.Pool(processes=psize)

    map(lambda x: pool.apply_async(process_file, (x, foo, bar), dict(baz=biz)), sys.argv[1:])
    pool.close()
    pool.join()

I've previously used pool.map to do something similar and it worked great, but I can't seem to use that here because pool.map doesn't (appear to) allow me to pass in extra arguments (and using lambda to do it won't work because lambda can't be marshalled).

我以前使用过 pool.map 来做类似的事情并且效果很好,但我似乎无法在这里使用它,因为 pool.map 不允许(似乎)允许我传入额外的参数(并使用 lambda 到这样做是行不通的,因为 lambda 不能被编组)。

So now I'm trying to get things to work using apply_async() directly. My issue is that the code seems to hang and never exit. A few of the files fail with an exception, but i don't see why what would cause join to fail/hang? Interestingly if none of the files fail with an exception, it does exit cleanly.

所以现在我试图直接使用 apply_async() 来让事情工作。我的问题是代码似乎挂起并且永远不会退出。一些文件因异常而失败,但我不明白为什么会导致连接失败/挂起?有趣的是,如果没有任何文件因异常而失败,它确实会干净地退出。

What am I missing?

我错过了什么?

Edit: When the function (and thus a worker) fails, I see this exception:

编辑:当函数(以及一个工人)失败时,我看到这个异常:

Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 376, in _handle_results
    task = get()
TypeError: ('__init__() takes at least 3 arguments (1 given)', <class 'subprocess.CalledProcessError'>, ())

If i see even one of these, the process parent process hangs forever, never reaping the children and exiting.

如果我看到其中之一,进程父进程将永远挂起,永远不会收割子进程并退出。

采纳答案by clemej

Sorry to answer my own question, but I've found at least a workaround so in case anyone else has a similar issue I want to post it here. I'll accept any better answers out there.

很抱歉回答我自己的问题,但我至少找到了一个解决方法,所以如果其他人有类似的问题,我想在这里发布。我会接受任何更好的答案。

I believe the root of the issue is http://bugs.python.org/issue9400. This tells me two things:

我相信问题的根源是http://bugs.python.org/issue9400。这告诉我两件事:

  • I'm not crazy, what I'm trying to do really is supposed to work
  • At least in python2, it is very difficult if not impossible to pickle 'exceptions' back to the parent process. Simple ones work, but many others don't.
  • 我没有疯,我想做的事真的应该奏效
  • 至少在 python2 中,如果不是不可能的话,将“异常”pickle 回父进程是非常困难的。简单的工作,但许多其他人没有。

In my case, my worker function was launching a subprocess that was segfaulting. This returned CalledProcessError exception, which is not pickleable. For some reason, this makes the pool object in the parent go out to lunch and not return from the call to join().

就我而言,我的工作函数正在启动一个存在段错误的子进程。这返回了 CalledProcessError 异常,该异常不可pickle。出于某种原因,这使得父对象中的池对象出去吃午饭,而不是从 join() 调用中返回。

In my particular case, I don't care what the exception was. At most I want to log it and keep going. To do this, I simply wrap my top worker function in a try/except clause. If the worker throws any exception, it is caught before trying to return to the parent process, logged, and then the worker process exits normally since it's no longer trying to send the exception through. See below:

在我的特殊情况下,我不在乎例外是什么。最多我想记录它并继续前进。为此,我只需将我的顶级工作函数包装在 try/except 子句中。如果工作进程抛出任何异常,它会在尝试返回父进程之前被捕获、记录,然后工作进程正常退出,因为它不再尝试发送异常。见下文:

def process_file_wrapped(filenamen, foo, bar, baz=biz):
    try:
        process_file(filename, foo, bar, baz=biz)
    except:
        print('%s: %s' % (filename, traceback.format_exc()))

Then, I have my initial map function call process_file_wrapped() instead of the original one. Now my code works as intended.

然后,我有我的初始地图函数调用 process_file_wrapped() 而不是原始的。现在我的代码按预期工作。

回答by nneonneo

You can actually use a functools.partialinstance instead of a lambdain cases where the object needs to be pickled. partialobjects are pickleable since Python 2.7 (and in Python 3).

在需要腌制对象的情况下,您实际上可以使用functools.partial实例而不是 a lambdapartial自 Python 2.7(和 Python 3)以来,对象是可腌制的。

pool.map(functools.partial(process_file, x, foo, bar, baz=biz), sys.argv[1:])

回答by Reut Sharabani

For what it's worth, I had a similar bug (not the same) when pool.maphung. Myuse case allowed me to use pool.terminateto solve it (make sure yours does as well before changing stuff).

值得一提的是,我在pool.map挂起时遇到了类似的错误(不一样)。我的用例允许我使用pool.terminate来解决它(在更改内容之前确保你的也这样做)。

I used pool.mapbefore calling terminateso I know everything finished, from the docs:

我在调用之前使用了pool.mapterminate所以我知道一切都完成了,来自文档

A parallel equivalent of the map() built-in function (it supports only one iterable argument though). It blocks until the result is ready.

map() 内置函数的并行等效项(尽管它仅支持一个可迭代参数)。它阻塞直到结果准备好。

If that's your use case this may be a way to patch it.

如果这是您的用例,这可能是一种修补它的方法。