0%

时间轮解析

数据结构

PriorityQueue

首先作者利用golang自带的heap接口实现了一个优先队列(priority_queue)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// The Interface type describes the requirements
// for a type using the routines in this package.
// Any type that implements it may be used as a
// min-heap with the following invariants (established after
// Init has been called or if the data is empty or sorted):

// !h.Less(j, i) for 0 <= i < h.Len() and 2*i+1 <= j <= 2*i+2 and j < h.Len()
// Note that Push and Pop in this interface are for package heap's
// implementation to call. To add and remove things from the heap,
// use heap.Push and heap.Pop.

type Interface interface {
sort.Interface
Push(x any) // add x as element Len()
Pop() any // remove and return element Len() - 1.
}

可以看到,heap接口定义了若干方法,用户可以使用任何类型的结构体来实现一个最小堆的能力,只要他实现上诉方法。如下面的定义所示,开源库的作者选择使用一个切片来作为这个最小堆的底层存储结构,这也是一种非常常见的方案,就是利用index的关系来构建一个树形数据结构。

1
2
3
4
5
6
7
8
type item struct {
Value interface{}
Priority int64
Index int
}
// this is a priority queue as implemented by a min heap
// ie. the 0th element is the *lowest* value
type priorityQueue []*item

DelayQueue

延迟队列的定义如下,使用C channel来将到期的任务推出;一个锁来防止并发读写队列造成的竞争问题;一个基于最小堆实现的优先队列,来pop出最先到期的任务。同时为了避免重复空轮询造成的cpu浪费,作者还特意设置了休眠机制。

1
2
3
4
5
6
7
8
9
10
11
12
// DelayQueue is an unbounded blocking queue of *Delayed* elements, in which
// an element can only be taken when its delay has expired. The head of the
// queue is the *Delayed* element whose delay expired furthest in the past.
type DelayQueue struct {
C chan interface{}
mu sync.Mutex
pq priorityQueue

// Similar to the sleeping state of runtime.timers.
sleeping int32
wakeupC chan struct{}
}

作者定义了两个方法OfferPoll,来往这个延迟队列里增加新任务和取出到期任务。

Bucket

1
2
3
4
5
6
7
8
9
10
11
type bucket struct {
// 64-bit atomic operations require 64-bit alignment, but 32-bit
// compilers do not ensure it. So we must keep the 64-bit field
// as the first field of the struct.

// For more explanations, see https://golang.org/pkg/sync/atomic/#pkg-note-BUG
// and https://go101.org/article/memory-layout.html.
expiration int64
mu sync.Mutex
timers *list.List
}

bucket的结构是一个list结构,把在同一时间点过期的所有任务放在一起,到期后,按照加入的顺序依次执行。注意,这里的同一时间点并不是绝对意义上的过期时间相同,这与时间轮每一个bucket所表示的精度有关。

![[/images/work/timewheel/buckets.png]]

Timingwheel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// TimingWheel is an implementation of Hierarchical Timing Wheels.
type TimingWheel struct {
// 每个bucket表示的时间精度
tick int64 // in milliseconds
// 一个时间轮有多少bucket
wheelSize int64

// 每个时间轮间隔时间,其值等于tick*wheelSize
interval int64 // in milliseconds
// 当前时间戳,用于判断目前执行到第几个buckets
currentTime int64 // in milliseconds

buckets []*bucket
queue *delayqueue.DelayQueue

// The higher-level overflow wheel.
// NOTE: This field may be updated and read concurrently, through Add().
overflowWheel unsafe.Pointer // type: *TimingWheel
exitC chan struct{}
waitGroup waitGroupWrapper
}

整个时间轮如上图所示,由若干Bucket所组成。每个时间轮都会维护一个独立的延迟队列,并将其所拥有的所有Bucket放入其中。每当Bucket所表示的时间到期,都会依次执行这个Bucket所维护的所有timer

尤其要注意,这里如果待执行任务的延迟时间超过了首个时间轮的interval,那么会再创建一个overflowwheel。新创建的overflowwheel的所有参数与首个时间轮基本一致,除了每个bucket的精度。

1
overflowwheel.tick = timingwheel.interval

执行逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// Start starts the current timing wheel.
func (tw *TimingWheel) Start() {
tw.waitGroup.Wrap(func() {
tw.queue.Poll(tw.exitC, func() int64 {
return timeToMs(time.Now().UTC())
})
})

tw.waitGroup.Wrap(func() {
for {
select {
case elem := <-tw.queue.C:
b := elem.(*bucket)
tw.advanceClock(b.Expiration())
b.Flush(tw.addOrRun)
case <-tw.exitC:
return
}
}
})
}

首先会开启一个协程,轮询延迟队列,查看是否有到期需要执行的bucket。其次,还会开启一个协程,不断的获取到期需要执行的bucket。每当获取到需要执行的bucket时,会修改当前时间轮的时间(用于新任务增加进来后,选择需要添加到哪个bucket当中)。同时,会调用flush方法,当当前bucket的所有timer重新选择一次bucket(通过这种方式,位于overflowwheeltimer,就可以上升至一级timingwheel中)。