如何将python asyncio与线程结合起来?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/28492103/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to combine python asyncio with threads?
提问by fxstein
I have successfully built a RESTful microservicewith Python asyncio and aiohttp that listens to a POST event to collect realtime events from various feeders.
我已经成功地使用 Python asyncio 和 aiohttp构建了一个RESTful 微服务,它侦听 POST 事件以从各种馈送器收集实时事件。
It then builds an in-memory structure to cache the last 24h of events in a nested defaultdict/deque structure.
然后它构建一个内存结构以在嵌套的 defaultdict/deque 结构中缓存最后 24 小时的事件。
Now I would like to periodically checkpoint that structure to disc, preferably using pickle.
现在我想定期检查该结构到磁盘,最好使用泡菜。
Since the memory structure can be >100MB I would like to avoid holding up my incoming event processing for the time it takes to checkpoint the structure.
由于内存结构可能大于 100MB,我想避免在检查点结构所需的时间内阻止传入事件处理。
I'd rather create a snapshot copy (e.g. deepcopy) of the structure and then take my time to write it to disk and repeat on a preset time interval.
我宁愿创建结构的快照副本(例如深拷贝),然后花时间将其写入磁盘并在预设的时间间隔内重复。
I have been searching for examples on how to combine threads (and is a thread even the best solution for this?) and asyncio for that purpose but could not find something that would help me.
我一直在寻找关于如何组合线程的示例(线程甚至是最好的解决方案?)和 asyncio 为此目的,但找不到对我有帮助的东西。
Any pointers to get started are much appreciated!
任何入门指南都非常感谢!
采纳答案by dano
It's pretty simple to delegate a method to a thread or sub-process using BaseEventLoop.run_in_executor
:
使用以下命令将方法委托给线程或子进程非常简单BaseEventLoop.run_in_executor
:
import asyncio
import time
from concurrent.futures import ProcessPoolExecutor
def cpu_bound_operation(x):
time.sleep(x) # This is some operation that is CPU-bound
@asyncio.coroutine
def main():
# Run cpu_bound_operation in the ProcessPoolExecutor
# This will make your coroutine block, but won't block
# the event loop; other coroutines can run in meantime.
yield from loop.run_in_executor(p, cpu_bound_operation, 5)
loop = asyncio.get_event_loop()
p = ProcessPoolExecutor(2) # Create a ProcessPool with 2 processes
loop.run_until_complete(main())
As for whether to use a ProcessPoolExecutor
or ThreadPoolExecutor
, that's kind of hard to say; pickling a large object will definitely eat some CPU cycles, which initially would you make think ProcessPoolExecutor
is the way to go. However, passing your 100MB object to a Process
in the pool would require pickling the instance in your main process, sending the bytes to the child process via IPC, unpickling it in the child, and then pickling it againso you can write it to disk. Given that, my guess is the pickling/unpickling overhead will be large enough that you're better off using a ThreadPoolExecutor
, even though you're going to take a performance hit because of the GIL.
至于是否使用 aProcessPoolExecutor
或ThreadPoolExecutor
,这有点难说;酸洗一个大对象肯定会占用一些 CPU 周期,最初你会认为这ProcessPoolExecutor
是要走的路。但是,将 100MB 对象传递给Process
池中的a需要在主进程中对实例进行酸洗,通过 IPC 将字节发送到子进程,在子进程中对其进行解压,然后再次进行酸洗,以便您可以将其写入磁盘。鉴于此,我的猜测是酸洗/取消酸洗开销将足够大,您最好使用 a ThreadPoolExecutor
,即使您会因为 GIL 而受到性能影响。
That said, it's very simple to test both ways and find out for sure, so you might as well do that.
也就是说,测试两种方式并确定答案非常简单,因此您不妨这样做。
回答by enigmaticPhysicist
I also used run_in_executor
, but I found this function kinda gross under most circumstances, since it requires partial()
for keyword args and I'm never calling it with anything other than a single executor and the default event loop. So I made a convenience wrapper around it with sensible defaults and automatic keyword argument handling.
我也使用了run_in_executor
,但我发现在大多数情况下这个函数有点恶心,因为它需要partial()
关键字 args 并且我从来没有用除了单个执行器和默认事件循环之外的任何东西来调用它。所以我用合理的默认值和自动关键字参数处理围绕它做了一个方便的包装器。
from time import sleep
import asyncio as aio
loop = aio.get_event_loop()
class Executor:
"""In most cases, you can just use the 'execute' instance as a
function, i.e. y = await execute(f, a, b, k=c) => run f(a, b, k=c) in
the executor, assign result to y. The defaults can be changed, though,
with your own instantiation of Executor, i.e. execute =
Executor(nthreads=4)"""
def __init__(self, loop=loop, nthreads=1):
from concurrent.futures import ThreadPoolExecutor
self._ex = ThreadPoolExecutor(nthreads)
self._loop = loop
def __call__(self, f, *args, **kw):
from functools import partial
return self._loop.run_in_executor(self._ex, partial(f, *args, **kw))
execute = Executor()
...
def cpu_bound_operation(t, alpha=30):
sleep(t)
return 20*alpha
async def main():
y = await execute(cpu_bound_operation, 5, alpha=-2)
loop.run_until_complete(main())