Python下定时任务框架APScheduler的使用

1.APScheduler简介:

APScheduler是Python的一个定时任务框架,可以很方便的满足用户定时执行或者周期执行任务的需求,它提供了基于日期date、固定时间间隔interval 、以及类似于Linux上的定时任务crontab类型的定时任务。并且该框架不仅可以添加、删除定时任务,还可以将任务存储到数据库中,实现任务的持久化,所以使用起来非常方便。

2.APScheduler安装:

APScheduler的安装相对来说也非常简单,可以直接利用pip安装,如果没有pip可以下载源码,利用源码安装。

1).利用pip安装:(推荐)

# pip install apscheduler

2).基于源码安装:https://pypi.python.org/pypi/APScheduler/

# python setup.py install

3.基本概念

APScheduler有四种组件及相关说明:

1) triggers(触发器):触发器包含调度逻辑,每一个作业有它自己的触发器,用于决定接下来哪一个作业会运行,除了他们自己初始化配置外,触发器完全是无状态的。

2)job stores(作业存储):用来存储被调度的作业,默认的作业存储器是简单地把作业任务保存在内存中,其它作业存储器可以将任务作业保存到各种数据库中,支持MongoDB、Redis、SQLAlchemy存储方式。当对作业任务进行持久化存储的时候,作业的数据将被序列化,重新读取作业时在反序列化。

3) executors(执行器):执行器用来执行定时任务,只是将需要执行的任务放在新的线程或者线程池中运行。当作业任务完成时,执行器将会通知调度器。对于执行器,默认情况下选择ThreadPoolExecutor就可以了,但是如果涉及到一下特殊任务如比较消耗CPU的任务则可以选择ProcessPoolExecutor,当然根据根据实际需求可以同时使用两种执行器。

4) schedulers(调度器):调度器是将其它部分联系在一起,一般在应用程序中只有一个调度器,应用开发者不会直接操作触发器、任务存储以及执行器,相反调度器提供了处理的接口。通过调度器完成任务的存储以及执行器的配置操作,如可以添加。修改、移除任务作业。  

   APScheduler提供了多种调度器,可以根据具体需求来选择合适的调度器,常用的调度器有:

BlockingScheduler:适合于只在进程中运行单个任务的情况,通常在调度器是你唯一要运行的东西时使用。

BackgroundScheduler: 适合于要求任何在程序后台运行的情况,当希望调度器在应用后台执行时使用。

AsyncIOScheduler:适合于使用asyncio框架的情况

GeventScheduler: 适合于使用gevent框架的情况

TornadoScheduler: 适合于使用Tornado框架的应用

TwistedScheduler: 适合使用Twisted框架的应用

QtScheduler: 适合使用QT的情况

   1)下面一个简单的示例:

      import time
      from apscheduler.schedulers.blocking import BlockingScheduler
      def test_job():
        print time.strftime(‘%Y-%m-%d %H:%M:%S‘, time.localtime(time.time()))
      scheduler = BlockingScheduler()
      ‘‘‘
      #该示例代码生成了一个BlockingScheduler调度器,使用了默认的默认的任务存储MemoryJobStore,以及默认的执行器ThreadPoolExecutor,并且最大线程数为10。
      ‘‘‘
      scheduler.add_job(test_job, ‘interval‘, seconds=5, id=‘test_job‘)
      ‘‘‘
      #该示例中的定时任务采用固定时间间隔(interval)的方式,每隔5秒钟执行一次。
      #并且还为该任务设置了一个任务id    scheduler.start()

  2)如果想执行一些复杂任务,如上边所说的同时使用两种执行器,或者使用多种任务存储方式,并且需要根据具体情况对任务的一些默认参数进行调整。可以参考下面的方式。(源码解析:http://apscheduler.readthedocs.io/en/latest/userguide.html)

第一种方式:

      from pytz import utc
      from apscheduler.schedulers.background import BackgroundScheduler  # 导入调度器      from apscheduler.jobstores.mongodb import MongoDBJobStore          # 导入作业存储      from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore    # 导入作业存储      from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor  # 导入执行器      jobstores = {
        ‘mongo‘: MongoDBJobStore(),
        ‘default‘: SQLAlchemyJobStore(url=‘sqlite:///jobs.sqlite‘)
      }
      executors = {
        ‘default‘: ThreadPoolExecutor(20),
        ‘processpool‘: ProcessPoolExecutor(5)
      }
      job_defaults = {
        ‘coalesce‘: False,
        ‘max_instances‘: 3
      }
      scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

第二种方式:

      from apscheduler.schedulers.background import BackgroundScheduler
      scheduler = BackgroundScheduler({
        ‘apscheduler.jobstores.mongo‘: {
          ‘type‘: ‘mongodb‘
        },
        ‘apscheduler.jobstores.default‘: {
          ‘type‘: ‘sqlalchemy‘,
          ‘url‘: ‘sqlite:///jobs.sqlite‘
        },
        ‘apscheduler.executors.default‘: {
          ‘class‘: ‘apscheduler.executors.pool:ThreadPoolExecutor‘,
          ‘max_workers‘: ‘20‘
        },
        ‘apscheduler.executors.processpool‘: {
          ‘type‘: ‘processpool‘,
          ‘max_workers‘: ‘5‘
        },
        ‘apscheduler.job_defaults.coalesce‘: ‘false‘,
        ‘apscheduler.job_defaults.max_instances‘: ‘3‘,
        ‘apscheduler.timezone‘: ‘UTC‘,
      })

第三种方式:

      from pytz import utc
      from apscheduler.schedulers.background import BackgroundScheduler
      from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
      from apscheduler.executors.pool import ProcessPoolExecutor
      jobstores = {
        ‘mongo‘: {‘type‘: ‘mongodb‘},
        ‘default‘: SQLAlchemyJobStore(url=‘sqlite:///jobs.sqlite‘)
      }
      executors = {
        ‘default‘: {‘type‘: ‘threadpool‘, ‘max_workers‘: 20},
        ‘processpool‘: ProcessPoolExecutor(max_workers=5)
      }
      job_defaults = {
        ‘coalesce‘: False,
        ‘max_instances‘: 3
      }
      scheduler = BackgroundScheduler()
      scheduler.configure(jobstores=jobstores, executors=executors,job_defaults=job_defaults, timezone=utc)

5.对任务作业的基本操作:

1).添加作业有两种方式:第一种可以直接调用add_job(),第二种使用scheduled_job()修饰器。

而add_job()是使用最多的,它可以返回一个apscheduler.job.Job实例,因而可以对它进行修改或者删除,而使用修饰器添加的任务添加之后就不能进行修改。

      #!/usr/bin/env python
      #-*- coding:UTF-8
      import time
      import datetime
      from apscheduler.schedulers.blocking import BlockingScheduler
      def job1(f):
          print time.strftime(‘%Y-%m-%d %H:%M:%S‘, time.localtime(time.time())), f
      def job2(arg1, args2, f):
          print f, args1, args2
      def job3(**args):
          print args

APScheduler支持以下三种定时任务:

cron: crontab类型任务

interval: 固定时间间隔任务

date: 基于日期时间的一次性任务

scheduler = BlockingScheduler()

#循环任务示例

scheduler.add_job(job1, ‘interval‘, seconds=5, args=(1,), id=‘test_job1‘)

#定时任务示例

scheduler.add_job(job1, ‘cron‘, second=‘*/5‘, args=(1,2,3,), id=‘test_job2‘)

#一次性任务示例

scheduler.add_job(job1, next_run_time=(datetime.datetime.now() + datetime.timedelta(seconds=10)), args=(1,), id=‘test_job3‘)

传递参数的方式有元组(tuple)、列表(list)、字典(dict)

注意:不过需要注意采用元组传递参数时后边需要多加一个逗号

#基于list

scheduler.add_job(job2, ‘interval‘, seconds=5, args=[‘a‘,‘b‘,‘list‘], id=‘test_job4‘)

#基于tuple

scheduler.add_job(job2, ‘interval‘, seconds=5, args=(‘a‘,‘b‘,‘tuple‘,), id=‘test_job5‘)

#基于dict

scheduler.add_job(job3, ‘interval‘, seconds=5, kwargs={‘f‘:‘dict‘, ‘a‘:1,‘b‘:2}, id=‘test_job6)

print scheduler.get_jobs()

scheduler.start()

或者使用scheduled_job()修饰器来添加作业:

@sched.scheduled_job(‘cron‘, second=‘*/5‘ ,id=‘my_job_id‘,)

def test_task():

  print("Hello world!")

2).获得任务列表:

可以通过get_jobs方法来获取当前的任务列表,也可以通过get_job()来根据job_id来获得某个任务的信息。并且apscheduler还提供了一个print_jobs()方法来打印格式化的任务列表。

例如:

scheduler.add_job(my_job, ‘interval‘, seconds=5, id=‘my_job_id‘ name=‘test_job‘)

print scheduler.get_job(‘my_job_id‘)

print scheduler.get_jobs()

3).修改任务:

修改任务的属性可以使用apscheduler.job.Job.modify()或者modify_job()方法,可以修改除了id的其它任何属性。

例如:

job = scheduler.add_job(my_job, ‘interval‘, seconds=5, id=‘my_job‘ name=‘test_job‘)

job.modify(max_instances=5, name=‘my_job‘)

4).删除任务:

删除调度器中的任务有可以用remove_job()根据job ID来删除指定任务或者使用remove(),如果使用remove()需要事先保存在添加任务时返回的实例对象,任务删除后就不会在执行。

注意:通过scheduled_job()添加的任务只能使用remove_job()进行删除。

例如:

job = scheduler.add_job(my_job, ‘interval‘, seconds=5, id=‘my_job_id‘ name=‘test_job‘)

job.remove()

或者

scheduler.add_job(my_job, ‘interval‘, seconds=5, id=‘my_job_id‘ name=‘test_job‘)

scheduler.remove_job(‘my_job‘)

5).暂停与恢复任务:

暂停与恢复任务可以直接操作任务实例或者调度器来实现。当任务暂停时,它的运行时间会被重置,暂停期间不会计算时间。

暂停任务:

apscheduler.job.Job.pause()

apscheduler.schedulers.base.BaseScheduler.pause_job()

恢复任务

apscheduler.job.Job.resume()

apscheduler.schedulers.BaseScheduler.resume_job()

6).启动调度器

可以使用start()方法启动调度器,BlockingScheduler需要在初始化之后才能执行start(),对于其他的Scheduler,调用start()方法都会直接返回,然后可以继续执行后面的初始化操作。

例如:

from apscheduler.schedulers.blocking import BlockingScheduler

def my_job():

   print "Hello world!"

scheduler = BlockingScheduler()

scheduler.add_job(my_job, ‘interval‘, seconds=5)

scheduler.start()

7).关闭调度器:

使用下边方法关闭调度器:

scheduler.shutdown()

默认情况下调度器会关闭它的任务存储和执行器,并等待所有正在执行的任务完成,如果不想等待,可以进行如下操作:

scheduler.shutdown(wait=False)

注意:

当出现No handlers could be found for logger “apscheduler.scheduler”次错误信息时,说明没有 logging模块的logger存在,所以需要添加上,对应新增内容如下所示(仅供参):

import logging

logging.basicConfig(

    level=logging.DEBUG,

  format=‘%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s‘,

  datafmt=‘%a, %d %b %Y %H:%M:%S‘,

  filename=‘/var/log/aaa.txt‘,

  filemode=‘a‘

  )

时间: 2024-10-02 22:57:15

Python下定时任务框架APScheduler的使用的相关文章

[转]Python定时任务框架APScheduler

APScheduler是基于Quartz的 一个Python定时任务框架,实现了Quartz的所有功能,使用起来十分方便.提供了基于日期.固定时间间隔以及crontab类型的任务,并且可以 持久化任务.基于这些功能,我们可以很方便的实现一个python定时任务系统,写python还是要比java舒服多了. 1. 安装 安装过程很简单,可以基于easy_install和源码. easy_install apscheduler 或者下载源码,运行命令: python setup.py install

定时任务框架APScheduler学习详解

APScheduler简介 在平常的工作中几乎有一半的功能模块都需要定时任务来推动,例如项目中有一个定时统计程序,定时爬出网站的URL程序,定时检测钓鱼网站的程序等等,都涉及到了关于定时任务的问题,第一时间想到的是利用time模块的time.sleep()方法使程序休眠来达到定时任务的目的,虽然这样也可以,但是总觉得不是那么的专业,^_^所以就找到了python的定时任务模块APScheduler: APScheduler基于Quartz的一个Python定时任务框架,实现了Quartz的所有功

Python定时任务框架APScheduler 3.0.3 Cron示例

APScheduler是基于Quartz的一个Python定时任务框架,实现了Quartz的所有功能,使用起来十分方便.提供了基于日期.固定时间间隔以及crontab类型的任务,并且可以持久化任务.基于这些功能,我们可以很方便的实现一个python定时任务系统. 安装 安装过程很简单,可以基于pip和源码. Pip install apscheduler==3.0.3 或者下载源码,运行命令: Python setup.py install cron job例子 1: #coding=utf-8

python下django框架项目生成的文件解释

目录MyDjangoProject下表示工程的全局配置,分别为setttings.py.urls.py和wsgi.py,1.其中setttings.py包括了系统的数据库配置.应用配置和其他配置,2.urls.py则表示web工程Url映射的配置.3.子目录student则是在该工程下创建的app,包含了models.py.tests.py和views.py等文件4.templates目录则为模板文件的目录5.manage.py是Django提供的一个管理工具,可以同步数据库等等

Python下APScheduler的简单使用

今天准备实现一个功能需要用到定时执行任务,所以就看到了Python的一个定时任务框架APScheduler,试了一下感觉还不错. 1.APScheduler简介: APScheduler是Python的一个定时任务框架,可以很方便的满足用户定时执行或者周期执行任务的需求,它提供了基于日期date.固定时间间隔interval .以及类似于Linux上的定时任务crontab类型的定时任务.并且该框架不仅可以添加.删除定时任务,还可以将任务存储到数据库中,实现任务的持久化,所以使用起来非常方便.

apscheduler 定时任务框架

一.APScheduler简介: Python的一个定时任务框架,满足用户定时执行或者周期性执行任务的需求,提供了基于日期date.固定的时间间隔interval.以及类似于Linux上的定时任务crontab类型的定时任务.并且该框架不仅可以添加.删除定时任务,还可以将任务存储到数据库中,实现任务的持久化. Python的第三方库,用来提供Python的后台程序.包含四个组件,分别是: triggers:任务触发器组件,提供任务触发方式 job stores: 任务商店组件,提供任务保存方式

APScheduler定时任务框架

1.简介 APScheduler是一个Python**定时任务框架**,提供了**基于日期**.**固定时间间隔**以及**crontab**类型的任务,并且可以**持久化任务**.基于这些功能,我们可以很方便的实现一个python定时任务系统. 2.安装 pip install apscheduler 3.组成部分 触发器(triggers):触发器包含调度逻辑,描述一个任务何时被触发,按日期或按时间间隔或按 cronjob 表达式三种方式触发.每个作业都有它自己的触发器,除了初始配置之外,触

APScheduler轻量级定时任务框架

目录 一.APScheduler简介 支持的后端存储作业 集成的Python框架 二.APScheduler下载安装 三.APScheduler组件 各组件简介 调度器 作业存储器 执行器 触发器 四.使用 添加任务 指定时间执行任务,只执行一次 间隔时间执行任务 一.APScheduler简介 APScheduler(Advanced Python Scheduler)是一个轻量级的Python定时任务调度框架(Python库). APScheduler有三个内置的调度系统,其中包括: cro

[Dynamic Language] Python定时任务框架

APScheduler是一个Python定时任务框架,使用起来十分方便.提供了基于日期.固定时间间隔以及crontab类型的任务,并且可以持久化任务.并以daemon方式运行应用. 测试计划任务 mac-abeen:timetask abeen$ vim testtask.py 1 # !/usr/bin/env python 2 # -*- encoding:utf-8 -*- 3 4 from datetime import datetime 5 6 7 class TestTask(obj