Python 使用 asyncio 时,如何让所有正在运行的任务在关闭事件循环之前完成
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/27796294/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
When using asyncio, how do you allow all running tasks to finish before shutting down the event loop
提问by derekdreery
I have the following code:
我有以下代码:
@asyncio.coroutine
def do_something_periodically():
while True:
asyncio.async(my_expensive_operation())
yield from asyncio.sleep(my_interval)
if shutdown_flag_is_set:
print("Shutting down")
break
I run this function until complete. The problem occurs when shutdown is set - the function completes and any pending tasks are never run.
我运行这个函数直到完成。设置关闭时会出现问题 - 函数完成并且任何挂起的任务都不会运行。
This is the error:
这是错误:
task: <Task pending coro=<report() running at script.py:33> wait_for=<Future pending cb=[Task._wakeup()]>>
How do I schedule a shutdown correctly?
如何正确安排关机?
To give some context, I'm writing a system monitor which reads from /proc/stat every 5 seconds, computes the cpu usage in that period, and then sends the result to a server. I want to keep scheduling these monitoring jobs until I receive sigterm, when I stop scheduling, wait for all current jobs to finish, and exit gracefully.
为了提供一些上下文,我正在编写一个系统监视器,它每 5 秒从 /proc/stat 读取一次,计算该期间的 CPU 使用率,然后将结果发送到服务器。我想继续调度这些监控作业,直到我收到 sigterm,当我停止调度时,等待所有当前作业完成,然后优雅地退出。
采纳答案by Martin Richard
You can retrieve unfinished tasks and run the loop again until they finished, then close the loop or exit your program.
您可以检索未完成的任务并再次运行循环直到它们完成,然后关闭循环或退出程序。
pending = asyncio.all_tasks()
loop.run_until_complete(asyncio.gather(*pending))
pending
is a list of pending tasks.asyncio.gather()
allows to wait on several tasks at once.
pending
是待处理任务的列表。asyncio.gather()
允许一次等待多个任务。
If you want to ensure all the tasks are completed inside a coroutine (maybe you have a "main" coroutine), you can do it this way, for instance:
如果你想确保所有的任务都在一个协程中完成(也许你有一个“主”协程),你可以这样做,例如:
async def do_something_periodically():
while True:
asyncio.create_task(my_expensive_operation())
await asyncio.sleep(my_interval)
if shutdown_flag_is_set:
print("Shutting down")
break
await asyncio.gather(*asyncio.all_tasks())
Also, in this case, since all the tasks are created in the same coroutine, you already have access to the tasks:
此外,在这种情况下,由于所有任务都是在同一个协程中创建的,因此您已经可以访问这些任务:
async def do_something_periodically():
tasks = []
while True:
tasks.append(asyncio.create_task(my_expensive_operation()))
await asyncio.sleep(my_interval)
if shutdown_flag_is_set:
print("Shutting down")
break
await asyncio.gather(*tasks)
回答by throws_exceptions_at_you
As of Python 3.7 the above answer uses multiple deprecated APIs(asyncio.async and Task.all_tasks,@asyncio.coroutine, yield from, etc.) and you should rather use this:
从 Python 3.7 开始,上述答案使用了多个已弃用的 API(asyncio.async 和 Task.all_tasks、@asyncio.coroutine、yield from 等),您应该使用它:
import asyncio
async def my_expensive_operation(expense):
print(await asyncio.sleep(expense, result="Expensive operation finished."))
async def do_something_periodically(expense, interval):
while True:
asyncio.create_task(my_expensive_operation(expense))
await asyncio.sleep(interval)
loop = asyncio.get_event_loop()
coro = do_something_periodically(1, 1)
try:
loop.run_until_complete(coro)
except KeyboardInterrupt:
coro.close()
tasks = asyncio.all_tasks(loop)
expensive_tasks = {task for task in tasks if task._coro.__name__ != coro.__name__}
loop.run_until_complete(asyncio.gather(*expensive_tasks))
回答by Ramil Aglyautdinov
You might also consider using asyncio.shield, although by doing this way you won't get ALLthe running tasks finished but only shielded. But it still can be useful in some scenarios.
你也可以考虑使用asyncio.shield,虽然通过这样做,你不会得到全部完成,但只有运行的任务屏蔽。但它在某些情况下仍然有用。
Besides that, as of Python 3.7 we also can use the high-level API method asynio.runhere. As Python core developer, Yury Selivanov suggests:
https://youtu.be/ReXxO_azV-w?t=636
Note:asyncio.run function has been added to asyncio in Python 3.7 on a provisional basis.
除此之外,从 Python 3.7 开始,我们还可以在此处使用高级 API 方法asynio.run。作为 Python 核心开发人员,Yury Selivanov 建议:
https
://youtu.be/ReXxO_azV-w?t =636注意:asyncio.run 函数已临时添加到 Python 3.7 中的 asyncio。
Hope that helps!
希望有帮助!
import asyncio
async def my_expensive_operation(expense):
print(await asyncio.sleep(expense, result="Expensive operation finished."))
async def do_something_periodically(expense, interval):
while True:
asyncio.create_task(my_expensive_operation(expense))
# using asyncio.shield
await asyncio.shield(asyncio.sleep(interval))
coro = do_something_periodically(1, 1)
if __name__ == "__main__":
try:
# using asyncio.run
asyncio.run(coro)
except KeyboardInterrupt:
print('Cancelled!')
回答by gilch
Use a wrapper coroutine that waits until the pending task count is 1 before returning.
使用一个包装器协程,等待挂起的任务计数为 1,然后再返回。
async def loop_job():
asyncio.create_task(do_something_periodically())
while len(asyncio.Task.all_tasks()) > 1: # Any task besides loop_job() itself?
await asyncio.sleep(0.2)
asyncio.run(loop_job())
回答by grabantot
I'm not sure if this is what you've asked for but I had a similar problem and here is the ultimate solution that I came up with.
我不确定这是否是您所要求的,但我遇到了类似的问题,这是我想出的最终解决方案。
The code is python 3 compatible and uses only public asyncio APIs (meaning no hacky _coro
and no deprecated APIs).
代码与 python 3 兼容,仅使用公共 asyncio API(意味着没有 hacky_coro
和不推荐使用的 API)。
import asyncio
async def fn():
await asyncio.sleep(1.5)
print('fn')
async def main():
print('main start')
asyncio.create_task(fn()) # run in parallel
await asyncio.sleep(0.2)
print('main end')
def async_run_and_await_all_tasks(main):
def get_pending_tasks():
tasks = asyncio.Task.all_tasks()
pending = [task for task in tasks if task != run_main_task and not task.done()]
return pending
async def run_main():
await main()
while True:
pending_tasks = get_pending_tasks()
if len(pending_tasks) == 0: return
await asyncio.gather(*pending_tasks)
loop = asyncio.new_event_loop()
run_main_coro = run_main()
run_main_task = loop.create_task(run_main_coro)
loop.run_until_complete(run_main_task)
# asyncio.run(main()) # doesn't print from fn task, because main finishes earlier
async_run_and_await_all_tasks(main)
output (as expected):
输出(如预期):
main start
main end
fn
That async_run_and_await_all_tasks function will make python to behave in a nodejs manner: exit only when there are no unfinished tasks.
该 async_run_and_await_all_tasks 函数将使 python 以 nodejs 方式运行:仅在没有未完成的任务时退出。