Python:并行运行子进程

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/16450788/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-18 22:39:10  来源:igfitidea点击:

Python: running subprocess in parallel

pythonsubprocess

提问by imagineerThat

I have the following code that writes the md5sums to a logfile

我有以下代码将 md5sums 写入日志文件

for file in files_output:
    p=subprocess.Popen(['md5sum',file],stdout=logfile)
p.wait()
  1. Will these be written in parallel? i.e. if md5sum takes a long time for one of the files, will another one be started before waiting for a previous one to complete?

  2. If the answer to the above is yes, can I assume the order of the md5sums written to logfile may differ based upon how long md5sum takes for each file? (some files can be huge, some small)

  1. 这些会被并行写入吗?即如果 md5sum 需要很长时间来处理其中一个文件,是否会在等待前一个文件完成之前启动另一个文件?

  2. 如果上述答案是肯定的,我是否可以假设写入日志文件的 md5sum 的顺序可能会因每个文件的 md5sum 需要多长时间而有所不同?(有些文件可能很大,有些文件很小)

采纳答案by Alfe

All sub processes are run in parallel. (To avoid this one has to wait explicitly for their completion.) They even can write into the log file at the same time, thus garbling the output. To avoid this you should let each process write into a different logfile and collect all outputs when all processes are finished.

所有子进程并行运行。(为了避免这种情况,必须显式等待它们的完成。)它们甚至可以同时写入日志文件,从而导致输出乱码。为避免这种情况,您应该让每个进程写入不同的日志文件,并在所有进程完成后收集所有输出。

q = Queue.Queue()
result = {}  # used to store the results
for fileName in fileNames:
  q.put(fileName)

def worker():
  while True:
    fileName = q.get()
    if fileName is None:  # EOF?
      return
    subprocess_stuff_using(fileName)
    wait_for_finishing_subprocess()
    checksum = collect_md5_result_for(fileName)
    result[fileName] = checksum  # store it

threads = [ threading.Thread(target=worker) for _i in range(20) ]
for thread in threads:
  thread.start()
  q.put(None)  # one EOF marker for each thread

After this the results should be stored in result.

在此之后,结果应存储在result.

回答by dkz

  1. Yes, these md5sum processes will be started in parallel.
  2. Yes, the order of md5sums writes will be unpredictable. And generally it is considered a bad practice to share a single resource like file from many processes this way.
  1. 是的,这些 md5sum 进程将并行启动。
  2. 是的,md5sums 写入的顺序是不可预测的。通常,以这种方式共享来自多个进程的单个资源(如文件)被认为是一种不好的做法。

Also your way of making p.wait()after the forloop will wait just for the last of md5sum processes to finish and the rest of them might still be running.

此外,您p.wait()for循环之后的制作方式将只等待最后一个 md5sum 进程完成,而其余进程可能仍在运行。

But you can modify this code slightly to still have benefits of parallel processing and predictability of synchronized output if you collect the md5sum output into temporary files and collect it back into one file once all processes are done.

但是,如果您将 md5sum 输出收集到临时文件中并在所有进程完成后将其收集回一个文件,则您可以稍微修改此代码以仍然具有并行处理和同步输出的可预测性的好处。

import subprocess
import os

processes = []
for file in files_output:
    f = os.tmpfile()
    p = subprocess.Popen(['md5sum',file],stdout=f)
    processes.append((p, f))

for p, f in processes:
    p.wait()
    f.seek(0)
    logfile.write(f.read())
    f.close()

回答by jfs

A simple way to collect output from parallel md5sum subprocesses is to use a thread pool and write to the file from the main process:

从并行 md5sum 子进程收集输出的一种简单方法是使用线程池并从主进程写入文件:

from multiprocessing.dummy import Pool # use threads
from subprocess import check_output

def md5sum(filename):
    try:
        return check_output(["md5sum", filename]), None
    except Exception as e:
        return None, e

if __name__ == "__main__":
    p = Pool(number_of_processes) # specify number of concurrent processes
    with open("md5sums.txt", "wb") as logfile:
        for output, error in p.imap(md5sum, filenames): # provide filenames
            if error is None:
               logfile.write(output)
  • the output from md5sumis small so you can store it in memory
  • imappreserves order
  • number_of_processesmay be different from number of files or CPU cores (larger values doesn't mean faster: it depends on relative performance of IO (disks) and CPU)
  • 的输出md5sum很小,因此您可以将其存储在内存中
  • imap保持秩序
  • number_of_processes可能与文件数或 CPU 内核数不同(较大的值并不意味着更快:它取决于 IO(磁盘)和 CPU 的相对性能)

You can try to pass several files at once to the md5sum subprocesses.

您可以尝试一次将多个文件传递给 md5sum 子进程。

You don't need external subprocess in this case; you can calculate md5 in Python:

在这种情况下,您不需要外部子流程;您可以在 Python 中计算 md5

import hashlib
from functools import partial

def md5sum(filename, chunksize=2**15, bufsize=-1):
    m = hashlib.md5()
    with open(filename, 'rb', bufsize) as f:
        for chunk in iter(partial(f.read, chunksize), b''):
            m.update(chunk)
    return m.hexdigest()

To use multiple processes instead of threads (to allow the pure Python md5sum()to run in parallel utilizing multiple CPUs) just drop .dummyfrom the import in the above code.

要使用多个进程而不是线程(以允许纯 Pythonmd5sum()使用多个 CPU 并行运行),只需.dummy从上述代码中的导入中删除即可。