如何使用 python 的 asyncio 模块正确创建和运行并发任务?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/29269370/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 04:18:19  来源:igfitidea点击:

How to properly create and run concurrent tasks using python's asyncio module?

pythonconcurrencytaskpython-3.4python-asyncio

提问by songololo

I am trying to properly understand and implement two concurrently running Taskobjects using Python 3's relatively new asynciomodule.

我正在尝试Task使用 Python 3 相对较新的asyncio模块来正确理解和实现两个并发运行的对象。

In a nutshell, asyncio seems designed to handle asynchronous processes and concurrent Taskexecution over an event loop. It promotes the use of await(applied in async functions) as a callback-free way to wait for and use a result, without blocking the event loop. (Futures and callbacks are still a viable alternative.)

简而言之,asyncio 似乎旨在Task通过事件循环处理异步进程和并发执行。它提倡使用await(应用于异步函数)作为等待和使用结果的无回调方式,而不会阻塞事件循环。(期货和回调仍然是一个可行的选择。)

It also provides the asyncio.Task()class, a specialized subclass of Futuredesigned to wrap coroutines. Preferably invoked by using the asyncio.ensure_future()method. The intended use of asyncio tasks is to allow independently running tasks to run 'concurrently' with other tasks within the same event loop. My understanding is that Tasksare connected to the event loop which then automatically keeps driving the coroutine between awaitstatements.

它还提供了asyncio.Task()类,一个专门Future用于包装协程的子类。最好使用该asyncio.ensure_future()方法调用。异步任务的预期用途是允许独立运行的任务与同一事件循环中的其他任务“同时”运行。我的理解是Tasks连接到事件循环,然后在await语句之间自动保持驱动协程。

I like the idea of being able to use concurrent Tasks without needing to use one of the Executorclasses, but I haven't found much elaboration on implementation.

我喜欢能够使用并发任务而不需要使用其中一个Executor类的想法,但我没有找到关于实现的太多详细说明。

This is how I'm currently doing it:

这就是我目前的做法:

import asyncio

print('running async test')

async def say_boo():
    i = 0
    while True:
        await asyncio.sleep(0)
        print('...boo {0}'.format(i))
        i += 1

async def say_baa():
    i = 0
    while True:
        await asyncio.sleep(0)
        print('...baa {0}'.format(i))
        i += 1

# wrap in Task object
# -> automatically attaches to event loop and executes
boo = asyncio.ensure_future(say_boo())
baa = asyncio.ensure_future(say_baa())

loop = asyncio.get_event_loop()
loop.run_forever()

In the case of trying to concurrently run two looping Tasks, I've noticed that unless the Task has an internal awaitexpression, it will get stuck in the whileloop, effectively blocking other tasks from running (much like a normal whileloop). However, as soon the Tasks have to (a)wait, they seem to run concurrently without an issue.

在尝试同时运行两个循环任务的情况下,我注意到除非任务具有内部await表达式,否则它会卡在while循环中,有效地阻止其他任务运行(很像正常while循环)。但是,一旦任务必须(a)等待,它们似乎可以同时运行而没有问题。

Thus, the awaitstatements seem to provide the event loop with a foothold for switching back and forth between the tasks, giving the effect of concurrency.

因此,这些await语句似乎为事件循环提供了在任务之间来回切换的立足点,从而产生并发效果。

Example output with internal await:

带有内部的示例输出await

running async test
...boo 0
...baa 0
...boo 1
...baa 1
...boo 2
...baa 2

Example output withoutinternal await:

没有内部的示例输出await

...boo 0
...boo 1
...boo 2
...boo 3
...boo 4

Questions

问题

Does this implementation pass for a 'proper' example of concurrent looping Tasks in asyncio?

此实现是否通过了并发循环任务的“正确”示例asyncio

Is it correct that the only way this works is for a Taskto provide a blocking point (awaitexpression) in order for the event loop to juggle multiple tasks?

这样做的唯一方法是让 aTask提供阻塞点(await表达式)以便事件循环处理多个任务是否正确?

采纳答案by dano

Yes, any coroutine that's running inside your event loop will block other coroutines and tasks from running, unless it

是的,任何在您的事件循环中运行的协程都会阻止其他协程和任务运行,除非它

  1. Calls another coroutine using yield fromor await(if using Python 3.5+).
  2. Returns.
  1. 使用yield fromor调用另一个协程await(如果使用 Python 3.5+)。
  2. 返回。

This is because asynciois single-threaded; the only way for the event loop to run is for no other coroutine to be actively executing. Using yield from/awaitsuspends the coroutine temporarily, giving the event loop a chance to work.

这是因为asyncio是单线程的;事件循环运行的唯一方法是没有其他协程主动执行。使用yield from/await暂时挂起协程,让事件循环有机会工作。

Your example code is fine, but in many cases, you probably wouldn't want long-running code that isn't doing asynchronous I/O running inside the event loop to begin with. In those cases, it often makes more sense to use asyncio.loop.run_in_executorto run the code in a background thread or process. ProcessPoolExecutorwould be the better choice if your task is CPU-bound, ThreadPoolExecutorwould be used if you need to do some I/O that isn't asyncio-friendly.

您的示例代码很好,但在许多情况下,您可能不希望长时间运行的代码不执行在事件循环内运行的异步 I/O。在这些情况下,使用asyncio.loop.run_in_executor在后台线程或进程中运行代码通常更有意义。ProcessPoolExecutor如果您的任务受 CPU 限制,ThreadPoolExecutor将是更好的选择,如果您需要执行一些asyncio不友好的I/O ,将使用它。

Your two loops, for example, are completely CPU-bound and don't share any state, so the best performance would come from using ProcessPoolExecutorto run each loop in parallel across CPUs:

例如,您的两个循环完全受 CPU 限制并且不共享任何状态,因此最好的性能来自使用ProcessPoolExecutor跨 CPU 并行运行每个循环:

import asyncio
from concurrent.futures import ProcessPoolExecutor

print('running async test')

def say_boo():
    i = 0
    while True:
        print('...boo {0}'.format(i))
        i += 1


def say_baa():
    i = 0
    while True:
        print('...baa {0}'.format(i))
        i += 1

if __name__ == "__main__":
    executor = ProcessPoolExecutor(2)
    loop = asyncio.get_event_loop()
    boo = asyncio.create_task(loop.run_in_executor(executor, say_boo))
    baa = asyncio.create_task(loop.run_in_executor(executor, say_baa))

    loop.run_forever()

回答by Jashandeep Sohi

You don't necessarily need a yield from xto give control over to the event loop.

您不一定需要 ayield from x来控制事件循环。

In your example, I think the properway would be to do a yield Noneor equivalently a simple yield, rather than a yield from asyncio.sleep(0.001):

在您的示例中,我认为正确的方法是执行 ayield None或等效的 simple yield,而不是 a yield from asyncio.sleep(0.001)

import asyncio

@asyncio.coroutine
def say_boo():
  i = 0
  while True:
    yield None
    print("...boo {0}".format(i))
    i += 1

@asyncio.coroutine
def say_baa():
  i = 0
  while True:
    yield
    print("...baa {0}".format(i))
    i += 1

boo_task = asyncio.async(say_boo())
baa_task = asyncio.async(say_baa())

loop = asyncio.get_event_loop()
loop.run_forever()

Coroutines are just plain old Python generators. Internally, the asyncioevent loop keeps a record of these generators and calls gen.send()on each of them one by one in a never ending loop. Whenever you yield, the call to gen.send()completes and the loop can move on. (I'm simplifying it; take a look around https://hg.python.org/cpython/file/3.4/Lib/asyncio/tasks.py#l265for the actual code)

协程只是普通的旧 Python 生成器。在内部,asyncio事件循环会保存这些生成器的记录,并gen.send()在永无止境的循环中逐个调用它们。无论何时yield,调用gen.send()完成并且循环可以继续。(我正在简化它;查看https://hg.python.org/cpython/file/3.4/Lib/asyncio/tasks.py#l265以获得实际代码)

That said, I would still go the run_in_executorroute if you need to do CPU intensive computation without sharing data.

也就是说,run_in_executor如果您需要在不共享数据的情况下进行 CPU 密集型计算,我仍然会走这条路线。