调度 Python Script 准确地每小时运行一次
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/22715086/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Scheduling Python Script to run every hour accurately
提问by sunshinekitty
Before I ask, Cron Jobs and Task Schedulerwill be my last options, this script will be used across Windows and Linux and I'd prefer to have a coded out method of doing this than leaving this to the end user to complete.
在我问之前,Cron Jobs 和 Task Scheduler将是我最后的选择,这个脚本将在 Windows 和 Linux 上使用,我更喜欢有一个编码的方法来做到这一点,而不是让最终用户来完成。
Is there a library for Python that I can use to schedule tasks? I will need to run a function once every hour, however, over time if I run a script once every hour and use .sleep, "once every hour" will run at a different part of the hour from the previous day due to the delay inherent to executing/running the script and/or function.
是否有可用于安排任务的 Python 库?我需要每小时运行一次函数,但是,随着时间的推移,如果我每小时运行一次脚本并使用 .sleep,由于延迟,“每小时一次”将在与前一天不同的时间运行执行/运行脚本和/或函数所固有的。
What is the bestway to schedule a function to run at a specific time of day (more than once) withoutusing a Cron Job or scheduling it with Task Scheduler?
在不使用 Cron 作业或使用 Task Scheduler 进行调度的情况下,安排函数在一天中的特定时间(多次)运行的最佳方法是什么?
Or if this is not possible, I would like your input as well.
或者,如果这是不可能的,我也希望您提供意见。
AP Scheduler fit my needs exactly.
AP Scheduler 完全符合我的需求。
Version < 3.0
版本 < 3.0
import datetime
import time
from apscheduler.scheduler import Scheduler
# Start the scheduler
sched = Scheduler()
sched.daemonic = False
sched.start()
def job_function():
print("Hello World")
print(datetime.datetime.now())
time.sleep(20)
# Schedules job_function to be run once each minute
sched.add_cron_job(job_function, minute='0-59')
out:
出去:
>Hello World
>2014-03-28 09:44:00.016.492
>Hello World
>2014-03-28 09:45:00.0.14110
Version > 3.0
版本 > 3.0
(From Animesh Pandey's answer below)
(来自Animesh Pandey的回答如下)
from apscheduler.schedulers.blocking import BlockingScheduler
sched = BlockingScheduler()
@sched.scheduled_job('interval', seconds=10)
def timed_job():
print('This job is run every 10 seconds.')
@sched.scheduled_job('cron', day_of_week='mon-fri', hour=10)
def scheduled_job():
print('This job is run every weekday at 10am.')
sched.configure(options_from_ini_file)
sched.start()
采纳答案by Unknown
Maybe this can help: Advanced Python Scheduler
也许这会有所帮助:Advanced Python Scheduler
Here's a small piece of code from their documentation:
这是他们文档中的一小段代码:
from apscheduler.schedulers.blocking import BlockingScheduler
def some_job():
print "Decorated job"
scheduler = BlockingScheduler()
scheduler.add_job(some_job, 'interval', hours=1)
scheduler.start()
回答by aepsil0n
The Python standard library does provide schedand threadingfor this task. But this means your scheduler script will have be running all the time instead of leaving its execution to the OS, which may or may not be what you want.
Python 标准库确实为此任务提供了调度和线程。但这意味着您的调度程序脚本将一直运行,而不是将其执行留给操作系统,这可能是您想要的,也可能不是。
回答by ubadub
One option is to write a C/C++ wrapper that executes the python script on a regular basis. Your end-user would run the C/C++ executable, which would remain running in the background, and periodically execute the python script. This may not be the best solution, and may not work if you don't know C/C++ or want to keep this 100% python. But it does seem like the most user-friendly approach, since people are used to clicking on executables. All of this assumes that python is installed on your end user's computer.
一种选择是编写一个 C/C++ 包装器来定期执行 python 脚本。您的最终用户将运行 C/C++ 可执行文件,该文件将继续在后台运行,并定期执行 python 脚本。这可能不是最好的解决方案,如果您不了解 C/C++ 或想要保留这个 100% 的 python,则可能无法工作。但这似乎是对用户最友好的方法,因为人们习惯于单击可执行文件。所有这些都假设您的最终用户的计算机上安装了 python。
Another option is to use cron job/Task Scheduler but to put it in the installer as a script so your end user doesn't have to do it.
另一种选择是使用 cron 作业/任务计划程序,但将其作为脚本放在安装程序中,这样您的最终用户就不必这样做了。
回答by Shane Davies
To run something every 10 minutes past the hour.
一小时后每 10 分钟运行一次。
from datetime import datetime, timedelta
while 1:
print 'Run something..'
dt = datetime.now() + timedelta(hours=1)
dt = dt.replace(minute=10)
while datetime.now() < dt:
time.sleep(1)
回答by Animesh Pandey
For apscheduler
< 3.0, see Unknown's answer.
对于apscheduler
< 3.0,请参阅Unknown's answer。
For apscheduler
> 3.0
对于apscheduler
> 3.0
from apscheduler.schedulers.blocking import BlockingScheduler
sched = BlockingScheduler()
@sched.scheduled_job('interval', seconds=10)
def timed_job():
print('This job is run every 10 seconds.')
@sched.scheduled_job('cron', day_of_week='mon-fri', hour=10)
def scheduled_job():
print('This job is run every weekday at 10am.')
sched.configure(options_from_ini_file)
sched.start()
Update:
更新:
apscheduler
documentation.
apscheduler
文档。
This for apscheduler-3.3.1
on Python 3.6.2
.
这对apscheduler-3.3.1
上Python 3.6.2
。
"""
Following configurations are set for the scheduler:
- a MongoDBJobStore named “mongo”
- an SQLAlchemyJobStore named “default” (using SQLite)
- a ThreadPoolExecutor named “default”, with a worker count of 20
- a ProcessPoolExecutor named “processpool”, with a worker count of 5
- UTC as the scheduler's timezone
- coalescing turned off for new jobs by default
- a default maximum instance limit of 3 for new jobs
"""
from pytz import utc
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor
"""
Method 1:
"""
jobstores = {
'mongo': {'type': 'mongodb'},
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
'default': {'type': 'threadpool', 'max_workers': 20},
'processpool': ProcessPoolExecutor(max_workers=5)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
"""
Method 2 (ini format):
"""
gconfig = {
'apscheduler.jobstores.mongo': {
'type': 'mongodb'
},
'apscheduler.jobstores.default': {
'type': 'sqlalchemy',
'url': 'sqlite:///jobs.sqlite'
},
'apscheduler.executors.default': {
'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
'max_workers': '20'
},
'apscheduler.executors.processpool': {
'type': 'processpool',
'max_workers': '5'
},
'apscheduler.job_defaults.coalesce': 'false',
'apscheduler.job_defaults.max_instances': '3',
'apscheduler.timezone': 'UTC',
}
sched_method1 = BlockingScheduler() # uses overrides from Method1
sched_method2 = BlockingScheduler() # uses same overrides from Method2 but in an ini format
@sched_method1.scheduled_job('interval', seconds=10)
def timed_job():
print('This job is run every 10 seconds.')
@sched_method2.scheduled_job('cron', day_of_week='mon-fri', hour=10)
def scheduled_job():
print('This job is run every weekday at 10am.')
sched_method1.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
sched_method1.start()
sched_method2.configure(gconfig=gconfig)
sched_method2.start()
回答by Arkham Angel
On the version posted by sunshinekitty called "Version < 3.0" , you may need to specify apscheduler 2.1.2 . I accidentally had version 3 on my 2.7 install, so I went:
在 sunkitty 发布的名为“Version < 3.0”的版本上,您可能需要指定 apscheduler 2.1.2 。我不小心在我的 2.7 安装中安装了版本 3,所以我去了:
pip uninstall apscheduler
pip install apscheduler==2.1.2
It worked correctly after that. Hope that helps.
之后它工作正常。希望有帮助。
回答by Abhishek Iyengar
#For scheduling task execution
import schedule
import time
def job():
print("I'm working...")
schedule.every(1).minutes.do(job)
#schedule.every().hour.do(job)
#schedule.every().day.at("10:30").do(job)
#schedule.every(5).to(10).minutes.do(job)
#schedule.every().monday.do(job)
#schedule.every().wednesday.at("13:15").do(job)
#schedule.every().minute.at(":17").do(job)
while True:
schedule.run_pending()
time.sleep(1)
回答by Hexiro
the simplest option I can suggest is using the schedulelibrary.
我可以建议的最简单的选择是使用时间表库。
In your question, you said "I will need to run a function once every hour" the code to do this is very simple:
在您的问题中,您说“我需要每小时运行一次函数”执行此操作的代码非常简单:
import schedule
def thing_you_wanna_do():
...
...
return
schedule.every().hour.do(thing_you_wanna_do)
while True:
schedule.run_pending()
you also asked how to do something at a certain time of the day some examples of how to do this are:
你还询问了如何在一天中的某个时间做某事,一些如何做到这一点的例子是:
import schedule
def thing_you_wanna_do():
...
...
return
schedule.every().day.at("10:30").do(thing_you_wanna_do)
schedule.every().monday.do(thing_you_wanna_do)
schedule.every().wednesday.at("13:15").do(thing_you_wanna_do)
# If you would like some randomness / variation you could also do something like this
schedule.every(1).to(2).hours.do(thing_you_wanna_do)
while True:
schedule.run_pending()
90% of the code used is the example code of the schedulelibrary. Happy scheduling!
90% 使用的代码是调度库的示例代码 。快乐调度!
回答by user8928128
Run the script every 15 minutes of the hour. For example, you want to receive 15 minute stock price quotes, which are updated every 15 minutes.
每 15 分钟运行一次脚本。例如,您希望接收每 15 分钟更新一次的 15 分钟股票报价。
while True:
print("Update data:", datetime.now())
sleep = 15 - datetime.now().minute % 15
if sleep == 15:
run_strategy()
time.sleep(sleep * 60)
else:
time.sleep(sleep * 60)
回答by Nicolás Seijas
Probably you got the solution already @lukik, but if you wanna remove a scheduling, you should use:
可能您已经在@lukik 找到了解决方案,但是如果您想删除日程安排,您应该使用:
job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.remove()
or
或者
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.remove_job('my_job_id')
if you need to use a explicit job ID
如果您需要使用明确的作业 ID
For more information, you should check: https://apscheduler.readthedocs.io/en/stable/userguide.html#removing-jobs
有关更多信息,您应该查看:https: //apscheduler.readthedocs.io/en/stable/userguide.html#removing-jobs