一, APScheduler 简介:
Python 的一个定时任务框架, 满足用户定时执行或者周期性执行任务的需求, 提供了基于日期 date, 固定的时间间隔 interval, 以及类似于 Linux 上的定时任务 crontab 类型的定时任务. 并且该框架不仅可以添加, 删除定时任务, 还可以将任务存储到数据库中, 实现任务的持久化.
Python 的第三方库, 用来提供 Python 的后台程序. 包含四个组件, 分别是:
triggers: 任务触发器组件, 提供任务触发方式
job stores: 任务商店组件, 提供任务保存方式
executors: 任务调度组件, 提供任务调度方式
schedulers: 任务调度组件, 提供任务工作方式
二, APScheduler 安装
1) 利用 pip 安装 (推荐)
# pip install apscheduler
2 基于源码: https://pypi.python.org/pypi/APScheduler/
# python setup.py install
三, 基本概念
1, APScheduler 有四种组件及相关说明
1) triggers(触发器): 触发器包含调度逻辑, 每一个作业有它自己的触发器, 用于决定接下来哪一个作业会运行, 除了他们自己初始化配置外, 触发器完全是无状态的.
2 )job stores(作业存储): 用来存储被调度的作业, 默认的作业存储器是简单地把作业任务保存在内存中, 其他作业存储器可以将任务保存到各种数据库中, 支持 MongoDB,Redis,SQLAlchemy 存储方式. 当对作业任务进行持久化存储的时候, 作业的数据将被序列化, 重新读取作业时在反序列化.
3)executors(执行器): 执行器用来执行定时任务, 只是将需要执行的任务放在新的线程或者线程池中运行. 当作业任务完成时, 执行器就会通知调度器. 对于执行器, 默认情况下选择 ThreadPoolExecutor 就可以了, 但是如果涉及到一下特殊任务如比较消耗 CPU 的任务则可以选择 ProcessPoolExecutor, 当然根据实际需求可以同时使用两种执行器
4)schedulers(调度器): 调度器是将其他部分联系在一起, 一般在应用程序中只有一个调度器, 应用开发者不会直接操作触发器, 任务存储以及执行器. 相反调度器提供了处理的接口. 通过调度器完成任务的存储以及执行器的配置操作, 如可以添加, 移除, 修改任务作业.
APScheduler 提供了多种调度器, 可以根据具体需求来选择合适的调度器, 常用的调度器有:
BlockingScheduler: 适合于只在进程中运行单个任务的情况, 通常在调度器是你唯一要运行的东西时使用
BackgroundScheduler: 适合于要求任务在程序后台运行的情况, 当希望调度器在应用后台执行时使用.
AsyncIOScheduler: 适合于使用 asyncio 框架的情况
GeventScheduler: 适合于使用 gevent 框架的情况
TornadoScheduler: 适合于使用 Tornado 框架的应用
TwistedScheduler: 适合使用 Twisted 框架的应用
QtScheduler: 适合使用 QT 的情况
2, 配置调度器
APScheduler 提供了许多不同的方式来配置调度器, 你可以使用一个配置字典或者作为参数关键字的方式传入. 你也可以先创建调度器. 在配置和添加作业, 这样可以在不同的环境中得到更大的灵活性.
3, 简单的实例
- from apscheduler.schedulers.blocking import BlockingScheduler
- import time
- # 实例化一个调度器
- scheduler = BlockingScheduler()
- def job1():
- print "%s: 执行任务" % time.asctime()
- # 添加任务并设置触发方式为 3s 一次
- scheduler.add_job(job1, 'interval', seconds=3)
- # 开始运行调度器
- scheduler.start()
四, 各组件功能
1,trigger 组件
trigger 提供任务的触发方式, 共三种方式:
date: 只在某个时间点执行一次 run_date(datetime|str)
- scheduler.add_job(my_job, 'date', run_date=date(2017, 9, 8), args=[])
- scheduler.add_job(my_job, 'date', run_date=datetime(2017, 9, 8, 21, 30, 5), args=[])
- scheduler.add_job(my_job, 'date', run_date='2019-6-12 21:30:05', args=[])
- # The 'date' trigger and datetime.now() as run_date are implicit
- sched.add_job(my_job, args=[[])
interval: 每隔一段时间执行一次 weeks=0 | days=0 | hours=0 | minutes=0 | seconds=0,
- start_date=None, end_date=None, timezone=None
- scheduler.add_job(my_job, 'interval', hours=2)
- scheduler.add_job(my_job, 'interval', hours=2, start_date='2017-9-8 21:30:00',
- end_date='2019-06-12 21:30:00)
- @scheduler.scheduled_job('interval', id='my_job_id', hours=2)
- def my_job():
- print("Hello World")
cron: 使用 Linux 下 crontab 的方式 (year=None, month=None, day=None, week=None, day_of_week=None, hour=None, minute=None, second=None, start_date=None, end_date=None, timezone=None)
- sched.add_job(my_job, 'cron', hour=3, minute=30)
- sched.add_job(my_job, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2017-10-
- 30')
- @sched.scheduled_job('cron', id='my_job_id', day='last sun')
- def some_decorated_task():
- print("I am printed at 00:00:00 on the last Sunday of every month!")
2,scheduler 组件
scheduler 组件提供执行的方式, 在不同的运行环境中选择合适的方式
BlockingScheduler: 进程中只运行调度器时的方式
- from apscheduler.schedulers.blocking import BlockingScheduler
- import time
- scheduler = BlockingScheduler()
- def job1():
- print "%s: 执行任务" % time.asctime()
- scheduler.add_job(job1, 'interval', seconds=3)
- scheduler.start()
BackgroundScheduler: 不想使用任何框架时的方式
- from apscheduler.schedulers.background import BackgroundScheduler
- import time
- scheduler = BackgroundScheduler()
- def job1():
- print "%s: 执行任务" % time.asctime()
- scheduler.add_job(job1, 'interval', seconds=3)
- scheduler.start()
- while True:
- pass
AsyncIOScheduler: asyncio module 的方式 ( Python3)
- from apscheduler.schedulers.asyncio import AsyncIOScheduler
- try:
- import asyncio
- except ImportError:
- import trollius as asyncio
- ...
- ...
- # while True pass
- try:
- asyncio.get_event_loop().run_forever()
- except (KeyboardInterrupt, SystemExit):
- pass
GeventScheduler: gevent 方式
- from apscheduler.schedulers.gevent import GeventScheduler
- ...
- ...
- g = scheduler.start()
- # while True:pass
- try:
- g.join()
- except (KeyboardInterrupt, SystemExit):
- pass
TornadoScheduler: Tornado 方式
- from tornado.ioloop import IOLoop
- from apscheduler.schedulers.tornado import TornadoScheduler
- ...
- ...
- # while True:pass
- try:
- IOLoop.instance().start()
- except (KeyboardInterrupt, SystemExit):
- pass
TwistedScheduler: Twisted 方式
- from twisted.internet import reactor
- from apscheduler.schedulers.twisted import TwistedScheduler
- ...
- ...
- # while True:pass
- try:
- reactor.run()
- except (KeyboardInterrupt, SystemExit):
- pass
QtScheduler: Qt 方式
3,executors 组件
executors 组件提供任务的调度方式
- base
- debug
- gevent
- pool(max_workers=10)
- twisted
4,jobstore 组件
jobstore 提供任务的各种持久化方式
- base
- memory
- MongoDB
- scheduler.add_jobstore('mongodb', collection='example_jobs')
- Redis
- scheduler.add_jobstore('redis', jobs_key='example.jobs', run_times_key='example.run_times')
- RethinkDB
- scheduler.add_jobstore('rethinkdb', database='apscheduler_example')
- sqlalchemy
- scheduler.add_jobstore('sqlalchemy', url=url)
- zookeeper
- scheduler.add_jobstore('zookeeper', path='/example_jobs')
五, 任务操作
1, 添加任务 add_job(如上)
如果使用了任务的存储, 开启时最好添加 replace_existing=True, 否则每次开启时都会创建任务的副本, 开启后任务不会马上启动, 可修改 triger 参数
2, 删除任务 remove_job
- # 根据任务实例删除
- job = scheduler.add_job(myfunc, 'interval', minutes=2)
- job.remove()
- # 根据任务 id 删除
- scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
- scheduler.remove_job('my_job_id')
3, 任务的暂停 pause_job 和继续 resume_job
- job = scheduler.add_job(myfunc, 'interval', minutes=2)
- # 根据任务实例
- job.pause()
- job.resume()
- # 根据任务 id 暂停
- scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
- scheduler.pause_job('my_job_id')
4, 任务的修饰 modify 和重设 reschedule_job
修饰: job.modify(max_instances=6, name='Alternate name')
重设: scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')
5, 调度器操作
开启: scheduler.start()
关闭: scheduler.shotdown(wait=True | False)
暂停: scheduler.pause()
继续: scheduler.resume()
监听: http://apscheduler.readthedocs.io/en/v3.3.0/modules/events.html#module-apscheduler.events
- def my_listener(event):
- if event.exception:
- print('The job crashed :(')
- else:
- print('The job worked :)')
- scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
官方实例
- from pytz import utc
- from apscheduler.schedulers.background import BackgroundScheduler
- from apscheduler.jobstores.MongoDB import MongoDBJobStore
- from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
- from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
- jobstores = {
- 'mongo': MongoDBJobStore(),
- 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
- } executors = {
- 'default': ThreadPoolExecutor(20),
- 'processpool': ProcessPoolExecutor(5)
- }
- job_defaults = {
- 'coalesce': False,
- 'max_instances': 3
- }
- scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors,
- job_defaults=job_default
来源: http://www.bubuko.com/infodetail-3090634.html