在日常工作中,我们常常会用到需要周期性执行的任务,一种方式是采用 Linux 系统自带的 crond[1] 结合命令行实现。另外一种方式是直接使用 Python。接下里整理的是常见的 Python 定时任务的实现方式。
利用 while True: + sleep() 实现定时任务
位于 time 模块中的 sleep(secs) 函数,可以实现令当前执行的线程暂停 secs 秒后再继续执行。所谓暂停,即令当前线程进入阻塞状态,当达到 sleep() 函数规定的时间后,再由阻塞状态转为就绪状态,等待 CPU 调度。
基于这样的特性我们可以通过 while 死循环+sleep() 的方式实现简单的定时任务。
代码示例:
Executor 执行器
Executor 在 scheduler 中初始化,另外也可通过 scheduler 的 add_executor 动态添加 Executor。每个 executor 都会绑定一个 alias,这个作为唯一标识绑定到 Job,在实际执行时会根据 Job 绑定的 executor 找到实际的执行器对象,然后根据执行器对象执行 Job。Executor 的种类会根据不同的调度来选择,如果选择 AsyncIO 作为调度的库,那么选择 AsyncIOExecutor,如果选择 tornado 作为调度的库,选择 TornadoExecutor,如果选择启动进程作为调度,选择 ThreadPoolExecutor 或者 ProcessPoolExecutor 都可以。Executor 的选择需要根据实际的 scheduler 来选择不同的执行器。目前 APScheduler 支持的 Executor:
-
executors.asyncio:同步 io,阻塞 -
executors.gevent:io 多路复用,非阻塞 -
executors.pool: 线程 ThreadPoolExecutor 和进程 ProcessPoolExecutor -
executors.twisted:基于事件驱动
Jobstore 作业存储
Jobstore 在 scheduler 中初始化,另外也可通过 scheduler 的 add_jobstore 动态添加 Jobstore。每个 jobstore 都会绑定一个 alias,scheduler 在 Add Job 时,根据指定的 jobstore 在 scheduler 中找到相应的 jobstore,并将 job 添加到 jobstore 中。作业存储器决定任务的保存方式, 默认存储在内存中(MemoryJobStore),重启后就没有了。APScheduler 支持的任务存储器有:
-
jobstores.memory:内存 -
jobstores.mongodb:存储在 mongodb -
jobstores.redis:存储在 redis -
jobstores.rethinkdb:存储在 rethinkdb -
jobstores.sqlalchemy:支持 sqlalchemy 的数据库如 mysql,sqlite 等 -
jobstores.zookeeper:zookeeper
不同的任务存储器可以在调度器的配置中进行配置(见调度器)
Event 事件
Event 是 APScheduler 在进行某些操作时触发相应的事件,用户可以自定义一些函数来监听这些事件,当触发某些 Event 时,做一些具体的操作。常见的比如。Job 执行异常事件 EVENT_JOB_ERROR。Job 执行时间错过事件 EVENT_JOB_MISSED。
目前 APScheduler 定义的 Event:
-
EVENT_SCHEDULER_STARTED -
EVENT_SCHEDULER_START -
EVENT_SCHEDULER_SHUTDOWN -
EVENT_SCHEDULER_PAUSED -
EVENT_SCHEDULER_RESUMED -
EVENT_EXECUTOR_ADDED -
EVENT_EXECUTOR_REMOVED -
EVENT_JOBSTORE_ADDED -
EVENT_JOBSTORE_REMOVED -
EVENT_ALL_JOBS_REMOVED -
EVENT_JOB_ADDED -
EVENT_JOB_REMOVED -
EVENT_JOB_MODIFIED -
EVENT_JOB_EXECUTED -
EVENT_JOB_ERROR -
EVENT_JOB_MISSED -
EVENT_JOB_SUBMITTED -
EVENT_JOB_MAX_INSTANCES
Listener 表示用户自定义监听的一些 Event,比如当 Job 触发了 EVENT_JOB_MISSED 事件时可以根据需求做一些其他处理。
调度器
Scheduler 是 APScheduler 的核心,所有相关组件通过其定义。scheduler 启动之后,将开始按照配置的任务进行调度。除了依据所有定义 Job 的 trigger 生成的将要调度时间唤醒调度之外。当发生 Job 信息变更时也会触发调度。
APScheduler 支持的调度器方式如下,比较常用的为 BlockingScheduler 和 BackgroundScheduler
-
BlockingScheduler:适用于调度程序是进程中唯一运行的进程,调用 start 函数会阻塞当前线程,不能立即返回。 -
BackgroundScheduler:适用于调度程序在应用程序的后台运行,调用 start 后主线程不会阻塞。 -
AsyncIOScheduler:适用于使用了 asyncio 模块的应用程序。 -
GeventScheduler:适用于使用 gevent 模块的应用程序。 -
TwistedScheduler:适用于构建 Twisted 的应用程序。 -
QtScheduler:适用于构建 Qt 的应用程序。
Scheduler 的工作流程
Scheduler 添加 job 流程:
Scheduler 调度流程:
使用分布式消息系统 Celery 实现定时任务
Celery[6] 是一个简单,灵活,可靠的分布式系统,用于处理大量消息,同时为操作提供维护此类系统所需的工具,也可用于任务调度。Celery 的配置比较麻烦,如果你只是需要一个轻量级的调度工具,Celery 不会是一个好选择。
Celery 是一个强大的分布式任务队列,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通常使用它来实现异步任务(async task)和定时任务(crontab)。异步任务比如是发送邮件、或者文件上传,图像处理等等一些比较耗时的操作 ,定时任务是需要在特定时间执行的任务。
需要注意,celery 本身并不具备任务的存储功能,在调度任务的时候肯定是要把任务存起来的,因此在使用 celery 的时候还需要搭配一些具备存储、访问功能的工具,比如:消息队列、Redis 缓存、数据库等。官方推荐的是消息队列 RabbitMQ,有些时候使用 Redis 也是不错的选择。
它的架构组成如下图:
Celery 架构,它采用典型的生产者-消费者模式,主要由以下部分组成:
-
Celery Beat,任务调度器,Beat 进程会读取配置文件的内容,周期性地将配置中到期需要执行的任务发送给任务队列。 -
Producer:需要在队列中进行的任务,一般由用户、触发器或其他操作将任务入队,然后交由 workers 进行处理。调用了 Celery 提供的 API、函数或者装饰器而产生任务并交给任务队列处理的都是任务生产者。 -
Broker,即消息中间件,在这指任务队列本身,Celery 扮演生产者和消费者的角色,brokers 就是生产者和消费者存放/获取产品的地方(队列)。 -
Celery Worker,执行任务的消费者,从队列中取出任务并执行。通常会在多台服务器运行多个消费者来提高执行效率。 -
Result Backend:任务处理完后保存状态信息和结果,以供查询。Celery 默认已支持 Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy 等方式。
实际应用中,用户从 Web 前端发起一个请求,我们只需要将请求所要处理的任务丢入任务队列 broker 中,由空闲的 worker 去处理任务即可,处理的结果会暂存在后台数据库 backend 中。我们可以在一台机器或多台机器上同时起多个 worker 进程来实现分布式地并行处理任务。
Celery 定时任务实例:
-
Python Celery & RabbitMQ Tutorial[7] -
Celery 配置实践笔记[8]
使用数据流工具 Apache Airflow 实现定时任务
Apache Airflow[9] 是 Airbnb 开源的一款数据流程工具,目前是 Apache 孵化项目。以非常灵活的方式来支持数据的 ETL 过程,同时还支持非常多的插件来完成诸如 HDFS 监控、邮件通知等功能。Airflow 支持单机和分布式两种模式,支持 Master-Slave 模式,支持 Mesos 等资源调度,有非常好的扩展性。被大量公司采用。
Airflow 使用 Python 开发,它通过 DAGs(Directed Acyclic Graph, 有向无环图)来表达一个工作流中所要执行的任务,以及任务之间的关系和依赖。比如,如下的工作流中,任务 T1 执行完成,T2 和 T3 才能开始执行,T2 和 T3 都执行完成,T4 才能开始执行。
Airflow 提供了各种 Operator 实现,可以完成各种任务实现:
-
BashOperator – 执行 bash 命令或脚本。 -
SSHOperator – 执行远程 bash 命令或脚本(原理同 paramiko 模块)。 -
PythonOperator – 执行 Python 函数。 -
EmailOperator – 发送 Email。 -
HTTPOperator – 发送一个 HTTP 请求。 -
MySqlOperator, SqliteOperator, PostgresOperator, MsSqlOperator, OracleOperator, JdbcOperator, 等,执行 SQL 任务。 -
DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator…
除了以上这些 Operators 还可以方便的自定义 Operators 满足个性化的任务需求。
一些情况下,我们需要根据执行结果执行不同的任务,这样工作流会产生分支。如:
这种需求可以使用 BranchPythonOperator 来实现。
Airflow 产生的背景
通常,在一个运维系统,数据分析系统,或测试系统等大型系统中,我们会有各种各样的依赖需求。包括但不限于:
-
时间依赖:任务需要等待某一个时间点触发。 -
外部系统依赖:任务依赖外部系统需要调用接口去访问。 -
任务间依赖:任务 A 需要在任务 B 完成后启动,两个任务互相间会产生影响。 -
资源环境依赖:任务消耗资源非常多, 或者只能在特定的机器上执行。
crontab 可以很好地处理定时执行任务的需求,但仅能管理时间上的依赖。Airflow 的核心概念 DAG(有向无环图)—— 来表现工作流。
-
Airflow 是一种 WMS,即:它将任务以及它们的依赖看作代码,按照那些计划规范任务执行,并在实际工作进程之间分发需执行的任务。 -
Airflow 提供了一个用于显示当前活动任务和过去任务状态的优秀 UI,并允许用户手动管理任务的执行和状态。 -
Airflow 中的工作流是具有方向性依赖的任务集合。 -
DAG 中的每个节点都是一个任务,DAG 中的边表示的是任务之间的依赖(强制为有向无环,因此不会出现循环依赖,从而导致无限执行循环)。
Airflow 核心概念
-
DAGs:即有向无环图 (Directed Acyclic Graph),将所有需要运行的 tasks 按照依赖关系组织起来,描述的是所有 tasks 执行顺序。 -
Operators:可以简单理解为一个 class,描述了 DAG 中某个的 task 具体要做的事。其中,airflow 内置了很多 operators,如 BashOperator 执行一个 bash 命令,PythonOperator 调用任意的 Python 函数,EmailOperator 用于发送邮件,HTTPOperator 用于发送 HTTP 请求, SqlOperator 用于执行 SQL 命令等等,同时,用户可以自定义 Operator,这给用户提供了极大的便利性。 -
Tasks:Task 是 Operator 的一个实例,也就是 DAGs 中的一个 node。 -
Task Instance:task 的一次运行。Web 界面中可以看到 task instance 有自己的状态,包括”running”, “success”, “failed”, “skipped”, “up for retry”等。 -
Task Relationships:DAGs 中的不同 Tasks 之间可以有依赖关系,如 Task1 >> Task2,表明 Task2 依赖于 Task2 了。通过将 DAGs 和 Operators 结合起来,用户就可以创建各种复杂的 工作流(workflow)。
Airflow 的架构
在一个可扩展的生产环境中,Airflow 含有以下组件:
-
元数据库:这个数据库存储有关任务状态的信息。 -
调度器:Scheduler 是一种使用 DAG 定义结合元数据中的任务状态来决定哪些任务需要被执行以及任务执行优先级的过程。调度器通常作为服务运行。 -
执行器:Executor 是一个消息队列进程,它被绑定到调度器中,用于确定实际执行每个任务计划的工作进程。有不同类型的执行器,每个执行器都使用一个指定工作进程的类来执行任务。例如,LocalExecutor 使用与调度器进程在同一台机器上运行的并行进程执行任务。其他像 CeleryExecutor 的执行器使用存在于独立的工作机器集群中的工作进程执行任务。 -
Workers:这些是实际执行任务逻辑的进程,由正在使用的执行器确定。
Worker 的具体实现由配置文件中的 executor 来指定,airflow 支持多种 Executor:
-
SequentialExecutor: 单进程顺序执行,一般只用来测试 -
LocalExecutor: 本地多进程执行 -
CeleryExecutor: 使用 Celery 进行分布式任务调度 -
DaskExecutor:使用 Dask[10] 进行分布式任务调度 -
KubernetesExecutor: 1.10.0 新增,创建临时 POD 执行每次任务
生产环境一般使用 CeleryExecutor 和 KubernetesExecutor。
使用 CeleryExecutor 的架构如图:
使用 KubernetesExecutor 的架构如图:
其他参考:
-
Getting started with Apache Airflow[11] -
Understanding Apache Airflow’s key concepts[12]
本文为原创文章,版权归知行编程网所有,欢迎分享本文,转载请保留出处!
你可能也喜欢
- ♥ 当当图书五折封顶,可叠加使用300-60优惠码07/19
- ♥ 写python程序用什么IDE01/05
- ♥ 如何使用 Python 将 pdf 转换为 word10/01
- ♥ python如何模拟键盘操作?09/28
- ♥ 如何在python中注释09/21
- ♥ python如何替换(替换)多个字符?08/18
内容反馈