go 中其实不复杂的 timer

2022-09-01 14:49:59 浏览数 (1)

在 go 中当我们需要延迟一段时间后执行,或者需要间隔固定时间去执行某个行为的时候就需要使用到 timer,那么 timer 到底是如何实现的呢?我们今天就来看看 timer 里面是什么样的。

同时因为 1.14 版本前后 timer 的实现有很大的区别,我们顺便来了解一下之前的版本和现在的版本有什么样的不一样,到底做了什么样的优化。

前置知识点

有以下的知识点支持才能更好的理解今天的分析

  • 需要有 GMP 模型的基础
  • 需要有 go 调度相关的基础
  • 需要有数据结构中’堆‘的基础

ticker

要看 timer 可以先从 ticker 入手,ticker 其实我们经常使用到,ticker 顾名思义就是每次间隔一段时间触发一次,下面我们就来看看它的具体实现

带着问题

  • Ticker 如果当前时间到了,没有及时处理,下一次时间到了,会保留吗?是都在后面排队,还是直接被丢弃了?
  • NewTicker()Tick() 有什么区别?使用上需要注意什么?

数据结构

代码语言:javascript复制
// A Ticker holds a channel that delivers ``ticks'' of a clock
// at intervals.
type Ticker struct {
	C <-chan Time // The channel on which the ticks are delivered.
	r runtimeTimer
}

可以看到它的数据结构非常简单,就是一个 channel 当时间到达就会向这个 channel 里面发送一个触发的时间

Start Stop Reset

代码语言:javascript复制
// NewTicker returns a new Ticker containing a channel that will send
// the time on the channel after each tick. The period of the ticks is
// specified by the duration argument. The ticker will adjust the time
// interval or drop ticks to make up for slow receivers.
// The duration d must be greater than zero; if not, NewTicker will
// panic. Stop the ticker to release associated resources.
func NewTicker(d Duration) *Ticker {
	if d <= 0 {
		panic(errors.New("non-positive interval for NewTicker"))
	}
	// Give the channel a 1-element time buffer.
	// If the client falls behind while reading, we drop ticks
	// on the floor until the client catches up.
	c := make(chan Time, 1)
	t := &Ticker{
		C: c,
		r: runtimeTimer{
			when:   when(d),
			period: int64(d),
			f:      sendTime,
			arg:    c,
		},
	}
	startTimer(&t.r)
	return t
}

// Stop turns off a ticker. After Stop, no more ticks will be sent.
// Stop does not close the channel, to prevent a concurrent goroutine
// reading from the channel from seeing an erroneous "tick".
func (t *Ticker) Stop() {
	stopTimer(&t.r)
}

// Reset stops a ticker and resets its period to the specified duration.
// The next tick will arrive after the new period elapses.
func (t *Ticker) Reset(d Duration) {
	if t.r.f == nil {
		panic("time: Reset called on uninitialized Ticker")
	}
	modTimer(&t.r, when(d), int64(d), t.r.f, t.r.arg, t.r.seq)
}

注意点有以下几个:

  • ticker 中的 channel 长度为 1,这也就意味着里面只能放一个触发的时间,也就是说如果当前这次触发没有处理完成,下次触发来了可以先存着,但是再下一次就直接会被抛弃了。你是不是奇怪为什么要单独提出这一点来说,想要说明的是,ticker 的使用并不能保证一定间隔相同的时间触发,如果你再处理过程中阻塞住了,间隔时间短就可能出现连续,所以处理一定要保证及时。
  • stop 并不会关闭 channel,因为并发的时候可能同时到了触发时间,如果关闭了 channel 就有可能出现往一个关闭的 channel 中发消息的 panic;但也只有 stop 了之后相关的资源才会得到释放,所以用完之后记得关闭

可以看到三个方法都比较简单,主要就是利用 timer 去实现的 ticker,所以我们主要需要关注在 startTimer stopTimer modTimer 方法上

Tick 方法

代码语言:javascript复制
// Tick is a convenience wrapper for NewTicker providing access to the ticking
// channel only. While Tick is useful for clients that have no need to shut down
// the Ticker, be aware that without a way to shut it down the underlying
// Ticker cannot be recovered by the garbage collector; it "leaks".
// Unlike NewTicker, Tick will return nil if d <= 0.
func Tick(d Duration) <-chan Time {
	if d <= 0 {
		return nil
	}
	return NewTicker(d).C
}

从这里我们很明显可以看到,其实 Tick 方法就是对 NewTicker 的一个封装,让使用 Ticker 更加简单,直接一行代码搞定,但是随之带来的就是你没有办法去关闭这个 Ticker 了。 这也就意味着会导致内存泄露,所以一般在项目中都会使用 NewTicker 方法,除非你的项目当 Tick 停止时就已经直接退出了,那也不必考虑这个问题。

好了,现在我们可以聚焦到这次我们的主角 Timer 上了

go1.13 的 Timer

老版本的 timer 实现比较简单,代码也比较清晰

startTimer

代码语言:javascript复制
func startTimer(t *timer) {
	if raceenabled {
		racerelease(unsafe.Pointer(t))
	}
	addtimer(t)
}

这里我们可以看到,首先是分配了一个 bucket 然后加锁之后开始 addtimerLocked

代码语言:javascript复制
func addtimer(t *timer) {
	tb := t.assignBucket()
	lock(&tb.lock)
	ok := tb.addtimerLocked(t)
	unlock(&tb.lock)
	if !ok {
		badTimer()
	}
}
代码语言:javascript复制
const timersLen = 64
func (t *timer) assignBucket() *timersBucket {
	id := uint8(getg().m.p.ptr().id) % timersLen
	t.tb = &timers[id].timersBucket
	return t.tb
}
代码语言:javascript复制
// Add a timer to the heap and start or kick timerproc if the new timer is
// earlier than any of the others.
// Timers are locked.
// Returns whether all is well: false if the data structure is corrupt
// due to user-level races.
func (tb *timersBucket) addtimerLocked(t *timer) bool {
	// when must never be negative; otherwise timerproc will overflow
	// during its delta calculation and never expire other runtime timers.
	if t.when < 0 {
		t.when = 1<<63 - 1
	}
	t.i = len(tb.t)
	tb.t = append(tb.t, t)
	if !siftupTimer(tb.t, t.i) {
		return false
	}
	if t.i == 0 {
		// siftup moved to top: new earliest deadline.
		if tb.sleeping && tb.sleepUntil > t.when {
			tb.sleeping = false
			notewakeup(&tb.waitnote)
		}
		if tb.rescheduling {
			tb.rescheduling = false
			goready(tb.gp, 0)
		}
		if !tb.created {
			tb.created = true
			go timerproc(tb)
		}
	}
	return true
}

当看到 siftupTimer 这个方法的时候你应该就豁然开朗了,因为这个很明显的就是一个堆的操作,只不过这里的堆是一个 4 叉堆,你看它找父节点的时候是 /4 的,siftdownTimer 也是类似这里也不多赘述了

代码语言:javascript复制
func siftupTimer(t []*timer, i int) bool {
	if i >= len(t) {
		return false
	}
	when := t[i].when
	tmp := t[i]
	for i > 0 {
		p := (i - 1) / 4 // parent
		if when >= t[p].when {
			break
		}
		t[i] = t[p]
		t[i].i = i
		i = p
	}
	if tmp != t[i] {
		t[i] = tmp
		t[i].i = i
	}
	return true
}

总的来说启动一个 timer 就是三步走

  • 加锁
  • 将新的 timer 添加到数组末尾
  • 堆化

stopTimer

其实知道了启动停止就不难了,也是类似的,从数组中删除之后然后堆化就可以了

代码语言:javascript复制
// stopTimer removes t from the timer heap if it is there.
// It returns true if t was removed, false if t wasn't even there.
//go:linkname stopTimer time.stopTimer
func stopTimer(t *timer) bool {
	return deltimer(t)
}
代码语言:javascript复制
// Delete timer t from the heap.
// Do not need to update the timerproc: if it wakes up early, no big deal.
func deltimer(t *timer) bool {
	if t.tb == nil {
		// t.tb can be nil if the user created a timer
		// directly, without invoking startTimer e.g
		//    time.Ticker{C: c}
		// In this case, return early without any deletion.
		// See Issue 21874.
		return false
	}

	tb := t.tb

	lock(&tb.lock)
	removed, ok := tb.deltimerLocked(t)
	unlock(&tb.lock)
	if !ok {
		badTimer()
	}
	return removed
}
代码语言:javascript复制
func (tb *timersBucket) deltimerLocked(t *timer) (removed, ok bool) {
	// t may not be registered anymore and may have
	// a bogus i (typically 0, if generated by Go).
	// Verify it before proceeding.
	i := t.i
	last := len(tb.t) - 1
	if i < 0 || i > last || tb.t[i] != t {
		return false, true
	}
	if i != last {
		tb.t[i] = tb.t[last]
		tb.t[i].i = i
	}
	tb.t[last] = nil
	tb.t = tb.t[:last]
	ok = true
	if i != last {
		if !siftupTimer(tb.t, i) {
			ok = false
		}
		if !siftdownTimer(tb.t, i) {
			ok = false
		}
	}
	return true, ok
}

何时触发?

那么问题来了,时间到了之后什么地方触发往 timer 中的 channel 中发数据呢?其实前面的源码中已经给出了细节,在 addtimerLocked 方法中:

代码语言:javascript复制
if !tb.created {
	tb.created = true
	// 这里创建了一个 goroutine 专门来运行 timerproc 方法
	go timerproc(tb)
}

创建的时候会调用 timerproc 方法,我们来看看这个方法里面做了什么。

代码语言:javascript复制
// Timerproc runs the time-driven events.
// It sleeps until the next event in the tb heap.
// If addtimer inserts a new earlier event, it wakes timerproc early.
func timerproc(tb *timersBucket) {
	tb.gp = getg()
	for {
		lock(&tb.lock)
		tb.sleeping = false
		now := nanotime()
		delta := int64(-1)
		for {
			if len(tb.t) == 0 {
				delta = -1
				break
			}
			// 获取堆顶元素
			t := tb.t[0]
			// 看是否满足触发时间
			delta = t.when - now
			if delta > 0 {
				break
			}
			ok := true
			// 当 period > 0 则说明这是一个需要周期性触发的 timer 也就是 ticker,否则就触发一次后直接从堆里面移除
			if t.period > 0 {
				// leave in heap but adjust next time to fire
				// 修改当前元素的触发时间,然后直接开始堆化即可,自己就排到后面去了
				t.when  = t.period * (1   -delta/t.period)
				if !siftdownTimer(tb.t, 0) {
					ok = false
				}
			} else {
				// remove from heap
				last := len(tb.t) - 1
				if last > 0 {
					tb.t[0] = tb.t[last]
					tb.t[0].i = 0
				}
				tb.t[last] = nil
				tb.t = tb.t[:last]
				if last > 0 {
					if !siftdownTimer(tb.t, 0) {
						ok = false
					}
				}
				t.i = -1 // mark as removed
			}
			f := t.f
			arg := t.arg
			seq := t.seq
			unlock(&tb.lock)
			if !ok {
				badTimer()
			}
			if raceenabled {
				raceacquire(unsafe.Pointer(t))
			}
			// 这里就是真正触发定时方法的地方,如果是 ticker 的话就是初始化的 sendTime 方法,就是将当前时间发送到 channel 中
			f(arg, seq)
			lock(&tb.lock)
		}
		if delta < 0 || faketime > 0 {
			// No timers left - put goroutine to sleep.
			tb.rescheduling = true
			goparkunlock(&tb.lock, waitReasonTimerGoroutineIdle, traceEvGoBlock, 1)
			continue
		}
		// At least one timer pending. Sleep until then.
		// 这里可以看到,如果对顶元素还没有到对应的触发时间,那么就睡眠相对应的时间即可
		tb.sleeping = true
		tb.sleepUntil = now   delta
		noteclear(&tb.waitnote)
		unlock(&tb.lock)
		notetsleepg(&tb.waitnote, delta)
	}
}

可以看到整体思路很清晰,就是将最先触发的元素拿出来,然后判断是否到时间,如果到了时间就触发,如果没到,就睡眠一个 delta 的时间等待触发。当然在 addtimerLocked 方法中也会尝试唤醒 (调用notewakeup方法),因为新加入的 timer 肯定会影响当前整个堆的下一次触发时间。

所以总的来说在 go1.13 版本中,timer 的实现还是比较简单清晰的

go1.17 的 Timer

那么我们来看看现在版本的 timer 是如何实现的,因为我们上面详细看过,这里就省略其中部分。

在当前新的版本中对于 timer 的定义有了各种状态的表示,下面的注释也很清晰,标识了各种状态所出现的情况,至于状态的转换这里就不给出具体的状态图了。

代码语言:javascript复制
// Values for the timer status field.
const (
	// Timer has no status set yet.
	timerNoStatus = iota

	// Waiting for timer to fire.
	// The timer is in some P's heap.
	timerWaiting

	// Running the timer function.
	// A timer will only have this status briefly.
	timerRunning

	// The timer is deleted and should be removed.
	// It should not be run, but it is still in some P's heap.
	timerDeleted

	// The timer is being removed.
	// The timer will only have this status briefly.
	timerRemoving

	// The timer has been stopped.
	// It is not in any P's heap.
	timerRemoved

	// The timer is being modified.
	// The timer will only have this status briefly.
	timerModifying

	// The timer has been modified to an earlier time.
	// The new when value is in the nextwhen field.
	// The timer is in some P's heap, possibly in the wrong place.
	timerModifiedEarlier

	// The timer has been modified to the same or a later time.
	// The new when value is in the nextwhen field.
	// The timer is in some P's heap, possibly in the wrong place.
	timerModifiedLater

	// The timer has been modified and is being moved.
	// The timer will only have this status briefly.
	timerMoving
)

addtimer

代码语言:javascript复制
// addtimer adds a timer to the current P.
// This should only be called with a newly created timer.
// That avoids the risk of changing the when field of a timer in some P's heap,
// which could cause the heap to become unsorted.
func addtimer(t *timer) {
	//.................

	when := t.when

	// Disable preemption while using pp to avoid changing another P's heap.
	mp := acquirem()

	// 获取当前 g 所绑定的 m,然后再拿到绑定的 p
	pp := getg().m.p.ptr()
	lock(&pp.timersLock)
	// 首先做清除,清除那些已经标记为删除的 timer
	cleantimers(pp)
	// 然后将当前的 timer 加入到当前 p 所属的 timer 列表中
	doaddtimer(pp, t)
	unlock(&pp.timersLock)

	wakeNetPoller(when)

	releasem(mp)
}
代码语言:javascript复制
// doaddtimer adds t to the current P's heap.
// The caller must have locked the timers for pp.
func doaddtimer(pp *p, t *timer) {
	// ...............
	
	t.pp.set(pp)
	i := len(pp.timers)
	// 这里的操作和之前类似,只不过这次不是放在桶里了,而是放到了 P 上,放完之后依旧是堆化
	pp.timers = append(pp.timers, t)
	siftupTimer(pp.timers, i)
	if t == pp.timers[0] {
		atomic.Store64(&pp.timer0When, uint64(t.when))
	}
	atomic.Xadd(&pp.numTimers, 1)
}
代码语言:javascript复制
type p struct {
	// ..............

	// Lock for timers. We normally access the timers while running
	// on this P, but the scheduler can also do it from a different P.
	timersLock mutex

	// P 里面是有一个专门的地方来保存这个 timer 堆的
	// Actions to take at some time. This is used to implement the
	// standard library's time package.
	// Must hold timersLock to access.
	timers []*timer

	// Number of timers in P's heap.
	// Modified using atomic instructions.
	numTimers uint32

	// Number of timerDeleted timers in P's heap.
	// Modified using atomic instructions.
	deletedTimers uint32
	// ..............
}

这里可以看到我们的 timer 堆已经不再是一个放在全局各个桶下面的了,而是在 P 内部保存 timer 堆,其他和原来的基本思路一致

deltimer

删除和原来的操作就不一样了,原先删除后会直接进行堆的操作,而在新版本中不是的,只是标记了状态,根据当前不同的状态进行操作,如:没有运行怎么办,或已经运行了怎么办,当前还未被添加….

而在 cleantimers 方法中会对已经标记为删除的 timer 做相对应的处理

代码语言:javascript复制
// deltimer deletes the timer t. It may be on some other P, so we can't
// actually remove it from the timers heap. We can only mark it as deleted.
// It will be removed in due course by the P whose heap it is on.
// Reports whether the timer was removed before it was run.
func deltimer(t *timer) bool {
	for {
		switch s := atomic.Load(&t.status); s {
		case timerWaiting, timerModifiedLater:
			// Prevent preemption while the timer is in timerModifying.
			// This could lead to a self-deadlock. See #38070.
			mp := acquirem()
			if atomic.Cas(&t.status, s, timerModifying) {
				// Must fetch t.pp before changing status,
				// as cleantimers in another goroutine
				// can clear t.pp of a timerDeleted timer.
				tpp := t.pp.ptr()
				if !atomic.Cas(&t.status, timerModifying, timerDeleted) {
					badTimer()
				}
				releasem(mp)
				atomic.Xadd(&tpp.deletedTimers, 1)
				// Timer was not yet run.
				return true
			} else {
				releasem(mp)
			}
		case timerModifiedEarlier:
			// Prevent preemption while the timer is in timerModifying.
			// This could lead to a self-deadlock. See #38070.
			mp := acquirem()
			if atomic.Cas(&t.status, s, timerModifying) {
				// Must fetch t.pp before setting status
				// to timerDeleted.
				tpp := t.pp.ptr()
				if !atomic.Cas(&t.status, timerModifying, timerDeleted) {
					badTimer()
				}
				releasem(mp)
				atomic.Xadd(&tpp.deletedTimers, 1)
				// Timer was not yet run.
				return true
			} else {
				releasem(mp)
			}
		case timerDeleted, timerRemoving, timerRemoved:
			// Timer was already run.
			return false
		case timerRunning, timerMoving:
			// The timer is being run or moved, by a different P.
			// Wait for it to complete.
			osyield()
		case timerNoStatus:
			// Removing timer that was never added or
			// has already been run. Also see issue 21874.
			return false
		case timerModifying:
			// Simultaneous calls to deltimer and modtimer.
			// Wait for the other call to complete.
			osyield()
		default:
			badTimer()
		}
	}
}

何时触发?

那么问题来了,在新版本里面是什么时候出发的。其实如果之前没有看过调度相关的源码还真的有点难找。

  1. schedule -> checkTimers -> runtimer
  2. stealWork -> checkTimers -> runtimer
  3. findrunnable -> checkTimers -> runtimer

其实是当调度的时候触发的 timer 检查,检查的时候触发的对应执行。而且如果你第一次看你会觉得神奇,为什么 work steal 的时候还会进行 timer 的检查呢?我们慢慢往下看。

代码语言:javascript复制
// One round of scheduler: find a runnable goroutine and execute it.
// Never returns.
func schedule() {
	_g_ := getg()

	// ....................

top:
	pp := _g_.m.p.ptr()
	pp.preempt = false

	if sched.gcwaiting != 0 {
		gcstopm()
		goto top
	}
	if pp.runSafePointFn != 0 {
		runSafePointFn()
	}

	// Sanity check: if we are spinning, the run queue should be empty.
	// Check this before calling checkTimers, as that might call
	// goready to put a ready goroutine on the local run queue.
	if _g_.m.spinning && (pp.runnext != 0 || pp.runqhead != pp.runqtail) {
		throw("schedule: spinning with local work")
	}

	checkTimers(pp, 0)

	// ..................
}
代码语言:javascript复制
// Finds a runnable goroutine to execute.
// Tries to steal from other P's, get g from local or global queue, poll network.
func findrunnable() (gp *g, inheritTime bool) {
	_g_ := getg()

	// The conditions here and in handoffp must agree: if
	// findrunnable would return a G to run, handoffp must start
	// an M.

top:
	_p_ := _g_.m.p.ptr()
	if sched.gcwaiting != 0 {
		gcstopm()
		goto top
	}
	if _p_.runSafePointFn != 0 {
		runSafePointFn()
	}

	now, pollUntil, _ := checkTimers(_p_, 0)

	// ..................

这里我们可以看到,确实是在调度的时候触发的 checkTimers 方法

代码语言:javascript复制
func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) {
	// If it's not yet time for the first timer, or the first adjusted
	// timer, then there is nothing to do.
	next := int64(atomic.Load64(&pp.timer0When))
	nextAdj := int64(atomic.Load64(&pp.timerModifiedEarliest))
	if next == 0 || (nextAdj != 0 && nextAdj < next) {
		next = nextAdj
	}

	if next == 0 {
		// No timers to run or adjust.
		return now, 0, false
	}

	if now == 0 {
		now = nanotime()
	}
	if now < next {
		// Next timer is not ready to run, but keep going
		// if we would clear deleted timers.
		// This corresponds to the condition below where
		// we decide whether to call clearDeletedTimers.
		// 当下一次触发实现还没有到的时候,这里有一个小细节,当需要删除 timer 个数小于 1/4 的时候是不操作的,直接返回,也就是说等着批量一起处理
		if pp != getg().m.p.ptr() || int(atomic.Load(&pp.deletedTimers)) <= int(atomic.Load(&pp.numTimers)/4) {
			return now, next, false
		}
	}

	lock(&pp.timersLock)

	if len(pp.timers) > 0 {
		// 进行当前 p 的 timer 堆的调整,这个方法里面还有很多细节,这里不展开,推荐看一眼
		adjusttimers(pp, now)
		for len(pp.timers) > 0 {
			// Note that runtimer may temporarily unlock
			// pp.timersLock.
			// 如果有需要执行的 timer 的话,那么就调用 runtimer 方法去执行
			if tw := runtimer(pp, now); tw != 0 {
				if tw > 0 {
					pollUntil = tw
				}
				break
			}
			ran = true
		}
	}

	// If this is the local P, and there are a lot of deleted timers,
	// clear them out. We only do this for the local P to reduce
	// lock contention on timersLock.
	if pp == getg().m.p.ptr() && int(atomic.Load(&pp.deletedTimers)) > len(pp.timers)/4 {
		clearDeletedTimers(pp)
	}

	unlock(&pp.timersLock)

	return now, pollUntil, ran
}

那么我们就来赶紧看看 runtimer 方法到底是如何运行的吧

runtimer

这个方法其实非常简单,就是将堆顶元素取出来看状态,根据不同状态去处理,如果满足运行时间则运行

代码语言:javascript复制
func runtimer(pp *p, now int64) int64 {
	for {
		// 取出堆顶元素
		t := pp.timers[0]
		if t.pp.ptr() != pp {
			throw("runtimer: bad p")
		}
		switch s := atomic.Load(&t.status); s {
		case timerWaiting:
			if t.when > now {
				// Not ready to run.
				return t.when
			}

			if !atomic.Cas(&t.status, s, timerRunning) {
				continue
			}
			// Note that runOneTimer may temporarily unlock
			// pp.timersLock.
			// 如果已经到了当前触发时间,就运行当前这个 timer
			runOneTimer(pp, t, now)
			return 0
		case timerDeleted:
			//.....................
		case timerModifiedEarlier, timerModifiedLater:
			//.....................
		case timerModifying:
			osyield()
		case timerNoStatus, timerRemoved:
			badTimer()
		case timerRunning, timerRemoving, timerMoving:
			badTimer()
		default:
			badTimer()
		}
	}
}

运行其实和原来的逻辑是一样的

代码语言:javascript复制
func runOneTimer(pp *p, t *timer, now int64) {
	//.....................

	f := t.f
	arg := t.arg
	seq := t.seq

	// 这里的逻辑和原来的 timerproc 中的逻辑是一致的
	if t.period > 0 {
		// Leave in heap but adjust next time to fire.
		delta := t.when - now
		t.when  = t.period * (1   -delta/t.period)
		if t.when < 0 { // check for overflow.
			t.when = maxWhen
		}
		siftdownTimer(pp.timers, 0)
		if !atomic.Cas(&t.status, timerRunning, timerWaiting) {
			badTimer()
		}
		updateTimer0When(pp)
	} else {
		// Remove from heap.
		dodeltimer0(pp)
		if !atomic.Cas(&t.status, timerRunning, timerNoStatus) {
			badTimer()
		}
	}

	//.....................

	unlock(&pp.timersLock)

	// 这里就是真正执行触发的方法了
	f(arg, seq)

	lock(&pp.timersLock)

	//.....................
}

moveTimer

你以为这样就没有了?还有什么问题我们没有考虑到呢?我们现在已经知道新版本的 timer 堆是在 P 上的了,那么问题来了,当 P 被销毁的时候,可能当前的 P 上还有 timer 呢,那这些 timer 应该怎么办?当然是移走咯

代码语言:javascript复制
func (pp *p) destroy() {
	assertLockHeld(&sched.lock)
	assertWorldStopped()

	//.....................

	if len(pp.timers) > 0 {
		// 找一个别的 P
		plocal := getg().m.p.ptr()
		// The world is stopped, but we acquire timersLock to
		// protect against sysmon calling timeSleepUntil.
		// This is the only case where we hold the timersLock of
		// more than one P, so there are no deadlock concerns.
		lock(&plocal.timersLock)
		lock(&pp.timersLock)
		// 这里把当前 P 上的 timer 都移走
		moveTimers(plocal, pp.timers)
	//.....................
}

版本对比

看完了源码你会发现,1.14 前后 timer 变化点主要在两个方面:

  1. 存储方式由原来的放在全局的桶里转而放到了 P 上
  2. 触发方式的由原来的单个 goroutine 方法循环定期触发改为调度中触发

接下来就是篇的最后重点部分了:为什么 1.14 前后 timer 需要做这样的优化?更快了吗?

我用图来让你快速明白为什么会有这个改动。

存储结构

存储结构上的改变很容易看出来,就是从原来的桶换成了 P,那么为什么呢?

问题关键

PS:图中的 TP 意思是 运行 timerproc 的 G

可以看到,改动之前,timer 的触发需要频繁的做 M 和 P 的绑定和解绑操作。

就这?对这就是问题的关键。我们举个例子,如果有一个 ticker 每秒触发一次,每触发一次就需要绑定一次 M 解绑一次,而当系统中的 timer 越来越多,那么随之带来的就是越加频繁的切换了。

而改动之后,timer 的触发是在调度循环里面,而且存储在本地的 P 中,所以没有了绑定和解绑的过程,也不再需要一个运行 timerproc goroutine 单独去维护触发。

总结

下面回顾总结几个点:

  1. timer 堆从原有的桶移动到了 P 上,是为了解决频繁切换 MP 的问题。
  2. 因为 checkTimers 是在调度循环里面执行的,所以一些操作被延后执行,比如删除 timer 的操作只是修改状态,而懒到后面一起去执行。
  3. 其实 timer 的设计说到底还是一个堆的存储,然后堆顶就是下一次最近要执行的 timer。

总的来说 timer 的实现还是比较清晰的,其实更老的版本中,一开始 timer 的实现的堆只有一个,而为了优化全局锁的并发性能才出现了 64 个桶这样的结构,然后又发现了切换的性能问题,继续优化才有了现在的 timer。所以其实现在看来很多 go 里面复杂的设计原本都是也是由一个非常简单的设计演变而来的。

0 人点赞