要么改变世界,要么适应世界

Go 借助 robfig/cron 实现定时任务

2024-08-21 22:28:16
0
目录

前言

在使用Go开发过程中,我们可能会经常使用定时器功能实现一些事情,对于简单的一些场景,我们可以借助time.sleep的方式,但是实际上这阻塞用法,会长时间占用CPU,当然我们也可以使用协程的方式避免主协程阻塞,然而如果我们还想做细粒度的控制,例如如果解决相邻两次任务时间重叠的问题,我们可能要花时间手动写一些编排任务列表控制的逻辑,实际上我们可以借助robfig/cron库,实现我们的功能。

安装

go get github.com/robfig/cron/v3@v3.0.0

【官方文档】

简单用法

指定时间间隔:

func testSimple() {
	c := cron.New()
	// 每 5 秒一次
	cronId, err := c.AddFunc("@every 5s", func() {
		fmt.Println(time.Now().String())
	})
	fmt.Println("taskId ", cronId, err)
	c.Start()
	// 等待 30 秒
	time.Sleep(30 * time.Second)
}

使用@every时,有两点需要注意:

  1. cron 不会在添加任务时立即将任务运行一遍,使用@every 5s时,第一次运行是5秒后
  2. 系统不会考虑每个任务执行的时间,如果使用@every 5s,每次任务花费3s,则任务之间间隔2s

多任务:

func testSimple() {
	c := cron.New()
	// 每 5 秒一次
	cronId1, err := c.AddFunc("@every 5s", func() {
		fmt.Println("task1 ", time.Now().String())
	})
	fmt.Println("task1 ", cronId1, err)

	// 每 8 秒一次
	cronId2, err := c.AddFunc("@every 8s", func() {
		fmt.Println("task2 ", time.Now().String())
	})
	fmt.Println("task2 ", cronId2, err)

	c.Start()
	// 等待 30 秒
	time.Sleep(30 * time.Second)
}

指定cron表达式:

【表达式说明】

func testCronExpression() {
	c := cron.New()

	// 每当分变为9时
	cronId, err := c.AddFunc("9 * * * *", func() {
		fmt.Println(time.Now().String())
	})
	fmt.Println("taskId ", cronId, err)
	c.Start()
	// 等待 150 秒
	time.Sleep(150 * time.Second)
}

传递函数参数(自定义Job):

如果一个结构体实现了Job接口,即实现了Run()函数,就可以作为任务传给 cron 对象。

type MyJob struct {
	Name string
	Cnt  int
}

func (j *MyJob) Run() {
	fmt.Printf("Jon: %s at:%s count:%v \n", j.Name, time.Now().String(), j.Cnt)
	j.Cnt++
}
func testMyJob() {
	c := cron.New()
	// 每 5 秒一次
	cronId, err := c.AddJob("@every 5s", &MyJob{Name: "myJob"})
	fmt.Println(cronId, err)

	c.Start()
	// 等待 30 秒
	time.Sleep(30 * time.Second)
}

高级用法

设定时区

默认情况下,系统使用当前时区的。在某些情况下,时区是很重要时,我们可以通过一下的方式设置:

func testWithZone() {
	
	location, _ := time.LoadLocation("Asia/Shanghai")
	c := cron.New(cron.WithLocation(location))
	/*
	亦或是
	c := cron.New()
		c.AddFunc("CRON_TZ=Asia/Shanghai 9 * * * *", func() {
		})
	 */
	// 每当分变为9时
	cronId, err := c.AddFunc("9 * * * *", func() {
		fmt.Println(time.Now().String())
	})
	fmt.Println("taskId ", cronId, err)
	c.Start()
	// 等待 150 秒
	time.Sleep(150 * time.Second)
}

自定义日志器Logger

func testWithLogger() {
	loggerOption := cron.WithLogger(cron.VerbosePrintfLogger(log.New(os.Stdout, "[cron-task]: ", log.LstdFlags)))
	c := cron.New(loggerOption)
	// 每 5 秒一次
	cronId1, err := c.AddFunc("@every 5s", func() {
		fmt.Println("task1 ", time.Now().String())
	})
	fmt.Println("task1 ", cronId1, err)

	c.Start()
	// 等待 30 秒
	time.Sleep(30 * time.Second)
}

添加包装器WithChain

包装器实际上是就是在执行任务前后增加一些逻辑,例如内置的:

  1. Recover:捕获运行过程产生的panic
  2. SkipIfStillRunning:如果上次任务还没运行完,本次任务跳过
  3. DelayIfStillRunning:如果上次任务还没运行完,本次任务延迟
func testSkipIfStillRunning() {
	chainOption := cron.WithChain(cron.SkipIfStillRunning(cron.DefaultLogger))
	c := cron.New(chainOption)
	// 每 5 秒一次
	cronId1, err := c.AddFunc("@every 5s", func() {
		fmt.Println("task1 ", time.Now().String())
		time.Sleep(8 * time.Second)
	})
	fmt.Println("task1 ", cronId1, err)

	c.Start()
	// 等待 30 秒
	time.Sleep(30 * time.Second)
}

支持秒

func testWithSecond() {
	c := cron.New(cron.WithSeconds())
	// 每秒执行一次
	cronId, err := c.AddFunc("* * * * * *", func() {
		fmt.Println(time.Now().String())
	})
	fmt.Println("taskId ", cronId, err)
	c.Start()
	// 等待 150 秒
	time.Sleep(150 * time.Second)
}

注意事项

cron会创建一个新的协程来执行触发回调。如果这些回调需要并发访问一些资源、数据,我们需要显式地做同步。

历史评论
开始评论