数据结构
PriorityQueue
首先作者利用golang
自带的heap接口实现了一个优先队列(priority_queue)。
1 | // The Interface type describes the requirements |
可以看到,heap接口定义了若干方法,用户可以使用任何类型的结构体来实现一个最小堆的能力,只要他实现上诉方法。如下面的定义所示,开源库的作者选择使用一个切片来作为这个最小堆的底层存储结构,这也是一种非常常见的方案,就是利用index的关系来构建一个树形数据结构。
1 | type item struct { |
DelayQueue
延迟队列的定义如下,使用C channel
来将到期的任务推出;一个锁来防止并发读写队列造成的竞争问题;一个基于最小堆实现的优先队列,来pop出最先到期的任务。同时为了避免重复空轮询造成的cpu浪费,作者还特意设置了休眠机制。
1 | // DelayQueue is an unbounded blocking queue of *Delayed* elements, in which |
作者定义了两个方法Offer
和Poll
,来往这个延迟队列里增加新任务和取出到期任务。
Bucket
1 | type bucket struct { |
bucket
的结构是一个list结构,把在同一时间点过期的所有任务放在一起,到期后,按照加入的顺序依次执行。注意,这里的同一时间点并不是绝对意义上的过期时间相同,这与时间轮每一个bucket
所表示的精度有关。
![[/images/work/timewheel/buckets.png]]
Timingwheel
1 | // TimingWheel is an implementation of Hierarchical Timing Wheels. |
整个时间轮如上图所示,由若干Bucket
所组成。每个时间轮都会维护一个独立的延迟队列,并将其所拥有的所有Bucket
放入其中。每当Bucket
所表示的时间到期,都会依次执行这个Bucket
所维护的所有timer
。
尤其要注意,这里如果待执行任务的延迟时间超过了首个时间轮的interval
,那么会再创建一个overflowwheel
。新创建的overflowwheel
的所有参数与首个时间轮基本一致,除了每个bucket
的精度。
1 | overflowwheel.tick = timingwheel.interval |
执行逻辑
1 | // Start starts the current timing wheel. |
首先会开启一个协程,轮询延迟队列,查看是否有到期需要执行的bucket
。其次,还会开启一个协程,不断的获取到期需要执行的bucket
。每当获取到需要执行的bucket
时,会修改当前时间轮的时间(用于新任务增加进来后,选择需要添加到哪个bucket
当中)。同时,会调用flush
方法,当当前bucket
的所有timer
重新选择一次bucket
(通过这种方式,位于overflowwheel
的timer
,就可以上升至一级timingwheel
中)。