windows Python 多处理:如何可靠地从子进程重定向标准输出?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/7714868/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Python multiprocessing: How can I RELIABLY redirect stdout from a child process?
提问by Tom
NB. I have seen Log output of multiprocessing.Process- unfortunately, it doesn't answer this question.
注意。我已经看到了 multiprocessing.Process 的日志输出- 不幸的是,它没有回答这个问题。
I am creating a child process (on windows) via multiprocessing. I want allof the child process's stdout and stderr output to be redirected to a log file, rather than appearing at the console. The only suggestion I have seen is for the child process to set sys.stdout to a file. However, this does not effectively redirect all stdout output, due to the behaviour of stdout redirection on Windows.
我正在通过多处理创建一个子进程(在 Windows 上)。我希望所有子进程的 stdout 和 stderr 输出都被重定向到一个日志文件,而不是出现在控制台上。我看到的唯一建议是让子进程将 sys.stdout 设置为文件。但是,由于 Windows 上的 stdout 重定向行为,这并不能有效地重定向所有 stdout 输出。
To illustrate the problem, build a Windows DLL with the following code
为了说明问题,请使用以下代码构建一个 Windows DLL
#include <iostream>
extern "C"
{
__declspec(dllexport) void writeToStdOut()
{
std::cout << "Writing to STDOUT from test DLL" << std::endl;
}
}
Then create and run a python script like the following, which imports this DLL and calls the function:
然后创建并运行一个像下面这样的 python 脚本,它导入这个 DLL 并调用函数:
from ctypes import *
import sys
print
print "Writing to STDOUT from python, before redirect"
print
sys.stdout = open("stdout_redirect_log.txt", "w")
print "Writing to STDOUT from python, after redirect"
testdll = CDLL("Release/stdout_test.dll")
testdll.writeToStdOut()
In order to see the same behaviour as me, it is probably necessary for the DLL to be built against a different C runtime than than the one Python uses. In my case, python is built with Visual Studio 2010, but my DLL is built with VS 2005.
为了看到与我相同的行为,可能需要针对与 Python 使用的 C 运行时不同的 C 运行时构建 DLL。就我而言,python 是用 Visual Studio 2010 构建的,但我的 DLL 是用 VS 2005 构建的。
The behaviour I see is that the console shows:
我看到的行为是控制台显示:
> stdout_test.py
Writing to STDOUT from python, before redirect
Writing to STDOUT from test DLL
While the file stdout_redirect_log.txt ends up containing:
虽然文件 stdout_redirect_log.txt 最终包含:
Writing to STDOUT from python, after redirect
In other words, setting sys.stdout failed to redirect the stdout output generated by the DLL. This is unsurprising given the nature of the underlying APIs for stdout redirection in Windows. I have encountered this problem at the native/C++ level before and never found a way to reliably redirect stdout from within a process. It has to be done externally.
换句话说,设置 sys.stdout 无法重定向 DLL 生成的 stdout 输出。鉴于 Windows 中标准输出重定向的底层 API 的性质,这并不奇怪。我以前在本机/C++ 级别遇到过这个问题,但从未找到一种方法来可靠地从进程内重定向 stdout。它必须在外部完成。
This is actually the very reason I am launching a child process - it's so that I can connect externally to its pipes and thus guarantee that I am intercepting all of its output. I can definitely do this by launching the process manually with pywin32, but I would very much like to be able to use the facilities of multiprocessing, in particular the ability to communicate with the child process via a multiprocessing Pipe object, in order to get progress updates. The question is whether there is any way to both use multiprocessing for its IPC facilities andto reliably redirect all of the child's stdout and stderr output to a file.
这实际上就是我启动子进程的真正原因——这样我就可以从外部连接到它的管道,从而保证我拦截了它的所有输出。我绝对可以通过使用 pywin32 手动启动进程来做到这一点,但我非常希望能够使用多处理的设施,特别是通过多处理管道对象与子进程通信的能力,以获得进展更新。问题是是否有任何方法既可以对其 IPC 设施使用多处理,又可以可靠地将所有孩子的 stdout 和 stderr 输出重定向到一个文件。
UPDATE:Looking at the source code for multiprocessing.Processs, it has a static member, _Popen, which looks like it can be used to override the class used to create the process. If it's set to None (default), it uses a multiprocessing.forking._Popen, but it looks like by saying
更新:查看 multiprocessing.Processs 的源代码,它有一个静态成员 _Popen,它看起来可用于覆盖用于创建进程的类。如果它设置为无(默认),它使用一个 multiprocessing.forking._Popen,但它看起来像说
multiprocessing.Process._Popen = MyPopenClass
I could override the process creation. However, although I could derive this from multiprocessing.forking._Popen, it looks like I would have to copy a bunch of internal stuff into my implementation, which sounds flaky and not very future-proof. If that's the only choice I think I'd probably plump for doing the whole thing manually with pywin32 instead.
我可以覆盖流程创建。然而,虽然我可以从 multiprocessing.forking._Popen 中得到它,但看起来我必须将一堆内部内容复制到我的实现中,这听起来很不稳定而且不太适合未来。如果这是唯一的选择,我想我可能会用 pywin32 手动完成整个事情。
采纳答案by Luke
The solution you suggest is a good one: create your processes manually such that you have explicit access to their stdout/stderr file handles. You can then create a socket to communicate with the sub-process and use multiprocessing.connection over that socket (multiprocessing.Pipe creates the same type of connection object, so this should give you all the same IPC functionality).
您建议的解决方案是一个很好的解决方案:手动创建您的进程,以便您可以显式访问它们的 stdout/stderr 文件句柄。然后,您可以创建一个套接字与子进程通信,并在该套接字上使用 multiprocessing.connection(multiprocessing.Pipe 创建相同类型的连接对象,因此这应该为您提供所有相同的 IPC 功能)。
Here's a two-file example.
这是一个包含两个文件的示例。
master.py:
大师.py:
import multiprocessing.connection
import subprocess
import socket
import sys, os
## Listen for connection from remote process (and find free port number)
port = 10000
while True:
try:
l = multiprocessing.connection.Listener(('localhost', int(port)), authkey="secret")
break
except socket.error as ex:
if ex.errno != 98:
raise
port += 1 ## if errno==98, then port is not available.
proc = subprocess.Popen((sys.executable, "subproc.py", str(port)), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
## open connection for remote process
conn = l.accept()
conn.send([1, "asd", None])
print(proc.stdout.readline())
subproc.py:
子进程.py:
import multiprocessing.connection
import subprocess
import sys, os, time
port = int(sys.argv[1])
conn = multiprocessing.connection.Client(('localhost', port), authkey="secret")
while True:
try:
obj = conn.recv()
print("received: %s\n" % str(obj))
sys.stdout.flush()
except EOFError: ## connection closed
break
You may also want to see the first answer to this questionto get non-blocking reads from the subprocess.
您可能还想查看此问题的第一个答案,以从子进程中获取非阻塞读取。
回答by ubershmekel
I don't think you have a better option than redirecting a subprocess to a file as you mentioned in your comment.
正如您在评论中提到的那样,我认为您没有比将子进程重定向到文件更好的选择。
The way consoles stdin/out/err work in windows is each process when it's born has its std handlesdefined. You can change them with SetStdHandle. When you modify python's sys.stdout
you only modify where python prints out stuff, not where other DLL's are printing stuff. Part of the CRT in your DLL is using GetStdHandle to find out where to print out to. If you want, you can do whatever piping you want in windows API in your DLL or in your python script with pywin32. Though I do think it'll be simpler with subprocess.
控制台 stdin/out/err 在 windows 中的工作方式是每个进程在它诞生时都定义了它的std 句柄。您可以使用SetStdHandle更改它们。当您修改 python 时,sys.stdout
您只能修改 python 打印内容的位置,而不是其他 DLL 打印内容的位置。DLL 中的部分 CRT 使用 GetStdHandle 来找出打印到的位置。如果需要,您可以使用 pywin32 在 DLL 中的 Windows API 或 Python 脚本中执行任何您想要的管道操作。虽然我确实认为subprocess会更简单。
回答by KobeJohn
I assume I'm off base and missing something, but for what it's worth here is what came to mind when I read your question.
我假设我偏离了基础并且遗漏了一些东西,但是这里的价值是当我阅读您的问题时想到的。
If you can intercept all of the stdout and stderr (I got that impression from your question), then why not add or wrap that capture functionality around each of your processes? Then send what is captured through a queue to a consumer that can do whatever you want with all of the outputs?
如果您可以拦截所有 stdout 和 stderr(我从您的问题中得到了这种印象),那么为什么不在您的每个进程中添加或包装该捕获功能呢?然后将通过队列捕获的内容发送给消费者,消费者可以对所有输出做任何你想做的事情?
回答by justengel
In my situation I changed sys.stdout.write
to write to a PySide QTextEdit. I couldn't read from sys.stdout
and I didn't know how to change sys.stdout
to be readable. I created two Pipes. One for stdout and the other for stderr. In the separate process I redirect sys.stdout
and sys.stderr
to the child connection of the multiprocessing pipe. On the main process I created two threads to read the stdout and stderr parent pipe and redirect the pipe data to sys.stdout
and sys.stderr
.
在我的情况下,我改为sys.stdout.write
写入 PySide QTextEdit。我无法阅读,sys.stdout
也不知道如何更改sys.stdout
为可读。我创建了两个管道。一个用于标准输出,另一个用于标准错误。在分离过程中,我重定向sys.stdout
,并sys.stderr
给多管孩子连接。在主进程中,我创建了两个线程来读取 stdout 和 stderr 父管道并将管道数据重定向到sys.stdout
和sys.stderr
。
import sys
import contextlib
import threading
import multiprocessing as mp
import multiprocessing.queues
from queue import Empty
import time
class PipeProcess(mp.Process):
"""Process to pipe the output of the sub process and redirect it to this sys.stdout and sys.stderr.
Note:
The use_queue = True argument will pass data between processes using Queues instead of Pipes. Queues will
give you the full output and read all of the data from the Queue. A pipe is more efficient, but may not
redirect all of the output back to the main process.
"""
def __init__(self, group=None, target=None, name=None, args=tuple(), kwargs={}, *_, daemon=None,
use_pipe=None, use_queue=None):
self.read_out_th = None
self.read_err_th = None
self.pipe_target = target
self.pipe_alive = mp.Event()
if use_pipe or (use_pipe is None and not use_queue): # Default
self.parent_stdout, self.child_stdout = mp.Pipe(False)
self.parent_stderr, self.child_stderr = mp.Pipe(False)
else:
self.parent_stdout = self.child_stdout = mp.Queue()
self.parent_stderr = self.child_stderr = mp.Queue()
args = (self.child_stdout, self.child_stderr, target) + tuple(args)
target = self.run_pipe_out_target
super(PipeProcess, self).__init__(group=group, target=target, name=name, args=args, kwargs=kwargs,
daemon=daemon)
def start(self):
"""Start the multiprocess and reading thread."""
self.pipe_alive.set()
super(PipeProcess, self).start()
self.read_out_th = threading.Thread(target=self.read_pipe_out,
args=(self.pipe_alive, self.parent_stdout, sys.stdout))
self.read_err_th = threading.Thread(target=self.read_pipe_out,
args=(self.pipe_alive, self.parent_stderr, sys.stderr))
self.read_out_th.daemon = True
self.read_err_th.daemon = True
self.read_out_th.start()
self.read_err_th.start()
@classmethod
def run_pipe_out_target(cls, pipe_stdout, pipe_stderr, pipe_target, *args, **kwargs):
"""The real multiprocessing target to redirect stdout and stderr to a pipe or queue."""
sys.stdout.write = cls.redirect_write(pipe_stdout) # , sys.__stdout__) # Is redirected in main process
sys.stderr.write = cls.redirect_write(pipe_stderr) # , sys.__stderr__) # Is redirected in main process
pipe_target(*args, **kwargs)
@staticmethod
def redirect_write(child, out=None):
"""Create a function to write out a pipe and write out an additional out."""
if isinstance(child, mp.queues.Queue):
send = child.put
else:
send = child.send_bytes # No need to pickle with child_conn.send(data)
def write(data, *args):
try:
if isinstance(data, str):
data = data.encode('utf-8')
send(data)
if out is not None:
out.write(data)
except:
pass
return write
@classmethod
def read_pipe_out(cls, pipe_alive, pipe_out, out):
if isinstance(pipe_out, mp.queues.Queue):
# Queue has better functionality to get all of the data
def recv():
return pipe_out.get(timeout=0.5)
def is_alive():
return pipe_alive.is_set() or pipe_out.qsize() > 0
else:
# Pipe is more efficient
recv = pipe_out.recv_bytes # No need to unpickle with data = pipe_out.recv()
is_alive = pipe_alive.is_set
# Loop through reading and redirecting data
while is_alive():
try:
data = recv()
if isinstance(data, bytes):
data = data.decode('utf-8')
out.write(data)
except EOFError:
break
except Empty:
pass
except:
pass
def join(self, *args):
# Wait for process to finish (unless a timeout was given)
super(PipeProcess, self).join(*args)
# Trigger to stop the threads
self.pipe_alive.clear()
# Pipe must close to prevent blocking and waiting on recv forever
if not isinstance(self.parent_stdout, mp.queues.Queue):
with contextlib.suppress():
self.parent_stdout.close()
with contextlib.suppress():
self.parent_stderr.close()
# Close the pipes and threads
with contextlib.suppress():
self.read_out_th.join()
with contextlib.suppress():
self.read_err_th.join()
def run_long_print():
for i in range(1000):
print(i)
print(i, file=sys.stderr)
print('finished')
if __name__ == '__main__':
# Example test write (My case was a QTextEdit)
out = open('stdout.log', 'w')
err = open('stderr.log', 'w')
# Overwrite the write function and not the actual stdout object to prove this works
sys.stdout.write = out.write
sys.stderr.write = err.write
# Create a process that uses pipes to read multiprocess output back into sys.stdout.write
proc = PipeProcess(target=run_long_print, use_queue=True) # If use_pipe=True Pipe may not write out all values
# proc.daemon = True # If daemon and use_queue Not all output may be redirected to stdout
proc.start()
# time.sleep(5) # Not needed unless use_pipe or daemon and all of stdout/stderr is desired
# Close the process
proc.join() # For some odd reason this blocks forever when use_queue=False
# Close the output files for this test
out.close()
err.close()