Golang定时任务框架GoCron的源码分析
目录
- 背景说明
- GoCron任务调度库
- 调用实例
- 项目分析
- 综合分析
- 新的GoCron分析
- 项目架构
- 新功能
- gocron主要数据结构
- 综合分析
- 最后
背景说明
最近工作上有个开发定时任务的需求,调研一下后发现golang并没有十分完善的定时任务库。
整理我这边的需求如下:- 支持启动仅定时执行一次的任务;
- 任务在执行之前可以完成撤销;
- 服务重启之后,未完成的定时任务需要允许重新调度;
显然,现成的cron库无法满足我的需求。限定于工期,最终自己实现了一个粗糙的事件驱动定时器。
但这个事件驱动定时器具有以下的缺点:
- 事件订阅/通知机制不成熟
- 无法适用于更灵活的场景,例如多节点的分布式任务调度执行
- 模块之间的职责不清晰,例如其实Timer模块是Scheduler调度器的一部分,Event定时器相关的部分也是Scheduler调度器的一部分,而Executor执行模块也存在任务调度的功能,实际上它只需要负责完成调度器交给它的任务就好
- 没有设计任务调度池,也就是但凡新建计划任务,就会在后台启动一个协程持续监听;一旦任务数量太多,后台停留的协程会越来越多,进程总的消耗就会变得非常夸张,非常可怕
- 任务调度时不存在优先级的概念,假如相同时间内有多个任务同时执行,哪个任务被优先调度完全取决于GMP的系统调度
综上,我需要着重考察现有的Golang任务调度框架,对任务定时器进行重新设计。
GoCron任务调度库
https://github.com/jasonlvhit/gocron
调用实例
package main import ( "fmt" "time" "github.com/jasonlvhit/gocron" ) func task() { fmt.Println("I am running task.") } func taskWithParams(a int, b string) { fmt.Println(a, b) } func main() { // Do jobs without params gocron.Every(1).Second().Do(task) gocron.Every(2).Seconds().Do(task) gocron.Every(1).Minute().Do(task) gocron.Every(2).Minutes().Do(task) gocron.Every(1).Hour().Do(task) gocron.Every(2).Hours().Do(task) gocron.Every(1).Day().Do(task) gocron.Every(2).Days().Do(task) gocron.Every(1).Week().Do(task) gocron.Every(2).Weeks().Do(task) // Do jobs with params gocron.Every(1).Second().Do(taskWithParams, 1, "hello") // Do jobs on specific weekday gocron.Every(1).Monday().Do(task) gocron.Every(1).Thursday().Do(task) // Do a job at a specific time - 'hour:min:sec' - seconds optional gocron.Every(1).Day().At("10:30").Do(task) gocron.Every(1).Monday().At("18:30").Do(task) gocron.Every(1).Tuesday().At("18:30:59").Do(task) // Begin job immediately upon start gocron.Every(1).Hour().From(gocron.NextTick()).Do(task) // Begin job at a specific date/time t := time.Date(2019, time.November, 10, 15, 0, 0, 0, time.Local) gocron.Every(1).Hour().From(&t).Do(task) // NextRun gets the next running time _, time := gocron.NextRun() fmt.Println(time) // Remove a specific job gocron.Remove(task) // Clear all scheduled jobs gocron.Clear() // Start all the pending jobs <- gocron.Start() // also, you can create a new scheduler // to run two schedulers concurrently s := gocron.NewScheduler() s.Every(3).Seconds().Do(task) <- s.Start() }
项目分析
这个工具库仅有三个文件:
代码主要分为job和scheduler两个文件,gocron仅放置了回调方法和公共方法。项目整体架构如下:
gocron通过scheduler维护一个job列表,指定MAXJOBNUM最大工作队列,限制可执行的工作数大小。
// gocron/scheduler.go // Sc编程客栈heduler struct, the only data member is the list of jobs. // - implements the sort.Interface{} for sorting jobs, by the time nextRun type Scheduler struct { jobs [MAXJOBNUM]*Job // Array store jobs size int // Size of jobs which jobs holding. loc *time.Location // Location to use when scheduling jobs with specified times }
这里需要更正一下,并不是全局列表,仅仅只是跟随调度器的生命周期。实际上,代码确实存在全局的默认调度器:
var ( defaultScheduler = NewScheduler() )
因此,可以直接调用。当然也支持实例化自己的调度器:
s := gocron.NewScheduler() s.Every(3).Seconds().Do(task) <- s.Start()
gocron是典型的链式调用,scheduler对象通过返回job对象,完成job对象的封装操作之后,加入调度器内部的jobs列表,再通过Start方法启动调度器监控协程,轮询列表中的jobs,一旦找到可执行的任务,就会启动协程运行job的Func对象。
// Job struct keeping information about job type Job struct { interval uint64 // pause interval * unit between runs jobFunc string // the job jobFunc to run, func[jobFunc] //...... funcs map[string]interface{} // Map for the function task store fparams map[string][]interface{} // Map for function and params of function //...... }
funcs维护一个map,缓存funcName到func的映射关系。具体封装在Do方法:
// gocron/job.go // func (j *Job) Do(jobFun interface{}, params ...interface{}) error fname := getFunctionName(jobFun) j.funcs[fname] = jobFun j.fparams[fname] = params j.jobFunc = fname
在执行任务时,通过反射回调func:
// gocron/job.go // func (j *Job) run() ([]reflect.Value, error) result, err := callJobFuncWithParams(j.funcs[j.jobFunc], j.fparams[j.jobFunc]) if err != nil { return nil, err } // gocron/gocron.go func callJobFuncWithParams(jobFunc interface{}, params []interface{}) ([]reflect.Value, error) { f := reflect.ValueOf(jobFunc) if len(params) != f.Type().NumIn() { return nil, ErrParamsNotAdapted } in := make([]reflect.Value, len(params)) for k, param := range params { in[k] = reflect.ValueOf(param) } return f.Call(in), nil }
启动调度器时,启动监控协程:
// Start all the pending jobs // Add seconds ticker func (s *Scheduler) Start() cnrVMTzoIcZhan bool { stopped := make(chan bool, 1) // ticker每秒产生一个信号 ticker := time.NewTicker(1 * time.Second) go func() { for { // select选择器阻塞 // case接收到信号则执行 // 同时接收到多个信号则随机选择一个执行 select { // ticker每秒产生一次信号 // RunPending轮询jobs列表,寻找到了时间可执行的任务 case <-ticker.C: s.RunPending() // stopped接收到停止信号,退出调度器协程 case <-stopped: ticker.Stop() return } } }() return stopped }
一个调度器一个协程,通过统一的调度协程去监控调度器任务列表内的任务。
// RunPending runs all the jobs that are scheduled to run. func (s *Scheduler) RunPending() { // 轮询jobs列表,找到到时间可执行的任务,创建可执行任务列表 runnableJobs, n := s.getRunnableJobs() if n != 0 { for i := 0; i < n; i++ { // 启动协程运行 go runnableJobs[i].run() // 刷新job执行信息,等待下一轮调度 runnableJobs[i].lastRun = time.Now() runnableJobs[i].scheduleNextRun() } } }
综合分析
综上,gocron有如下好处:
- 链式调用简单易用
- scheduler和job职责清晰,项目架构非常容易理解
- 调度器一键启动协程监控,只有到了时间可执行的任务才会被加入到runablejobs列表,大大减少了进程中协程的数量,减少资源消耗
- 调度器维护的待执行任务池,存在预设的容量大小,限定了同时可执行的最大任务数量,不会导致超量
但它的缺陷也同样明显:
- 当不同的线程同时对同一个调度器进行操作,对任务列表产生的影响是不可预知的。因此这个框架下,最好是每个client维护自己的scheduler对象
- 虽然调度器维护一个jobs列表,但如果超过列表设定容量的任务便无法等待执行了……这一点gocron并没有理睬
- 几乎每秒,为了找到可执行的任务去构建runablejobs列表,都会轮询一次任务列表。为了追求结果的一致,它会对jobs进行排序,虽然Golang编译器对内置的sort方法进行了优化,会选举最快的方式对数据进行处理,但依然存在消耗
- 依然是内存操作,服务重启任务列表就不存在了。也没有考虑到多节点的场景。
新的GoCron分析
https://github.com/go-co-op/gocron
原gocron的作者居然住进ICU了,管理员说截止至2020年3月依然无法联系上他。愿他身体安康……gocron被fork后有了新的发展,赶紧扒下来学习一下
新的gocron新增了很多内容,依然围绕着Scheduler和Job进行链式操作,但新增了executor模块。executor仅负责执行Scheduler调度过来的任务。
项目架构
下面是项目README文档里公开的架构图:
新功能
新版gocron支持了cron格式的语法
// cron expressions supported s.Cron("*/1 * * * *").Do(task) // every minute
新增了异步和阻塞模式的两种调度方式
// you can start running the scheduler in two different ways: // starts the scheduler asynchronously s.StartAsync() // starts the scheduler and blocks current execution path s.StartBlocking()
通过设置信号量限制可同时运行的任务数量
// gocron/scheduler.go // SetMaxConcurrentJobs limits how many jobs can be running at the same time. // This is useful when running resource intensive jobs and a precise start time is not critical. func (s *Scheduler) SetMaxConcurrentJobs(n int, mode limitMode) { // 通过对n的配置修改并发任务数的大小 s.executor.maxRunningJobs = semaphore.NewWeighted(int64(n)) // limitMode即当可执行任务达到最大并发量时,应该如何处理的逻辑 // RescheduleMode:跳过本次执行,等待下一次调度 // WaitMode:持续等待,知道可执行队列空出。但,由于等待的任务数积累,可能导致不可预知的后果,某些任务可能一直等不到执行 s.executor.limitMode = mode } // gocron/executor.go // 通过信号量的方式从最大数量中取一位 // 若通过,下一步可以执行函数 if e.maxRunningJobs != nil { if !e.maxRunningJobs.TryAcquire(1) { switch e.limitMode { case RescheduleMode: return case WaitMode: select { case <-stopCtx.Done(): return case <-f.ctx.Done(): return default: } if err := e.maxRunningJobs.Acquire(f.ctx, 1); err != nil { break } } } defer e.maxRunningJobs.Release(1) }
gocron支持指定Job以单例模式运行。通过siglefilght工具库保证当前仅有一个可运行的Job
// gocron/job.go // SingletonMode prevents a new job from starting if the prior job has not yet // completed it's run // Note: If a job is added to a running scheduler and this method is then used // you may see the job run overrun itself as job is scheduled immediately // by default upon being added to the scheduler. It is recommended to use the // SingletonMode() func on the scheduler chain when scheduling the job. func (j *Job) SingletonMode() { j.mu.Lock() defer j.mu.Unlock() j.runConfig.mode = singletonMode j.jobFunction.limiter = &singleflight.Group{} } // gocron/executor.go switch f.runConfig.mode { case defaultMode: runJob() case singletonMode: // limiter是singlefilght对象,Do方法内仅会执行一次,保证一次只运行一个任务 _, _, _ = f.limiter.Do("main", func() (interface{}, error) { select { case <-stopCtx.Done(): return nil, nil case <-f.ctx.Done(): return nil, nil default: } runJob() return nil, nil }) }
gocron主要数据结构
主要分为schduler调度器,job任务,以及executor执行器对象
追踪一下调用链的工作流程:
- 初始化一个Scheduler;新版gocron似乎更鼓励用户使用自己的scheduler,而不是如同老版一样维护一个默认的全局调度器
func NewScheduler(loc *time.Location) *Scheduler { // 这时已经将executor同步初始化完毕 // scheduler和executor是一对一的关系 executor := newExecutor() return &Scheduler{ jobs: make([]*Job, 0), location: loc, running: false, time: &trueTime{}, executor: &executor, tagsUnique: false, timer: afterFunc, } }
- Every方法初始化一个Job,如果scheduler已经启动,即任务列表中已经存在一个等待封装的Job,那么直接取出相应的Job
if s.updateJob || s.jobCreated { job = s.getCurrentJob() }
接下来确定Job的运行周期,并加入到任务列表
s.setJobs(append(s.Jobs(), job))
Every方法返回了新增Job的scheduler,此时scheduler的任务队列中存在一个Job就绪,等待下一步调度。
- Do方法带着回调的函数和对应的参数开始执行,它从当前的scheduler中取出一个就绪的Job,进行最后的判断,如果Job不合格,那么将它从任务队列中移除,并返回报错
if job.error != nil { // delete the job from the scheduler as this job // cannot be executed s.RemoveByReference(job) return nil, job.error } // 还有很多判断条件,这里不一一列举
将Do方法将要执行的函数封装进Job。接下来判断schduler是否启动:如之前gocron一样,scheduler也是通过协程监听并执行启动任务协程的工作。
之前的scheduler,默认启动一个ticker,每秒去排序并轮询任务队列,从中取出满足条件的任务开始执行,效率非常低。而现在的改进是:scheduler启动监听协程后;不是以轮询而是以通知的方式,从channel中获取Job的Function,再启动协程去执行。
在这样的前提下,scheduler监听协程什么时候启动是位置的。此处添加一个判断,当scheduler启动时,同时启动runContinuous去完成Job的最后一步操作。若是scheduler没有启动,那么直接返回,等待scheduler启动后再完成操作。
// we should not schedule if not running since we can't foresee how long it will take for the scheduler to start if s.IsRunning() { s.runC编程客栈ontinuous(job) }
通过这样的设计,在最终启动scheduler前后,都可以以动态的方式添加/移除任务。
- scheduler提供了两种启动schduler的模式:异步和阻塞(也就是同步啦)
// StartAsync starts all jobs without blocking the current thread func (s *Scheduler) StartAsync() { if !s.IsRunning() { s.start() } } // StartBlocking starts all jobs and blocks the current thread. // This blocking method can be stopped with Stop() from a separate goroutine. func (s *Scheduler) StartBlocking() { s.StartAsync() s.startBlockingStopChanMutex.Lock() s.startBlockingStopChan = make(chan struct{},javascript 1) s.startBlockingStopChanMutex.Unlock() <-s.startBlockingStopChan }
一般情况下,我们通过异步模式,启动对所有任务的监控
// start starts the scheduler, scheduling and running jobs func (s *Scheduler) start() { // 启动监听协程,select选择器配合channel阻塞 // 直到Job准备执行发送通知 go s.executor.start() // 将scheduler置位为running s.setRunning(true) // 遍历所有任www.devze.com务,以递归的方式监控起来 s.runJobs(s.Jobs()) }
比较有意思的是这个部分:
func (s *Scheduler) runJobs(jobs []*Job) { for _, job := range jobs { // 这个函数是一个递归调用 // 这里对所有Job都以递归的方式监听着 s.runContinuous(job) } } // 这是runContinuous的部分代码 job.setTimer(s.timer(nextRun, func() { if !next.dateTime.IsZero() { for { n := s.now().UnixNano() - next.dateTime.UnixNano() // 某个任务满足执行条件了,退出循环 if n >= 0 { break } s.time.Sleep(time.Duration(n)) } } // 递归执行本方法 // runContinuous会判断当前Job是否可执行 // 若不则退出,若可以则将Job设置为立即执行,并刷新执行时间 // 若Job“立即执行”的标志已经置位,直接调用run发送通知给监听协程 s.runContinuous(job) }))
这样的设计太优雅了,大佬们的奇思妙想啊~
- 最后是executor的执行,前面已经提到过。通过select接收channel通知的形式执行下去,核心方法是这个:
runJob := func() { f.incrementRunState() callJobFunc(f.eventListeners.onBeforeJobExecution) callJobFuncWithParams(f.function, f.parameters) callJobFunc(f.eventListeners.onAfterJobExecution) f.decrementRunState() }
eventListeners封装了两个接口,用以在执行任务和完成任务后发送给用户事件通知。
综合分析
gocron进行了不少方面的优化:
- 在任务列表的维护上,可加入调度的任务数不再限定为某个值,而是以切片的方式自动增长。但最终能够并行执行的任务数却通过信号量多方式加以控制;
- 不再周期性地轮询任务列表,以期待获得可运行的任务;而是通过更巧妙的方式,任务递归监听,一旦发现可执行的任务,就自行通知scheduler,完成调度;
- 具备更丰富的语法和模式,用户可以根据场景自行选择;调度器同时支持异步及同步调用,而Job也支持周期性轮询和单点任务;
- scheduler内加锁了,对Jobs列表的操作都会加上读写锁,一些其它的参数也拥有自己的锁。这使得scheduler具备线程安全性,但某种程度上影响了对Jobs队列的操作。考虑到gocron不再鼓励使用全局Scheduler,而是每个client维护自己的Scheduler,那么被锁影响的场景会进一步减少,与最终优化获得的性能提升相比,都是值得的。
最后
最后的最后,gocron依然无法满足我当前的需求,但已经不妨碍我对源码进行下一步的改造:
- 我需要对Job进行上层的封装,并将要调用的方法和参数序列化后存入数据库,直到服务重启时,能够找到未完成的任务加载进scheduler重新执行
- 我的计划任务只需要执行一次,而无须重复执行,这一点已经有SingletonMode保证
- 我需要改造gocron,让它能够支持在某个时间范围内调度任务
到此这篇关于Golang定时任务框架GoCron的源码分析的文章就介绍到这了,更多相关Golang定时任务框架GoCron内容请搜索编程客栈(www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程客栈(www.devze.com)!
精彩评论