APScheduler轻量级定时任务框架

目录

  • 一、APScheduler简介

    • 支持的后端存储作业
    • 集成的Python框架
  • 二、APScheduler下载安装
  • 三、APScheduler组件
    • 各组件简介

      • 调度器
      • 作业存储器
      • 执行器
      • 触发器
  • 四、使用
    • 添加任务
    • 指定时间执行任务,只执行一次
    • 间隔时间执行任务

一、APScheduler简介

APScheduler(Advanced Python Scheduler)是一个轻量级的Python定时任务调度框架(Python库)。

APScheduler有三个内置的调度系统,其中包括:

  • cron式调度(可选开始/结束时间)
  • 基于间隔的执行(以偶数间隔运行作业,也可以选择开始/结束时间)
  • 一次性延迟执行任务(在指定的日期/时间内运行作业一次)

支持的后端存储作业

APScheduler可以任意混合和匹配调度系统和作业存储的后端,其中支持后端存储作业包括:

  • Memory
  • SQLAlchemy
  • MongoDB
  • Redis
  • RethinkDB
  • ZooKeeper

集成的Python框架

APScheduler内继承了几个常见的Python框架:

  • asyncio
  • gevent
  • tornado
  • qt

二、APScheduler下载安装

使用pip安装:

pip install apscheduler
pip install apscheduler==3.6.3

如果超时或者出现别的情况,可以选择:

# 法1使用豆瓣源下载
pip install -i https://pypi.doubanio.com/simple/ apscheduler
# 法2使用清华源下载
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple apscheduler

要是再不行,点击该链接或者pypi官网下载了。下载并解压缩,进入跟setup.py文件同级的目录,打开cmd,使用命令进行下载:

python setup.py install

三、APScheduler组件

APScheduler共有4种组件,分别是:

  • 触发器(trigger),触发器中包含调度逻辑,每个作业都有自己的触发器来决定下次运行时间。除了它们自己初始配置以外,触发器完全是无状态的。
  • 作业存储器(job store),存储被调度的作业,默认的作业存储器只是简单地把作业保存在内存中,其他的作业存储器则是将作业保存在数据库中,当作业被保存在一个持久化的作业存储器中的时候,该作业的数据会被序列化,并在加载时被反序列化,需要说明的是,作业存储器不能共享调度器。
  • 执行器(executor),处理作业的运行,通常通过在作业中提交指定的可调用对象到一个线程或者进程池来进行,当作业完成时,执行器会将通知调度器。
  • 调度器(scheduler),配置作业存储器和执行器可以在调度器中完成。例如添加、修改、移除作业,根据不同的应用场景,可以选择不同的调度器,可选的将在下一小节展示。

各组件简介

调度器

  • BlockingScheduler : 当调度器是你应用中唯一要运行的东西时。
  • BackgroundScheduler : 当你没有运行任何其他框架并希望调度器在你应用的后台执行时使用(充电桩即使用此种方式)。
  • AsyncIOScheduler : 当你的程序使用了asyncio(一个异步框架)的时候使用。
  • GeventScheduler : 当你的程序使用了gevent(高性能的Python并发框架)的时候使用。
  • TornadoScheduler : 当你的程序基于Tornado(一个web框架)的时候使用。
  • TwistedScheduler : 当你的程序使用了Twisted(一个异步框架)的时候使用
  • QtScheduler : 如果你的应用是一个Qt应用的时候可以使用。

作业存储器

如果你的应用在每次启动的时候都会重新创建作业,那么使用默认的作业存储器(MemoryJobStore)即可,但是如果你需要在调度器重启或者应用程序奔溃的情况下任然保留作业,你应该根据你的应用环境来选择具体的作业存储器。例如:使用Mongo或者SQLAlchemy JobStore (用于支持大多数RDBMS)

执行器

对执行器的选择取决于你使用上面哪些框架,大多数情况下,使用默认的ThreadPoolExecutor已经能够满足需求。如果你的应用涉及到CPU密集型操作,你可以考虑使用ProcessPoolExecutor来使用更多的CPU核心。你也可以同时使用两者,将ProcessPoolExecutor作为第二执行器。

触发器

当你调度作业的时候,你需要为这个作业选择一个触发器,用来描述这个作业何时被触发,APScheduler有三种内置的触发器类型:

  • date 一次性指定日期
  • interval 在某个时间范围内间隔多长时间执行一次
  • cron 和Linux crontab格式兼容,最为强大

四、使用

当你需要调度作业的时候,你需要为这个作业选择一个触发器,用来描述该作业将在何时被触发,APScheduler有3中内置的触发器类型:

  • 新建一个调度器(scheduler)
  • 添加一个调度任务(job store)
  • 运行调度任务

添加任务

有两种方式可以添加一个新的作业:

  • add_job来添加作业
  • 装饰器模式添加作业

指定时间执行任务,只执行一次

import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
def job2(text):
    print(‘job2‘, datetime.datetime.now(), text)
scheduler = BlockingScheduler()
scheduler.add_job(job2, ‘date‘, run_date=datetime.datetime(2020, 2, 25, 19, 5, 6), args=[‘text‘], id=‘job2‘)
scheduler.start()

上例中,只在2010-2-25 19:05:06执行一次,args传递一个text参数。

间隔时间执行任务

下面来个简单的例子,作业每个5秒执行一次:

import datetime
from apscheduler.schedulers.blocking import BlockingScheduler

def job1():
    print(‘job1‘, datetime.datetime.now())
scheduler = BlockingScheduler()
scheduler.add_job(job1, ‘interval‘, seconds=5, id=‘job1‘)  # 每隔5秒执行一次
scheduler.start()

每天凌晨1点30分50秒执行一次

# 装饰器的方式

from apscheduler.schedulers.blocking import BlockingScheduler  # 后台运行
sc = BlockingScheduler()
f = open(‘t1.text‘, ‘a‘, encoding=‘utf8‘)

@sc.scheduled_job(‘cron‘, day_of_week=‘*‘, hour=1, minute=‘30‘, second=‘50‘)
def check_db():
    print(111111111111)
if __name__ == ‘__main__‘:
    try:
        sc.start()
        f.write(‘定时任务成功执行‘)
    except Exception as e:
        sc.shutdown()
        f.write(‘定时任务执行失败‘)
    finally:
        f.close()

每几分钟执行一次:

import datetime
from apscheduler.schedulers.blocking import BlockingScheduler

def job1():
    print(‘job1‘, datetime.datetime.now())
scheduler = BlockingScheduler()
# 每隔2分钟执行一次, */1:每隔1分钟执行一次
scheduler.add_job(job1, ‘cron‘, minute="*/2", id=‘job1‘)
scheduler.start()

每小时执行一次:

import datetime
from apscheduler.schedulers.blocking import BlockingScheduler

def job1():
    print(‘job1‘, datetime.datetime.now())
scheduler = BlockingScheduler()
# 每小时执行一次
scheduler.add_job(job1, ‘interval‘, hours=1, id=‘job1‘)
# 每小时执行一次,上下浮动120秒区间内
# scheduler.add_job(job1, ‘interval‘, hours=1, id=‘job1‘, jitter=120)
scheduler.start()

原文地址:https://www.cnblogs.com/guyouyin123/p/12688029.html

时间: 2024-11-15 00:02:30

APScheduler轻量级定时任务框架的相关文章

[转]Python定时任务框架APScheduler

APScheduler是基于Quartz的 一个Python定时任务框架,实现了Quartz的所有功能,使用起来十分方便.提供了基于日期.固定时间间隔以及crontab类型的任务,并且可以 持久化任务.基于这些功能,我们可以很方便的实现一个python定时任务系统,写python还是要比java舒服多了. 1. 安装 安装过程很简单,可以基于easy_install和源码. easy_install apscheduler 或者下载源码,运行命令: python setup.py install

Python定时任务框架APScheduler 3.0.3 Cron示例

APScheduler是基于Quartz的一个Python定时任务框架,实现了Quartz的所有功能,使用起来十分方便.提供了基于日期.固定时间间隔以及crontab类型的任务,并且可以持久化任务.基于这些功能,我们可以很方便的实现一个python定时任务系统. 安装 安装过程很简单,可以基于pip和源码. Pip install apscheduler==3.0.3 或者下载源码,运行命令: Python setup.py install cron job例子 1: #coding=utf-8

Python下定时任务框架APScheduler的使用

1.APScheduler简介: APScheduler是Python的一个定时任务框架,可以很方便的满足用户定时执行或者周期执行任务的需求,它提供了基于日期date.固定时间间隔interval .以及类似于Linux上的定时任务crontab类型的定时任务.并且该框架不仅可以添加.删除定时任务,还可以将任务存储到数据库中,实现任务的持久化,所以使用起来非常方便. 2.APScheduler安装: APScheduler的安装相对来说也非常简单,可以直接利用pip安装,如果没有pip可以下载源

定时任务框架APScheduler学习详解

APScheduler简介 在平常的工作中几乎有一半的功能模块都需要定时任务来推动,例如项目中有一个定时统计程序,定时爬出网站的URL程序,定时检测钓鱼网站的程序等等,都涉及到了关于定时任务的问题,第一时间想到的是利用time模块的time.sleep()方法使程序休眠来达到定时任务的目的,虽然这样也可以,但是总觉得不是那么的专业,^_^所以就找到了python的定时任务模块APScheduler: APScheduler基于Quartz的一个Python定时任务框架,实现了Quartz的所有功

apscheduler 定时任务框架

一.APScheduler简介: Python的一个定时任务框架,满足用户定时执行或者周期性执行任务的需求,提供了基于日期date.固定的时间间隔interval.以及类似于Linux上的定时任务crontab类型的定时任务.并且该框架不仅可以添加.删除定时任务,还可以将任务存储到数据库中,实现任务的持久化. Python的第三方库,用来提供Python的后台程序.包含四个组件,分别是: triggers:任务触发器组件,提供任务触发方式 job stores: 任务商店组件,提供任务保存方式

APScheduler定时任务框架

1.简介 APScheduler是一个Python**定时任务框架**,提供了**基于日期**.**固定时间间隔**以及**crontab**类型的任务,并且可以**持久化任务**.基于这些功能,我们可以很方便的实现一个python定时任务系统. 2.安装 pip install apscheduler 3.组成部分 触发器(triggers):触发器包含调度逻辑,描述一个任务何时被触发,按日期或按时间间隔或按 cronjob 表达式三种方式触发.每个作业都有它自己的触发器,除了初始配置之外,触

[Dynamic Language] Python定时任务框架

APScheduler是一个Python定时任务框架,使用起来十分方便.提供了基于日期.固定时间间隔以及crontab类型的任务,并且可以持久化任务.并以daemon方式运行应用. 测试计划任务 mac-abeen:timetask abeen$ vim testtask.py 1 # !/usr/bin/env python 2 # -*- encoding:utf-8 -*- 3 4 from datetime import datetime 5 6 7 class TestTask(obj

C# 的轻量级 RPC 框架

Redola.Rpc 的一个小目标 Redola.Rpc 的一个小目标 Redola.Rpc 的一个小目标:20000 tps. Concurrency level: 8 threads Complete requests: 20000 Time taken for tests: 0.886 seconds Time per request: 0.044 ms (avg) Requests per second: 22573 [#/sec] (avg) Concurrency level: 8

微博轻量级RPC框架Motan正式开源:支撑千亿调用

支撑微博千亿调用的轻量级 RPC 框架 Motan 正式开源了,项目地址为https://github.com/weibocom/motan. 微博轻量级RPC框架Motan正式开源 Motan 是微博技术团队研发的基于 Java 的轻量级 RPC 框架,已在微博内部大规模应用多年,每天稳定支撑微博上亿次的内部调用.Motan 基于微博的高并发和高负载场景优化,成为一套简单.易用.高可用的 RPC 服务框架. Motan 功能特点:简单.易用.高可用 无侵入集成.简单易用,通过 Spring 配