Go组件学习——cron定时器

1 前言

  转到Go已经将近三个月,写业务代码又找到了属于Go的条件反射了。

  后置声明和多参数返回这些Go风格代码写起来也不会那么蹩脚,甚至还有点小适应~

  反而,前几天在写Java的时候,发现Java怎么启动这么慢,Java怎么能够容忍这些用不到的代码还理直气壮的躺在那……等等,这些话在哪听过类似的???

  “Go为什么要后置声明,多别扭啊”

  “Go里面为啥要定义这么多的struct,看的头晕”

  ……

  其实,没有最好的语言,只有最适合的。

  前面《Go语言学习》系列主要介绍了一些Go的基础知识和相较于Java的一些新特性。后续如果有相关的体会和新的还会继续更新。

  从这篇开始,开始学习Go的一些工具类库和开源组件,希望在学习这些优秀的开源项目过程中,更深入的了解Go,发现Go的威力。

2 cron简介

  robfig/cron是一个第三方开源的任务调度库,也就是我们平时说的定时任务。

  Github:https://github.com/robfig/cron

  官方文档:https://godoc.org/github.com/robfig/cron

3 cron如何使用

1、新建文件cron-demo.go

package main

import (
	"fmt"
	"github.com/robfig/cron"
	"time"
)

func main() {
	c := cron.New()
	c.AddFunc("*/3 * * * * *", func() {
		fmt.Println("every 3 seconds executing")
	})

	go c.Start()
	defer c.Stop()

	select {
	case <-time.After(time.Second * 10):
		return
	}
}
  • cron.New创建一个定时器管理器
  • c.AddFunc添加一个定时任务,第一个参数是cron时间表达式,第二个参数是要触发执行的函数
  • go c.Start()新启一个协程,运行定时任务
  • c.Stop是等待停止信号结束任务

2、在cron-demo.go文件下执行go build

本项目采用go mod进行包管理,所以执行go build命令后,会在go.mod文件中生成对应的依赖版本如图所示

3、运行cron-demo.go

  可以看出每3秒执行一次,直到10秒后过期退出进程,任务结束。

  代码参见项目:go-demo项目(https://github.com/DMinerJackie/go-demo/tree/master/main/src/cron

看上去这个任务调度还是蛮好用的,那么具体是如何实现的呢,看了下源码,也是非常的短小精悍,目录结构如下。

下面通过几个问题一起看下cron是如何实现任务调度。

4 cron如何解析任务表达式

  上例我们看到添加“*/3 * * * * *”这样的表达式,就能实现每3秒执行一次。

  显然,这个表达式只是对人友好的一种约定表达形式,要真正在指定时间执行任务,cron肯定是要读取并解析这个c表达式,转化为具体的时间再执行。

  那我们来看看,这个具体是如何执行的。

  进入AddFunc函数实现

// AddFunc adds a func to the Cron to be run on the given schedule.
func (c *Cron) AddFunc(spec string, cmd func()) error {
	return c.AddJob(spec, FuncJob(cmd))
}

  

  这只是套了个壳,具体还要进入AddJob函数

// AddJob adds a Job to the Cron to be run on the given schedule.
func (c *Cron) AddJob(spec string, cmd Job) error {
	schedule, err := Parse(spec)
	if err != nil {
		return err
	}
	c.Schedule(schedule, cmd)
	return nil
}

  

  该函数第一行就是解析cron表达式,顺藤摸瓜,我们看到具体实现如下

// Parse returns a new crontab schedule representing the given spec.
// It returns a descriptive error if the spec is not valid.
// It accepts crontab specs and features configured by NewParser.
func (p Parser) Parse(spec string) (Schedule, error) {
	if len(spec) == 0 {
		return nil, fmt.Errorf("Empty spec string")
	}
	if spec[0] == ‘@‘ && p.options&Descriptor > 0 {
		return parseDescriptor(spec)
	}

	// Figure out how many fields we need
	max := 0
	for _, place := range places {
		if p.options&place > 0 {
			max++
		}
	}
	min := max - p.optionals

	// Split fields on whitespace
	fields := strings.Fields(spec)	// 使用空白符拆分cron表达式

	// Validate number of fields
	if count := len(fields); count < min || count > max {
		if min == max {
			return nil, fmt.Errorf("Expected exactly %d fields, found %d: %s", min, count, spec)
		}
		return nil, fmt.Errorf("Expected %d to %d fields, found %d: %s", min, max, count, spec)
	}

	// Fill in missing fields
	fields = expandFields(fields, p.options)

	var err error
	field := func(field string, r bounds) uint64 {	// 抽象出filed函数,方便下面调用
		if err != nil {
			return 0
		}
		var bits uint64
		bits, err = getField(field, r)
		return bits
	}

	var (
		second     = field(fields[0], seconds)
		minute     = field(fields[1], minutes)
		hour       = field(fields[2], hours)
		dayofmonth = field(fields[3], dom)
		month      = field(fields[4], months)
		dayofweek  = field(fields[5], dow)
	)
	if err != nil {
		return nil, err
	}

	return &SpecSchedule{
		Second: second,
		Minute: minute,
		Hour:   hour,
		Dom:    dayofmonth,
		Month:  month,
		Dow:    dayofweek,
	}, nil
}

  

  该函数主要是将cron表达式映射为“Second, Minute, Hour, Dom, Month, Dow”6个时间维度的结构体SpecSchedule。

  SpecSchedule是实现了方法“Next(time.Time) time.Time”的结构体,而“Next(time.Time) time.Time”是定义在Schedule接口中的

// The Schedule describes a job‘s duty cycle.
type Schedule interface {
	// Return the next activation time, later than the given time.
	// Next is invoked initially, and then each time the job is run.
	Next(time.Time) time.Time
}

  所以,最终可以理解是将cron解析后转换为下一次要执行的时刻,等待执行。

5 cron如何执行任务

  我们知道通过parser.go可以将人很好理解的表达式转换为cron可以读懂的要执行的时间。

  有了要执行的时间点,那么cron具体是如何执行这些任务的呢?

  我们看下Start函数的具体实现

// Start the cron scheduler in its own go-routine, or no-op if already started.
func (c *Cron) Start() {
	if c.running {
		return
	}
	c.running = true
	go c.run()
}

  这里会通过判定Cron的running字段是否在运行来巨额听是否要启动任务。

  显然这里running是false,因为在调用c.New初始化的时候running被设置为false。

  所以,这里新启一个协程用于执行定时任务,再次顺藤摸瓜,我们看到run函数的实现

// Run the scheduler. this is private just due to the need to synchronize
// access to the ‘running‘ state variable.
func (c *Cron) run() {
	// Figure out the next activation times for each entry.
	now := c.now()
	for _, entry := range c.entries {
		entry.Next = entry.Schedule.Next(now)
	}

	for {
		// Determine the next entry to run.
		sort.Sort(byTime(c.entries))

		var timer *time.Timer
		if len(c.entries) == 0 || c.entries[0].Next.IsZero() {	// 如果没有要执行的任务或者第一个任务的待执行时间为空,则睡眠
			// If there are no entries yet, just sleep - it still handles new entries
			// and stop requests.
			timer = time.NewTimer(100000 * time.Hour)
		} else {
			timer = time.NewTimer(c.entries[0].Next.Sub(now))	// 否则新建一个距离现在到下一个要触发执行的Timer
		}

		for {
			select {
			case now = <-timer.C:	// 触发时间到,执行任务
				now = now.In(c.location)
				// Run every entry whose next time was less than now
				for _, e := range c.entries {
					if e.Next.After(now) || e.Next.IsZero() {
						break
					}
					go c.runWithRecovery(e.Job)
					e.Prev = e.Next
					e.Next = e.Schedule.Next(now)
				}

			case newEntry := <-c.add:	// 添加任务
				timer.Stop()
				now = c.now()
				newEntry.Next = newEntry.Schedule.Next(now)
				c.entries = append(c.entries, newEntry)

			case <-c.snapshot:	// 调用c.Entries()返回一个现有任务列表的snapshot
				c.snapshot <- c.entrySnapshot()
				continue

			case <-c.stop:	// 任务结束,退出
				timer.Stop()
				return
			}

			break
		}
	}
}
  • 进入该函数,首先遍历所以任务,找到所有任务下一个要执行的时间。
  • 然后进入外层for循环,对于各个任务按照执行时间进行排序,保证离当前时间最近的先执行。
  • 再对任务列表进行判定,是否有任务如果没有,则休眠,否则初始化一个timer。

里层的for循环才是重头戏,下面主要分析这个for循环里面的任务加入和执行。

在此之前,需要了解下go标准库的timer

timer用于指定在某个时间间隔后,调用函数或者表达式。

使用NewTimer就可以创建一个Timer,在指定时间间隔到达后,可以通过<-timer.C接收值。

package main

import (
	"fmt"
	"time"
)

func main() {
	timer1 := time.NewTimer(2 * time.Second)

	<-timer1.C
	fmt.Println("Timer 1 expired")

	timer2 := time.NewTimer(time.Second)
	go func() {
		<-timer2.C
		fmt.Println("Timer 2 expired")
	}()

	stop2 := timer2.Stop()
	if stop2 {
		fmt.Println("Timer 2 stopped")
	}
}

  执行结果为

Timer 1 expired
Timer 2 stopped

  timer1表示2秒后到期,在此之前都是阻塞状态,2秒后<-timer1.C接收到信号,执行下面的打印语句。

  timer2表示1秒后到期,但是中途被Stop掉了,相当于清除了定时功能。

  有了这个背景之后,我们再来看run函数的里层for循环。

  接收到c.add信道

case newEntry := <-c.add:	// 添加任务
	timer.Stop()
	now = c.now()
	newEntry.Next = newEntry.Schedule.Next(now)
	c.entries = append(c.entries, newEntry)

  将timer停掉,清除设置的定时功能,并以当前时间点为起点,设置添加任务的下一次执行时间,并添加到entries任务队列中。

  接收到timer.C信道

case now = <-timer.C:	// 触发时间到,执行任务
	now = now.In(c.location)
    // Run every entry whose next time was less than now
    for _, e := range c.entries {
    	if e.Next.After(now) || e.Next.IsZero() {
    		break
    	}
    go c.runWithRecovery(e.Job)
    e.Prev = e.Next
    e.Next = e.Schedule.Next(now)
    }

  当定任务到点后,time.C就会接收到值,并新开协程执行真正需要执行的Job,之后再更新下一个要执行的任务列表。

  我们进入runWithRecovery函数,该函数从函数名就可以看出,即使出现panic也可以重新recovery,保证其他任务不受影响。

func (c *Cron) runWithRecovery(j Job) {
	defer func() {
		if r := recover(); r != nil {
			const size = 64 << 10
			buf := make([]byte, size)
			buf = buf[:runtime.Stack(buf, false)]
			c.logf("cron: panic running job: %v\n%s", r, buf)
		}
	}()
	j.Run()
}

追根溯源,我们发现真正执行Job的是j.Run()的执行。进入这个Run函数的实现,我们看到

func (f FuncJob) Run() { f() }

没错,我们要执行的任务一直从AddFunc一直往下传递,直到这里,我们通过调用Run函数,将包装的FuncJob类型的函数通过f()的形式进行执行。

这里说的可能比较模糊,举个例子,Go里面的闭包定义

func () {
    fmt.Println("test")
}()

如果这里定义后面没有"()"该函数就不会执行,所以结合这个看上面的定时任务是如何执行就更容易理解了。

6 代码阅读体会

1、channel的奥妙

  通过channel可以让感知变得轻而易举,比如timer.C就像是时间到了,自然会有人来敲门告诉你。而不需要我们自己主动去获取是否到期了。

2、常用类库的使用

  比如在parser里面我们看到了"fields := strings.Fields(spec)",在日常开发中,我们可以灵活使用这些API,避免自己造轮子的情况。

3、多思考

  之前做Java的时候,更多的是沉浸在各种工具和框架的使用,对于这些工具和框架的实现关注的不多。比如从Quartz到Spring Job,我们需要更新的是越来越好用的定时任务工具,而底层的实现升级Spring都帮我们考虑好了。这种对业务对项目有友好的,可以快速的实现业务功能开发,但是对于开发者并不友好,友好的设计麻痹了开发者对于底层原理的深究的欲望。

?

原文地址:https://www.cnblogs.com/bigdataZJ/p/go-opensource-cron.html

时间: 2024-10-06 21:05:13

Go组件学习——cron定时器的相关文章

JavaScript学习05 定时器

JavaScript学习05 定时器 定时器1 用以指定在一段特定的时间后执行某段程序. setTimeout(): 格式:[定时器对象名=] setTimeout(“<表达式>”,毫秒) 功能:执行<表达式>一次. 例子: <!DOCTYPE html> <html> <head> <title>timer1.html</title> <meta http-equiv="keywords" co

Android组件学习之ExpandableListView

一个简单的小例子: 可以展开的ListView,和Listview差不多,只是设置的Adapter不同.常用的Adapter有BaseExpandableListAdapter.SimpleExpandableListAdapter.SimpleCursorTreeAdapter 布局如下:(布局中我设置了android:groupIndicator,不知道为什么不起作用.另外,android:dividerHeight这个属性是组对象和子节点共用的.如果要定义比较复杂的组视图及子节点视图,还是

Indy10 控件的使用(2)TidTCpServer组件学习

Indy10 控件的使用(2)TidTCpServer组件学习 (2012-05-18 15:16:53) 转载▼ 标签: indy10 lazarus idtcpserver 分类: Indy10 以下来自英文原版帮助文件,文桓英语不好,翻译了老半天.有错误的地方见谅,别骂我. TIdTCPServer = class(TIdComponent) Description TIdTCPServer is a TIdComponent descendant that encapsulates a

GitHub--FoldAbleLayout可折叠组件学习(二)

接上文: GitHub–FoldAbleLayout可折叠组件学习(一) 遗留问题 同样是使用Picasso,图片存于drawable文件夹中,RecycleView的界面滑动十分卡顿.查看Github作者的例子,图片存在assets文件夹中存放图片,通过Picasso传入图片的路径就可以很流畅的加载出图片. 我分别实验了两种:图片放在drawable文件夹下和asset文件夹下. Drawable文件夹下,AS内存使用情况: 显示内存消耗过多,程序运行卡顿. 在Asset文件夹下,AS的内存使

JMeter学习-021-JMeter 定时器的应用

定时器类型 下面我们看下jmeter提供了哪些定时器组件: 固定定时器 高斯随机定时器 Uniform Random Timer Synchronizing Timer Poisson Random Timer JSR223 Timer Constant Throughput Timer BeanShell Timer 高斯随机定时器 高斯随机定时器,又可以称作正态分布随机定时器,该定时器可以设置在两个请求间随机延时时长.且总的延时是高斯分布(正态分布)的总和(均值:0.0.标准差1.0).在使

boost asio 学习(六) 定时器

http://www.gamedev.net/blog/950/entry-2249317-a-guide-to-getting- started-with-boostasio?pg=7 6 定时器 boost::asio 提供了一个 deadline_timer class来提供同步与异步的接口. BOOST文档提供了一组优秀示例.第一个例子,将创建一个间隔5秒的定时器. #include <boost/asio.hpp> #include <boost/shared_ptr.hpp&

react native组件学习(四)

PullToRefreshViewAndroid学习 PullToRefreshViewAndroid是一个视图,可以放置单个可滚动子视图,当子视图的竖直方向偏移(scrollY)为0时,将其下拉可以触发一个onRefresh事件. 在学习PullToRefreshViewAndroid组件之前,如果没有一定的基础,建议先阅读前面的文章,因为,这里我在之前学习ScrollView的基础上,添加PullToRefreshViewAndroid功能.当前的代码如下: 'use strict'; va

Android 四大组件学习之BroadcastReceiver一

本节课学习四大组件最后一个, 广播接受者. 顾名思义广播接受者就是接受广播呗.比如在现实社会中,以前每个人家都有一台收音机,这可就可以去接受广播发出来的消息.大家都知道,程序世界也是参照的显示生活设计出来的,那在Android系统中也引入了广播这个概念.那在Android系统中广播有什么作用呢? 举个例子:比如你正在玩游戏或者看视频突然手机电量过低,这时候就会弹出一个框,提醒您手机电量过低,请充电的提示.其实电量改变就是一种广播类型,当电量过低时,系统就会发生一条广播,这时候正在运行的程序就会收

Android 四大组件学习之Activity一

Activity是Android四大组件中最基础也是最常用的组件之一.Activity作为一个应用程序组件,提供了一个与用户交互的界面.可以这么说Activity是和用户操作有密切相关的的,常用来负责与用户交互,可以通过setContentView来显示组件. 今天我们学习如何去创建一个Activity 第一步: 创建一个Android Application Project, 填入应用名称,工程名称,包名 点击Next下一步:下面的选择都是默认的. 直到 你就可以选择一个Activity进行创