使用sched库完成周期定时任务

我们经常需要定时的执行某个任务,在Linux下我们有强大的crontab

我们经常需要定时执行某个任务,在linux下有强大的crontab,在python里除了第三方模块外,python自带了sched模块和Timer类,用于完成周期任务。timer类在threading模块里。

下面主要说明sched模块的使用

scheduler.scheduler(timefunc, delayfunc)

scheduler对像有如下方法和属性:

scheduler.enterabs(time,priority,action,argument)   生成一个调度事件,它的返回值可用于取消这个调度事件.相同时间执行的事件将按它们的优先级顺序处理

scheduler.enter(delay,priority,action,argument)     生成一个调度事件,返回值和enterabs一样

scheduler.cancel(event)    取消一个调度事件,参数为enterabs,enter的返回值。

scheduler.empty()    判断调度事件队列是否为空,空则返回True

scheduler.run()     运行所有调度事件,阻塞等待完成。这个方法使用构造方法中的delayfunc阻塞等下一个事件,然后执行它直至完成调度队列里的所有调度事件。cancel方法和run方法需不在一个线程里,如果在同一个线程需要在run之前cancel调度事件

scheduler.queue    调度队列

sched模块是一个延时调度处理模块,每次要执行某个任务时都必须写入一个调度任务(调用的方法)。

使用如下:

  1. 生成一个调度器:

    import time,sched

    s=sched.scheduler(time.time,time.sleep)

    说明:第一个参数必须是不带参数的时间函数,并且必须返回数字,比如此例返回的是一个时间戳,第二个参数是阻塞函数,该函数需带一个参数,兼容时间函数的输出。

  2. 添加第一调度事件:

    添加调度事件的方法有2个enter和enterabs,以enter为例:

    def print_time():

print "From print_time", time.time()

s.enter(5,1,print_time,())

3.执行

s.run()

 需要注意的是sched模块不是循环的,一次调度被执行完后就结束了,如果要再执行,请再次enter, enter后也无需再次执行run.若在一个方法里重复调用run,当超过最大的递归深度时会导致异常,终止任务执行。

#! /usr/bin/env python
#coding=utf-8
import time, os, sched 
    
# 第一个参数确定任务的时间,返回从某个特定的时间到现在经历的秒数 
# 第二个参数以某种人为的方式衡量时间 
schedule = sched.scheduler(time.time, time.sleep) 
    
def run_command(cmd, inc): 
    # 安排inc秒后再次运行自己,即周期运行 
    schedule.enter(inc, 0, run_command, (cmd, inc)) 
    os.system(cmd) 
        
def timming_exe(cmd, inc = 60): 
    # enter用来安排某事件的发生时间,从现在起第n秒开始启动 
    schedule.enter(inc, 0, run_command, (cmd, inc)) 
    # 持续运行,直到计划时间队列变成空为止 
    schedule.run() 
        
    
print("show time after 10 seconds:") 
if os.name == "nt":
    timming_exe("echo %time%", 10)
elif os.name=="posix":
    timming_exe("echo `date`", 10)
时间: 2024-11-03 00:02:27

使用sched库完成周期定时任务的相关文章

HTTP API 自动化测试从手工测试到平台的演变

不管是 Web 系统,还是移动 APP,前后端逻辑的分离设计已经是常态化,相互之间通过 API 调用进行数据交互.在基于 API 约定的开发模式下,如何加速请求 / 响应的 API 测试,让研发人员及早参与到调试中来呢?既然 API 是基于约定开发,为何不按照这个规范编写测试用例,直接进入待测试状态,使用自动化的方式来推进研发过程的质量改进呢?遵循:测试 -> 重构 -> 测试 -> 重构,这样的闭环,过程产出的质量会更加可控,在重构的同时进行快速的功能回归验证,大大提高效率.下面主要讲

python第三方库系列之十四--集群化部署定时任务apscheduler库

如果将定时任务部署在一台服务器上,那么这个定时任务就是整个系统的单点,这台服务器出现故障的话会影响服务.对于可以冗余的任务(重复运行不影响服务),可以部署在多台服务器上,让他们同时执行,这样就可以很简单的避免单点.但是如果任务不允许冗余,最多只能有一台服务器执行任务,那么前面的方法显然行不通.本篇文章就向大家介绍如何避免这种互斥任务的单点问题,最后再介绍一下基于APScheduler的分布式定时任务框架,这个框架是通过多个项目的实践总结而成的. 对于运行在同一台服务器上的两个进程,可以通过加锁实

python标准库之sched 定时事件调度器

# -*- coding: utf-8 -*-# 作者:新手__author__ = 'Administrator'#py标准库之schedimport timeimport sched#定时事件调度器#使用time来掌握当前时间,还有一个是延迟(delay)来指定一个时间段#调用time是不带任何参数的,返回当前一个时间数,delay要提供一个整数的参数,#有延迟运行事件"""4个方法1:表示延迟的数:2:优先级值:3:要调用的函数;4:函数参数的元组""

python定时任务-sched模块

通过sched模块可以实现通过自定义时间,自定义函数,自定义优先级来执行函数. 范例一 1 import time 2 import sched 3 4 schedule = sched.scheduler( time.time,time.sleep) 5 6 def func(string1): 7 print "now excuted func is %s"%string1 8 9 print "start" 10 schedule.enter(2,0,func

python使用sched模块执行周期性任务和定时任务

执行周期性任务 sched模块是一个通用的事件调度程序,可以对任务进行延迟调度,基于此,可以用它来实现周期性任务. # coding:utf8 import time import sched # 初始化scheduler类 # 第一个参数是一个可以返回时间戳的函数,第二个参数可以在定时未到达之前阻塞. s = sched.scheduler(time.time, time.sleep) # 被周期性调度的任务 def task(): print("run time: {}".form

Python标准库笔记(5) — sched模块

事件调度 sched模块内容很简单,只定义了一个类.它用来最为一个通用的事件调度模块. class sched.scheduler(timefunc, delayfunc)这个类定义了调度事件的通用接口,它需要外部传入两个参数,timefunc是一个没有参数的返回时间类型数字的函数(常用使用的如time模块里面的time),delayfunc应该是一个需要一个参数来调用.与timefunc的输出兼容.并且作用为延迟多个时间单位的函数(常用的如time模块的sleep). 下面是一个列子: imp

mysql定时任务按天建表并跨库同步数据

创建定时任务完成:创建ASR识别记录表,每天自动从小云AI对话详情表同步数据.*/DROP PROCEDURE IF EXISTS `create_o_asr_record_call`;DELIMITER ;;CREATE PROCEDURE `create_o_asr_record_call`(IN `dayInt` bigint,out result int) COMMENT 'ASR识别结果表--按日--建表'BEGIN set @sql_tmp3 = CONCAT('create tab

apscheduler 定时任务框架

一.APScheduler简介: Python的一个定时任务框架,满足用户定时执行或者周期性执行任务的需求,提供了基于日期date.固定的时间间隔interval.以及类似于Linux上的定时任务crontab类型的定时任务.并且该框架不仅可以添加.删除定时任务,还可以将任务存储到数据库中,实现任务的持久化. Python的第三方库,用来提供Python的后台程序.包含四个组件,分别是: triggers:任务触发器组件,提供任务触发方式 job stores: 任务商店组件,提供任务保存方式

python的sched模块--延时调度

我们经常需要定时的执行某个任务,在Linux下我们有强大的crontab,但是在Python这个粒度(定时执行函数),如何处理呢?除了第三方的模块外,标准库为我们提供了sched模块和Timer类. 先说sched模块,准确的说,它是一个调度(延时处理机制),每次想要定时执行某任务都必须写入一个调度.使用步骤如下:(1)生成调度器:s = sched.scheduler(time.time,time.sleep)第一个参数是一个可以返回时间戳的函数,第二个参数可以在定时未到达之前阻塞.可以说sc