Python 多处理:处理父级中的子级错误
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/19924104/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Python Multiprocessing: Handling Child Errors in Parent
提问by drunken_monkey
I am currently playing around with multiprocessing and queues. I have written a piece of code to export data from mongoDB, map it into a relational (flat) structure, convert all values to string and insert them into mysql.
我目前正在玩多处理和队列。我写了一段代码来从 mongoDB 导出数据,将其映射到关系(平面)结构中,将所有值转换为字符串并将它们插入到 mysql 中。
Each of these steps is submitted as a process and given import/export queues, safe for the mongoDB export which is handled in the parent.
这些步骤中的每一个都作为一个进程提交并给定导入/导出队列,这对于在父级中处理的 mongoDB 导出是安全的。
As you will see below, I use queues and child processes terminate themselves when they read "None" from the queue. The problem I currently have is that, if a child process runs into an unhandled Exception, this is not recognized by the parent and the rest just Keeps running. What I want to happen is that the whole shebang quits and at best reraise the child error.
正如您将在下面看到的,我使用队列,当子进程从队列中读取“无”时,它们会自行终止。我目前遇到的问题是,如果子进程遇到未处理的异常,则父进程无法识别,其余进程将继续运行。我想要发生的是整个shebang退出并充其量重新引发子错误。
I have two questions:
我有两个问题:
- How do I detect the child error in the parent?
- How do I kill my child processes after detecting the error (best practice)? I realize that putting "None" to the queue to kill the child is pretty dirty.
- 如何检测父级中的子级错误?
- 检测到错误后如何终止我的子进程(最佳实践)?我意识到将“无”放入队列以杀死孩子是非常肮脏的。
I am using python 2.7.
我正在使用 python 2.7。
Here are the essential parts of my code:
以下是我的代码的基本部分:
# Establish communication queues
mongo_input_result_q = multiprocessing.Queue()
mapper_result_q = multiprocessing.Queue()
converter_result_q = multiprocessing.Queue()
[...]
[...]
# create child processes
# all processes generated here are subclasses of "multiprocessing.Process"
# create mapper
mappers = [mongo_relational_mapper.MongoRelationalMapper(mongo_input_result_q, mapper_result_q, columns, 1000)
for i in range(10)]
# create datatype converter, converts everything to str
converters = [datatype_converter.DatatypeConverter(mapper_result_q, converter_result_q, 'str', 1000)
for i in range(10)]
# create mysql writer
# I create a list of writers. currently only one,
# but I have the option to parallellize it further
writers = [mysql_inserter.MySqlWriter(mysql_host, mysql_user, mysql_passwd, mysql_schema, converter_result_q
, columns, 'w_'+mysql_table, 1000) for i in range(1)]
# starting mapper
for mapper in mappers:
mapper.start()
time.sleep(1)
# starting converter
for converter in converters:
converter.start()
# starting writer
for writer in writers:
writer.start()
[... initializing mongo db connection ...]
[...初始化mongo db连接...]
# put each dataset read to queue for the mapper
for row in mongo_collection.find({inc_column: {"$gte": start}}):
mongo_input_result_q.put(row)
count += 1
if count % log_counter == 0:
print 'Mongo Reader' + " " + str(count)
print "MongoReader done"
# Processes are terminated when they read "None" object from queue
# now that reading is finished, put None for each mapper in the queue so they terminate themselves
# the same for all followup processes
for mapper in mappers:
mongo_input_result_q.put(None)
for mapper in mappers:
mapper.join()
for converter in converters:
mapper_result_q.put(None)
for converter in converters:
converter.join()
for writer in writers:
converter_result_q.put(None)
for writer in writers:
writer.join()
采纳答案by KobeJohn
I don't know standard practice but what I've found is that to have reliable multiprocessing I design the methods/class/etc. specifically to work with multiprocessing. Otherwise you never really know what's going on on the other side (unless I've missed some mechanism for this).
我不知道标准做法,但我发现要获得可靠的多处理,我设计了方法/类/等。专门用于多处理。否则你永远不会真正知道另一边发生了什么(除非我错过了一些机制)。
Specifically what I do is:
具体我做的是:
- Subclass
multiprocessing.Process
or make functions that specifically support multiprocessing (wrapping functions that you don't have control over if necessary) - always provide a shared error
multiprocessing.Queue
from the main process to each worker process - enclose the entire run code in a
try: ... except Exception as e
. Then when something unexpected happens send an error package with:- the process id that died
- the exception with it's original context (check here). The original context is really important if you want to log useful information in the main process.
- of course handle expected issues as normal within the normal operation of the worker
- (similar to what you said already) assuming a long-running process, wrap the running code (inside the try/catch-all) with a loop
- define a stop token in the class or for functions.
- When the main process wants the worker(s) to stop, just send the stop token. to stop everyone, send enough for all the processes.
- the wrapping loop checks the input q for the token or whatever other input you want
- 子类化
multiprocessing.Process
或生成专门支持多处理的函数(必要时包装您无法控制的函数) - 始终
multiprocessing.Queue
从主进程向每个工作进程提供共享错误 - 将整个运行代码包含在一个
try: ... except Exception as e
. 然后当发生意外时发送一个错误包:- 死亡的进程ID
- 原始上下文的异常(检查这里)。如果您想在主进程中记录有用的信息,原始上下文非常重要。
- 当然在工人的正常操作范围内正常处理预期的问题
- (类似于你已经说过的)假设一个长时间运行的进程,用循环包装正在运行的代码(在 try/catch-all 内)
- 在类或函数中定义一个停止标记。
- 当主进程希望工人停止时,只需发送停止令牌。停止每个人,发送足够的所有进程。
- 包装循环检查输入 q 的令牌或您想要的任何其他输入
The end result is worker processes that can survive for a long time and that can let you know what's happening when something goes wrong. They will die quietly since you can handle whatever you need to do after the catch-all exception and you will also know when you need to restart a worker.
最终结果是工作进程可以存活很长时间,并且可以在出现问题时让您知道发生了什么。它们会安静地死掉,因为您可以在捕获所有异常之后处理您需要做的任何事情,并且您还将知道何时需要重新启动工作程序。
Again, I've just come to this pattern through trial and error so I don't know how standard it is. Does that help with what you are asking for?
同样,我刚刚通过反复试验得出这种模式,所以我不知道它的标准如何。这对你的要求有帮助吗?
回答by drunken_monkey
Thanks to kobejohn i have found a solution which is nice and stable.
感谢 kobejohn,我找到了一个很好且稳定的解决方案。
I have created a subclass of multiprocessing.Process which implements some functions and overwrites the
run()
method to wrap a new saferun method into a try-catch block. This Class requires a feedback_queue to initialize which is used to report info, debug, error messages back to the parent. The log methods in the class are wrappers for the globally defined log functions of the package:class EtlStepProcess(multiprocessing.Process): def __init__(self, feedback_queue): multiprocessing.Process.__init__(self) self.feedback_queue = feedback_queue def log_info(self, message): log_info(self.feedback_queue, message, self.name) def log_debug(self, message): log_debug(self.feedback_queue, message, self.name) def log_error(self, err): log_error(self.feedback_queue, err, self.name) def saferun(self): """Method to be run in sub-process; can be overridden in sub-class""" if self._target: self._target(*self._args, **self._kwargs) def run(self): try: self.saferun() except Exception as e: self.log_error(e) raise e return
I have subclassed all my other process steps from EtlStepProcess. The code to be run is implemented in the saferun() method rather than run. This ways i do not have to add a try catch block around it, since this is already done by the run() method. Example:
class MySqlWriter(EtlStepProcess): def __init__(self, mysql_host, mysql_user, mysql_passwd, mysql_schema, mysql_table, columns, commit_count, input_queue, feedback_queue): EtlStepProcess.__init__(self, feedback_queue) self.mysql_host = mysql_host self.mysql_user = mysql_user self.mysql_passwd = mysql_passwd self.mysql_schema = mysql_schema self.mysql_table = mysql_table self.columns = columns self.commit_count = commit_count self.input_queue = input_queue def saferun(self): self.log_info(self.name + " started") #create mysql connection engine = sqlalchemy.create_engine('mysql://' + self.mysql_user + ':' + self.mysql_passwd + '@' + self.mysql_host + '/' + self.mysql_schema) meta = sqlalchemy.MetaData() table = sqlalchemy.Table(self.mysql_table, meta, autoload=True, autoload_with=engine) connection = engine.connect() try: self.log_info("start MySQL insert") counter = 0 row_list = [] while True: next_row = self.input_queue.get() if isinstance(next_row, Terminator): if counter % self.commit_count != 0: connection.execute(table.insert(), row_list) # Poison pill means we should exit break row_list.append(next_row) counter += 1 if counter % self.commit_count == 0: connection.execute(table.insert(), row_list) del row_list[:] self.log_debug(self.name + ' ' + str(counter)) finally: connection.close() return
In my main file, I submit a Process that does all the work and give it a feedback_queue. This process starts all the steps and thenreads from mongoDB and puts values to the initial queue. My main process listens to the feedback queue and prints all log messages. If it receives an error log, it print the error and terminate its child, which in return also terminates all its children before dying.
if __name__ == '__main__': feedback_q = multiprocessing.Queue() p = multiprocessing.Process(target=mongo_python_export, args=(feedback_q,)) p.start() while p.is_alive(): fb = feedback_q.get() if fb["type"] == "error": p.terminate() print "ERROR in " + fb["process"] + "\n" for child in multiprocessing.active_children(): child.terminate() else: print datetime.datetime.fromtimestamp(fb["timestamp"]).strftime('%Y-%m-%d %H:%M:%S') + " " + \ fb["process"] + ": " + fb["message"] p.join()
我创建了一个 multiprocessing.Process 的子类,它实现了一些功能并覆盖了
run()
将新的安全运行方法包装到 try-catch 块中的方法。这个类需要一个feedback_queue来初始化,用于向父级报告信息、调试、错误消息。类中的日志方法是包的全局定义日志函数的包装器:class EtlStepProcess(multiprocessing.Process): def __init__(self, feedback_queue): multiprocessing.Process.__init__(self) self.feedback_queue = feedback_queue def log_info(self, message): log_info(self.feedback_queue, message, self.name) def log_debug(self, message): log_debug(self.feedback_queue, message, self.name) def log_error(self, err): log_error(self.feedback_queue, err, self.name) def saferun(self): """Method to be run in sub-process; can be overridden in sub-class""" if self._target: self._target(*self._args, **self._kwargs) def run(self): try: self.saferun() except Exception as e: self.log_error(e) raise e return
我已经从 EtlStepProcess 子类化了我所有的其他流程步骤。要运行的代码在 saferun() 方法中实现而不是运行。这样我就不必在它周围添加一个 try catch 块,因为这已经由 run() 方法完成了。例子:
class MySqlWriter(EtlStepProcess): def __init__(self, mysql_host, mysql_user, mysql_passwd, mysql_schema, mysql_table, columns, commit_count, input_queue, feedback_queue): EtlStepProcess.__init__(self, feedback_queue) self.mysql_host = mysql_host self.mysql_user = mysql_user self.mysql_passwd = mysql_passwd self.mysql_schema = mysql_schema self.mysql_table = mysql_table self.columns = columns self.commit_count = commit_count self.input_queue = input_queue def saferun(self): self.log_info(self.name + " started") #create mysql connection engine = sqlalchemy.create_engine('mysql://' + self.mysql_user + ':' + self.mysql_passwd + '@' + self.mysql_host + '/' + self.mysql_schema) meta = sqlalchemy.MetaData() table = sqlalchemy.Table(self.mysql_table, meta, autoload=True, autoload_with=engine) connection = engine.connect() try: self.log_info("start MySQL insert") counter = 0 row_list = [] while True: next_row = self.input_queue.get() if isinstance(next_row, Terminator): if counter % self.commit_count != 0: connection.execute(table.insert(), row_list) # Poison pill means we should exit break row_list.append(next_row) counter += 1 if counter % self.commit_count == 0: connection.execute(table.insert(), row_list) del row_list[:] self.log_debug(self.name + ' ' + str(counter)) finally: connection.close() return
在我的主文件中,我提交了一个完成所有工作的流程并给它一个反馈队列。此过程启动所有步骤,然后从 mongoDB 读取并将值放入初始队列。我的主进程监听反馈队列并打印所有日志消息。如果它收到错误日志,它会打印错误并终止其子进程,作为回报,它也会在死亡之前终止其所有子进程。
if __name__ == '__main__': feedback_q = multiprocessing.Queue() p = multiprocessing.Process(target=mongo_python_export, args=(feedback_q,)) p.start() while p.is_alive(): fb = feedback_q.get() if fb["type"] == "error": p.terminate() print "ERROR in " + fb["process"] + "\n" for child in multiprocessing.active_children(): child.terminate() else: print datetime.datetime.fromtimestamp(fb["timestamp"]).strftime('%Y-%m-%d %H:%M:%S') + " " + \ fb["process"] + ": " + fb["message"] p.join()
I think about making a module out of it and putting it up on github, but I have to do some cleaning up and commenting first.
我想用它制作一个模块并将其放在 github 上,但我必须先做一些清理和评论。
回答by mrkwjc
Why not to let the Process to take care of its own exceptions, like this:
为什么不让 Process 处理它自己的异常,像这样:
from __future__ import print_function
import multiprocessing as mp
import traceback
class Process(mp.Process):
def __init__(self, *args, **kwargs):
mp.Process.__init__(self, *args, **kwargs)
self._pconn, self._cconn = mp.Pipe()
self._exception = None
def run(self):
try:
mp.Process.run(self)
self._cconn.send(None)
except Exception as e:
tb = traceback.format_exc()
self._cconn.send((e, tb))
# raise e # You can still rise this exception if you need to
@property
def exception(self):
if self._pconn.poll():
self._exception = self._pconn.recv()
return self._exception
Now you have, both error and traceback at your hands:
现在,您可以同时掌握错误和回溯:
def target():
raise ValueError('Something went wrong...')
p = Process(target = target)
p.start()
p.join()
if p.exception:
error, traceback = p.exception
print(traceback)
Regards, Marek
问候, 马雷克
回答by TitanFighter
@mrkwjc 's solutionis simple, so easy to understand and implement, but there is one disadvantage of this solution. When we have few processes and we want to stop all processes if any single process has error, we need to wait until all processes are finished in order to check if p.exception
. Below is the code which fixes this problem (ie when one child has error, we terminate also another child):
@mrkwjc 的解决方案很简单,很容易理解和实现,但是这个解决方案有一个缺点。当我们有几个进程并且我们想在任何一个进程出错时停止所有进程时,我们需要等到所有进程都完成以检查是否p.exception
. 下面是解决这个问题的代码(即当一个孩子有错误时,我们也终止另一个孩子):
import multiprocessing
import traceback
from time import sleep
class Process(multiprocessing.Process):
"""
Class which returns child Exceptions to Parent.
https://stackoverflow.com/a/33599967/4992248
"""
def __init__(self, *args, **kwargs):
multiprocessing.Process.__init__(self, *args, **kwargs)
self._parent_conn, self._child_conn = multiprocessing.Pipe()
self._exception = None
def run(self):
try:
multiprocessing.Process.run(self)
self._child_conn.send(None)
except Exception as e:
tb = traceback.format_exc()
self._child_conn.send((e, tb))
# raise e # You can still rise this exception if you need to
@property
def exception(self):
if self._parent_conn.poll():
self._exception = self._parent_conn.recv()
return self._exception
class Task_1:
def do_something(self, queue):
queue.put(dict(users=2))
class Task_2:
def do_something(self, queue):
queue.put(dict(users=5))
def main():
try:
task_1 = Task_1()
task_2 = Task_2()
# Example of multiprocessing which is used:
# https://eli.thegreenplace.net/2012/01/16/python-parallelizing-cpu-bound-tasks-with-multiprocessing/
task_1_queue = multiprocessing.Queue()
task_2_queue = multiprocessing.Queue()
task_1_process = Process(
target=task_1.do_something,
kwargs=dict(queue=task_1_queue))
task_2_process = Process(
target=task_2.do_something,
kwargs=dict(queue=task_2_queue))
task_1_process.start()
task_2_process.start()
while task_1_process.is_alive() or task_2_process.is_alive():
sleep(10)
if task_1_process.exception:
error, task_1_traceback = task_1_process.exception
# Do not wait until task_2 is finished
task_2_process.terminate()
raise ChildProcessError(task_1_traceback)
if task_2_process.exception:
error, task_2_traceback = task_2_process.exception
# Do not wait until task_1 is finished
task_1_process.terminate()
raise ChildProcessError(task_2_traceback)
task_1_process.join()
task_2_process.join()
task_1_results = task_1_queue.get()
task_2_results = task_2_queue.get()
task_1_users = task_1_results['users']
task_2_users = task_2_results['users']
except Exception:
# Here usually I send email notification with error.
print('traceback:', traceback.format_exc())
if __name__ == "__main__":
main()