大家好,又见面了,我是你们的朋友全栈君。
- 利用while True: + sleep()实现定时任务
- 使用Timeloop库运行定时任务
- 利用threading.Timer实现定时任务
- 利用内置模块sched实现定时任务
- 利用调度模块schedule实现定时任务
- 利用任务框架APScheduler实现定时任务
- Job 作业
- Trigger 触发器
- Executor 执行器
- Jobstore 作业存储
- Event 事件 调度器
- APScheduler中的重要概念
- Scheduler的工作流程
- 使用分布式消息系统Celery实现定时任务
- 使用数据流工具Apache Airflow实现定时任务
- Airflow 产生的背景
- Airflow 核心概念
- Airflow 的架构
- 只能设定间隔,不能指定具体的时间,比如每天早上8:00
- sleep 是一个阻塞函数,也就是说 sleep 这一段时间,程序什么也不能操作。
- interval: 指定的时间
- function: 要执行的方法
- args/kwargs: 方法的参数
- enter(delay, priority, action, argument),安排一个事件来延迟delay个时间单位。
- -cancel(event):从队列中删除事件。如果事件不是当前队列中的事件,则该方法将跑出一个ValueError。
- -run():运行所有预定的事件。这个函数将等待(使用传递给构造函数的delayfunc()函数),然后执行事件,直到不再有预定的事件。
- 类似于 Liunx Cron 的调度程序(可选的开始/结束时间)
- 基于时间间隔的执行调度(周期性调度,可选的开始/结束时间)
- 一次性执行任务(在设定的日期/时间运行一次任务)
- 触发器(trigger) 包含调度逻辑,每一个作业有它自己的触发器,用于决定接下来哪一个作业会运行。除了他们自己初始配置意外,触发器完全是无状态的。
- 作业存储(job store) 存储被调度的作业,默认的作业存储是简单地把作业保存在内存中,其他的作业存储是将作业保存在数据库中。一个作业的数据讲在保存在持久化作业存储时被序列化,并在加载时被反序列化。调度器不能分享同一个作业存储。
- 执行器(executor) 处理作业的运行,他们通常通过在作业中提交制定的可调用对象到一个线程或者进城池来进行。当作业完成时,执行器将会通知调度器。
- 调度器(scheduler) 是其他的组成部分。你通常在应用只有一个调度器,应用的开发者通常不会直接处理作业存储、调度器和触发器,相反,调度器提供了处理这些的合适的接口。配置作业存储和执行器可以在调度器中完成,例如添加、修改和移除作业。通过配置executor、jobstore、trigger,使用线程池(ThreadPoolExecutor默认值20)或进程池(ProcessPoolExecutor 默认值5)并且默认最多3个(max_instances)任务实例同时运行,实现对job的增删改查等调度控制
- id:指定作业的唯一ID name:指定作业的名字
- trigger:apscheduler定义的触发器,用于确定Job的执行时间,根据设置的 trigger规则,计算得到下次执行此job的时间,满足时将会执行;
- executor:apscheduler定义的执行器,job创建时设置执行器的名字,根据字符串你名字到scheduler获取到执行此job的执行器,执行job指定的函数;
- max_instances:执行此job的最大实例数,executor执行job时,根据job的id来计算执行次数,根据设置的最大实例数来确定是否可执行;
- next_run_time:Job下次的执行时间,创建Job时可以指定一个时间[datetime],不指定的话则默认根据trigger获取触发时间;
- misfire_grace_time:Job的延迟执行时间,例如Job的计划执行时间是21:00:00,但因服务重启或其他原因导致21:00:31才执行,如果设置此key为40,则该job会继续执行,否则将会丢弃此job
- coalesce:Job是否合并执行,是一个bool值。例如scheduler停止20s后重启启动,而job的触发器设置为5s执行一次,因此此job错过了4个执行时间,如果设置为是,则会合并到一次执行,否则会逐个执行
- func:Job执行的函数
- args:Job执行函数需要的位置参数
- kwargs:Job执行函数需要的关键字参数
- 指定时间的DateTrigger
- 指定间隔时间的IntervalTrigger
- 像Linux的crontab一样的CronTrigger
- run_date (datetime|str) – the date/time to run the job at
- timezone(datetime.tzinfo|str) – time zone for run_date if it doesn’t have one already
- weeks (int) – 间隔几周
- days (int) – 间隔几天
- hours (int) – 间隔几小时
- minutes(int) – 间隔几分钟
- seconds (int) – 间隔多少秒
- start_date (datetime|str) – 开始日期
- end_date (datetime|str) – 结束日期
- timezone (datetime.tzinfo|str) – 时区
- (int|str) 表示参数既可以是int类型,也可以是str类型
- (datetime | str)表示参数既可以是datetime类型,也可以是str类型
- year (int|str) – 4-digit year-(表示四位数的年份,如2008年)
- month (int|str) – month (1-12) -(表示取值范围为1-12月)
- day (int|str) – day of the (1-31) -(表示取值范围为1-31日)
- week (int|str) – ISOweek (1-53) -(格里历2006年12月31日可以写成2006年-W52-7(扩展形式)或2006W527(紧凑形式))
- day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) – (表示一周中的第几天,既可以用0-6表示也可以用其英语缩写表示)
- hour (int|str) – hour (0-23) – (表示取值范围为0-23时)
- minute (int|str) – minute (0-59) – (表示取值范围为0-59分)
- second (int|str) – second (0-59) –(表示取值范围为0-59秒)
- start_date (datetime|str) – earliest possible date/time to trigger on (inclusive) – (表示开始时间)
- end_date(datetime|str) – latest possible date/time to trigger on (inclusive) – (表示结束时间)
- timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone) -(表示时区取值)
- executors.asyncio:同步io,阻塞
- executors.gevent:io多路复用,非阻塞
- executors.pool:线程ThreadPoolExecutor和进程ProcessPoolExecutor
- executors.twisted:基于事件驱动
- jobstores.memory:内存
- jobstores.mongodb:存储在mongodb
- jobstores.redis:存储在redis
- jobstores.rethinkdb:存储在rethinkdb
- jobstores.sqlalchemy:支持sqlalchemy的数据库如mysql,sqlite等
- jobstores.zookeeper:zookeeper
- EVENT_SCHEDULER_STARTED
- EVENT_SCHEDULER_START
- EVENT_SCHEDULER_SHUTDOWN
- EVENT_SCHEDULER_PAUSED
- EVENT_SCHEDULER_RESUMED
- EVENT_EXECUTOR_ADDED
- EVENT_EXECUTOR_REMOVED
- EVENT_JOBSTORE_ADDED
- EVENT_JOBSTORE_REMOVED
- EVENT_ALL_JOBS_REMOVED
- EVENT_JOB_ADDED
- EVENT_JOB_REMOVED
- EVENT_JOB_MODIFIED
- EVENT_JOB_EXECUTED
- EVENT_JOB_ERROR
- EVENT_JOB_MISSED
- EVENT_JOB_SUBMITTED
- EVENT_JOB_MAX_INSTANCES Listener表示用户自定义监听的一些Event,比如当Job触发了EVENT_JOB_MISSED事件时可以根据需求做一些其他处理。
- BlockingScheduler:适用于调度程序是进程中唯一运行的进程,调用start函数会阻塞当前线程,不能立即返回。
- BackgroundScheduler:适用于调度程序在应用程序的后台运行,调用start后主线程不会阻塞。
- AsyncIOScheduler:适用于使用了asyncio模块的应用程序。
- GeventScheduler:适用于使用gevent模块的应用程序。
- TwistedScheduler:适用于构建Twisted的应用程序。
- QtScheduler:适用于构建Qt的应用程序。
- Celery Beat,任务调度器,Beat进程会读取配置文件的内容,周期性地将配置中到期需要执行的任务发送给任务队列。
- Producer:需要在队列中进行的任务,一般由用户、触发器或其他操作将任务入队,然后交由workers进行处理。调用了Celery提供的API、函数或者装饰器而产生任务并交给任务队列处理的都是任务生产者。
- Broker,即消息中间件,在这指任务队列本身,Celery扮演生产者和消费者的角色,brokers就是生产者和消费者存放/获取产品的地方(队列)。
- Celery Worker,执行任务的消费者,从队列中取出任务并执行。通常会在多台服务器运行多个消费者来提高执行效率。 Result
- Backend:任务处理完后保存状态信息和结果,以供查询。Celery默认已支持Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy等方式。
- Python Celery & RabbitMQ Tutorial
- Celery 配置实践笔记
- BashOperator – 执行 bash 命令或脚本。
- SSHOperator – 执行远程 bash 命令或脚本(原理同paramiko 模块)。
- PythonOperator – 执行 Python 函数。 EmailOperator – 发送Email。
- HTTPOperator – 发送一个 HTTP 请求。
- MySqlOperator,SqliteOperator,PostgresOperator,MsSqlOperator,OracleOperator, JdbcOperator, 等,执行SQL 任务。
- DockerOperator, HiveOperator,S3FileTransferOperator,PrestoToMysqlOperator, SlackOperator…
- 时间依赖:任务需要等待某一个时间点触发。
- 外部系统依赖:任务依赖外部系统需要调用接口去访问。
- 任务间依赖:任务 A 需要在任务 B完成后启动,两个任务互相间会产生影响。
- 资源环境依赖:任务消耗资源非常多, 或者只能在特定的机器上执行。
- Airflow 是一种 WMS,即:它将任务以及它们的依赖看作代码,按照那些计划规范任务执行,并在实际工作进程之间分发需执行的任务。
- Airflow 提供了一个用于显示当前活动任务和过去任务状态的优秀 UI,并允许用户手动管理任务的执行和状态。
- Airflow中的工作流是具有方向性依赖的任务集合。
- DAG 中的每个节点都是一个任务,DAG中的边表示的是任务之间的依赖(强制为有向无环,因此不会出现循环依赖,从而导致无限执行循环)。
- DAGs:即有向无环图(Directed AcyclicGraph),将所有需要运行的tasks按照依赖关系组织起来,描述的是所有tasks执行顺序。
- Operators:可以简单理解为一个class,描述了DAG中某个的task具体要做的事。其中,airflow内置了很多operators,如BashOperator执行一个bash 命令,PythonOperator 调用任意的Python 函数,EmailOperator用于发送邮件,HTTPOperator 用于发送HTTP请求, SqlOperator用于执行SQL命令等等,同时,用户可以自定义Operator,这给用户提供了极大的便利性。
- Tasks:Task 是Operator的一个实例,也就是DAGs中的一个node。
- Task Instance:task的一次运行。Web界面中可以看到task instance 有自己的状态,包括”running”, “success”, “failed”, “skipped”, “up for retry”等。
- TaskRelationships:DAGs中的不同Tasks之间可以有依赖关系,如 Task1 >>Task2,表明Task2依赖于Task2了。通过将DAGs和Operators结合起来,用户就可以创建各种复杂的工作流(workflow)。
- 元数据库:这个数据库存储有关任务状态的信息。
- 调度器:Scheduler 是一种使用 DAG 定义结合元数据中的任务状态来决定哪些任务需要被执行以及任务执行优先级的过程。调度器通常作为服务运行。
- 执行器:Executor 是一个消息队列进程,它被绑定到调度器中,用于确定实际执行每个任务计划的工作进程。有不同类型的执行器,每个执行器都使用一个指定工作进程的类来执行任务。例如,LocalExecutor 使用与调度器进程在同一台机器上运行的并行进程执行任务。其他像 CeleryExecutor 的执行器使用存在于独立的工作机器集群中的工作进程执行任务。
- Workers:这些是实际执行任务逻辑的进程,由正在使用的执行器确定。
- SequentialExecutor: 单进程顺序执行,一般只用来测试;
- LocalExecutor: 本地多进程执行;
- CeleryExecutor: 使用Celery进行分布式任务调度;
- DaskExecutor:使用Dask进行分布式任务调度;
- KubernetesExecutor: 1.10.0新增, 创建临时POD执行每次任务;
发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/154741.html原文链接:https://javaforall.cn
【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛
【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...