Python 安排一个函数在 Flask 上每小时运行一次
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/21214270/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Scheduling a function to run every hour on Flask
提问by RomaValcer
I have a Flask web hosting with no access to croncommand.
How can I execute some Python function every hour?
我有一个无法访问cron命令的 Flask 虚拟主机。如何每小时执行一些 Python 函数?
采纳答案by tuomastik
You can use BackgroundScheduler()from APSchedulerpackage (v3.5.3):
您可以BackgroundScheduler()从APScheduler包 (v3.5.3) 中使用:
import time
import atexit
from apscheduler.schedulers.background import BackgroundScheduler
def print_date_time():
print(time.strftime("%A, %d. %B %Y %I:%M:%S %p"))
scheduler = BackgroundScheduler()
scheduler.add_job(func=print_date_time, trigger="interval", seconds=3)
scheduler.start()
# Shut down the scheduler when exiting the app
atexit.register(lambda: scheduler.shutdown())
Note that two of these schedulers will be launched when Flask is in debug mode. For more information, check out thisquestion.
请注意,当 Flask 处于调试模式时,将启动其中两个调度程序。有关更多信息,请查看此问题。
回答by Sean Vieira
You could make use of APSchedulerin your Flask application and run your jobs via its interface:
您可以APScheduler在 Flask 应用程序中使用并通过其界面运行您的作业:
import atexit
# v2.x version - see https://stackoverflow.com/a/38501429/135978
# for the 3.x version
from apscheduler.scheduler import Scheduler
from flask import Flask
app = Flask(__name__)
cron = Scheduler(daemon=True)
# Explicitly kick off the background thread
cron.start()
@cron.interval_schedule(hours=1)
def job_function():
# Do your work here
# Shutdown your cron thread if the web process is stopped
atexit.register(lambda: cron.shutdown(wait=False))
if __name__ == '__main__':
app.run()
回答by Alexander Davydov
You might want to use some queue mechanism with scheduler like RQ scheduleror something more heavy like Celery (most probably an overkill).
您可能希望使用一些队列机制与调度程序(如RQ 调度程序)或更重的(如 Celery)(很可能是矫枉过正)。
回答by Mads Jensen
Another alternative might be to use Flask-APScheduler which plays nicely with Flask, e.g.:
另一种选择可能是使用 Flask-APScheduler,它可以很好地与 Flask 配合使用,例如:
- Loads scheduler configuration from Flask configuration,
- Loads job definitions from Flask configuration
- 从 Flask 配置加载调度程序配置,
- 从 Flask 配置加载作业定义
More information here:
更多信息在这里:
回答by KD Chang
You could try using APScheduler's BackgroundSchedulerto integrate interval job into your Flask app. Below is the example that uses blueprint and app factory (init.py) :
您可以尝试使用APScheduler 的 BackgroundScheduler将间隔作业集成到您的 Flask 应用程序中。下面是使用蓝图和应用程序工厂 ( init.py)的示例:
from datetime import datetime
# import BackgroundScheduler
from apscheduler.schedulers.background import BackgroundScheduler
from flask import Flask
from webapp.models.main import db
from webapp.controllers.main import main_blueprint
# define the job
def hello_job():
print('Hello Job! The time is: %s' % datetime.now())
def create_app(object_name):
app = Flask(__name__)
app.config.from_object(object_name)
db.init_app(app)
app.register_blueprint(main_blueprint)
# init BackgroundScheduler job
scheduler = BackgroundScheduler()
# in your case you could change seconds to hours
scheduler.add_job(hello_job, trigger='interval', seconds=3)
scheduler.start()
try:
# To keep the main thread alive
return app
except:
# shutdown if app occurs except
scheduler.shutdown()
Hope it helps :)
希望能帮助到你 :)
Ref :
参考:
回答by ivanleoncz
I'm a little bit new with the concept of application schedulers, but what I found here for APScheduler v3.3.1, it's something a little bit different. I believe that for the newest versions, the package structure, class names, etc., have changed, so I'm putting here a fresh solution which I made recently, integrated with a basic Flask application:
我对应用程序调度程序的概念有点陌生,但是我在这里找到的APScheduler v3.3.1有点不同。我相信对于最新版本,包结构、类名等都发生了变化,所以我在这里放了一个我最近制作的新解决方案,与一个基本的 Flask 应用程序集成:
#!/usr/bin/python3
""" Demonstrating Flask, using APScheduler. """
from apscheduler.schedulers.background import BackgroundScheduler
from flask import Flask
def sensor():
""" Function for test purposes. """
print("Scheduler is alive!")
sched = BackgroundScheduler(daemon=True)
sched.add_job(sensor,'interval',minutes=60)
sched.start()
app = Flask(__name__)
@app.route("/home")
def home():
""" Function for test purposes. """
return "Welcome Home :) !"
if __name__ == "__main__":
app.run()
I'm also leaving this Gist here, if anyone have interest on updates for this example.
如果有人对此示例的更新感兴趣,我也将把这个 Gist留在这里。
Here are some references, for future readings:
以下是一些参考资料,以供将来阅读:
- APScheduler Doc: https://apscheduler.readthedocs.io/en/latest/
- daemon=True: https://docs.python.org/3.4/library/threading.html#thread-objects
- APScheduler 文档:https://apscheduler.readthedocs.io/en/latest/
- 守护进程=真:https: //docs.python.org/3.4/library/threading.html#thread-objects
回答by MortenB
A complete example using schedule and multiprocessing, with on and off control and parameter to run_job()
the return codes are simplified and interval is set to 10sec, change to every(2).hour.do()for 2hours. Schedule is quite impressive it does not drift and I've never seen it more than 100ms off when scheduling. Using multiprocessing instead of threading because it has a termination method.
使用调度和多处理的完整示例,具有开和关控制和 run_job() 参数,返回代码被简化,间隔设置为 10 秒,更改every(2).hour.do()为 2 小时。时间表令人印象深刻,它不会漂移,而且我从未见过它在安排时超过 100 毫秒。使用多处理而不是线程,因为它有一个终止方法。
#!/usr/bin/env python3
import schedule
import time
import datetime
import uuid
from flask import Flask, request
from multiprocessing import Process
app = Flask(__name__)
t = None
job_timer = None
def run_job(id):
""" sample job with parameter """
global job_timer
print("timer job id={}".format(id))
print("timer: {:.4f}sec".format(time.time() - job_timer))
job_timer = time.time()
def run_schedule():
""" infinite loop for schedule """
global job_timer
job_timer = time.time()
while 1:
schedule.run_pending()
time.sleep(1)
@app.route('/timer/<string:status>')
def mytimer(status, nsec=10):
global t, job_timer
if status=='on' and not t:
schedule.every(nsec).seconds.do(run_job, str(uuid.uuid4()))
t = Process(target=run_schedule)
t.start()
return "timer on with interval:{}sec\n".format(nsec)
elif status=='off' and t:
if t:
t.terminate()
t = None
schedule.clear()
return "timer off\n"
return "timer status not changed\n"
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
You test this by just issuing:
您只需发出以下命令即可测试:
$ curl http://127.0.0.1:5000/timer/on
timer on with interval:10sec
$ curl http://127.0.0.1:5000/timer/on
timer status not changed
$ curl http://127.0.0.1:5000/timer/off
timer off
$ curl http://127.0.0.1:5000/timer/off
timer status not changed
Every 10sec the timer is on it will issue a timer message to console:
每隔 10 秒,计时器就会向控制台发出一条计时器消息:
127.0.0.1 - - [18/Sep/2018 21:20:14] "GET /timer/on HTTP/1.1" 200 -
timer job id=b64ed165-911f-4b47-beed-0d023ead0a33
timer: 10.0117sec
timer job id=b64ed165-911f-4b47-beed-0d023ead0a33
timer: 10.0102sec
回答by Bemmu
For a simple solution, you could add a route such as
对于一个简单的解决方案,您可以添加一条路线,例如
@app.route("/cron/do_the_thing", methods=['POST'])
def do_the_thing():
logging.info("Did the thing")
return "OK", 200
Then add a unix cron jobthat POSTs to this endpoint periodically. For example to run it once a minute, in terminal type crontab -eand add this line:
然后添加一个定期发布到此端点的 unix cron 作业。例如,每分钟运行一次,在终端输入crontab -e并添加以下行:
* * * * * /opt/local/bin/curl -X POST https://YOUR_APP/cron/do_the_thing
(Note that the path to curl has to be complete, as when the job runs it won't have your PATH. You can find out the full path to curl on your system by which curl)
(请注意,curl 的路径必须完整,因为当作业运行时,它不会有您的 PATH。您可以通过 找到系统上 curl 的完整路径which curl)
I like this in that it's easy to test the job manually, it has no extra dependencies and as there isn't anything special going on it is easy to understand.
我喜欢这一点,因为它很容易手动测试作业,它没有额外的依赖项,并且因为没有任何特别的事情发生,所以很容易理解。
Security
安全
If you'd like to password protect your cron job, you can pip install Flask-BasicAuth, and then add the credentials to your app configuration:
如果您想用密码保护您的 cron 作业,您可以pip install Flask-BasicAuth,然后将凭据添加到您的应用程序配置中:
app = Flask(__name__)
app.config['BASIC_AUTH_REALM'] = 'realm'
app.config['BASIC_AUTH_USERNAME'] = 'falken'
app.config['BASIC_AUTH_PASSWORD'] = 'joshua'
To password protect the job endpoint:
要密码保护作业端点:
from flask_basicauth import BasicAuth
basic_auth = BasicAuth(app)
@app.route("/cron/do_the_thing", methods=['POST'])
@basic_auth.required
def do_the_thing():
logging.info("Did the thing a bit more securely")
return "OK", 200
Then to call it from your cron job:
然后从你的 cron 工作中调用它:
* * * * * /opt/local/bin/curl -X POST https://falken:joshua@YOUR_APP/cron/do_the_thing

