python使用apscheduler实现定时任务

安装apscheduler

pip install apscheduler

基本概念

1、触发器triggers。 触发器包含调度逻辑。每个作业都有自己的触发器,用于确定下一个任务何时运行。触发器有三种内建的trigger:

  1. data: 特定的时间触发
  2. interval: 固定的时间间隔触发
  3. cron: 在特定时间周期性地触发

2、 任务存储器 job stores。 用于存放任务,把任务存放在内存(为默认MemoryJobStore)或数据库中

3、执行器 executors。 执行器是将任务提交到线程池或进程池中运行,当任务完成时,执行器通知调度器触发相应的事件

4、调度器scheduler。 把上方三个组件作为参数,通过创建调度器实例来运行

根据开发需求选择对应的组件

BlockingScheduler            阻塞式调度器:适用于只跑调度器的程序。
BackgroundScheduler      后台调度器:适用于非阻塞的情况,调度器会在后台独立运行。
AsyncIOScheduler            AsyncIO调度器,适用于应用使用AsnycIO的情况。
GeventScheduler              Gevent调度器,适用于应用通过Gevent的情况。
TornadoScheduler            Tornado调度器,适用于构建Tornado应用。
TwistedScheduler             Twisted调度器,适用于构建Twisted应用。
QtScheduler                      Qt调度器,适用于构建Qt应用。

使用步骤

1、新建一个调度器schedulers

2、添加调度任务

3、运行调度任务

使用示例

触发器date:特点的时间点触发,只执行一次

from datetime import datetime
from datetime import date
from apscheduler.schedulers.blocking import BlockingScheduler

def job(text):
    print(text)

scheduler = BlockingScheduler()
# 在 2019-8-30 运行一次 job 方法
scheduler.add_job(job, ‘date‘, run_date=date(2019, 8, 30), args=[‘text1‘])
# 在 2019-8-30 01:00:00 运行一次 job 方法
scheduler.add_job(job, ‘date‘, run_date=datetime(2019, 8, 30, 1, 0, 0), args=[‘text2‘])
# 在 2019-8-30 01:00:01 运行一次 job 方法
scheduler.add_job(job, ‘date‘, run_date=‘2019-8-30 01:00:00‘, args=[‘text3‘])

scheduler.start()

触发器interval:固定时间间隔触发。

参数 说明
weeks(int) 间隔几周
days(int) 间隔几天
hours(int) 间隔几小时
minutes(int) 间隔几分钟
seconds(int) 间隔多少秒
start_date(datetime或str) 开始日期
end_date(datetime或str) 结束日期
timezone(datetime.tzinfo 或str)
from apscheduler.schedulers.blocking import BlockingScheduler

def run(text):
    print(text)

if __name__ == ‘__main__‘:
    scheduler = BlockingScheduler()
    # 每隔 1分钟 运行一次 job 方法
    scheduler.add_job(run, ‘interval‘, minutes=1, args=[‘job1‘])
    # 在 2019-08-29 22:15:00至2019-08-29 22:17:00期间,每隔1分30秒 运行一次 job 方法
    scheduler.add_job(run, ‘interval‘, minutes=1, seconds=30, start_date=‘2019-08-29 22:15:00‘, end_date=‘2019-08-29 22:17:00‘, args=[‘job2‘])
    scheduler.start()

触发器cron: 在特定的时间周期性地触发。

参数 说明
year (int 或 str) 年,4位数字
month (int 或 str) 月 (范围1-12)
day (int 或 str) 日 (范围1-31)
week (int 或 str) 周 (范围1-53)
day_of_week (int 或 str) 周内第几天或者星期几 (范围0-6 或者 mon,tue,wed,thu,fri,sat,sun)
hour (int 或 str) 时 (范围0-23)
minute (int 或 str) 分 (范围0-59)
second (int 或 str) 秒 (范围0-59)
start_date (datetime 或 str) 最早开始日期(包含)
end_date (datetime 或 str) 最晚结束时间(包含)
timezone (datetime.tzinfo 或str) 指定时区
from apscheduler.schedulers.blocking import BlockingScheduler

def exec_job(text):
    print(text)

if __name__ == ‘__main__‘:
    scheduler = BlockingScheduler()
    # 在每天的22点,每隔一分钟 运行一次exec_job方法
    scheduler.add_job(exec_job, ‘cron‘, hour=22, minute=‘*/1‘, args=[‘job_one‘])
    # 在每天的22点和23点的25分, 运行一次exec_job方法
    scheduler.add_job(exec_job, ‘cron‘, hour=‘22-23‘, minute=‘25‘, args=[‘result‘])
    scheduler.start()

添加任务

添加任务的方法有两种

(1)、通过add_job()添加任务

(2)、通过装饰器scheduled_job()

from datetime import datetime
from apscheduler.schedulers.blocking import BlockingScheduler

scheduler = BlockingScheduler()

def tick(message=‘working‘):
    print(‘Tick! The time is: %s‘ % datetime.now())
    print("worker status is:", message)

def tick_other(status="relaxing"):
    print(‘Tick! The time is: %s‘ % datetime.now())
    print("worker status is:", status)

# 添加任务的方法有两种 1、通过调用add_job(), 2、通过装饰器scheduled_job()
# add_job()方法返回一个apscheduler.job.Job实例,可以使用该实例稍后修改或删除该任务
@scheduler.scheduled_job(‘interval‘, seconds=5)
def tick_another(status="restart"):
    print(‘Tick! The time is: %s‘ % datetime.now())
    print("worker status is:", status)

if __name__ == ‘__main__‘:
    # 在指定期间内, 每隔1分10秒运行一次tick方法
    scheduler.add_job(tick, ‘interval‘, minutes=1, seconds=10,
                      start_date=‘2020-01-06 14:19:00‘, end_date=‘2020-01-06 14:24:00‘, args=[‘sleep‘])
    # 在每天的11点25分,运行一次tick_other方法
    scheduler.add_job(tick_other, ‘cron‘, hour=‘11‘, minute=‘25‘, args=[‘hard working‘])
    scheduler.start()

参考来源

在线文档:在线文档

转载来源:文档

原文地址:https://www.cnblogs.com/ppwang06/p/12534754.html

时间: 2024-08-30 16:43:41

python使用apscheduler实现定时任务的相关文章

APScheduler轻量级定时任务框架

目录 一.APScheduler简介 支持的后端存储作业 集成的Python框架 二.APScheduler下载安装 三.APScheduler组件 各组件简介 调度器 作业存储器 执行器 触发器 四.使用 添加任务 指定时间执行任务,只执行一次 间隔时间执行任务 一.APScheduler简介 APScheduler(Advanced Python Scheduler)是一个轻量级的Python定时任务调度框架(Python库). APScheduler有三个内置的调度系统,其中包括: cro

Python下APScheduler的简单使用

今天准备实现一个功能需要用到定时执行任务,所以就看到了Python的一个定时任务框架APScheduler,试了一下感觉还不错. 1.APScheduler简介: APScheduler是Python的一个定时任务框架,可以很方便的满足用户定时执行或者周期执行任务的需求,它提供了基于日期date.固定时间间隔interval .以及类似于Linux上的定时任务crontab类型的定时任务.并且该框架不仅可以添加.删除定时任务,还可以将任务存储到数据库中,实现任务的持久化,所以使用起来非常方便.

使用python crontab设置linux定时任务

熟悉linux的朋友应该知道在linux中可以使用crontab设置定时任务.可以通过命令crontab -e编写任务.当然也可以直接写配置文件设置任务. 但是有时候希望通过脚本自动设置,比如我们应用程序部署时等.有需求当然就得想办法解决,不然在程序猿界混(一群自得其乐的猿). 下面进入正题,开始想通过以写文件的形式设置,通过在配置文件中直接追加一行即可.但是读写文件难免有点繁琐,再比如:设置任务时要检查任务是否已经存在:根据输入参数设置相应的任务等.以读写文件难免不太合适.所以想到了"万能&q

python(flask)+apscheduler定时邮件重发两次的问题

工作中遇到一个需求,要在现有系统(airbnb家的开源平台superset)上添加一个定时邮件的功能. 定时邮件功能使用的是apscheduler这个库,关于怎么用这里就不多赘述了反正网上都有. 主要记录一个问题,使用过程中发现邮件有的时候会重发两次,经过研究之后发现是runserver的时候,调度器实例被创建了两次. 这个主要和系统使用的flask的一个reload机制有关(FLASK_USE_RELOAD = True),reload主要用于代码的热更新(简单解释就是,当你runserver

Flask(十一)flash与APScheduler 实现定时任务

from flask import Flask from flask_apscheduler import APScheduler # 引入APScheduler class Config(object): # 创建配置,用类 JOBS = [ # 任务列表 { # 任务字典(细节) 'id': 'job1', 'func': '__main__:job_1', 'args': (1, 2), 'trigger': 'cron', 'hour': 19, 'minute': 27 }, { #

详解使用python crontab设置linux定时任务

熟悉linux的朋友应该知道在linux中可以使用crontab设置定时任务.可以通过命令crontab -e编写任务.当然也可以直接写配置文件设置任务. 但是有时候希望通过脚本自动设置,比如我们应用程序部署时等.有需求当然就得想办法解决,不然在程序猿界混(一群自得其乐的猿). 下面进入正题,开始想通过以写文件的形式设置,通过在配置文件中直接追加一行即可.但是读写文件难免有点繁琐,再比如:设置任务时要检查任务是否已经存在:根据输入参数设置相应的任务等.以读写文件难免不太合适.所以想到了“万能”的

44. Python Celery多实例 定时任务

celery是一个分布式的任务调度模块,那么celery是如何和分布式挂钩呢? celery可以支持多台不同的计算机执行不同的任务或者相同的任务. 如果要说celery的分布式应用的话,就要提到celery的消息路由机制,提到AMQP协议. 具体可以查看AMQP文档详细了解. 简单理解: 可以有多个"消息队列"(message Queue),不同的消息可以指定发送给不同的Message Queue, 而这是通过Exchange来实现的,发送消息到"消息队列"中时,可

Celery+python+redis异步执行定时任务

我之前的一篇文章中写了[Celery+django+redis异步执行任务] 博文:http://blog.csdn.net/apple9005/article/details/54236212 你会发现,这些代码并不依赖django框架,随便写到一个py文件中,就可以轻松的执行成功,这是因为这些代码并没有用到django-celery,django-redis等依附于django框架的东西. 今天,参照官方文档示例,测试一下celery的异步执行定时任务如何.我先是在django框架内执行了一

[转]Python定时任务框架APScheduler

APScheduler是基于Quartz的 一个Python定时任务框架,实现了Quartz的所有功能,使用起来十分方便.提供了基于日期.固定时间间隔以及crontab类型的任务,并且可以 持久化任务.基于这些功能,我们可以很方便的实现一个python定时任务系统,写python还是要比java舒服多了. 1. 安装 安装过程很简单,可以基于easy_install和源码. easy_install apscheduler 或者下载源码,运行命令: python setup.py install