python 是否有多线程 map() 函数?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/2562757/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-11-04 00:55:54  来源:igfitidea点击:

Is there a multithreaded map() function?

pythonmultithreading

提问by Sandro

I have a function that is side-effect free. I would like to run it for every element in an array and return an array with all of the results.

我有一个没有副作用的功能。我想为数组中的每个元素运行它并返回一个包含所有结果的数组。

Does Python have something to generate all of the values?

Python 是否可以生成所有值?

回答by samtregar

Try the Pool.map function from multiprocessing:

尝试多处理中的 Pool.map 函数:

http://docs.python.org/library/multiprocessing.html#using-a-pool-of-workers

http://docs.python.org/library/multiprocessing.html#using-a-pool-of-workers

It's not multithreaded per-se, but that's actually good since multithreading is severely crippled in Python by the GIL.

它本身不是多线程,但这实际上很好,因为 GIL 在 Python 中严重削弱了多线程。

回答by Haipeng Li

Try concurrent.futures.ThreadPoolExecutor.mapin Python Standard Library (New in version 3.2).

在 Python 标准库中尝试concurrent.futures.ThreadPoolExecutor.map(3.2 版中的新功能)。

Similar to map(func, *iterables)except:

  • the iterables are collected immediately rather than lazily;
  • func is executed asynchronously and several calls to func may be made concurrently.

类似于map(func, *iterables)除了:

  • 可迭代对象被立即收集而不是懒惰地收集;
  • func 是异步执行的,并且可以同时进行多次对 func 的调用。

A simple example (modified from ThreadPoolExecutor Example):

一个简单的例子(修改自ThreadPoolExecutor Example):

import concurrent.futures
import urllib.request

URLS = [
  'http://www.foxnews.com/',
  'http://www.cnn.com/',
  'http://europe.wsj.com/',
  'http://www.bbc.co.uk/',
]

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    # Do something here
    # For example
    with urllib.request.urlopen(url, timeout=timeout) as conn:
      try:
        data = conn.read()
      except Exception as e:
        # You may need a better error handler.
        return b''
      else:
        return data

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
    # map
    l = list(executor.map(lambda url: load_url(url, 60), URLS))

print('Done.')

回答by BrainCore

You can use the multiprocessing python package (http://docs.python.org/library/multiprocessing.html). The cloud python package, available from PiCloud (http://www.picloud.com), offers a multi-processing map() function as well, which can offload your map to the cloud.

您可以使用多处理 python 包 ( http://docs.python.org/library/multiprocessing.html)。PiCloud ( http://www.picloud.com) 提供的云 python 包也提供多处理 map() 函数,可以将您的地图卸载到云中。

回答by Maximilian

Python now has the concurrent.futures module, which is the simplest way of getting map to work with either multiple threads or multiple processes.

Python 现在有 concurrent.futures 模块,这是让 map 与多线程或多进程一起工作的最简单方法。

https://docs.python.org/3/library/concurrent.futures.html

https://docs.python.org/3/library/concurrent.futures.html

回答by speedplane

Below is my map_parallelfunction. It works just like map, except it can run each element in parallel in a separate thread (but see note below). This answer builds upon another SO answer.

下面是我的map_parallel功能。它的工作原理与 类似map,但它可以在单独的线程中并行运行每个元素(但请参阅下面的注释)。这个答案建立在另一个 SO 答案的基础上。

import threading
import logging
def map_parallel(f, iter, max_parallel = 10):
    """Just like map(f, iter) but each is done in a separate thread."""
    # Put all of the items in the queue, keep track of order.
    from queue import Queue, Empty
    total_items = 0
    queue = Queue()
    for i, arg in enumerate(iter):
        queue.put((i, arg))
        total_items += 1
    # No point in creating more thread objects than necessary.
    if max_parallel > total_items:
        max_parallel = total_items

    # The worker thread.
    res = {}
    errors = {}
    class Worker(threading.Thread):
        def run(self):
            while not errors:
                try:
                    num, arg = queue.get(block = False)
                    try:
                        res[num] = f(arg)
                    except Exception as e:
                        errors[num] = sys.exc_info()
                except Empty:
                    break

    # Create the threads.
    threads = [Worker() for _ in range(max_parallel)]
    # Start the threads.
    [t.start() for t in threads]
    # Wait for the threads to finish.
    [t.join() for t in threads]

    if errors:
        if len(errors) > 1:
            logging.warning("map_parallel multiple errors: %d:\n%s"%(
                len(errors), errors))
        # Just raise the first one.
        item_i = min(errors.keys())
        type, value, tb = errors[item_i]
        # Print the original traceback
        logging.info("map_parallel exception on item %s/%s:\n%s"%(
            item_i, total_items, "\n".join(traceback.format_tb(tb))))
        raise value
    return [res[i] for i in range(len(res))]

NOTE: One thing to be careful of is Exceptions. Like normal map, the above function raises an exception if one of it's sub-thread raises an exception, and will stop iteration. However, due to the parallel nature, there's no guarantee that the earliest element will raise the first exception.

注意:需要注意的一件事是异常。与 normal 一样map,上述函数在其子线程之一引发异常时引发异常,并将停止迭代。但是,由于并行性,不能保证最早的元素会引发第一个异常。

回答by Adam Nelson

Maybe try the Unladen Swallow Python 3implementation? That might be a major project, and not guaranteed to be stable, but if you're inclined it could work. Then list or set comprehensionsseem like the proper functional structure to use.

也许尝试Unladen Swallow Python 3实现?这可能是一个重大项目,并不能保证稳定,但如果您愿意,它可以工作。那么列表或集合推导式似乎是使用的正确功能结构。