Python 限制一次运行的最大线程数的正确方法?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/19369724/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 13:34:51  来源:igfitidea点击:

The right way to limit maximum number of threads running at once?

pythonmultithreadingpython-multithreading

提问by d33tah

I'd like to create a program that runs multiple light threads, but limits itself to a constant, predefined number of concurrent running tasks, like this (but with no risk of race condition):

我想创建一个运行多个轻线程的程序,但将自身限制为恒定的、预定义的并发运行任务数量,如下所示(但没有竞争条件的风险):

import threading

def f(arg):
    global running
    running += 1
    print("Spawned a thread. running=%s, arg=%s" % (running, arg))
    for i in range(100000):
        pass
    running -= 1
    print("Done")

running = 0
while True:
    if running < 8:
        arg = get_task()
        threading.Thread(target=f, args=[arg]).start()

What's the safest/fastest way to implement this?

实现这一点的最安全/最快的方法是什么?

采纳答案by cdhowie

It sounds like you want to implement the producer/consumer pattern with eight workers. Python has a Queueclass for this purpose, and it is thread-safe.

听起来您想用 8 个工人实现生产者/消费者模式。Python 有一个Queue用于此目的的类,它是线程安全的。

Each worker should call get()on the queue to retrieve a task. This call will block if no tasks are available, causing the worker to go idle until one becomes available. Then the worker should execute the task and finally call task_done()on the queue.

每个工作人员都应该调用get()队列来检索任务。如果没有可用的任务,此调用将阻塞,导致工作器空闲,直到有可用的任务。然后工作人员应该执行任务并最终调用task_done()队列。

You would put tasks in the queue by calling put()on the queue.

您可以通过调用队列将任务放入put()队列中。

From the main thread, you can call join()on the queue to wait until all pending tasks have been completed.

从主线程,您可以调用join()队列以等待所有待处理任务完成。

This approach has the benefit that you are not creating and destroying threads, which is expensive. The worker threads will run continuously, but will be asleep when no tasks are in the queue, using zero CPU time.

这种方法的好处是您不会创建和销毁线程,这很昂贵。工作线程将持续运行,但在队列中没有任务时将处于休眠状态,使用零 CPU 时间。

(The linked documentation page has an example of this very pattern.)

(链接的文档页面有一个这种模式的例子。)

回答by Kirk Strauser

I've seen that most commonly written like:

我见过最常见的写法是:

threads = [threading.Thread(target=f) for _ in range(8)]
for thread in threads:
    thread.start()
...
for thread in threads:
    thread.join()

If you want to maintain a fixed-size pool of running threads that process short-lived tasks than ask for new work, consider a solution built around Queues, like "How to wait until only the first thread is finished in Python".

如果您想维护一个固定大小的运行线程池,这些线程处理短期任务而不是要求新工作,请考虑围绕队列构建的解决方案,例如“如何在 Python 中等待仅第一个线程完成”。

回答by abarnert

It would be much easier to implement this as a thread pool or executor, using either multiprocessing.dummy.Pool, or concurrent.futures.ThreadPoolExecutor(or, if using Python 2.x, the backport futures). For example:

使用multiprocessing.dummy.Pool, or concurrent.futures.ThreadPoolExecutor(或者,如果使用 Python 2.x,则使用 backport futures)将其实现为线程池或执行器会容易得多。例如:

import concurrent

def f(arg):
    print("Started a task. running=%s, arg=%s" % (running, arg))
    for i in range(100000):
        pass
    print("Done")

with concurrent.futures.ThreadPoolExecutor(8) as executor:
    while True:
        arg = get_task()
        executor.submit(f, arg)

Of course if you can change the pull-model get_taskto a push-model get_tasksthat, e.g., yields tasks one at a time, this is even simpler:

当然,如果您可以将 pull-modelget_task更改为 push-model get_tasks,例如,一次产生一个任务,则更简单:

with concurrent.futures.ThreadPoolExecutor(8) as executor:
    for arg in get_tasks():
        executor.submit(f, arg)

When you run out of tasks (e.g., get_taskraises an exception, or get_tasksruns dry), this will automatically tell the executor to stop after it drains the queue, wait for it to stop, and clean up everything.

当您用完任务(例如,get_task引发异常,或get_tasks运行枯竭)时,这将自动告诉执行程序在排空队列后停止,等待它停止并清理所有内容。

回答by Hammad Haleem

semaphoreis a variable or abstract data type that is used to control access to a common resource by multiple processes in a concurrent system such as a multiprogramming operating system; this can help you here.

信号量是一种变量或抽象数据类型,用于控制并发系统(如多道程序操作系统)中多个进程对公共资源的访问;这可以帮助你。

threadLimiter = threading.BoundedSemaphore(maximumNumberOfThreads)

class MyThread(threading.Thread):

    def run(self):
        threadLimiter.acquire()
        try:
            self.Executemycode()
        finally:
            threadLimiter.release()

    def Executemycode(self):
        print(" Hello World!") 
        # <your code here>

This way you can easily limit the number of threads that will be executed concurrently during the program execution. Variable, 'maximumNumberOfThreads' can be used to define an upper limit on the maximum value of threads.

通过这种方式,您可以轻松限制在程序执行期间并发执行的线程数。变量,'maximumNumberOfThreads' 可用于定义线程最大值的上限。

credits

学分

回答by Benyamin Jafari

For apply limitationon threadcreating, follow this example (it really works):

对于应用限制线程创建,按照这个例子(它确实有效):

import threading
import time


def some_process(thread_num):
    count = 0
    while count < 5:
        time.sleep(0.5)
        count += 1
        print "%s: %s" % (thread_num, time.ctime(time.time()))
        print 'number of alive threads:{}'.format(threading.active_count())


def create_thread():
    try:
        for i in range(1, 555):  # trying to spawn 555 threads.
            thread = threading.Thread(target=some_process, args=(i,))
            thread.start()

            if threading.active_count() == 100:  # set maximum threads.
                thread.join()

            print threading.active_count()  # number of alive threads.

    except Exception as e:
        print "Error: unable to start thread {}".format(e)


if __name__ == '__main__':
    create_thread()

Or:

或者:

Another way to set a thread number checker mutex/lock such as below example:

另一种设置线程编号检查器互斥锁/锁的方法,如下例所示:

import threading
import time


def some_process(thread_num):
    count = 0
    while count < 5:
        time.sleep(0.5)
        count += 1
        # print "%s: %s" % (thread_num, time.ctime(time.time()))
        print 'number of alive threads:{}'.format(threading.active_count())


def create_thread2(number_of_desire_thread ):
    try:
        for i in range(1, 555):
            thread = threading.Thread(target=some_process, args=(i,)).start()

            while number_of_desire_thread <= threading.active_count():
                '''mutex for avoiding to additional thread creation.'''
                pass

            print 'unlock'
            print threading.active_count()  # number of alive threads.

    except Exception as e:
        print "Error: unable to start thread {}".format(e)


if __name__ == '__main__':
    create_thread2(100)

回答by Paul Jacobs

I ran into this same problem and spent days (2 days to be precise) getting to the correct solution using a queue. I wasted a day going down the ThreadPoolExecutor path because there is no way to limit the number of threads that thing launches! I fed it a list of 5000 files to copy and the code went non-responsive once it got up to about 1500 concurrent file copies running all at once. The max_workers parameter on the ThreadPoolExecutor only controls how many workers are spinning up threads not how many threads get spun up.

我遇到了同样的问题,并花了几天(准确地说是 2 天)使用队列找到正确的解决方案。我浪费了一天沿着 ThreadPoolExecutor 路径走下去,因为没有办法限制启动的线程数!我给它提供了一个包含 5000 个要复制的文件的列表,一旦它同时运行了大约 1500 个并发文件副本,代码就会无响应。ThreadPoolExecutor 上的 max_workers 参数只控制有多少工人正在启动线程,而不是有多少线程被启动。

Ok, anyway, here is a very simple example of using a Queue for this:

好的,无论如何,这是一个使用队列的非常简单的例子:

import threading, time, random
from queue import Queue

jobs = Queue()

def do_stuff(q):
    while not q.empty():
        value = q.get()
        time.sleep(random.randint(1, 10))
        print(value)
        q.task_done()

for i in range(10):
    jobs.put(i)

for i in range(3):
    worker = threading.Thread(target=do_stuff, args=(jobs,))
    worker.start()

print("waiting for queue to complete", jobs.qsize(), "tasks")
jobs.join()
print("all done")