如何在 Python 中对类实例使用多处理?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/14169550/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to use multiprocessing with class instances in Python?
提问by David Lynch
I am trying to create a class than can run a separate process to go do some work that takes a long time, launch a bunch of these from a main module and then wait for them all to finish. I want to launch the processes once and then keep feeding them things to do rather than creating and destroying processes. For example, maybe I have 10 servers running the dd command, then I want them all to scp a file, etc.
我正在尝试创建一个类,它可以运行一个单独的进程来完成一些需要很长时间的工作,从主模块启动一堆这些,然后等待它们全部完成。我想启动一次流程,然后继续为他们提供要做的事情,而不是创建和破坏流程。例如,也许我有 10 个服务器运行 dd 命令,然后我希望它们都 scp 文件等。
My ultimate goal is to create a class for each system that keeps track of the information for the system in which it is tied to like IP address, logs, runtime, etc. But that class must be able to launch a system command and then return execution back to the caller while that system command runs, to followup with the result of the system command later.
我的最终目标是为每个系统创建一个类来跟踪系统的信息,例如 IP 地址、日志、运行时等。但该类必须能够启动系统命令然后返回在该系统命令运行时执行返回给调用者,以便稍后跟进系统命令的结果。
My attempt is failing because I cannot send an instance method of a class over the pipe to the subprocess via pickle. Those are not pickleable. I therefore tried to fix it various ways but I can't figure it out. How can my code be patched to do this? What good is multiprocessing if you can't send over anything useful?
我的尝试失败了,因为我无法通过 pickle 通过管道将类的实例方法发送到子进程。那些不是腌制的。因此,我尝试以各种方式修复它,但我无法弄清楚。如何修补我的代码以执行此操作?如果你不能发送任何有用的东西,多处理有什么好处?
Is there any good documentation of multiprocessing being used with class instances? The only way I can get the multiprocessing module to work is on simple functions. Every attempt to use it within a class instance has failed. Maybe I should pass events instead? I don't understand how to do that yet.
是否有与类实例一起使用的多处理的良好文档?我可以让多处理模块工作的唯一方法是使用简单的函数。每次在类实例中使用它的尝试都失败了。也许我应该通过事件来代替?我还不明白该怎么做。
import multiprocessing
import sys
import re
class ProcessWorker(multiprocessing.Process):
"""
This class runs as a separate process to execute worker's commands in parallel
Once launched, it remains running, monitoring the task queue, until "None" is sent
"""
def __init__(self, task_q, result_q):
multiprocessing.Process.__init__(self)
self.task_q = task_q
self.result_q = result_q
return
def run(self):
"""
Overloaded function provided by multiprocessing.Process. Called upon start() signal
"""
proc_name = self.name
print '%s: Launched' % (proc_name)
while True:
next_task_list = self.task_q.get()
if next_task is None:
# Poison pill means shutdown
print '%s: Exiting' % (proc_name)
self.task_q.task_done()
break
next_task = next_task_list[0]
print '%s: %s' % (proc_name, next_task)
args = next_task_list[1]
kwargs = next_task_list[2]
answer = next_task(*args, **kwargs)
self.task_q.task_done()
self.result_q.put(answer)
return
# End of ProcessWorker class
class Worker(object):
"""
Launches a child process to run commands from derived classes in separate processes,
which sit and listen for something to do
This base class is called by each derived worker
"""
def __init__(self, config, index=None):
self.config = config
self.index = index
# Launce the ProcessWorker for anything that has an index value
if self.index is not None:
self.task_q = multiprocessing.JoinableQueue()
self.result_q = multiprocessing.Queue()
self.process_worker = ProcessWorker(self.task_q, self.result_q)
self.process_worker.start()
print "Got here"
# Process should be running and listening for functions to execute
return
def enqueue_process(target): # No self, since it is a decorator
"""
Used to place an command target from this class object into the task_q
NOTE: Any function decorated with this must use fetch_results() to get the
target task's result value
"""
def wrapper(self, *args, **kwargs):
self.task_q.put([target, args, kwargs]) # FAIL: target is a class instance method and can't be pickled!
return wrapper
def fetch_results(self):
"""
After all processes have been spawned by multiple modules, this command
is called on each one to retreive the results of the call.
This blocks until the execution of the item in the queue is complete
"""
self.task_q.join() # Wait for it to to finish
return self.result_q.get() # Return the result
@enqueue_process
def run_long_command(self, command):
print "I am running number % as process "%number, self.name
# In here, I will launch a subprocess to run a long-running system command
# p = Popen(command), etc
# p.wait(), etc
return
def close(self):
self.task_q.put(None)
self.task_q.join()
if __name__ == '__main__':
config = ["some value", "something else"]
index = 7
workers = []
for i in range(5):
worker = Worker(config, index)
worker.run_long_command("ls /")
workers.append(worker)
for worker in workers:
worker.fetch_results()
# Do more work... (this would actually be done in a distributor in another class)
for worker in workers:
worker.close()
Edit: I tried to move the ProcessWorkerclass and the creation of the multiprocessing queues outside of the Workerclass and then tried to manually pickle the worker instance. Even that doesn't work and I get an error
编辑:我尝试将ProcessWorker类和多处理队列的创建移到类之外,Worker然后尝试手动pickle 工作实例。即使这样也不起作用,我得到一个错误
RuntimeError: Queue objects should only be shared between processes through inheritance
RuntimeError:队列对象只能通过继承在进程之间共享
. But I am only passing references of those queues into the worker instance?? I am missing something fundamental. Here is the modified code from the main section:
. 但我只是将这些队列的引用传递给工作实例??我错过了一些基本的东西。这是主要部分的修改代码:
if __name__ == '__main__':
config = ["some value", "something else"]
index = 7
workers = []
for i in range(1):
task_q = multiprocessing.JoinableQueue()
result_q = multiprocessing.Queue()
process_worker = ProcessWorker(task_q, result_q)
worker = Worker(config, index, process_worker, task_q, result_q)
something_to_look_at = pickle.dumps(worker) # FAIL: Doesn't like queues??
process_worker.start()
worker.run_long_command("ls /")
采纳答案by 9000
Instead of attempting to send a method itself (which is impractical), try sending a nameof a method to execute.
不要尝试发送方法本身(这是不切实际的),而是尝试发送要执行的方法的名称。
Provided that each worker runs the same code, it's a matter of a simple getattr(self, task_name).
假设每个 worker 运行相同的代码,这是一个简单的getattr(self, task_name).
I'd pass tuples (task_name, task_args), where task_argswere a dict to be directly fed to the task method:
我会传递 tuples (task_name, task_args),其中task_argsdict 直接提供给任务方法:
next_task_name, next_task_args = self.task_q.get()
if next_task_name:
task = getattr(self, next_task_name)
answer = task(**next_task_args)
...
else:
# poison pill, shut down
break
回答by David Lynch
So, the problem was that I was assuming that Python was doing some sort of magic that is somehow different from the way that C++/fork() works. I somehow thought that Python only copied the class, not the whole program into a separate process. I seriously wasted days trying to get this to work because all of the talk about pickle serialization made me think that it actually sent everything over the pipe. I knew that certain things could not be sent over the pipe, but I thought my problem was that I was not packaging things up properly.
所以,问题是我假设 Python 正在做某种与 C++/fork() 工作方式有些不同的魔法。我以某种方式认为 Python 只是将类而不是整个程序复制到一个单独的进程中。我严重浪费了几天时间试图让它工作,因为所有关于泡菜序列化的讨论让我认为它实际上通过管道发送了所有内容。我知道某些东西不能通过管道发送,但我认为我的问题是我没有正确包装东西。
This all could have been avoided if the Python docs gave me a 10,000 ft view of what happens when this module is used. Sure, it tells me what the methods of multiprocess module does and gives me some basic examples, but what I want to know is what is the "Theory of Operation" behind the scenes! Here is the kind of information I could have used. Please chime in if my answer is off. It will help me learn.
如果 Python 文档为我提供了使用此模块时发生的 10,000 英尺视图,这一切都可以避免。当然,它告诉我多进程模块的方法是做什么的,并给出了一些基本示例,但我想知道幕后的“操作原理”是什么!这是我可以使用的信息类型。如果我的回答不正确,请插话。它会帮助我学习。
When you run start a process using this module, the whole program is copied into another process. But since it is not the "__main__" process and my code was checking for that, it doesn't fire off yet another process infinitely. It just stops and sits out there waiting for something to do, like a zombie. Everything that was initialized in the parent at the time of calling multiprocess.Process() is all set up and ready to go. Once you put something in the multiprocess.Queue or shared memory, or pipe, etc. (however you are communicating), then the separate process receives it and gets to work. It can draw upon all imported modules and setup just as if it was the parent. However, once some internal state variables change in the parent or separate process, those changes are isolated. Once the process is spawned, it now becomes your job to keep them in sync if necessary, either through a queue, pipe, shared memory, etc.
当您使用此模块运行启动进程时,整个程序将被复制到另一个进程中。但由于它不是“__main__“进程和我的代码正在检查它,它不会无限地触发另一个进程。它只是停下来坐在那里等待做某事,就像僵尸一样。调用时在父进程中初始化的所有内容multiprocess.Process() 已全部设置并准备就绪。一旦您将某些内容放入 multiprocess.Queue 或共享内存或管道等(无论您在进行何种通信),那么单独的进程就会接收它并开始工作。它可以像父进程一样利用所有导入的模块和设置。但是,一旦父进程或单独进程中的某些内部状态变量发生变化,这些变化就会被隔离。一旦进程产生,它现在成为你的工作如有必要,它们可以通过队列、管道、共享内存等同步。
I threw out the code and started over, but now I am only putting one extra function out in the ProcessWorker, an "execute" method that runs a command line. Pretty simple. I don't have to worry about launching and then closing a bunch of processes this way, which has caused me all kinds of instability and performance issues in the past in C++. When I switched to launching processes at the beginning and then passing messages to those waiting processes, my performance improved and it was very stable.
我扔掉了代码并重新开始,但现在我只在 中放置了一个额外的函数ProcessWorker,一个运行命令行的“执行”方法。很简单。我不必担心以这种方式启动然后关闭一堆进程,这导致我过去在 C++ 中出现各种不稳定和性能问题。当我在开始时切换到启动进程,然后将消息传递给那些等待的进程时,我的性能有所提高,并且非常稳定。
BTW, I looked at this link to get help, which threw me off because the example made me think that methods were being transported across the queues: http://www.doughellmann.com/PyMOTW/multiprocessing/communication.htmlThe second example of the first section used "next_task()" that appeared (to me) to be executing a task received via the queue.
顺便说一句,我查看了这个链接以获得帮助,这让我失望,因为这个例子让我认为方法正在跨队列传输:http: //www.doughellmann.com/PyMOTW/multiprocessing/communication.html第二个例子第一部分使用了“next_task()”,它似乎(对我来说)正在执行通过队列接收的任务。
回答by Sawan
REF: https://stackoverflow.com/a/14179779
参考:https: //stackoverflow.com/a/14179779
Answer on Jan 6 at 6:03 by David Lynchis not factually correct when he says that he was misled by http://www.doughellmann.com/PyMOTW/multiprocessing/communication.html.
David Lynch 在 1 月 6 日 6:03 回答说他被http://www.doughellmann.com/PyMOTW/multiprocessing/communication.html误导时,他的回答实际上并不正确 。
The code and examples provided are correct and work as advertised. next_task()isexecuting a task received via the queue -- try and understand what the Task.__call__()method is doing.
提供的代码和示例是正确的,并且如宣传的那样工作。next_task()正在执行通过队列接收的任务 - 尝试了解该Task.__call__()方法在做什么。
In my case what, tripped me up was syntax errors in my implementation of run(). It seems that the sub-process will not report this and just fails silently -- leaving things stuck in weird loops! Make sure you have some kind of syntax checker running e.g. Flymake/Pyflakes in Emacs.
在我的情况下,让我绊倒的是我的run(). 似乎子进程不会报告这个,只是默默地失败——让事情陷入奇怪的循环!确保你有某种语法检查器在 Emacs 中运行,例如 Flymake/Pyflakes。
Debugging via multiprocessing.log_to_stderr()F helped me narrow down the problem.
通过multiprocessing.log_to_stderr()F调试帮助我缩小了问题的范围。

