go-zero 如何应对海量定时/延迟任务?

一个系统中存在着大量的调度任务 , 同时调度任务存在时间的滞后性 , 而大量的调度任务如果每一个都使用自己的调度器来管理任务的生命周期的话 , 浪费cpu的资源而且很低效 。
本文来介绍 go-zero 中 延迟操作 , 它可能让开发者调度多个任务时 , 只需关注具体的业务执行函数和执行时间「立即或者延迟」 。 而 延迟操作 , 通常可以采用两个方案:

  1. Timer:定时器维护一个优先队列 , 到时间点执行 , 然后把需要执行的 task 存储在 map 中
  2. collection 中的 timingWheel, 维护一个存放任务组的数组 , 每一个槽都维护一个存储task的双向链表 。 开始执行时 , 计时器每隔指定时间执行一个槽里面的tasks 。
方案2把维护task从 优先队列 O(nlog(n)) 降到 双向链表 O(1) , 而执行task也只要轮询一个时间点的tasks O(N) , 不需要像优先队列 , 放入和删除元素 O(nlog(n))
我们先看看 go-zero 中自己对 timingWheel 的使用 :
cache 中的 timingWheel首先我们先来在 collectioncache 中关于 timingWheel 的使用:
timingWheel, err := NewTimingWheel(time.Second, slots, func(k, v interface{}) {key, ok := k.(string)if !ok {return}cache.Del(key)})if err != nil {return nil, err}cache.timingWheel = timingWheel这是 cache 初始化中也同时初始化 timingWheel 做key的过期处理 , 参数依次代表:
  • interval:时间划分刻度
  • numSlots:时间槽
  • execute:时间点执行函数
cache 中执行函数则是 删除过期key , 而这个过期则由 timingWheel 来控制推进时间 。
接下来 , 就通过 cachetimingWheel 的使用来认识 。
初始化// 真正做初始化func newTimingWheelWithClock(interval time.Duration, numSlots int, execute Execute, ticker timex.Ticker) (*TimingWheel, error) {tw :=e != nil; {task := e.Value.(*timingEntry)// 标记删除 , 在 scan 中做真正的删除 「删除map的data」if task.removed {next := e.Next()l.Remove(e)tw.timers.Del(task.key)e = nextcontinue} else if task.circle > 0 {// 当前执行点已经过期 , 但是同时不在第一层 , 所以当前层即然已经完成了 , 就会降到下一层// 但是并没有修改 postask.circle--e = e.Next()continue} else if task.diff > 0 {// 因为之前已经标注了diff , 需要再进入队列next := e.Next()l.Remove(e)pos := (tw.tickedPos + task.diff) % tw.numSlotstw.slots[pos].PushBack(task)tw.setTimerPosition(pos, task)task.diff = 0e = nextcontinue}// 以上的情况都是不能执行的情况 , 能够执行的会被加入tasks中tasks = append(tasks, timingTask{key:task.key,value: task.value,})next := e.Next()l.Remove(e)tw.timers.Del(task.key)e = next}// for range tasks , 然后把每个 task->execute 执行即可tw.runTasks(tasks)}具体的分支情况在注释中说明了 , 在看的时候可以和前面的 moveTask() 结合起来 , 其中 circle 下降 , diff 的计算是关联两个函数的重点 。
至于 diff 计算就涉及到 pos, circle 的计算:
// interval: 4min, d: 60min, numSlots: 16, tickedPos = 15// step = 15, pos = 14, circle = 0func (tw *TimingWheel) getPositionAndCircle(d time.Duration) (pos int, circle int) {steps := int(d / tw.interval)pos = (tw.tickedPos + steps) % tw.numSlotscircle = (steps - 1) / tw.numSlotsreturn}上面的过程可以简化成下面:
steps = d / intervalpos = step % numSlots - 1circle = (step - 1) / numSlots总结
  1. timingWheel 靠定时器推动 , 时间前进的同时会取出当前时间格中 list「双向链表」的task , 传递到 execute 中执行 。 因为是是靠 internal 固定时间刻度推进 , 可能就会出现:一个 60s 的task , internal = 1s , 这样就会空跑59次loop 。
  2. 而在扩展时间上 , 采取 circle 分层 , 这样就可以不断复用原有的 numSlots, 因为定时器在不断 loop , 而执行可以把上层的 slot 下降到下层 , 在不断 loop 中就可以执行到上层的task 。 这样的设计可以在不创造额外的数据结构 , 突破长时间的限制 。
同时在 go-zero 中还有很多实用的组件工具 , 用好工具对于提升服务性能和开发效率都有很大的帮助 , 希望本篇文章能给大家带来一些收获 。
项目地址