关于我
一个有思想的程序猿, 终身学习实践者, 目前在一个创业团队任 team lead, 技术栈涉及 Android,Python,Java 和 Go, 这个也是我们团队的主要技术栈.
GitHub:https://github.com/hylinux1024
微信公众号: 终身开发者(angrycode)
上一篇《一个简单的 Python 调度器》介绍了一个简单的 Python 调度器的使用, 后来我翻阅了一下它的源码 https://github.com/dbader/schedule , 惊奇的发现核心库才一个文件, 代码量短短 700 行不到. 这是绝佳的学习材料.
让我喜出望外的是这个库的作者竟然就是我最近阅读的一本书《Python Tricks》的作者! 现在就让我们看看大神的实现思路.
0x00 准备
项目地址
https://github.com/dbader/schedule
将代码 checkout 到本地
环境
PyCharm+venv+Python3
0x01 用法
这个在上一篇也介绍过了, 非常简单
- import schedule
- # 定义需要执行的方法
- def job():
- print("a simple scheduler in python.")
- # 设置调度的参数, 这里是每 2 秒执行一次
- schedule.every(2).seconds.do(job)
- if __name__ == '__main__':
- while True:
- schedule.run_pending()
- # 执行结果
- a simple scheduler in python.
- a simple scheduler in python.
- a simple scheduler in python.
- ...
这个库的文档也很详细, 可以浏览 https://schedule.readthedocs.io/ 了解库的大概用法
0x02 项目结构
- (venv) schedule Git:(master) tree -L 2
- .
- ...
├── requirements-dev.txt
├── schedule
│ └── __init__.py
├── setup.py
├── test_schedule.py
├── tox.INI
└── venv
├── bin
├── include
├── lib
├── pip-selfcheck.JSON
└── pyvenv.cfg
8 directories, 18 files
schedule 目录下就一个__init__.py 文件, 这是我们需要重点学习的地方.
setup.py 文件是发布项目的配置文件
test_schedule.py 是单元测试文件, 一开始除了看文档外, 也可以从单元测试中入手, 了解这个库的使用
requirements-dev.txt
开发环境的依赖库文件, 如果核心的库是不需要第三方的依赖的, 但是单元测试需要
venv 是我 checkout 后创建的, 原本的项目是没有的
0x03 schedule
我们知道__init__.py 是定义 Python 包必需的文件. 在这个文件中定义方法, 类都可以在使用 import 命令时导入到工程项目中, 然后使用.
schedule 源码
以下是 schedule 会用到的模块, 都是 Python 内部的模块.
- import collections
- import datetime
- import functools
- import logging
- import random
- import re
- import time
- logger = logging.getLogger('schedule')
然后定义了一个日志打印工具实例
接着是定义了该模块的 3 个异常类的结构体系, 是由 Exception 派生出来的, 分别是 ScheduleError,ScheduleValueError 和 IntervalError
- class ScheduleError(Exception):
- """Base schedule exception"""
- pass
- class ScheduleValueError(ScheduleError):
- """Base schedule value error"""
- pass
- class IntervalError(ScheduleValueError):
- """An improper interval was used"""
- pass
还定义了一个 CancelJob 的类, 用于取消调度器的继续执行
- class CancelJob(object):
- """
- Can be returned from a job to unschedule itself.
- """
- pass
例如在自定义的需要被调度方法中返回这个 CancelJob 类就可以实现一次性的任务
- # 定义需要执行的方法
- def job():
- print("a simple scheduler in python.")
- # 返回 CancelJob 可以停止调度器的后续执行
- return schedule.CancelJob
接着就是这个库的两个核心类 Scheduler 和 Job.
- class Scheduler(object):
- """
- Objects instantiated by the :class:`Scheduler <Scheduler>` are
- factories to create jobs, keep record of scheduled jobs and
- handle their execution.
- """
- class Job(object):
- """
- A periodic job as used by :class:`Scheduler`.
- :param interval: A quantity of a certain time unit
- :param scheduler: The :class:`Scheduler <Scheduler>` instance that
- this job will register itself with once it has
- been fully configured in :meth:`Job.do()`.
- Every job runs at a given fixed time interval that is defined by:
- * a :meth:`time unit <Job.second>`
- * a quantity of `time units` defined by `interval`
- A job is usually created and returned by :meth:`Scheduler.every`
- method, which also defines its `interval`.
- """
Scheduler 是调度器的实现类, 它负责调度任务 (job) 的创建和执行.
Job 则是对需要执行任务的抽象.
这两个类是这个库的核心, 后面我们还会看到详细的分析.
接下来就是默认调度器 default_scheduler 和任务列表 jobs 的创建.
- # The following methods are shortcuts for not having to
- # create a Scheduler instance:
- #: Default :class:`Scheduler <Scheduler>` object
- default_scheduler = Scheduler()
- #: Default :class:`Jobs <Job>` list
- jobs = default_scheduler.jobs # todo: should this be a copy, e.g. jobs()?
在执行 import schedule 后, 就默认创建了 default_scheduler. 而 Scheduler 的构造方法为
- def __init__(self):
- self.jobs = []
在执行初始化时, 调度器就创建了一个空的任务列表.
在文件的最后定义了一些链式调用的方法, 使用起来也是非常人性化的, 值得学习.
这里的方法都定义在模块下, 而且都是封装了 default_scheduler 实例的调用.
- def every(interval=1):
- """Calls :meth:`every <Scheduler.every>` on the
- :data:`default scheduler instance <default_scheduler>`.
- """
- return default_scheduler.every(interval)
- def run_pending():
- """Calls :meth:`run_pending <Scheduler.run_pending>` on the
- :data:`default scheduler instance <default_scheduler>`.
- """
- default_scheduler.run_pending()
- def run_all(delay_seconds=0):
- """Calls :meth:`run_all <Scheduler.run_all>` on the
- :data:`default scheduler instance <default_scheduler>`.
- """
- default_scheduler.run_all(delay_seconds=delay_seconds)
- def clear(tag=None):
- """Calls :meth:`clear <Scheduler.clear>` on the
- :data:`default scheduler instance <default_scheduler>`.
- """
- default_scheduler.clear(tag)
- def cancel_job(job):
- """Calls :meth:`cancel_job <Scheduler.cancel_job>` on the
- :data:`default scheduler instance <default_scheduler>`.
- """
- default_scheduler.cancel_job(job)
- def next_run():
- """Calls :meth:`next_run <Scheduler.next_run>` on the
- :data:`default scheduler instance <default_scheduler>`.
- """
- return default_scheduler.next_run
- def idle_seconds():
- """Calls :meth:`idle_seconds <Scheduler.idle_seconds>` on the
- :data:`default scheduler instance <default_scheduler>`.
- """
- return default_scheduler.idle_seconds
我们看下入口方法 run_pending(), 从本文一开头的 Demo 可以知道这个是启动调度器的方法. 这里它执行了 default_scheduler 中的方法.
default_scheduler.run_pending()
所以我们就把目光定位到 Scheduler 类的相应方法
- def run_pending(self):
- """
- Run all jobs that are scheduled to run.
- Please note that it is *intended behavior that run_pending()
- does not run missed jobs*. For example, if you've registered a job
- that should run every minute and you only call run_pending()
- in one hour increments then your job won't be run 60 times in
- between but only once.
- """
- runnable_jobs = (job for job in self.jobs if job.should_run)
- for job in sorted(runnable_jobs):
- self._run_job(job)
这个方法中首先从 jobs 列表将需要执行的任务过滤后放在 runnable_jobs 列表, 然后将其排序后顺序执行内部的_run_job(job)方法
- def _run_job(self, job):
- ret = job.run()
- if isinstance(ret, CancelJob) or ret is CancelJob:
- self.cancel_job(job)
在_run_job 方法中就调用了 job 类中的 run 方法, 并根据返回值判断是否需要取消任务.
这时候我们要看下 Job 类的实现逻辑.
首先我们要看下 Job 是什么时候创建的. 还是从 Demo 中的代码入手
schedule.every(2).seconds.do(job)
这里先执行了 schedule.every()方法
- def every(interval=1):
- """Calls :meth:`every <Scheduler.every>` on the
- :data:`default scheduler instance <default_scheduler>`.
- """
- return default_scheduler.every(interval)
这个方法就是 scheduler 类中的 every 方法
- def every(self, interval=1):
- """
- Schedule a new periodic job.
- :param interval: A quantity of a certain time unit
- :return: An unconfigured :class:`Job <Job>`
- """
- job = Job(interval, self)
- return job
在这里创建了一个任务 job, 并将参数 interval 和 scheduler 实例传入到构造方法中, 最后返回 job 实例用于实现链式调用.
跳转到 Job 的构造方法
- def __init__(self, interval, scheduler=None):
- self.interval = interval # pause interval * unit between runs
- self.latest = None # upper limit to the interval
- self.job_func = None # the job job_func to run
- self.unit = None # time units, e.g. 'minutes', 'hours', ...
- self.at_time = None # optional time at which this job runs
- self.last_run = None # datetime of the last run
- self.next_run = None # datetime of the next run
- self.period = None # timedelta between runs, only valid for
- self.start_day = None # Specific day of the week to start on
- self.tags = set() # unique set of tags for the job
- self.scheduler = scheduler # scheduler to register with
主要初始化了间隔时间配置, 需要执行的方法, 调度器各种时间单位等.
执行 every 方法之后又调用了 seconds 这个属性方法
- @property
- def seconds(self):
- self.unit = 'seconds'
- return self
设置了时间单位, 这个设置秒, 当然还有其它类似的属性方法 minutes,hours,days 等等.
最后就是执行了 do 方法
- def do(self, job_func, *args, **kwargs):
- """
- Specifies the job_func that should be called every time the
- job runs.
- Any additional arguments are passed on to job_func when
- the job runs.
- :param job_func: The function to be scheduled
- :return: The invoked job instance
- """
- self.job_func = functools.partial(job_func, *args, **kwargs)
- try:
- functools.update_wrapper(self.job_func, job_func)
- except AttributeError:
- # job_funcs already wrapped by functools.partial won't have
- # __name__, __module__ or __doc__ and the update_wrapper()
- # call will fail.
- pass
- self._schedule_next_run()
- self.scheduler.jobs.append(self)
- return self
在这里使用 functools 工具的中的偏函数 partial 将我们自定义的方法封装成可调用的对象
然后就调用_schedule_next_run 方法, 它主要是对时间的解析, 按照时间对 job 排序, 我觉得这个方法是本项目中的技术点, 逻辑也是稍微复杂一丢丢, 仔细阅读就可以看懂, 主要是对时间 datetime 的使用. 由于篇幅, 这里就不再贴出代码.
这里就完成了任务 job 的添加. 然后在调用 run_pending 方法中就可以让任务执行.
0x04 总结一下
schedule 库定义两个核心类 Scheduler 和 Job. 在导入包时就默认创建一个 Scheduler 对象, 并初始化任务列表.
schedule 模块提供了链式调用的接口, 在配置 schedule 参数时, 就会创建任务对象 job, 并会将 job 添加到任务列表中, 最后在执行 run_pending 方法时, 就会调用我们自定义的方法.
这个库的核心思想是使用面向对象方法, 对事物能够准确地抽象, 它总体的逻辑并不复杂, 是学习源码很不错的范例.
0x05 学习资料
- https://github.com/dbader/schedule
- https://schedule.readthedocs.io
来源: https://www.cnblogs.com/angrycode/p/11433283.html