定时器实现原理剖析

2022-08-15 14:39:43 浏览数 (1)

时间对于我们来说都不陌生,每时每刻都能感受到它的存在。时间又是一个抽象的概念,看不见摸不着。在我们编写程序的时候,对时间的使用非常频繁。本文讲述Go中时间相关函数的使用和实现原理,时间相关的源码在src下的time包和runtime包,下面的分析基于的Go的1.14版本。

常用的API

获取当前的时间戳

time.Now().Unix()返回自从1970年1月1日以来到现在的秒数。它不依赖地理位置时区。

代码语言:javascript复制
func main() {
 fmt.Println(time.Now().Unix())
}
当前时间字符串表示

time.Now().Format("2006-01-02 15:04:05")将当前的时间按照Format中的格式以字符串的形式显示。下面实例会将当前的时间以YYYY-MM-DD hh:mm:ss格式显示。

代码语言:javascript复制
fmt.Println(time.Now().Format("2006-01-02 15:04:05"))
date转时间戳

time.Date将一个日期时间转成time.Time类型,通过Time提供的Unix()函数可以知道当前的时间戳

代码语言:javascript复制
 currentTime := time.Date(2021, 6, 6, 10, 18, 11, 0, time.Local)
 fmt.Println(currentTime.Unix())
 // 1622945891
time.Parse

time.Parse将时间格式的字符串转成tim.Time

代码语言:javascript复制
currentTime, _ = time.Parse("2006-01-02 15:04:05", "2021-06-06 10:24:40")
 fmt.Println(currentTime.Unix())
 // 1622975080

前面已经初步简要介绍了几个time常见的API.下面介绍定时器的内容,定时器与时间息息相关。在Go中,定时器并不是通过sigalarm signal实现的,而是通过堆(四叉堆)实现的。

定时器API

下面介绍time包提供定时器相关的API,具体列举如下:

代码语言:javascript复制
t:=time.Tick(time.Second)
 <-t

 t=time.After(time.Second)
 <-t
 
 timer:=time.NewTimer(time.Second)
 <-timer.C
 
 ticker:=time.NewTicker(time.Second)
 <-ticker.C
 
 timer=time.AfterFunc(time.Second,func() {})
 

实现原理

先来看看Timer的结构,它包含一个Time类型的Chan一个runtimeTimer类型,C是长度为1的通道。r是timer的真正实现结构体,在runtime包中也有一个timer定义,与这里的runtimeTimer结构是一模一样的。timer的真正实现逻辑是在runtime包处理的。runtimeTimer结构包含一个定时器触发的时间点when, 执行P的地址pp, timer会被挂在P的结构中的timers中,pp执行P可以很方便的从定时器找到他所挂的P. f时间触发之后要执行的逻辑功能,arg是传递给执行函数f的参数。period是下一次被触发的时间,即两次触发之间的间隔时间。

代码语言:javascript复制
// Timer Timer结构体,包含一个Time类型的chan和一个runtimeTimer结构体
// chan用来在时间达到后,发送一个通知给调用方
type Timer struct {
 C <-chan Time
 r runtimeTimer
}

// runTimer 是timer的真正实现结构体,runtime包中的timer定义与这里是一致的
// timer真正实现的相关操作是在runtime中完成的
type runtimeTimer struct {
 // 指向P(GMP模型中的处理器)的地址
 pp uintptr
 // 定时器在什么时候触发
 when int64
 // 下一次触发的时间,即两次触发之间的间隔时间
 period int64
 // 时间触发后执行的函数处理
 f func(interface{}, uintptr) // NOTE: must not be closure
 // 传递给处理函数f的参数
 arg interface{}
 // seq看源码中并没有使用
 seq uintptr
 // 当timer被修改时,timer位于修改状态下,下一次触发时间
 nextwhen int64
 // 定时timer的状态
 status uint32
}

下面看timer是如何创建的,time包提供了构造方法NewTimer创建一个*timer类型的对象,time类型的通道会初始为1,填充触发时间点when和执行函数f及参数。然后执行startTimer启动定时器,startTime真正实现在runtime包中的time.go中

代码语言:javascript复制
// 创建一个定时器timer结构,可以在d duration后从timer的通道中读取通知信息
func NewTimer(d Duration) *Timer {
 // 定义一个有缓冲的Time类型的Chan,缓冲区的大小为1
 c := make(chan Time, 1)
 t := &Timer{
  C: c,
  r: runtimeTimer{
   // when是触发时间,当前时间 d
   when: when(d),
   // 触发时执行的操作,这里是向c中发送当前时间
   f:    sendTime,
   // 当f被执行的时候,传递给f的参数,这里是Chan c
   arg:  c,
  },
 }
 // 启动定时器,startTimer真正实现在runtime包中的time.go中
 startTimer(&t.r)
 return t
}

// when 计算一个过期时间值,例如3秒后过期,将返回当前时间 3s后的时间值
// 如果传入的时间值<=0
func when(d Duration) int64 {
 if d <= 0 {
  return runtimeNano()
 }
 // t是一个精确到纳秒的时间
 t := runtimeNano()   int64(d)
 if t < 0 {
  t = 1<<63 - 1 // math.MaxInt64
 }
 return t
}

time包中的startTimer会链接到下面的处理函数,startTimer真正处理是在addtimer函数中,继续看addtimer具体做了什么。

  • 会设置t的状态为timerWaiting
  • 获取当前运行的P,对P中的定时器执行cleantimers
  • 将t加入到P中
  • 唤醒netPoller中休眠的线程
代码语言:javascript复制
// time包中的startTimer会链接到这里,真正处理的是addtimer函数
func startTimer(t *timer) {
 if raceenabled {
  racerelease(unsafe.Pointer(t))
 }
 addtimer(t)
}

// addtimer 将一个timer添加到P中,该函数只在新建一个timer的时候被调用
//
func addtimer(t *timer) {
 // when不能为负数,否则可能会导致增量计算时溢出,导致runtime中的timer永远不会过期

 if t.when < 0 {
  t.when = maxWhen
 }
 // 刚添加的timer,走到这里还未初始化状态,如果状态不是timerNoStatus说明出问题了
 if t.status != timerNoStatus {
  throw("addtimer called with initialized timer")
 }
 // 设置timer的状态为timerWaiting
 t.status = timerWaiting

 when := t.when

 // pp是当前调度G的P
 pp := getg().m.p.ptr()
 lock(&pp.timersLock)
 // 清理掉pp中timers队列中头部处于timerDeleted、timerModifiedEarlier、
 // timerModifiedLater状态的timer
 cleantimers(pp)
 // 将定时器t添加到P中的定时器切片中保存起来,t和P进行了双向绑定
 // 即t.pp指向了P,P对象中的timers维护了所有的timers
 doaddtimer(pp, t)
 unlock(&pp.timersLock)
    // 唤醒netPoller中休眠的线程
 wakeNetPoller(when)
}

接下进一步分析上面addtimer处理逻辑中的几个重要调用函数。cleantimers会清理P中维护的定时器队列中头部处于删除状态、修改为更早、更晚触发的定时器。

代码语言:javascript复制
// cleantimers 清理P中维护的定时器队列中头部处于删除状态、修改为更早、更晚触发的定时器
func cleantimers(pp *p) {
 for {
  if len(pp.timers) == 0 {
   return
  }
  t := pp.timers[0]
  if t.pp.ptr() != pp {
   throw("cleantimers: bad p")
  }
  switch s := atomic.Load(&t.status); s {
  case timerDeleted:
   // 将t的状态从timerDeleted修改为timerRemoving
   if !atomic.Cas(&t.status, s, timerRemoving) {
    continue
   }
   // 将定时器t从pp的队列中删除
   dodeltimer0(pp)
   // 将t的状态从timerRemoving修改为timerRemoved
   if !atomic.Cas(&t.status, timerRemoving, timerRemoved) {
    badTimer()
   }
   // 将pp中的需要删除的定时器数量减少一个
   atomic.Xadd(&pp.deletedTimers, -1)
  case timerModifiedEarlier, timerModifiedLater:
   // 将t的状态从timerModifiedEarlier/timerModifiedLater修改为timerMoving状态
   if !atomic.Cas(&t.status, s, timerMoving) {
    continue
   }
   // Now we can change the when field.
   // 因为时间已经调整,将调整后的时间nextwhen给when,成为下次触发时间
   t.when = t.nextwhen
   // Move t to the right position.
   // 通过先删除后添加的方式将t放到正确的位置
   dodeltimer0(pp)
   doaddtimer(pp, t)
   // 记录向前调整的定时器数量减少1个
   if s == timerModifiedEarlier {
    atomic.Xadd(&pp.adjustTimers, -1)
   }
   // 将t的状态从timeMoving修改为timerWaiting
   if !atomic.Cas(&t.status, timerMoving, timerWaiting) {
    badTimer()
   }
  default:
   // Head of timers does not need adjustment.
   // 其他状态的定时器不需要调整,直接返回
   return
  }
 }
}

doaddtimer将定时器t加入到P的tiemrs切片中,然后对P的timers进行调整,要调整成四叉堆的结构,调节的顺序是按t.when即触发时间,处于堆顶的元素是最早要触发的定时器。

代码语言:javascript复制
// doaddtimer将定时器t加入到P的堆中
func doaddtimer(pp *p, t *timer) {
 // Timers rely on the network poller, so make sure the poller
 // has started.
 // timer依赖与网络轮询器的调度,所以添加之前要确保轮询器poller已经开始
 if netpollInited == 0 {
  // 初始化轮询器
  netpollGenericInit()
 }

 // 还没加入到pp中,这时的t是还没绑定到P的,它的pp是默认值0
 if t.pp != 0 {
  throw("doaddtimer: P already set in timer")
 }
 // 将定时器t绑定到P上
 t.pp.set(pp)
 i := len(pp.timers)
 // 将当前的定时器t加入到P中timer中
 pp.timers = append(pp.timers, t)
 // 将刚加入的定时器t调整到四叉堆的合适位置
 siftupTimer(pp.timers, i)
 // 如果t是最早触发的定时器,将触发的时间保存到pp的time0When中
 if t == pp.timers[0] {
  atomic.Store64(&pp.timer0When, uint64(t.when))
 }
 // 将P中记录定时器数量的变量numTimers加1
 atomic.Xadd(&pp.numTimers, 1)
}

wakeNetPoller 唤醒正在netpoll休眠的线程,唤醒的条件有两个:

  • when的值小于pollUntil时间
  • pollUntil为0 timer为什么需要唤醒netpoll,跟netpoll有什么关系呢?在1.14版中将timer和netpoll统一了起来,不管是网络轮休还是定时器都是某个条件满足了,对于网络来说是有数据包了,对定时器来说是时间到了。netpoll中的poll_runtime_pollSetDeadline函数用来设置pollDesc结构体的deadline相关字段,可以为每个fd设置读超时以及写超时。其原理就是为pollDesc结构体添加一个timer,timer的f函数设置为netpolldeadlineimpl函数。netpolldeadlineimpl函数会运行pollDesc结构体的协程,即使pollDesc中的事件没有被epoll触发,因为deadline到了。
代码语言:javascript复制
// wakeNetPoller 唤醒正在netpoll休眠的线程, 唤醒的条件是when的值小于
// pollUntil的时间,或者pollUntil为0
func wakeNetPoller(when int64) {
 if atomic.Load64(&sched.lastpoll) == 0 {
  pollerPollUntil := int64(atomic.Load64(&sched.pollUntil))
  if pollerPollUntil == 0 || pollerPollUntil > when {
   netpollBreak()
  }
 }
}

netpollBreak进行唤醒操作,实现方法是往管道netpollBreakWr写数据,这样netpoll自然会被唤醒。

代码语言:javascript复制
// netpollBreak 进行唤醒操作,实现方式是,netpollBreakWr是一个管道,用
// write给netpollBreakWr写数据,这样netpoll自然会被唤醒
func netpollBreak() {
 for {
  var b byte
  n := write(netpollBreakWr, unsafe.Pointer(&b), 1)
  if n == 1 || n == -_EAGAIN {
   break
  }
  if n == -_EINTR {
   continue
  }
  println("runtime: netpollBreak write failed with", -n)
  throw("runtime: netpollBreak write failed")
 }
}

下面看Stop定时器,执行了哪些操作. time包中Stop操作,核心调的是stopTimer函数,该函数在runtime包中实现。

代码语言:javascript复制
// Stop 停止定时器接口,阻止定时器被触发。如果定时器已经过期或者被停止掉,次函数将返回
// false,否则返回true, 调用Stop并不会关闭定时器中的Chan,以防止通道不能成功的读取
// Stop不能并发的执行, 对于通过AfterFunc创建的timer, 如果定时器已经到期并且f在它自己的goroutine中运行
// 调用Stop将返回false. 如果调用者想知道f是否执行完毕,需要自己实现与f进行协调

func (t *Timer) Stop() bool {
 if t.r.f == nil {
  panic("time: Stop called on uninitialized Timer")
 }
 return stopTimer(&t.r)
}

stopTimer处理逻辑调用的是deltimer函数,根据t的状态会做不同的处理,如果定时器处于timerWaiting/timerModifiedLater/timerModifiedEarlier状态,先将其状态修改为timerModifying状态,最后修改为timerDeleted状态, 如果定时器处于其他状态,待状态改变或直接返回。deltimer函数并不是直接删除定时器,而是将其状态标记为删除状态,是为了防止并发冲突。真正执行删除的逻辑是在各个P上完成的。

代码语言:javascript复制
// 停止掉定时器
func stopTimer(t *timer) bool {
 return deltimer(t)
}

// deltimer 删除定时器,将定时器t从P上删除。注意这里的删除并不是真正从P移除了,而是
// 将t的状态标记为删除,是为了防止并发冲突。然后每个P真正删除逻辑会将自己上面的标记为
// 删除状态的定时器删除
func deltimer(t *timer) bool {
 for {
  switch s := atomic.Load(&t.status); s {
  case timerWaiting, timerModifiedLater:
   
   // 对处于等待状态,延迟触发的定时器将其状态标记为timerModifying
   mp := acquirem()
   if atomic.Cas(&t.status, s, timerModifying) {
   
    // t的pp字段指向了P的位置,可以方便的从t知道它被挂在哪个P下
    tpp := t.pp.ptr()
    if !atomic.Cas(&t.status, timerModifying, timerDeleted) {
     badTimer()
    }
    releasem(mp)
    // 对P进行操作,将记录删除定时器的数量减1
    atomic.Xadd(&tpp.deletedTimers, 1)
    // Timer was not yet run.
    // 因为定时器还未被触发,返回true
    return true
   } else {
    releasem(mp)
   }
  case timerModifiedEarlier:
   
   // 定时器处于提前触发状态,将其状态修改为timerModifying
   mp := acquirem()
   if atomic.Cas(&t.status, s, timerModifying) {
   
    tpp := t.pp.ptr()
    // 定时器的状态将被修改为timerDeleted,所以维护调整状态的定时器数量-1
    atomic.Xadd(&tpp.adjustTimers, -1)
    if !atomic.Cas(&t.status, timerModifying, timerDeleted) {
     badTimer()
    }
    releasem(mp)
    // 对P进行操作,将记录删除定时器的数量减1
    atomic.Xadd(&tpp.deletedTimers, 1)
    // Timer was not yet run.
    // 因为定时器还未被触发,返回true
    return true
   } else {
    releasem(mp)
   }
  case timerDeleted, timerRemoving, timerRemoved:
   // Timer was already run.
   // 定时器已经处于移除状态了,返回false处理,这种情况在重复调用t.Stop会发生
   return false
  case timerRunning, timerMoving:
   
   // 定时器已经被触发或正在移动状态,让出当前的处理
   osyield()
  case timerNoStatus:
   // 定时器还未添加或已经运行完了,返回false
   return false
  case timerModifying:
   // 定时器处于修改状态,并发的调用deltimer和modtimer会可能会走到这里,让出当前处理
   osyield()
  default:
   badTimer()
  }
 }
}

定时器Reset操作,用于将一个已有定时器的触发时间进行修改,可以往前修改也可以往后修改。Reset应该在定时器停止之后或过期之后并且通道数据取走之后被调用,如果程序已经从t.C中取走了值,定时器已经到期,可以直接执行t.Reset操作,如果程序还未从t.C中取走值,在调用t.Reset之前必须先停止掉定时器,并将通道的数据取干净。

代码语言:javascript复制
// Reset应该在定时器停止之后或过期之后并且通道数据取走之后被调用
// 如果程序已经从t.C中取走了值,定时器已经到期,可以直接执行t.Reset操作
// 如果程序还未从t.C中取走值,在调用t.Reset之前必须先停止掉定时器,并将通道的数据取干净

func (t *Timer) Reset(d Duration) bool {
 if t.r.f == nil {
  panic("time: Reset called on uninitialized Timer")
 }
 w := when(d)
 active := stopTimer(&t.r)
 resetTimer(&t.r, w)
 return active
}

resettimer完成Reset的真正处理逻辑,如果一个定时器处于不活动状态,调用resettimer将变成激活状态。如果一个定时器已经被使用或者可能被使用,应该调用resettimer而不是addtimer.

代码语言:javascript复制
// resettimer 重置一个定时器的触发时间,如果一个定时器处于不活动状态,调用resettimer
// 将变成活动状态。如果一个定时器已经被使用或者可能被使用,应该调用resettimer而不是addtimer
func resettimer(t *timer, when int64) {
 modtimer(t, when, t.period, t.f, t.arg, t.seq)
}

// modtimer 修改已存在定时器的触发时间
func modtimer(t *timer, when, period int64, f func(interface{}, uintptr), arg interface{}, seq uintptr) {
 if when < 0 {
  when = maxWhen
 }

 status := uint32(timerNoStatus)
 wasRemoved := false
 var mp *m
loop:
 for {
  switch status = atomic.Load(&t.status); status {
  case timerWaiting, timerModifiedEarlier, timerModifiedLater:
   
   // 将定时器修改为正在修改状态
   mp = acquirem()
   if atomic.Cas(&t.status, status, timerModifying) {
    break loop
   }
   releasem(mp)
  case timerNoStatus, timerRemoved:
   
   mp = acquirem()

   // 将定时器修改为正在修改状态
   if atomic.Cas(&t.status, status, timerModifying) {
    wasRemoved = true
    break loop
   }
   releasem(mp)
  case timerDeleted:
   
   // 将定时器修改为正在修改状态
   mp = acquirem()
   if atomic.Cas(&t.status, status, timerModifying) {
    atomic.Xadd(&t.pp.ptr().deletedTimers, -1)
    break loop
   }
   releasem(mp)
  case timerRunning, timerRemoving, timerMoving:
   
   // 定时器t正在运行、正在被移除、正在移动状态,让出当前处理
   osyield()
  case timerModifying:
   // 为啥不跟上面的合并处理呢? 处于修改状态的定时器,也让出当前处理
   osyield()
  default:
   badTimer()
  }
 }

 // 更新t的period、执行函数、参数、seq
 t.period = period
 t.f = f
 t.arg = arg
 t.seq = seq

 // 处于timerNoStatus, timerRemoved下的定时器已进不在P中,需要重新加入
 if wasRemoved {
  t.when = when
  pp := getg().m.p.ptr()
  lock(&pp.timersLock)
  // 将t加入到pp中
  doaddtimer(pp, t)
  unlock(&pp.timersLock)
  if !atomic.Cas(&t.status, timerModifying, timerWaiting) {
   badTimer()
  }
  releasem(mp)
  // 唤醒net poller进行处理
  wakeNetPoller(when)
 } else {
  
  // when不能直接给t.when, 此时的t还在某个P中,如果直接修改将导致无序,
  // 所以这里将when给t.nextwhen,让各自的P在处理的时候将nextwhen的值
  // 赋值给when
  t.nextwhen = when

  // newStatus记录本次时间调整对对应的状态,是提前还是延迟
  newStatus := uint32(timerModifiedLater)
  if when < t.when {
   newStatus = timerModifiedEarlier
  }

  // 更新adjust的值,如果是将定时器从timerModifiedEarlier移除,
  // adjust-1 ,如果是将定时器调整到timerModifiedEarlier,
  // adjust 1
  adjust := int32(0)
  if status == timerModifiedEarlier {
   adjust--
  }
  if newStatus == timerModifiedEarlier {
   adjust  
  }
  if adjust != 0 {
   atomic.Xadd(&t.pp.ptr().adjustTimers, adjust)
  }

  // 将定时器的状态设置为新的状态
  if !atomic.Cas(&t.status, timerModifying, newStatus) {
   badTimer()
  }
  releasem(mp)

  if newStatus == timerModifiedEarlier {
   wakeNetPoller(when)
  }
 }
}

timer是如何被触发运行的?有两种方式会触发timer运行.

  • 第一种是在调度循环中直接检查是否有满足定时器直接触发
  • 第二种是Go的后台监控中会定时检查是否有定时器需要触发。在调度循环中触发定时器的函数有2个函数,schedule和findrunnable。下面从这两个函数中抽取出于定时器处理有关的逻辑。schedule中会调用checkTimers, checkTimers会对需要进行调整的timer进行调整,如果没有需要执行的定时器,直接返回,如果下一个要执行的定时器timer没有到期并且需要删除的定时器占整个定时器的比例小于1/4也会直接返回。之后调用adjusttimers进行定时器的调整,调整成四叉堆的结构,在调用runtimer查找堆中是否存在需要执行的timer。最后根据当前goroutine的P和传入的 P 相同,并且需要删除的 timer 超过了 timer 列表数量的四分之一,那么调用clearDeletedTimers 清理需要删除的timer.
代码语言:javascript复制
func schedule() {
    ...
 checkTimers(pp, 0)

 var gp *g
 var inheritTime bool
    ...
 if gp == nil {
  gp, inheritTime = findrunnable() // blocks until work is available
 }
    ...
}


// checkTimers检查P中的定时器触发时间是否已满足
func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) {
 // If there are no timers to adjust, and the first timer on
 // the heap is not yet ready to run, then there is nothing to do.
 // 检查是否有处于调整时间过程中的定时器,如果没有会进入if逻辑。检查最早被触发的定时
 // 器的运行时间,即pp.time0When,如果当前时间还没达到最早定时器的触发时间。检查
 // 待删除的定时器数量是否小于等待所有定时器的数量的1/4, 如果是直接返回。如果不是
 // 执行清理操作,将deleted状态的定时器删除
 if atomic.Load(&pp.adjustTimers) == 0 {
  next := int64(atomic.Load64(&pp.timer0When))
  if next == 0 {
   return now, 0, false
  }
  if now == 0 {
   now = nanotime()
  }
  if now < next {
   if pp != getg().m.p.ptr() || int(atomic.Load(&pp.deletedTimers)) <= int(atomic.Load(&pp.numTimers)/4) {
    return now, next, false
   }
  }
 }

 lock(&pp.timersLock)

 // 调整定时器
 adjusttimers(pp)

 rnow = now
 if len(pp.timers) > 0 {
  if rnow == 0 {
   rnow = nanotime()
  }
  for len(pp.timers) > 0 {
   // 运行定时器
   if tw := runtimer(pp, rnow); tw != 0 {
    if tw > 0 {
     pollUntil = tw
    }
    break
   }
   ran = true
  }
 }

 // If this is the local P, and there are a lot of deleted timers,
 // clear them out. We only do this for the local P to reduce
 // lock contention on timersLock.
 // P中待删除的定时器数大于所有定时器数量的1/4,执行删除操作
 if pp == getg().m.p.ptr() && int(atomic.Load(&pp.deletedTimers)) > len(pp.timers)/4 {
  clearDeletedTimers(pp)
 }

 unlock(&pp.timersLock)

 return rnow, pollUntil, ran
}

findrunnable主要是窃取可运行的G,在窃取前先会调用checkTimers 检查 P 中可执行的 timer,如果 netpoll 中有等待的 waiter,那么会调用 netpoll 尝试无阻塞的从netpoller获取Glist,如果获取不到可执行的 G,那么就会开始执行窃取。窃取的时候会调用 checkTimers 随机从其他的 P 中获取 timer,窃取完毕后也没有可执行的 timer,那么会继续往下,休眠前再次检查 netpoll 网络,调用 netpoll函数进行阻塞调用。

代码语言:javascript复制
func findrunnable() (gp *g, inheritTime bool) {
 ...
 // 从其他P中偷G到当前的处理P中
 for i := 0; i < 4; i   {
  for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
   if sched.gcwaiting != 0 {
    goto top
   }
   // 在循环的最后一轮,如果其他P运行队列中没有G,将从其他队列的runnext中获取
   stealRunNextG := i > 2 // first look for ready queues with more than 1 g
   p2 := allp[enum.position()]
   if _p_ == p2 {
    continue
   }
   // 从其他的P的运行队列中获取一般的G到当前的队列
   if gp := runqsteal(_p_, p2, stealRunNextG); gp != nil {
    return gp, false
   }
            ...
   // 如果运行队列中没有G,那么从timers中获取可执行的timer
   if i > 2 && shouldStealTimers(p2) {
    tnow, w, ran := checkTimers(p2, now)
    now = tnow
    if w != 0 && (pollUntil == 0 || w < pollUntil) {
     pollUntil = w
    }
  ...
}

监控线程sysmon会通过timeSleepUntil 遍历所有的 P 的 timer 列表,找到下一个需要执行的 timer,如果超过 10ms 没有 poll,则 poll 一下网络,如果有 timer 到期,这个时候直接启动新的 M 处理 timer.

代码语言:javascript复制
func sysmon() {
       ...
  // poll network if not polled for more than 10ms
  // 获取上次poll 轮休的时间
  lastpoll := int64(atomic.Load64(&sched.lastpoll))
  // 如果上次轮询 network距离现在已经超过了10ms,则轮询一下网络
  if netpollinited() && lastpoll != 0 && lastpoll 10*1000*1000 < now {
   atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
   // 非阻塞,返回G列表
   list := netpoll(0)
   // 如果G列表非空,将获取到的G列表插入到空闲的P中或全局列表中
   if !list.empty() {
    incidlelocked(-1)
    injectglist(&list)
    incidlelocked(1)
   }
  }
  // 如果有定时器到期,启动新的M处理定时器
  if next < now {
   startm(nil, false)
  }
  ...
}

总结

上述分析的是1.14版的定时器实现原理,在1.14版本之前,定时器的实现方法与上面不太一样。在之前的版本,维护了一个桶结构,桶的大小为64,申请的每个定时器会分配到这64个桶中的某个桶上。然后对每个桶进行调度处理里面定时器任务。在1.14版本将 timer列表直接挂到了 P 上面,这不仅减少了上下文切换带来的性能损耗,也减少了在锁之间的争抢问题。

0 人点赞