apscheduler 定时任务框架

一、APScheduler简介:

Python的一个定时任务框架,满足用户定时执行或者周期性执行任务的需求,提供了基于日期date、固定的时间间隔interval、以及类似于Linux上的定时任务crontab类型的定时任务。并且该框架不仅可以添加、删除定时任务,还可以将任务存储到数据库中,实现任务的持久化。

Python的第三方库,用来提供Python的后台程序。包含四个组件,分别是:

triggers:任务触发器组件,提供任务触发方式

job stores: 任务商店组件,提供任务保存方式

executors:任务调度组件,提供任务调度方式

schedulers: 任务调度组件,提供任务工作方式

二、APScheduler安装

1)利用pip安装(推荐)

# pip install apscheduler
2 基于源码:https://pypi.python.org/pypi/APScheduler/

# python setup.py install

三、基本概念

1、 APScheduler有四种组件及相关说明

1) triggers(触发器):触发器包含调度逻辑,每一个作业有它自己的触发器,用于决定接下来哪一个作业会运行,除了他们自己初始化配置外,触发器完全是无状态的。
2 )job stores(作业存储):用来存储被调度的作业,默认的作业存储器是简单地把作业任务保存在内存中,其他作业存储器可以将任务保存到各种数据库中,支持MongoDB、Redis、SQLAlchemy存储方式。当对作业任务进行持久化存储的时候,作业的数据将被序列化,重新读取作业时在反序列化。

3)executors(执行器):执行器用来执行定时任务,只是将需要执行的任务放在新的线程或者线程池中运行。当作业任务完成时,执行器就会通知调度器。对于执行器,默认情况下选择ThreadPoolExecutor就可以了,但是如果涉及到一下特殊任务如比较消耗CPU的任务则可以选择ProcessPoolExecutor,当然根据实际需求可以同时使用两种执行器

4)schedulers(调度器):调度器是将其他部分联系在一起,一般在应用程序中只有一个调度器,应用开发者不会直接操作触发器、任务存储以及执行器。相反调度器提供了处理的接口。通过调度器完成任务的存储以及执行器的配置操作,如可以添加、移除、修改任务作业。

APScheduler提供了多种调度器,可以根据具体需求来选择合适的调度器,常用的调度器有:

BlockingScheduler:适合于只在进程中运行单个任务的情况,通常在调度器是你唯一要运行的东西时使用

BackgroundScheduler:适合于要求任务在程序后台运行的情况,当希望调度器在应用后台执行时使用。

AsyncIOScheduler:适合于使用asyncio框架的情况

GeventScheduler:适合于使用gevent框架的情况

TornadoScheduler:适合于使用Tornado框架的应用

TwistedScheduler:适合使用Twisted框架的应用

QtScheduler:适合使用QT的情况

2、配置调度器

APScheduler提供了许多不同的方式来配置调度器,你可以使用一个配置字典或者作为参数关键字的方式传入。你也可以先创建调度器。在配置和添加作业,这样可以在不同的环境中得到更大的灵活性。

3、简单的实例

from apscheduler.schedulers.blocking import BlockingScheduler
import time
#实例化一个调度器
scheduler = BlockingScheduler()

def job1():
print "%s: 执行任务" % time.asctime()

# 添加任务并设置触发方式为3s一次

scheduler.add_job(job1, ‘interval‘, seconds=3)
#开始运行调度器
scheduler.start()

四、各组件功能

1、trigger组件

trigger提供任务的触发方式,共三种方式:

date:只在某个时间点执行一次run_date(datetime|str)

scheduler.add_job(my_job, ‘date‘, run_date=date(2017, 9, 8), args=[])
scheduler.add_job(my_job, ‘date‘, run_date=datetime(2017, 9, 8, 21, 30, 5), args=[])
scheduler.add_job(my_job, ‘date‘, run_date=‘2019-6-12 21:30:05‘, args=[])
# The ‘date‘ trigger and datetime.now() as run_date are implicit
sched.add_job(my_job, args=[[])

interval:每隔一段时间执行一次weeks=0 | days=0 | hours=0 | minutes=0 | seconds=0,

start_date=None, end_date=None, timezone=None

scheduler.add_job(my_job, ‘interval‘, hours=2)
scheduler.add_job(my_job, ‘interval‘, hours=2, start_date=‘2017-9-8 21:30:00‘,
end_date=‘2019-06-12 21:30:00)
@scheduler.scheduled_job(‘interval‘, id=‘my_job_id‘, hours=2)
def my_job():
    print("Hello World")

cron:使用Linux下crontab的方式(year=None, month=None, day=None, week=None, day_of_week=None, hour=None, minute=None, second=None, start_date=None, end_date=None, timezone=None)

      sched.add_job(my_job, ‘cron‘, hour=3, minute=30)

    sched.add_job(my_job, ‘cron‘, day_of_week=‘mon-fri‘, hour=5, minute=30, end_date=‘2017-10-
30‘)
    @sched.scheduled_job(‘cron‘, id=‘my_job_id‘, day=‘last sun‘)
    def some_decorated_task():
        print("I am printed at 00:00:00 on the last Sunday of every month!")

2、scheduler组件

scheduler组件提供执行的方式,在不同的运行环境中选择合适的方式

BlockingScheduler:进程中只运行调度器时的方式

from apscheduler.schedulers.blocking import BlockingScheduler
import time
scheduler = BlockingScheduler()
def job1():

print "%s: 执行任务" % time.asctime()

scheduler.add_job(job1, ‘interval‘, seconds=3)
scheduler.start()

BackgroundScheduler:不想使用任何框架时的方式

from apscheduler.schedulers.background import BackgroundScheduler
import time
scheduler = BackgroundScheduler()
def job1():

print "%s:执行任务 " % time.asctime()

scheduler.add_job(job1, ‘interval‘, seconds=3)

scheduler.start()
while True:
    pass

AsyncIOScheduler: asyncio module的方式( Python3)

from apscheduler.schedulers.asyncio import AsyncIOScheduler
try:
    import asyncio
except ImportError:
    import trollius as asyncio
...
...

# while True pass

try:
    asyncio.get_event_loop().run_forever()
except (KeyboardInterrupt, SystemExit):
    pass

GeventScheduler: gevent方式

from apscheduler.schedulers.gevent import GeventScheduler
...

...

g = scheduler.start()
# while True:pass
try:
    g.join()
except (KeyboardInterrupt, SystemExit):
    pass

TornadoScheduler: Tornado方式

from tornado.ioloop import IOLoop
from apscheduler.schedulers.tornado import TornadoScheduler

...

...

# while True:pass
try:
    IOLoop.instance().start()
except (KeyboardInterrupt, SystemExit):
    pass

TwistedScheduler: Twisted方式

from twisted.internet import reactor
from apscheduler.schedulers.twisted import TwistedScheduler

...

...

# while True:pass
try:
    reactor.run()
except (KeyboardInterrupt, SystemExit):
    pass

QtScheduler: Qt方式

3、executors组件

executors组件提供任务的调度方式

base

debug
gevent

pool(max_workers=10)

twisted 

4、jobstore组件

jobstore提供任务的各种持久化方式

base

memory
mongodb
    scheduler.add_jobstore(‘mongodb‘, collection=‘example_jobs‘)

redis

scheduler.add_jobstore(‘redis‘, jobs_key=‘example.jobs‘, run_times_key=‘example.run_times‘)
rethinkdb
    scheduler.add_jobstore(‘rethinkdb‘, database=‘apscheduler_example‘)

sqlalchemy

  scheduler.add_jobstore(‘sqlalchemy‘, url=url)

zookeeper

  scheduler.add_jobstore(‘zookeeper‘, path=‘/example_jobs‘)

五、任务操作

1、添加任务add_job(如上)

如果使用了任务的存储,开启时最好添加replace_existing=True,否则每次开启时都会创建任务的副本,开启后任务不会马上启动,可修改triger参数

2、删除任务remove_job

#根据任务实例删除

job = scheduler.add_job(myfunc, ‘interval‘, minutes=2)
job.remove()

# 根据任务id删除

scheduler.add_job(myfunc, ‘interval‘, minutes=2, id=‘my_job_id‘)
scheduler.remove_job(‘my_job_id‘)

3、任务的暂停pause_job和继续resume_job

job = scheduler.add_job(myfunc, ‘interval‘, minutes=2)
#根据任务实例
job.pause()
job.resume()

# 根据任务id暂停

scheduler.add_job(myfunc, ‘interval‘, minutes=2, id=‘my_job_id‘)
scheduler.pause_job(‘my_job_id‘)
4、任务的修饰modify和重设reschedule_job

修饰:job.modify(max_instances=6, name=‘Alternate name‘)

重设:scheduler.reschedule_job(‘my_job_id‘, trigger=‘cron‘, minute=‘*/5‘)

5、调度器操作

开启:scheduler.start()

关闭:scheduler.shotdown(wait=True | False)

暂停:scheduler.pause()

继续:scheduler.resume()

监听:http://apscheduler.readthedocs.io/en/v3.3.0/modules/events.html#module-apscheduler.events

def my_listener(event):
    if event.exception:
        print(‘The job crashed :(‘)
    else:
        print(‘The job worked :)‘)
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

官方实例

from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
jobstores = {
    ‘mongo‘: MongoDBJobStore(),
    ‘default‘: SQLAlchemyJobStore(url=‘sqlite:///jobs.sqlite‘)
}
executors = {
    ‘default‘: ThreadPoolExecutor(20),
    ‘processpool‘: ProcessPoolExecutor(5)
}
job_defaults = {
    ‘coalesce‘: False,
    ‘max_instances‘: 3
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors,
job_defaults=job_default

原文地址:https://www.cnblogs.com/DBA-3306/p/11013421.html

时间: 2024-11-14 12:27:31

apscheduler 定时任务框架的相关文章

APScheduler定时任务框架

1.简介 APScheduler是一个Python**定时任务框架**,提供了**基于日期**.**固定时间间隔**以及**crontab**类型的任务,并且可以**持久化任务**.基于这些功能,我们可以很方便的实现一个python定时任务系统. 2.安装 pip install apscheduler 3.组成部分 触发器(triggers):触发器包含调度逻辑,描述一个任务何时被触发,按日期或按时间间隔或按 cronjob 表达式三种方式触发.每个作业都有它自己的触发器,除了初始配置之外,触

[转]Python定时任务框架APScheduler

APScheduler是基于Quartz的 一个Python定时任务框架,实现了Quartz的所有功能,使用起来十分方便.提供了基于日期.固定时间间隔以及crontab类型的任务,并且可以 持久化任务.基于这些功能,我们可以很方便的实现一个python定时任务系统,写python还是要比java舒服多了. 1. 安装 安装过程很简单,可以基于easy_install和源码. easy_install apscheduler 或者下载源码,运行命令: python setup.py install

Python定时任务框架APScheduler 3.0.3 Cron示例

APScheduler是基于Quartz的一个Python定时任务框架,实现了Quartz的所有功能,使用起来十分方便.提供了基于日期.固定时间间隔以及crontab类型的任务,并且可以持久化任务.基于这些功能,我们可以很方便的实现一个python定时任务系统. 安装 安装过程很简单,可以基于pip和源码. Pip install apscheduler==3.0.3 或者下载源码,运行命令: Python setup.py install cron job例子 1: #coding=utf-8

Python下定时任务框架APScheduler的使用

1.APScheduler简介: APScheduler是Python的一个定时任务框架,可以很方便的满足用户定时执行或者周期执行任务的需求,它提供了基于日期date.固定时间间隔interval .以及类似于Linux上的定时任务crontab类型的定时任务.并且该框架不仅可以添加.删除定时任务,还可以将任务存储到数据库中,实现任务的持久化,所以使用起来非常方便. 2.APScheduler安装: APScheduler的安装相对来说也非常简单,可以直接利用pip安装,如果没有pip可以下载源

定时任务框架APScheduler学习详解

APScheduler简介 在平常的工作中几乎有一半的功能模块都需要定时任务来推动,例如项目中有一个定时统计程序,定时爬出网站的URL程序,定时检测钓鱼网站的程序等等,都涉及到了关于定时任务的问题,第一时间想到的是利用time模块的time.sleep()方法使程序休眠来达到定时任务的目的,虽然这样也可以,但是总觉得不是那么的专业,^_^所以就找到了python的定时任务模块APScheduler: APScheduler基于Quartz的一个Python定时任务框架,实现了Quartz的所有功

[Dynamic Language] Python定时任务框架

APScheduler是一个Python定时任务框架,使用起来十分方便.提供了基于日期.固定时间间隔以及crontab类型的任务,并且可以持久化任务.并以daemon方式运行应用. 测试计划任务 mac-abeen:timetask abeen$ vim testtask.py 1 # !/usr/bin/env python 2 # -*- encoding:utf-8 -*- 3 4 from datetime import datetime 5 6 7 class TestTask(obj

atititt.java定时任务框架选型Spring Quartz 注解总结

atititt.java定时任务框架选型Spring Quartz 总结 1. .Spring Quartz  (ati recomm) 1 2. Spring Quartz具体配置 2 2.1. 增加context,task命名空间xml: 2 2.2. 增加xsi:schemaLocation valide 2 2.3. 我们的task任务扫描注解in spr.xml 2 2.4. 设置运行方法 3 2.5. 设置输出日志 3 3. 运行测试sprX走ok兰. 4 4. Quartz Sch

QUARTZ.NET 一个定时任务框架

<1> Quartz.NET-1.0.3文件下载地址 (这是老版本了,现在已经有新版本了.用法好像不一样了) 首先要添加Quartz.NET-1.0.3 文件下面的  bin/3.5/Release/Quartz/ 目录下面的Common.Logging.dll文件和Quartz.dll文件 然后添加引用.将连个文件引入到项目中来 using Quartz; using Quartz.Impl; using System; using System.Collections.Generic; u

Quartz.Net的使用(简单配置方法)定时任务框架

Quartz.dll 安装nuget在线获取dll包管理器,从中获取最新版 Quartz.Net是一个定时任务框架,可以实现异常灵活的定时任务,开发人员只要编写少量的代码就可以实现“每隔1小时执行”.“每天22点执行”.“每月18日的下午执行8次”等各种定时任务. Quartz.Net中的概念:计划者(IScheduler).工作(IJob).触发器(Trigger).给计划者一个工作,让他在Trigger(什么条件下做这件事)触发的条件下执行这个工作 将要定时执行的任务的代码写到实现IJob接