Python ThreadPoolExecutor().map 与 ThreadPoolExecutor().submit 有何不同?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/20838162/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-18 21:20:02  来源:igfitidea点击:

How does ThreadPoolExecutor().map differ from ThreadPoolExecutor().submit?

pythonmultithreadingpython-3.xpython-multithreadingconcurrent.futures

提问by Patrick Collins

I was just very confused by some code that I wrote. I was surprised to discover that:

我只是对我写的一些代码感到非常困惑。我惊讶地发现:

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(f, iterable))

and

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    results = list(map(lambda x: executor.submit(f, x), iterable))

produce different results. The first one produces a list of whatever type freturns, the second produces a list of concurrent.futures.Futureobjects that then need to be evaluated with their result()method in order to get the value that freturned.

产生不同的结果。第一个生成一个f返回任何类型的列表,第二个生成一个concurrent.futures.Future对象列表,然后需要使用它们的result()方法评估这些对象以获得f返回的值。

My main concern is that this means that executor.mapcan't take advantage of concurrent.futures.as_completed, which seems like an extremely convenient way to evaluate the results of some long-running calls to a database that I'm making as they become available.

我主要担心的是,这意味着executor.map不能利用concurrent.futures.as_completed,这似乎是一种非常方便的方法来评估对我正在制作的数据库的一些长时间调用的结果,因为它们变得可用。

I'm not at all clear on how concurrent.futures.ThreadPoolExecutorobjects work -- naively, I would prefer the (somewhat more verbose):

我完全不清楚concurrent.futures.ThreadPoolExecutor对象是如何工作的——天真地,我更喜欢(有点冗长):

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    result_futures = list(map(lambda x: executor.submit(f, x), iterable))
    results = [f.result() for f in futures.as_completed(result_futures)]

over the more concise executor.mapin order to take advantage of a possible gain in performance. Am I wrong to do so?

更简洁executor.map,以便利用可能的性能增益。我这样做有错吗?

采纳答案by Kritzefitz

The problem is that you transform the result of ThreadPoolExecutor.mapto a list. If you don't do this and instead iterate over the resulting generator directly, the results are still yielded in the original order but the loop continues before all results are ready. You can test this with this example:

问题是您将结果ThreadPoolExecutor.map转换为列表。如果您不这样做而是直接迭代生成的生成器,则结果仍会按原始顺序生成,但循环会在所有结果准备好之前继续进行。你可以用这个例子来测试:

import time
import concurrent.futures

e = concurrent.futures.ThreadPoolExecutor(4)
s = range(10)
for i in e.map(time.sleep, s):
    print(i)

The reason for the order being kept may be because it's sometimes important that you get results in the same order you give them to map. And results are probably not wrapped in future objects because in some situations it may take just too long to do another map over the list to get all results if you need them. And after all in most cases it's very likely that the next value is ready before the loop processed the first value. This is demonstrated in this example:

保留顺序的原因可能是因为有时按照您给它们映射的相同顺序获得结果很重要。并且结果可能不会包含在未来的对象中,因为在某些情况下,如果需要,在列表上执行另一个映射以获取所有结果可能需要太长时间。毕竟在大多数情况下,很可能在循环处理第一个值之前下一个值就准备好了。这在这个例子中得到了证明:

import concurrent.futures

executor = concurrent.futures.ThreadPoolExecutor() # Or ProcessPoolExecutor
data = some_huge_list()
results = executor.map(crunch_number, data)
finals = []

for value in results:
    finals.append(do_some_stuff(value))

In this example it may be likely that do_some_stufftakes longer than crunch_numberand if this is really the case it's really not a big loss of performace while you still keep the easy usage of map.

在这个例子中,它可能do_some_stuff需要更长的时间crunch_number,如果确实如此,当您仍然保持地图的简单使用时,性能真的没有很大损失。

Also since the worker threads(/processes) start processing at the beginning of the list and work their way to the end to the list you submitted the results should be finished in the order they're already yielded by the iterator. Which means in most cases executor.mapis just fine, but in some cases, for example if it doesn't matter in which order you process the values and the function you passed to maptakes very different times to run, the future.as_completedmay be faster.

此外,由于工作线程(/进程)在列表的开头开始处理并一直工作到您提交的列表的末尾,因此结果应该按照迭代器已经产生的顺序完成。这意味着在大多数情况下executor.map都很好,但在某些情况下,例如,如果您处理值的顺序无关紧要,并且您传递给的函数map需要非常不同的时间来运行,则future.as_completed可能会更快。

回答by edW

Below is an example of submit vs. map. They both accept the jobs immediately (submitted|mapped - start). They take the same time to complete, 11 seconds (last result time - start). However, submit gives results as soon as any thread in the ThreadPoolExecutor maxThreads=2 completes. map gives results in the order they are submitted.

以下是提交与地图的示例。他们都立即接受工作(提交|映射 - 开始)。它们需要相同的时间来完成,11 秒(上次结果时间 - 开始)。但是,一旦 ThreadPoolExecutor maxThreads=2 中的任何线程完成,提交就会给出结果。map 按照提交的顺序给出结果。

import time
import concurrent.futures

def worker(i):
    time.sleep(i)
    return i,time.time()

e = concurrent.futures.ThreadPoolExecutor(2)
arrIn = range(1,7)[::-1]
print arrIn

f = []
print 'start submit',time.time()
for i in arrIn:
    f.append(e.submit(worker,i))
print 'submitted',time.time()
for r in concurrent.futures.as_completed(f):
    print r.result(),time.time()
print

f = []
print 'start map',time.time()
f = e.map(worker,arrIn)
print 'mapped',time.time()
for r in f:
    print r,time.time()    

Output:

输出:

[6, 5, 4, 3, 2, 1]
start submit 1543473934.47
submitted 1543473934.47
(5, 1543473939.473743) 1543473939.47
(6, 1543473940.471591) 1543473940.47
(3, 1543473943.473639) 1543473943.47
(4, 1543473943.474192) 1543473943.47
(1, 1543473944.474617) 1543473944.47
(2, 1543473945.477609) 1543473945.48

start map 1543473945.48
mapped 1543473945.48
(6, 1543473951.483908) 1543473951.48
(5, 1543473950.484109) 1543473951.48
(4, 1543473954.48858) 1543473954.49
(3, 1543473954.488384) 1543473954.49
(2, 1543473956.493789) 1543473956.49
(1, 1543473955.493888) 1543473956.49

回答by Brad Solomon

In addition to the explanation in the answers here, it can be helpful to go right to the source. It reaffirms the statement from another answer here that:

除了此处答案中的解释外,直接查看源代码也很有帮助。它重申此处另一个答案的声明:

  • .map()gives results in the order they are submitted, while
  • iterating over a list of Futureobjects with concurrent.futures.as_completed()won't guarantee this ordering, because this is the nature of as_completed()


.map()is defined in the base class, concurrent.futures._base.Executor:

.map()在基类中定义concurrent.futures._base.Executor

class Executor(object):
    def submit(self, fn, *args, **kwargs):
        raise NotImplementedError()

    def map(self, fn, *iterables, timeout=None, chunksize=1):
        if timeout is not None:
            end_time = timeout + time.monotonic()

        fs = [self.submit(fn, *args) for args in zip(*iterables)]  # <!!!!!!!!

        def result_iterator():
            try:
                # reverse to keep finishing order
                fs.reverse()  # <!!!!!!!!
                while fs:
                    # Careful not to keep a reference to the popped future
                    if timeout is None:
                        yield fs.pop().result()  # <!!!!!!!!
                    else:
                        yield fs.pop().result(end_time - time.monotonic())
            finally:
                for future in fs:
                    future.cancel()
        return result_iterator()

As you mention, there is also .submit(), which left to be defined in the child classes, namely ProcessPoolExecutorand ThreadPoolExecutor, and returns a _base.Futureinstance that you need to call .result()on to actually make do anything.

正如您所提到的,还有.submit(), 需要在子类中定义,即ProcessPoolExecutorand ThreadPoolExecutor,并返回一个_base.Future您需要调用.result()以实际执行任何操作的实例。

The important lines from .map()boil down to:

重要的几行.map()归结为:

fs = [self.submit(fn, *args) for args in zip(*iterables)]
fs.reverse()
while fs:
    yield fs.pop().result()

The .reverse()plus .pop()is a means to get the first-submitted result (from iterables) to be yielded first, the second-submitted result to be yielded second, and so on. The elements of the resulting iterator are not Futures; they're the actual results themselves.

.reverse()加上.pop()是让(从第一次提交的结果的一种手段iterables),以先产生,第二提交的结果进行第二屈服了,等等。结果迭代器的元素不是Futures;它们本身就是实际结果。

回答by wynemo

if you use concurrent.futures.as_completed, you can handle the exception for each function.

如果使用concurrent.futures.as_completed,则可以处理每个函数的异常。

import concurrent.futures
iterable = [1,2,3,4,6,7,8,9,10]

def f(x):
    if x == 2:
        raise Exception('x')
    return x

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    result_futures = list(map(lambda x: executor.submit(f, x), iterable))
    for future in concurrent.futures.as_completed(result_futures):
        try:
            print('resutl is', future.result())
        except Exception as e:
            print('e is', e, type(e))
# resutl is 3
# resutl is 1
# resutl is 4
# e is x <class 'Exception'>
# resutl is 6
# resutl is 7
# resutl is 8
# resutl is 9
# resutl is 10

in executor.map, if there is an exception, the whole executor would stop. you need to handle the exception in the worker function.

in executor.map,如果有异常,整个执行器就会停止。您需要在工作函数中处理异常。

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    for each in executor.map(f, iterable):
        print(each)
# if there is any exception, executor.map would stop