如何在 Python 中对循环内的操作进行多线程处理

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/15143837/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-18 13:28:07  来源:igfitidea点击:

How to Multi-thread an Operation Within a Loop in Python

pythonmultithreadingpython-multithreading

提问by doremi

Say I have a very large list and I'm performing an operation like so:

假设我有一个非常大的列表,我正在执行如下操作:

for item in items:
    try:
        api.my_operation(item)
    except:
        print 'error with item'

My issue is two fold:

我的问题有两个方面:

  • There are a lot of items
  • api.my_operation takes forever to return
  • 有很多项目
  • api.my_operation 需要永远返回

I'd like to use multi-threading to spin up a bunch of api.my_operations at once so I can process maybe 5 or 10 or even 100 items at once.

我想使用多线程一次启动一堆 api.my_operations,这样我就可以一次处理 5、10 甚至 100 个项目。

If my_operation() returns an exception (because maybe I already processed that item) - that's OK. It won't break anything. The loop can continue to the next item.

如果 my_operation() 返回异常(因为我可能已经处理了该项目) - 没关系。它不会破坏任何东西。循环可以继续到下一项。

Note: this is for Python 2.7.3

注意:这是针对 Python 2.7.3

采纳答案by abarnert

First, in Python, if your code is CPU-bound, multithreading won't help, because only one thread can hold the Global Interpreter Lock, and therefore run Python code, at a time. So, you need to use processes, not threads.

首先,在 Python 中,如果您的代码受 CPU 限制,多线程将无济于事,因为一次只有一个线程可以持有全局解释器锁,从而运行 Python 代码。因此,您需要使用进程,而不是线程。

This is not true if your operation "takes forever to return" because it's IO-bound—that is, waiting on the network or disk copies or the like. I'll come back to that later.

如果您的操作“永远需要返回”,这不是真的,因为它是 IO 绑定的——也就是说,等待网络或磁盘副本等。稍后我会回到这个话题。



Next, the way to process 5 or 10 or 100 items at once is to create a pool of 5 or 10 or 100 workers, and put the items into a queue that the workers service. Fortunately, the stdlib multiprocessingand concurrent.futureslibraries both wraps up most of the details for you.

接下来,一次处理 5、10 或 100 个项目的方法是创建一个由 5、10 或 100 个工作人员组成的池,并将这些项目放入工作人员服务的队列中。幸运的是,stdlibmultiprocessingconcurrent.futures库都为您提供了大部分细节。

The former is more powerful and flexible for traditional programming; the latter is simpler if you need to compose future-waiting; for trivial cases, it really doesn't matter which you choose. (In this case, the most obvious implementation with each takes 3 lines with futures, 4 lines with multiprocessing.)

前者对于传统编程更强大、更灵活;如果您需要编写未来等待,后者更简单;对于琐碎的情况,您选择哪个并不重要。(在这种情况下,最明显的实现方式 each 需要 3 行futures, 4 行multiprocessing。)

If you're using 2.6-2.7 or 3.0-3.1, futuresisn't built in, but you can install it from PyPI(pip install futures).

如果您使用的是 2.6-2.7 或 3.0-3.1,futures则不是内置的,但您可以从PyPI( pip install futures)安装它。



Finally, it's usually a lot simpler to parallelize things if you can turn the entire loop iteration into a function call (something you could, e.g., pass to map), so let's do that first:

最后,如果您可以将整个循环迭代转换为函数调用(您可以执行的操作,例如传递给map),那么并行化通常会简单得多,所以让我们先这样做:

def try_my_operation(item):
    try:
        api.my_operation(item)
    except:
        print('error with item')


Putting it all together:

把它们放在一起:

executor = concurrent.futures.ProcessPoolExecutor(10)
futures = [executor.submit(try_my_operation, item) for item in items]
concurrent.futures.wait(futures)


If you have lots of relatively small jobs, the overhead of multiprocessing might swamp the gains. The way to solve that is to batch up the work into larger jobs. For example (using grouperfrom the itertoolsrecipes, which you can copy and paste into your code, or get from the more-itertoolsproject on PyPI):

如果您有很多相对较小的作业,多处理的开销可能会淹没收益。解决这个问题的方法是将工作分成更大的工作。例如(使用grouper来自itertoolsrecipes,您可以将其复制并粘贴到您的代码中,或者从more-itertoolsPyPI 上的项目中获取):

def try_multiple_operations(items):
    for item in items:
        try:
            api.my_operation(item)
        except:
            print('error with item')

executor = concurrent.futures.ProcessPoolExecutor(10)
futures = [executor.submit(try_multiple_operations, group) 
           for group in grouper(5, items)]
concurrent.futures.wait(futures)


Finally, what if your code is IO bound? Then threads are just as good as processes, and with less overhead (and fewer limitations, but those limitations usually won't affect you in cases like this). Sometimes that "less overhead" is enough to mean you don't need batching with threads, but you do with processes, which is a nice win.

最后,如果你的代码是 IO 绑定的怎么办?然后线程和进程一样好,并且开销更少(并且限制更少,但在这种情况下,这些限制通常不会影响您)。有时,“更少的开销”足以意味着您不需要使用线程进行批处理,但您需要使用进程进行批处理,这是一个不错的胜利。

So, how do you use threads instead of processes? Just change ProcessPoolExecutorto ThreadPoolExecutor.

那么,如何使用线程而不是进程呢?只需更改ProcessPoolExecutorThreadPoolExecutor.

If you're not sure whether your code is CPU-bound or IO-bound, just try it both ways.

如果您不确定您的代码是受 CPU 限制还是受 IO 限制,请两种方式都尝试。



Can I do this for multiple functions in my python script? For example, if I had another for loop elsewhere in the code that I wanted to parallelize. Is it possible to do two multi threaded functions in the same script?

我可以在我的 python 脚本中为多个函数执行此操作吗?例如,如果我想要并行化的代码中的其他地方有另一个 for 循环。是否可以在同一个脚本中执行两个多线程函数?

Yes. In fact, there are two different ways to do it.

是的。事实上,有两种不同的方法可以做到。

First, you can share the same (thread or process) executor and use it from multiple places with no problem. The whole point of tasks and futures is that they're self-contained; you don't care where they run, just that you queue them up and eventually get the answer back.

首先,您可以共享同一个(线程或进程)执行程序并在多个地方使用它,没有问题。任务和未来的全部意义在于它们是独立的;你不在乎他们跑在哪里,只要你把他们排队并最终得到答案。

Alternatively, you can have two executors in the same program with no problem. This has a performance cost—if you're using both executors at the same time, you'll end up trying to run (for example) 16 busy threads on 8 cores, which means there's going to be some context switching. But sometimes it's worth doing because, say, the two executors are rarely busy at the same time, and it makes your code a lot simpler. Or maybe one executor is running very large tasks that can take a while to complete, and the other is running very small tasks that need to complete as quickly as possible, because responsiveness is more important than throughput for part of your program.

或者,您可以在同一个程序中有两个执行程序,没有问题。这会带来性能成本——如果您同时使用两个执行程序,您最终将尝试在 8 个内核上运行(例如)16 个繁忙线程,这意味着将进行一些上下文切换。但有时这样做是值得的,因为,比如说,两个执行程序很少同时忙碌,这使您的代码更简单。或者,也许一个执行器正在运行可能需要一段时间才能完成的非常大的任务,而另一个正在运行需要尽快完成的非常小的任务,因为响应性比程序的一部分的吞吐量更重要。

If you don't know which is appropriate for your program, usually it's the first.

如果您不知道哪个适合您的程序,通常是第一个。

回答by Ryan Haining

You can split the processing into a specified number of threads using an approach like this:

您可以使用如下方法将处理拆分为指定数量的线程:

import threading                                                                

def process(items, start, end):                                                 
    for item in items[start:end]:                                               
        try:                                                                    
            api.my_operation(item)                                              
        except Exception:                                                       
            print('error with item')                                            


def split_processing(items, num_splits=4):                                      
    split_size = len(items) // num_splits                                       
    threads = []                                                                
    for i in range(num_splits):                                                 
        # determine the indices of the list this thread will handle             
        start = i * split_size                                                  
        # special case on the last chunk to account for uneven splits           
        end = None if i+1 == num_splits else (i+1) * split_size                 
        # create the thread                                                     
        threads.append(                                                         
            threading.Thread(target=process, args=(items, start, end)))         
        threads[-1].start() # start the thread we just created                  

    # wait for all threads to finish                                            
    for t in threads:                                                           
        t.join()                                                                



split_processing(items)

回答by woozyking

Edit 2018-02-06: revision based on this comment

编辑 2018-02-06:基于此评论的修订

Edit: forgot to mention that this works on Python 2.7.x

编辑:忘了提到这适用于 Python 2.7.x

There's multiprocesing.pool, and the following sample illustrates how to use one of them:

有 multiprocesing.pool,以下示例说明了如何使用其中之一:

from multiprocessing.pool import ThreadPool as Pool
# from multiprocessing import Pool

pool_size = 5  # your "parallelness"

# define worker function before a Pool is instantiated
def worker(item):
    try:
        api.my_operation(item)
    except:
        print('error with item')

pool = Pool(pool_size)

for item in items:
    pool.apply_async(worker, (item,))

pool.close()
pool.join()

Now if you indeed identify that your process is CPU bound as @abarnert mentioned, change ThreadPool to the process pool implementation (commented under ThreadPool import). You can find more details here: http://docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers

现在,如果您确实像@abarnert 提到的那样确定您的进程受 CPU 限制,请将 ThreadPool 更改为进程池实现(在 ThreadPool 导入下注释)。您可以在此处找到更多详细信息:http: //docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers

回答by Vinoj John Hosan

import numpy as np
import threading


def threaded_process(items_chunk):
    """ Your main process which runs in thread for each chunk"""
    for item in items_chunk:                                               
        try:                                                                    
            api.my_operation(item)                                              
        except Exception:                                                       
            print('error with item')  

n_threads = 20
# Splitting the items into chunks equal to number of threads
array_chunk = np.array_split(input_image_list, n_threads)
thread_list = []
for thr in range(n_threads):
    thread = threading.Thread(target=threaded_process, args=(array_chunk[thr]),)
    thread_list.append(thread)
    thread_list[thr].start()

for thread in thread_list:
    thread.join()