Python 如何使用 asyncio 定期执行函数?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/37512182/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 19:29:10  来源:igfitidea点击:

How can I periodically execute a function with asyncio?

pythonpython-3.xtornadopython-3.5python-asyncio

提问by 2Cubed

I'm migrating from tornadoto asyncio, and I can't find the asyncioequivalent of tornado's PeriodicCallback. (A PeriodicCallbacktakes two arguments: the function to run and the number of milliseconds between calls.)

我正在从 迁移tornadoasyncio,但找不到's的asyncio等效项。(A有两个参数:要运行的函数和调用之间的毫秒数。)tornadoPeriodicCallbackPeriodicCallback

  • Is there such an equivalent in asyncio?
  • If not, what would be the cleanest way to implement this without running the risk of getting a RecursionErrorafter a while?
  • 在 中有这样的等价物asyncio吗?
  • 如果没有,那么在不冒一段时间RecursionError后获得风险的情况下,最干净的实现方法是什么?

采纳答案by A. Jesse Jiryu Davis

For Python versions below 3.5:

对于低于 3.5 的 Python 版本:

import asyncio

@asyncio.coroutine
def periodic():
    while True:
        print('periodic')
        yield from asyncio.sleep(1)

def stop():
    task.cancel()

loop = asyncio.get_event_loop()
loop.call_later(5, stop)
task = loop.create_task(periodic())

try:
    loop.run_until_complete(task)
except asyncio.CancelledError:
    pass

For Python 3.5 and above:

对于 Python 3.5 及更高版本:

import asyncio

async def periodic():
    while True:
        print('periodic')
        await asyncio.sleep(1)

def stop():
    task.cancel()

loop = asyncio.get_event_loop()
loop.call_later(5, stop)
task = loop.create_task(periodic())

try:
    loop.run_until_complete(task)
except asyncio.CancelledError:
    pass

回答by Martijn Pieters

There is no built-in support for periodic calls, no.

没有对定期调用的内置支持,没有。

Just create your own scheduler loop that sleeps and executes any tasks scheduled:

只需创建您自己的调度程序循环,它会休眠并执行任何调度的任务:

import math, time

async def scheduler():
    while True:
        # sleep until the next whole second
        now = time.time()
        await asyncio.sleep(math.ceil(now) - now)

        # execute any scheduled tasks
        await for task in scheduled_tasks(time.time()):
            await task()

The scheduled_tasks()iterator should produce tasks that are ready to be run at the given time. Note that producing the schedule and kicking off all the tasks could in theory take longer than 1 second; the idea here is that the scheduler yields all tasks that should have started since the last check.

scheduled_tasks()迭代器应该产生对于准备在给定的时间运行的任务。请注意,理论上,生成时间表并启动所有任务可能需要超过 1 秒的时间;这里的想法是调度程序产生自上次检查以来应该开始的所有任务。

回答by Mikhail Gerasimov

When you feel that something should happen "in background" of your asyncio program, asyncio.Taskmight be good way to do it. You can read this postto see how to work with tasks.

当您觉得应该在 asyncio 程序的“后台”发生某些事情时,这asyncio.Task可能是一个很好的方法。您可以阅读这篇文章以了解如何处理任务。

Here's possible implementation of class that executes some function periodically:

这是定期执行某些功能的类的可能实现:

import asyncio
from contextlib import suppress


class Periodic:
    def __init__(self, func, time):
        self.func = func
        self.time = time
        self.is_started = False
        self._task = None

    async def start(self):
        if not self.is_started:
            self.is_started = True
            # Start task to call func periodically:
            self._task = asyncio.ensure_future(self._run())

    async def stop(self):
        if self.is_started:
            self.is_started = False
            # Stop task and await it stopped:
            self._task.cancel()
            with suppress(asyncio.CancelledError):
                await self._task

    async def _run(self):
        while True:
            await asyncio.sleep(self.time)
            self.func()

Let's test it:

让我们测试一下:

async def main():
    p = Periodic(lambda: print('test'), 1)
    try:
        print('Start')
        await p.start()
        await asyncio.sleep(3.1)

        print('Stop')
        await p.stop()
        await asyncio.sleep(3.1)

        print('Start')
        await p.start()
        await asyncio.sleep(3.1)
    finally:
        await p.stop()  # we should stop task finally


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Output:

输出:

Start
test
test
test

Stop

Start
test
test
test

[Finished in 9.5s]

As you see on startwe just start task that calls some functions and sleeps some time in endless loop. On stopwe just cancel that task. Note, that task should be stopped at the moment program finished.

正如您在上面看到的,start我们只是开始调用一些函数并在无限循环中休眠一段时间的任务。在stop我们刚刚取消该任务。请注意,该任务应在程序完成时停止。

One more important thing that your callback shouldn't take much time to be executed (or it'll freeze your event loop). If you're planning to call some long-running func, you possibly would need to run it in executor.

更重要的一件事是您的回调不应花费太多时间来执行(否则它会冻结您的事件循环)。如果您打算调用一些 long-running func,则可能需要在 executor 中运行它

回答by Fernando José Esteves de Souza

Alternative version with decorator for python 3.7

python 3.7 带有装饰器的替代版本

import asyncio
import time


def periodic(period):
    def scheduler(fcn):

        async def wrapper(*args, **kwargs):

            while True:
                asyncio.create_task(fcn(*args, **kwargs))
                await asyncio.sleep(period)

        return wrapper

    return scheduler


@periodic(2)
async def do_something(*args, **kwargs):
    await asyncio.sleep(5)  # Do some heavy calculation
    print(time.time())


if __name__ == '__main__':
    asyncio.run(do_something('Maluzinha do papai!', secret=42))

回答by Fred Ross

A variant that may be helpful: if you want your recurring call to happen every n seconds instead of n seconds between the end of the last execution and the beginning of the next, and you don't want calls to overlap in time, the following is simpler:

一个可能有用的变体:如果您希望重复调用每 n 秒发生一次,而不是在上次执行结束和下一次开始之间的 n 秒,并且您不希望调用在时间上重叠,请执行以下操作更简单:

async def repeat(interval, func, *args, **kwargs):
    """Run func every interval seconds.

    If func has not finished before *interval*, will run again
    immediately when the previous iteration finished.

    *args and **kwargs are passed as the arguments to func.
    """
    while True:
        await asyncio.gather(
            func(*args, **kwargs),
            asyncio.sleep(interval),
        )

And an example of using it to run a couple tasks in the background:

以及使用它在后台运行几个任务的示例:

async def f():
    await asyncio.sleep(1)
    print('Hello')


async def g():
    await asyncio.sleep(0.5)
    print('Goodbye')


async def main():
    t1 = asyncio.ensure_future(repeat(3, f))
    t2 = asyncio.ensure_future(repeat(2, g))
    await t1
    await t2

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

回答by Wojciech Migda

Based on @A. Jesse Jiryu Davis response (with @Torkel Bj?rnson-Langen and @ReWrite comments) this is an improvement which avoids drift.

基于@A。Jesse Jiryu Davis 回应(@Torkel Bj?rnson-Langen 和@ReWrite 评论)这是一项避免漂移的改进。

import time
import asyncio

@asyncio.coroutine
def periodic(period):
    def g_tick():
        t = time.time()
        count = 0
        while True:
            count += 1
            yield max(t + count * period - time.time(), 0)
    g = g_tick()

    while True:
        print('periodic', time.time())
        yield from asyncio.sleep(next(g))

loop = asyncio.get_event_loop()
task = loop.create_task(periodic(1))
loop.call_later(5, task.cancel)

try:
    loop.run_until_complete(task)
except asyncio.CancelledError:
    pass