Python 每 n 秒运行一次特定代码

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/3393612/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-18 10:48:57  来源:igfitidea点击:

Run certain code every n seconds

pythonmultithreading

提问by John Howard

Is there a way to, for example, print Hello World!every n seconds? For example, the program would go through whatever code I had, then once it had been 5 seconds (with time.sleep()) it would execute that code. I would be using this to update a file though, not print Hello World.

例如,有没有办法Hello World!每 n 秒打印一次?例如,该程序将遍历我拥有的任何代码,然后一旦过了 5 秒(使用time.sleep()),它就会执行该代码。我将使用它来更新文件,而不是打印 Hello World。

For example:

例如:

startrepeat("print('Hello World')", .01) # Repeats print('Hello World') ever .01 seconds

for i in range(5):
    print(i)

>> Hello World!
>> 0
>> 1
>> 2
>> Hello World!
>> 3
>> Hello World!
>> 4

采纳答案by Alex Martelli

import threading

def printit():
  threading.Timer(5.0, printit).start()
  print "Hello, World!"

printit()

# continue with the rest of your code

https://docs.python.org/3/library/threading.html#timer-objects

https://docs.python.org/3/library/threading.html#timer-objects

回答by avacariu

def update():
    import time
    while True:
        print 'Hello World!'
        time.sleep(5)

That'll run as a function. The while True:makes it run forever. You can always take it out of the function if you need.

这将作为一个函数运行。这while True:使它永远运行。如果需要,您可以随时将其从函数中取出。

回答by Kit

You can start a separate thread whose sole duty is to count for 5 seconds, update the file, repeat. You wouldn't want this separate thread to interfere with your main thread.

您可以启动一个单独的线程,其唯一职责是计数 5 秒,更新文件,重复。您不希望这个单独的线程干扰您的主线程。

回答by MestreLion

My humble take on the subject, a generalization of Alex Martelli's answer, with start() and stop() control:

我对这个主题的谦虚看法,是 Alex Martelli 答案的概括,带有 start() 和 stop() 控制:

from threading import Timer

class RepeatedTimer(object):
    def __init__(self, interval, function, *args, **kwargs):
        self._timer     = None
        self.interval   = interval
        self.function   = function
        self.args       = args
        self.kwargs     = kwargs
        self.is_running = False
        self.start()

    def _run(self):
        self.is_running = False
        self.start()
        self.function(*self.args, **self.kwargs)

    def start(self):
        if not self.is_running:
            self._timer = Timer(self.interval, self._run)
            self._timer.start()
            self.is_running = True

    def stop(self):
        self._timer.cancel()
        self.is_running = False

Usage:

用法:

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!

Features:

特征:

  • Standard library only, no external dependencies
  • start()and stop()are safe to call multiple times even if the timer has already started/stopped
  • function to be called can have positional and named arguments
  • You can change intervalanytime, it will be effective after next run. Same for args, kwargsand even function!
  • 仅标准库,无外部依赖
  • start()stop()即使计时器已经开始/停止,也可以安全地多次调用
  • 要调用的函数可以有位置和命名参数
  • 可以interval随时更改,下次运行后生效。同为argskwargs甚至function

回答by Yan King Yin

Save yourself a schizophrenic episode and use the Advanced Python scheduler: http://pythonhosted.org/APScheduler

保存自己的精神分裂症发作并使用高级 Python 调度程序:http: //pythonhosted.org/APScheduler

The code is so simple:

代码就这么简单:

from apscheduler.scheduler import Scheduler

sched = Scheduler()
sched.start()

def some_job():
    print "Every 10 seconds"

sched.add_interval_job(some_job, seconds = 10)

....
sched.shutdown()

回答by jfs

Here's a version that doesn't create a new thread every nseconds:

这是一个不会每秒钟创建一个新线程的版本n

from threading import Event, Thread

def call_repeatedly(interval, func, *args):
    stopped = Event()
    def loop():
        while not stopped.wait(interval): # the first call is in `interval` secs
            func(*args)
    Thread(target=loop).start()    
    return stopped.set

The event is used to stop the repetitions:

该事件用于停止重复:

cancel_future_calls = call_repeatedly(5, print, "Hello, World")
# do something else here...
cancel_future_calls() # stop future calls

See Improve current implementation of a setInterval python

请参阅改进 setInterval python 的当前实现

回答by Six

Here is a simple example compatible with APScheduler 3.00+:

这是一个与APScheduler 3.00+兼容的简单示例:

# note that there are many other schedulers available
from apscheduler.schedulers.background import BackgroundScheduler

sched = BackgroundScheduler()

def some_job():
    print('Every 10 seconds')

# seconds can be replaced with minutes, hours, or days
sched.add_job(some_job, 'interval', seconds=10)
sched.start()

...

sched.shutdown()

Alternatively, you can use the following. Unlike many of the alternatives, this timer will execute the desired code every nseconds exactly (irrespective of the time it takes for the code to execute). So this is a great option if you cannot afford any drift.

或者,您可以使用以下内容。与许多替代方案不同,此计时器将精确地每n秒执行一次所需的代码(与执行代码所需的时间无关)。因此,如果您负担不起任何漂移,这是一个不错的选择。

import time
from threading import Event, Thread

class RepeatedTimer:

    """Repeat `function` every `interval` seconds."""

    def __init__(self, interval, function, *args, **kwargs):
        self.interval = interval
        self.function = function
        self.args = args
        self.kwargs = kwargs
        self.start = time.time()
        self.event = Event()
        self.thread = Thread(target=self._target)
        self.thread.start()

    def _target(self):
        while not self.event.wait(self._time):
            self.function(*self.args, **self.kwargs)

    @property
    def _time(self):
        return self.interval - ((time.time() - self.start) % self.interval)

    def stop(self):
        self.event.set()
        self.thread.join()


# start timer
timer = RepeatedTimer(10, print, 'Hello world')

# stop timer
timer.stop()