Python 分别从子进程 stdout 和 stderr 读取,同时保留顺序

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/31833897/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 10:36:50  来源:igfitidea点击:

Python read from subprocess stdout and stderr separately while preserving order

pythonsubprocessstdoutstderr

提问by Luke Sapan

I have a python subprocess that I'm trying to read output and error streams from. Currently I have it working, but I'm only able to read from stderrafter I've finished reading from stdout. Here's what it looks like:

我有一个 python 子进程,我试图从中读取输出和错误流。目前我可以使用它,但是我只能在阅读stderrstdout. 这是它的样子:

process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout_iterator = iter(process.stdout.readline, b"")
stderr_iterator = iter(process.stderr.readline, b"")

for line in stdout_iterator:
    # Do stuff with line
    print line

for line in stderr_iterator:
    # Do stuff with line
    print line

As you can see, the stderrfor loop can't start until the stdoutloop completes. How can I modify this to be able to read from both in the correct order the lines come in?

如您所见,stderrfor 循环在stdout循环完成之前无法启动。如何修改它以便能够以正确的顺序读取行进来的两者?

To clarify:I still need to be able to tell whether a line came from stdoutor stderrbecause they will be treated differently in my code.

澄清一下我仍然需要能够判断一行是来自stdout还是stderr因为它们在我的代码中会被区别对待。

采纳答案by Dev Aggarwal

Here's a solution based on selectors, but one that preserves order, and streams variable-length characters (even single chars).

这是一个基于selectors, 但保留顺序的解决方案,并流式传输可变长度字符(甚至单个字符)。

The trick is to use read1(), instead of read().

诀窍是使用read1(), 而不是read()

import selectors
import subprocess
import sys

p = subprocess.Popen(
    ["python", "random_out.py"], stdout=subprocess.PIPE, stderr=subprocess.PIPE
)

sel = selectors.DefaultSelector()
sel.register(p.stdout, selectors.EVENT_READ)
sel.register(p.stderr, selectors.EVENT_READ)

while True:
    for key, _ in sel.select():
        data = key.fileobj.read1().decode()
        if not data:
            exit()
        if key.fileobj is p.stdout:
            print(data, end="")
        else:
            print(data, end="", file=sys.stderr)

If you want a test program, use this.

如果您想要一个测试程序,请使用它。

import sys
from time import sleep


for i in range(10):
    print(f" x{i} ", file=sys.stderr, end="")
    sleep(0.1)
    print(f" y{i} ", end="")
    sleep(0.1)

回答by Patrick Maupin

I wrote something to do this a long time ago. I haven't yet ported it to Python 3, but it shouldn't be too difficult (patches accepted!)

很久以前我写了一些东西来做到这一点。我还没有将它移植到 Python 3,但应该不会太难(接受补丁!)

If you run it standalone, you will see a lot of the different options. In any case, it allows you to distinguish stdout from stderr.

如果你独立运行它,你会看到很多不同的选项。在任何情况下,它都允许您将 stdout 与 stderr 区分开来。

回答by jfs

The code in your question may deadlock if the child process produces enough output on stderr (~100KB on my Linux machine).

如果子进程在 stderr 上产生足够的输出(在我的 Linux 机器上约为 100KB),那么您问题中的代码可能会死锁。

There is a communicate()method that allows to read from both stdout and stderr separately:

有一种communicate()方法可以分别从 stdout 和 stderr 读取:

from subprocess import Popen, PIPE

process = Popen(command, stdout=PIPE, stderr=PIPE)
output, err = process.communicate()

If you need to read the streams while the child process is still running then the portable solution is to use threads (not tested):

如果您需要在子进程仍在运行时读取流,那么可移植的解决方案是使用线程(未测试):

from subprocess import Popen, PIPE
from threading import Thread
from Queue import Queue # Python 2

def reader(pipe, queue):
    try:
        with pipe:
            for line in iter(pipe.readline, b''):
                queue.put((pipe, line))
    finally:
        queue.put(None)

process = Popen(command, stdout=PIPE, stderr=PIPE, bufsize=1)
q = Queue()
Thread(target=reader, args=[process.stdout, q]).start()
Thread(target=reader, args=[process.stderr, q]).start()
for _ in range(2):
    for source, line in iter(q.get, None):
        print "%s: %s" % (source, line),

See:

看:

回答by J?rg Schulz

The order in which a process writes data to different pipes is lost after write.

进程将数据写入不同管道的顺序在写入后丢失。

There is no way you can tell if stdout has been written before stderr.

您无法判断 stdout 是否已在 stderr 之前写入。

You can try to read data simultaneously from multiple file descriptors in a non-blocking way as soon as data is available, but this would only minimize the probability that the order is incorrect.

一旦数据可用,您可以尝试以非阻塞方式同时从多个文件描述符中读取数据,但这只会最大限度地减少顺序不正确的可能性。

This program should demonstrate this:

这个程序应该证明这一点:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import select
import subprocess

testapps={
    'slow': '''
import os
import time
os.write(1, 'aaa')
time.sleep(0.01)
os.write(2, 'bbb')
time.sleep(0.01)
os.write(1, 'ccc')
''',
    'fast': '''
import os
os.write(1, 'aaa')
os.write(2, 'bbb')
os.write(1, 'ccc')
''',
    'fast2': '''
import os
os.write(1, 'aaa')
os.write(2, 'bbbbbbbbbbbbbbb')
os.write(1, 'ccc')
'''
}

def readfds(fds, maxread):
    while True:
        fdsin, _, _ = select.select(fds,[],[])
        for fd in fdsin:
            s = os.read(fd, maxread)
            if len(s) == 0:
                fds.remove(fd)
                continue
            yield fd, s
        if fds == []:
            break

def readfromapp(app, rounds=10, maxread=1024):
    f=open('testapp.py', 'w')
    f.write(testapps[app])
    f.close()

    results={}
    for i in range(0, rounds):
        p = subprocess.Popen(['python', 'testapp.py'], stdout=subprocess.PIPE
                                                     , stderr=subprocess.PIPE)
        data=''
        for (fd, s) in readfds([p.stdout.fileno(), p.stderr.fileno()], maxread):
            data = data + s
        results[data] = results[data] + 1 if data in results else 1

    print 'running %i rounds %s with maxread=%i' % (rounds, app, maxread)
    results = sorted(results.items(), key=lambda (k,v): k, reverse=False)
    for data, count in results:
        print '%03i x %s' % (count, data)


print
print "=> if output is produced slowly this should work as whished"
print "   and should return: aaabbbccc"
readfromapp('slow',  rounds=100, maxread=1024)

print
print "=> now mostly aaacccbbb is returnd, not as it should be"
readfromapp('fast',  rounds=100, maxread=1024)

print
print "=> you could try to read data one by one, and return"
print "   e.g. a whole line only when LF is read"
print "   (b's should be finished before c's)"
readfromapp('fast',  rounds=100, maxread=1)

print
print "=> but even this won't work ..."
readfromapp('fast2', rounds=100, maxread=1)

and outputs something like this:

并输出如下内容:

=> if output is produced slowly this should work as whished
   and should return: aaabbbccc
running 100 rounds slow with maxread=1024
100 x aaabbbccc

=> now mostly aaacccbbb is returnd, not as it should be
running 100 rounds fast with maxread=1024
006 x aaabbbccc
094 x aaacccbbb

=> you could try to read data one by one, and return
   e.g. a whole line only when LF is read
   (b's should be finished before c's)
running 100 rounds fast with maxread=1
003 x aaabbbccc
003 x aababcbcc
094 x abababccc

=> but even this won't work ...
running 100 rounds fast2 with maxread=1
003 x aaabbbbbbbbbbbbbbbccc
001 x aaacbcbcbbbbbbbbbbbbb
008 x aababcbcbcbbbbbbbbbbb
088 x abababcbcbcbbbbbbbbbb

回答by simomo

According to python's doc

根据python的文档

Popen.stdout If the stdout argument was PIPE, this attribute is a file object that provides output from the child process. Otherwise, it is None.

Popen.stderr If the stderr argument was PIPE, this attribute is a file object that provides error output from the child process. Otherwise, it is None.

Popen.stdout 如果 stdout 参数是 PIPE,则此属性是提供子进程输出的文件对象。否则,它是无。

Popen.stderr 如果 stderr 参数是 PIPE,则此属性是一个文件对象,它提供来自子进程的错误输出。否则,它是无。

Below sample can do what you want

下面的示例可以做你想做的

test.py

测试文件

print "I'm stdout"

raise Exception("I'm Error")

printer.py

打印机.py

import subprocess

p = subprocess.Popen(['python', 'test.py'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)

print "Normal"
std_lines = p.stdout.readlines()
for line in std_lines:
    print line.rstrip()

print "Error"
stderr_lines = p.stderr.readlines()
for line in stderr_lines:
    print line.rstrip()

Output:

输出:

Normal
I'm stdout

Error
Traceback (most recent call last):
  File "test.py", line 3, in <module>
    raise Exception("I'm Error")
Exception: I'm Error

回答by Marten Jacobs

I know this question is very old, but this answer may help others who stumble upon this page in researching a solution for a similar situation, so I'm posting it anyway.

我知道这个问题很老了,但是这个答案可能会帮助偶然发现此页面的其他人研究类似情况的解决方案,所以我还是发布了它。

I've built a simple python snippet that will merge any number of pipes into a single one. Of course, as stated above, the order cannot be guaranteed, but this is as close as I think you can get in Python.

我已经构建了一个简单的 python 片段,它将任意数量的管道合并为一个。当然,如上所述,顺序不能保证,但这与我认为您在 Python 中所能获得的最接近。

It spawns a thread for each of the pipes, reads them line by line and puts them into a Queue (which is FIFO). The main thread loops through the queue, yielding each line.

它为每个管道生成一个线程,逐行读取它们并将它们放入队列(即 FIFO)。主线程循环遍历队列,产生每一行。

import threading, queue
def merge_pipes(**named_pipes):
    r'''
    Merges multiple pipes from subprocess.Popen (maybe other sources as well).
    The keyword argument keys will be used in the output to identify the source
    of the line.

    Example:
    p = subprocess.Popen(['some', 'call'],
                         stdin=subprocess.PIPE,
                         stdout=subprocess.PIPE,
                         stderr=subprocess.PIPE)
    outputs = {'out': log.info, 'err': log.warn}
    for name, line in merge_pipes(out=p.stdout, err=p.stderr):
        outputs[name](line)

    This will output stdout to the info logger, and stderr to the warning logger
    '''

    # Constants. Could also be placed outside of the method. I just put them here
    # so the method is fully self-contained
    PIPE_OPENED=1
    PIPE_OUTPUT=2
    PIPE_CLOSED=3

    # Create a queue where the pipes will be read into
    output = queue.Queue()

    # This method is the run body for the threads that are instatiated below
    # This could be easily rewritten to be outside of the merge_pipes method,
    # but to make it fully self-contained I put it here
    def pipe_reader(name, pipe):
        r"""
        reads a single pipe into the queue
        """
        output.put( ( PIPE_OPENED, name, ) )
        try:
            for line in iter(pipe.readline,''):
                output.put( ( PIPE_OUTPUT, name, line.rstrip(), ) )
        finally:
            output.put( ( PIPE_CLOSED, name, ) )

    # Start a reader for each pipe
    for name, pipe in named_pipes.items():
        t=threading.Thread(target=pipe_reader, args=(name, pipe, ))
        t.daemon = True
        t.start()

    # Use a counter to determine how many pipes are left open.
    # If all are closed, we can return
    pipe_count = 0

    # Read the queue in order, blocking if there's no data
    for data in iter(output.get,''):
        code=data[0]
        if code == PIPE_OPENED:
            pipe_count += 1
        elif code == PIPE_CLOSED:
            pipe_count -= 1
        elif code == PIPE_OUTPUT:
            yield data[1:]
        if pipe_count == 0:
            return

回答by waszil

This works for me (on windows): https://github.com/waszil/subpiper

这对我有用(在 Windows 上):https: //github.com/waszil/subpiper

from subpiper import subpiper

def my_stdout_callback(line: str):
    print(f'STDOUT: {line}')

def my_stderr_callback(line: str):
    print(f'STDERR: {line}')

my_additional_path_list = [r'c:\important_location']

retcode = subpiper(cmd='echo magic',
                   stdout_callback=my_stdout_callback,
                   stderr_callback=my_stderr_callback,
                   add_path_list=my_additional_path_list)

回答by Deepak Yadav

This works for Python3 (3.6):

这适用于 Python3 (3.6):

    p = subprocess.Popen(cmd, stdout=subprocess.PIPE, 
                         stderr=subprocess.PIPE, universal_newlines=True)
    # Read both stdout and stderr simultaneously
    sel = selectors.DefaultSelector()
    sel.register(p.stdout, selectors.EVENT_READ)
    sel.register(p.stderr, selectors.EVENT_READ)
    ok = True
    while ok:
        for key, val1 in sel.select():
            line = key.fileobj.readline()
            if not line:
                ok = False
                break
            if key.fileobj is p.stdout:
                print(line, end="")
            else:
                print(line, end="", file=sys.stderr)