2万字深入分析GC源码和实现原理

2022-08-15 14:47:12 浏览数 (2)

GC工作流程

本文通过详细分析Go中GC源码,深入了解其实现原理细节,分析的Go版本为1.14。我们知道Go垃圾回收采用的是标记清扫算法,根据算法名也可以猜到有标记和清扫两个过程,这是逻辑上的划分。在工程层面,具体实现的时候需要做一些其他工作,例如过渡阶段的处理。根据mgc.go注解的说明,GC一共分为4个阶段:

  1. 清理终止阶段(sweep termination)
  2. 标记阶段(mark phase)
  3. 标记终止阶段(mark termination)
  4. 清理阶段(sweep phase)

读到这里,可能有同学对这里4个阶段的顺序有疑问?不应该是先标记然后才有清理吗,为啥这里第1个节点是清理终止阶段,理论上应该是按照标记阶段、标记终止阶段、清理阶段、清理终止阶段呀。确实,这里的4个阶段是按照编码实现的角度顺序来说的,因为在实现的时候用了一点花式动作。GC的阶段有一个gcphase变量来记录当前GC所处的阶段,实际上这个gcphase有3种状态。

代码语言:javascript复制
// 标记阶段
setGCPhase(_GCmark)
// 标记终止阶段
setGCPhase(_GCmarktermination)
// 标记关闭
setGCPhase(_GCoff)

可以看到gcphase只是记录了标记阶段和标记终止阶段。清理阶段和清理终止阶段是隐含的阶段,没有显示状态记录在gcphase中。清理终止阶段是通过检查mheap_.sweepdone是否为1来实现的。所以看到在实现的时候(gcStart函数)上来就开始检查是否清理终止,检查上一轮清理是否结束,而不是执行标记阶段。下面用概览图描述整个GC流程,方便我们从全局理解。至于实现各个阶段的处理逻辑,有个整体认识即可,后面会结合源码进行说明。

GC触发策略

触发执行GC有3种方式,分别是定时周期性触发、满足一定内存条件触发、手动触发。整个触发方式入口判断在下面的test方法中做的,此函数会检查当前GC的状态,如果处于_GCoff阶段或者还没有开启GC或者发送过panic,则不会执行GC,直接返回。

  1. gcTriggerHeap:内存占用超过某个值会触发,具体值是多少有一个计算公式,依据上次存活以及标记的数量做自动动态变更
  2. gcTriggerTime: 周期性触发,每2分钟会强制触发执行GC,当前forcegcperiod为2分钟
  3. gcTriggerCycle:手动触发,用户程序手动调用runtime.GC()。尽管这里有3种触发执行垃圾回收的方式,但在同一个时间只能有一种触发方式触发一轮GC.这个是通过gcphase来判断的,一旦开始执行GC,gcphase就处于非_GCoff阶段,其他调用方再来执行test的时候会返回false.
代码语言:javascript复制
func (t gcTrigger) test() bool {
 if !memstats.enablegc || panicking != 0 || gcphase != _GCoff {
  return false
 }
 switch t.kind {
 case gcTriggerHeap:
  // 满足内存条件触发,当前已分配的内存memstats.heap_live超过某个设定的值memstats.gc_trigger时触发
  return memstats.heap_live >= memstats.gc_trigger
 case gcTriggerTime:
  // 定时周期性触发,即上次GC完成到当前的时间差超过forcegcperiod会执行,forcegcperiod为2分钟
  // 也就是每2分钟会强制触发执行GC 
  if gcpercent < 0 {
   return false
  }
  lastgc := int64(atomic.Load64(&memstats.last_gc_nanotime))
  return lastgc != 0 && t.now-lastgc > forcegcperiod
 case gcTriggerCycle:
  // 手动触发,用户程序手动调用runtime.GC()
  return int32(t.n-work.cycles) > 0
 }
 return true
}

执行GC的入口函数为gcStart,此函数需要传入一个gcTrigger类型的参数,gcTrigger表示触发的策略,就是上面讲的三种触发方式。下面看gcStart函数是在哪里被调用的,也就是哪些地方call了gcStart。因为有三种触发方式,所以有3个地方调用了gcStart。

「第1处」是在forcegchelper(runtime/proc.go)中,forcegchelper是一个死循环函数,每隔两分钟会进行一次gcStart调用,对应到上面的周期性触发方式。forcegchelper函数运行在一个单独的goroutine中,默认处于暂停状态,由sysmon监控线程通知它恢复运行。

代码语言:javascript复制
func init() {
 go forcegchelper()
}

// forcegchelper检查是否需要启动GC
func forcegchelper() {
 forcegc.g = getg()
 for {
  lock(&forcegc.lock)
  if forcegc.idle != 0 {
   throw("forcegc: phase error")
  }
  atomic.Store(&forcegc.idle, 1)
  goparkunlock(&forcegc.lock, waitReasonForceGGIdle, traceEvGoBlock, 1)
  // forcegchelper默认是挂起状态,由sysmon监控线程通知它是否开始运行
  if debug.gctrace > 0 {
   println("GC forced")
  }
  // 调用gcStart,进行垃圾回收
  gcStart(gcTrigger{kind: gcTriggerTime, now: nanotime()})
 }
}

「第2处」在GC函数,该函数名是大写的是可导出的,用户程序可以使用runtime.GC()手动执行垃圾回收,对应到触发方式就是手动GC.需要注意的时,GC()会阻塞调用者,直到完成该轮垃圾回收处理,包含清理阶段。最后发布堆使用描述信息。

代码语言:javascript复制
// GC 运行垃圾回收并阻塞调用者,直到垃圾回收完成。它也可能阻塞整个程序。
func GC() {
 // work.cycles记录了第N次GC,每执行一次GC,work.cycles 1
 n := atomic.Load(&work.cycles)
 gcWaitOnMark(n)

 // 调用gcStart进行第n 1次垃圾回收,走到这里,第n次GC流程可能还没有完全走完,
 // 即还有可能处于第n次GC的清扫阶段或者清扫终止阶段,不过没关系,在gcStart函数中
 // 会等待之前的清扫阶段完成,才会进行新一轮GC
 gcStart(gcTrigger{kind: gcTriggerCycle, n: n   1})

 // 等待当前这轮的GC进入标记终止阶段
 gcWaitOnMark(n   1)

 // 等待当前这轮GC的清扫阶段完成,为什么会走完整个GC流程,是因为 runtime.GC() 通常用作测试和
 // 基准测试的一部分,以使系统进入相对稳定和隔离的状态。
 for atomic.Load(&work.cycles) == n 1 && sweepone() != ^uintptr(0) {
  sweep.nbgsweep  
  Gosched()
 }
 
 // 但现在堆使用情况信息仍然反映的是第N次GC的,而不是N 1。等第N 1轮GC流程的清除完成,
 // 这个时候堆的使用情况可以反映出当前轮GC的效果。虽然扫描队列上没有更多的跨度,但
 // 可能正在同时扫描跨度,所以必须等待,等到所有的清扫完毕。
 for atomic.Load(&work.cycles) == n 1 && atomic.Load(&mheap_.sweepers) != 0 {
  Gosched()
 }

 mp := acquirem()
 cycle := atomic.Load(&work.cycles)
 if cycle == n 1 || (gcphase == _GCmark && cycle == n 2) {
  // 发布堆使用情况描述信息
  mProf_PostSweep()
 }
 releasem(mp)
}

「第3处」在mallocgc(runtime/malloc.go)函数中,mallocgc是对象分配的入口函数,在Go内存管理-下篇文章中详细的分析,这里只是挑出关键的部分。mallocgc对应到触发方式是内存占用超过了某个值。因为只有进行了内存分配,内存占用才会增加,才有可能触发GC. 在mallocgc函数中,shouldhelpgc变量用于确定是否进行GC,在分配大对象(>32KB)对象的时候,shouldhelpgc直接被赋值为true,就是说在分配大对象时必须检查是否需要进行GC,在分配小对象和微对象的时候,只有调用了c.nextFree函数,也就是在当前线程的内存管理单元中不存在空闲空间时,需要从中心缓存或者页堆中获取新的管理单元时,才有可能检查是否需要进行GC。

代码语言:javascript复制
func mallocgc(size uintptr, typ *_type, needzero bool) unsafe.Pointer {
 shouldhelpgc := false
 ...
 if size <= maxSmallSize {
  if noscan && size < maxTinySize {
   ...
   v := nextFreeFast(span)
   if v == 0 {
    v, _, shouldhelpgc = c.nextFree(tinySpanClass)
   }
   ...
  } else {
   ...
   v := nextFreeFast(span)
   if v == 0 {
    v, span, shouldhelpgc = c.nextFree(spc)
   }
    ...
  }
 } else {
  shouldhelpgc = true
  ...
 }
 ...
 if shouldhelpgc {
  if t := (gcTrigger{kind: gcTriggerHeap}); t.test() {
   gcStart(t)
  }
 }

 return x
}

内存占用满足memstats.heap_live >= memstats.gc_trigger会执行GC操作,现在我们来看这两个变量的含义。memstats.heap_live表示当前占用的堆内存大小,memstats.gc_trigger是一个目标堆内存占用值。

memstats.heap_live是怎么计算的呢?涉及heap_live操作的有3个地方,第一个地方是从中心缓存分配mspan的时候,增加heap_live的值,第二个地方是将mspan对象归还给中心缓存的时候,减少heap_live的值,第三个地方是在分配大对象的时候,增加heap_live的值。

第一个地方:中心缓存分配mspan的时候,增加heap_live的值

代码语言:javascript复制
func (c *mcentral) cacheSpan() *mspan {
 spanBytes := uintptr(class_to_allocnpages[c.spanclass.sizeclass()]) * _PageSize
 deductSweepCredit(spanBytes, 0)
...
 usedBytes := uintptr(s.allocCount) * s.elemsize
 atomic.Xadd64(&memstats.heap_live, int64(spanBytes)-int64(usedBytes))
...
 return s
}

第二个地方:将mspan对象归还给中心缓存的时候,减少heap_live的值

代码语言:javascript复制
func (c *mcentral) uncacheSpan(s *mspan) {
 ...
 n := int(s.nelems) - int(s.allocCount)
    ...
   atomic.Xadd64(&memstats.heap_live, -int64(n)*int64(s.elemsize))
 ...
}

第三个地方:分配大对象的时候,增加heap_live的值

代码语言:javascript复制
func (h *mheap) allocSpan(npages uintptr, manual bool, spanclass spanClass, sysStat *uint64) (s *mspan) {
 ...
       // spanclass.sizeclass()为0表示分配大对象
  if spanclass.sizeclass() == 0 {
   mheap_.largealloc  = uint64(npages * pageSize)
   mheap_.nlargealloc  
   atomic.Xadd64(&memstats.heap_live, int64(npages*pageSize))
  }

...
 return s
}

可见运行时只会在中心缓存分配或者释放内存管理单元以及在堆上分配大对象时才会更新,虽然牺牲了微小的精确性,因为分配出的mspan对象,有可能只分配出去了部分object,但在heap_live统计的时候是整个mspan管理的大小,这样做的目的是为了减少锁的竞争,还是值得的。

GC工作流程

gcStart是GC的启动函数,下面结合gcStart函数分析GC具体的工作流程。为了便于理解,暂时先不看gcStart的最先的清理终止阶段的处理逻辑和标记阶段前的准备工作,直接进入第一个部分标记阶段。

代码语言:javascript复制
func gcStart(trigger gcTrigger) {
...
 setGCPhase(_GCmark)
 gcBgMarkPrepare() // Must happen before assist enable.
 gcMarkRootPrepare()
 gcMarkTinyAllocs()
 atomic.Store(&gcBlackenEnabled, 1)
 gcController.markStartTime = now

 systemstack(func() {
  now = startTheWorldWithSema(trace.enabled)
  work.pauseNS  = now - work.pauseStart
  work.tMark = now
 })
...
}

在标记阶段,会做如下工作:

  1. 设置GC的状态从_GCoff修改为_GCmark,设置写屏障为启用状态
  2. gcBgMarkPrepare后台扫描需要的状态的初始化
  3. 调用gcMarkRootRrepare进行根初始化,将栈上和全局变量等根对象加入到标记队列
  4. 调用gcMarkTinyAllocs多微对象进行标记
  5. 将gcBlackenEnabled设置为1,表示用户程序和标记任务可以将对象进行涂黑操作了
  6. 调用startTheWorldWithSema启动goroutine的执行,这个时候用户程序可以运行了,后台任务也会开始标记堆中的对象。

下面对上面几个工作流程结合代码做一个功能说明:「gcBgMarkPrepare」:work.nproc和work.nwait都设置为uint32的最大值,它们开始时相等的,每个worker或辅助GC worker在开始进行标记前会将work.nwait减1,完成标记工作后又会将work.nwait加1。而work.nproc的值是不变的,所以当所有的work完成标记,work.nwait的值是和work.nproc相等的。通过work.nproc==work.nwait作为没有work在进行标记工作的一个判定。该函数要在辅助GC worker进行工作前执行。

代码语言:javascript复制
func gcBgMarkPrepare() {
 work.nproc = ^uint32(0)
 work.nwait = ^uint32(0)
}

「gcMarkRootPrepare」:收集根节点信息,包含全局变量、栈对象、根span对象等,计算出根节点标记的任务量,记录在work.markrootJobs中。这里着重说明下work.nSpanRoots,它表示从mheap分配出去的span对象数。

代码语言:javascript复制
// gcMarkRootPrepare统计需要扫描的对象数(包括全局变量、栈对象、spanRoot)
func gcMarkRootPrepare() {
 work.nFlushCacheRoots = 0

 // 计算编译后进程中data段和BSS段有多少个根对象块,rootBlockBytes为256KB
 nBlocks := func(bytes uintptr) int {
  return int((bytes   rootBlockBytes - 1) / rootBlockBytes)
 }

 work.nDataRoots = 0
 work.nBSSRoots = 0

 // 扫描全量变量的数据区
 for _, datap := range activeModules() {
  nDataRoots := nBlocks(datap.edata - datap.data)
  if nDataRoots > work.nDataRoots {
   work.nDataRoots = nDataRoots
  }
 }
 // 扫描全局变量的BSS区
 for _, datap := range activeModules() {
  nBSSRoots := nBlocks(datap.ebss - datap.bss)
  if nBSSRoots > work.nBSSRoots {
   work.nBSSRoots = nBSSRoots
  }
 }

 // 根span数
 work.nSpanRoots = mheap_.sweepSpans[mheap_.sweepgen/2%2].numBlocks()

 // 栈对象数
 work.nStackRoots = int(atomic.Loaduintptr(&allglen))

 work.markrootNext = 0
 // 上述所有对象数的总和=2 缓存的根对象 全局数据区块数 全局BSS区块数 span根对象数 栈对象数
 work.markrootJobs = uint32(fixedRootCount   work.nFlushCacheRoots   work.nDataRoots   work.nBSSRoots   work.nSpanRoots   work.nStackRoots)
}

「gcMarkTinyAllocs」:对每P中的微对象内存进行标记,将每个P使用本地缓存mcache分配的小型数据结构地址标记为灰色并加入到p.gcw队列中。

代码语言:javascript复制
// 对活跃的微对象内存标记为灰色,执行该操作的时候,是已经stop the world,所以不用担心标记出错误回收。
func gcMarkTinyAllocs() {
 for _, p := range allp {
  c := p.mcache
  if c == nil || c.tiny == 0 {
   continue
  }
  _, span, objIndex := findObject(c.tiny, 0, 0)
  gcw := &p.gcw
        // 将c.tiny标记为灰色
  greyobject(c.tiny, 0, 0, span, gcw, objIndex)
 }
}

「startTheWorldWithSema」:从网络轮询器中获取等待处理的任务加入到全局队列中,如果处理器P的数量有更改,调整为修改后的值,进行扩大或缩小全局处理器的数量,procresize返回有可运行任务的P的链表,通过notewakeup和newm依次唤醒处理器或者为处理器创建新的线程M来运行P,最后如果有空闲的P,并且没有自旋的M则唤醒或者创建一个M。

代码语言:javascript复制
// startTheWorldWithSema重新启动世界
func startTheWorldWithSema(emitTraceEvent bool) int64 {
 // 禁止被抢占
 mp := acquirem() 
 if netpollinited() {
  // 从网络轮询器中获取待处理的任务并加入全局队列
  list := netpoll(0) // non-blocking
  injectglist(&list)
 }
 lock(&sched.lock)
 // 如果处理器P的数量有更改,调整为修改后的值
 procs := gomaxprocs
 if newprocs != 0 {
  procs = newprocs
  newprocs = 0
 }
 // 进行扩大或缩小全局处理器的数量,返回有可运行任务的P的链表
 p1 := procresize(procs)
 sched.gcwaiting = 0
 // 如果sysmon goroutine处于等待中则唤醒它
 if sched.sysmonwait != 0 {
  sched.sysmonwait = 0
  notewakeup(&sched.sysmonnote)
 }
 unlock(&sched.lock)
 // 通过notewakeup和newm依次唤醒处理器或者为处理器创建新的线程M来运行P
 for p1 != nil {
  p := p1
  p1 = p1.link.ptr()
  if p.m != 0 {
   mp := p.m.ptr()
   p.m = 0
   if mp.nextp != 0 {
    throw("startTheWorld: inconsistent mp->nextp")
   }
   mp.nextp.set(p)
   notewakeup(&mp.park)
  } else {
   newm(nil, p)
  }
 }
 startTime := nanotime()
 if emitTraceEvent {
  traceGCSTWDone()
 }

 // 如果有空闲的P,并且没有自旋的M则唤醒或者创建一个M
 if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 {
  wakep()
 }

 releasem(mp)

 return startTime
}

上面介绍完了标记的初始阶段,当设置完atomic.Store(&gcBlackenEnabled, 1)之后,就开启了并发标记模式,并发标记处理是个异步的过程。在阅读源码的时候,在gcStart里面看不到并发的逻辑。这个异步处理过程如下:

调用函数schedule检测到gcBlackenEnabled被设置为1之后,会执行gcController.findRunnableGCWorker函数,该函数会查找是否有可运行的GC worker goroutine(后台GC标记协程),如果有,就会执行它,也就是对对象进行标记工作。

GC worker goroutine是提前准备好的,每个P都有一个,是在进行标记前的准备工作中创建的,创建完之后只是处于暂停状态。具体创建函数是在gcStart中的gcBgMarkStartWorkers函数。

代码语言:javascript复制
// 为每个P创建一个标记对象的goroutine
func gcBgMarkStartWorkers() {
 for _, p := range allp {
  // p.gcBgMarkWorker存储的运行的G的地址,如果它为0,表示该P还没有创建gc work goroutine
  if p.gcBgMarkWorker == 0 {
   // 给每个p开启一个goroutine,先开启暂时不运行
   go gcBgMarkWorker(p)
   notetsleepg(&work.bgMarkReady, -1)
   noteclear(&work.bgMarkReady)
  }
 }
}

虽然每个P都有一个GC work goroutine,但不是每个都会执行标记工作。因为标记工作和用户程序是并发执行的,不会让所有的CPU资源都拿来运行标记goroutine,runtime中默认的是用25%的CPU资源来运行后台标记协程,其他会分给用户程序执行用。判决一个P是否能够运行GC work goroutine逻辑就是在上面提到的gcController.findRunnableGCWorker函数,判决的标准使用不超过25%的资源。下面具体看看该函数的实现细节。

代码语言:javascript复制
// 查找一个可运行的GCWorker
func (c *gcControllerState) findRunnableGCWorker(_p_ *p) *g {
 // gcBlackenEnabled为1表示可以进行标记了,为0还不能进行标记
 if gcBlackenEnabled == 0 {
  throw("gcControllerState.findRunnable: blackening not enabled")
 }
 if _p_.gcBgMarkWorker == 0 {
  // 与此P关联的标记工作者被阻止执行标记转换。我们无法运行它,
  // 因为它可能在其他运行或等待队列中
  return nil
 }

 if !gcMarkWorkAvailable(_p_) {
  return nil
 }

 decIfPositive := func(ptr *int64) bool {
  if *ptr > 0 {
   if atomic.Xaddint64(ptr, -1) >= 0 {
    return true
   }
   // We lost a race
   atomic.Xaddint64(ptr,  1)
  }
  return false
 }

 // c.dedicatedMarkWorkersNeeded表示有多少个P需要专门用于对象标记,每调用一次decIfPositive如果分配出去了一个P,则c.dedicatedMarkWorkersNeeded减少一个,当减少到0时结束
 // 例如总共有8个P,更加25%的CPU用于对象标记,则需要8*0.25=2个P
 // 假设有6个P,则6*0.25=1.5,则有1个P专门进行对象标记,还有1.5-1=0.5
 // 进行部分时间进行标记模式,%5(0.05)的值存储在c.fractionalUtilizationGoal中
 if decIfPositive(&c.dedicatedMarkWorkersNeeded) {
  // 当前的P专门用于标记直到并发标记阶段结束
  _p_.gcMarkWorkerMode = gcMarkWorkerDedicatedMode
 } else if c.fractionalUtilizationGoal == 0 {
  // 不需要当前P进行标记
  return nil
 } else {
  delta := nanotime() - gcController.markStartTime
  if delta > 0 && float64(_p_.gcFractionalMarkTime)/float64(delta) > c.fractionalUtilizationGoal {
   return nil
  }
  // 当前的P部分时间用于标记,但是可以被抢占运行其他任务
  _p_.gcMarkWorkerMode = gcMarkWorkerFractionalMode
 }
 // 将当前P上的GC worker goroutine状态从等待状态修改为可运行状态,后台启动运行GC worker goroutine
 gp := _p_.gcBgMarkWorker.ptr()
 casgstatus(gp, _Gwaiting, _Grunnable)
 if trace.enabled {
  traceGoUnpark(gp, 0)
 }
 return gp
}

前面说了,在GC标记阶段会将25%的CPU用于垃圾回收,也就是说只有25%P中的GC worker goroutine会运行对象标记,其他P中的GC worker goroutine不会运行。考虑到25%P的数量不一定是一个整数,小数部分会用P的部分时间进行标记,该P上运行的GC worker goroutine是可以被抢占的。促成了GC worker goroutine有三种不同的工作模式,分别是:

  1. gcMarkWorkerDedicatedMode:标为该模式的P专门标记对象,不会被调度器抢占,直到标记阶段结束。
  2. gcMarkWorkerFractionalMode:标为该模式的P只占用一个CPU的部分资源,可以被抢占。当25%P的数有小数时,会存在该模式,帮助GC时标记工作CPU使用率到达整体的25%的目标。
  3. gcMarkWorkerIdleMode:P为空闲模式,即P没有运行任何goroutine,它也会进行对象标记,直到被抢占。因为闲着也是闲着,可以用来帮助干点GC的事情。

在上面的代码中,已看到对P设置gcMarkWorkerDedicatedMode或gcMarkWorkerFractionalMode模式,第3种模式是在哪里设置的呢?是在findrunnable中,findrunnable是被调度函数schedule执行的。在findrunnable函数中,会先后依次查找本地队列、全局队列、网络轮询、尝试从其他P偷取可运行的任务goroutine来执行,如果都没有的话,当前的P会被标记为gcMarkWorkerIdleMode模式, 运行挂在P中的gcBgMarkWorker中的GC worker goroutine赋值标记对象。

代码语言:javascript复制
func schedule() {
  ...
 if gp == nil && gcBlackenEnabled != 0 {
  gp = gcController.findRunnableGCWorker(_g_.m.p.ptr())
  tryWakeP = tryWakeP || gp != nil
 }
 if gp == nil {
  if _g_.m.p.ptr().schedticka == 0 && sched.runqsize > 0 {
   lock(&sched.lock)
   gp = globrunqget(_g_.m.p.ptr(), 1)
   unlock(&sched.lock)
  }
 }
 if gp == nil {
  gp, inheritTime = runqget(_g_.m.p.ptr())
 }
 if gp == nil {
    // 查找可运行的goroutine
  gp, inheritTime = findrunnable() // blocks until work is available
 }

 ...
 execute(gp, inheritTime)
}

func findrunnable() (gp *g, inheritTime bool) {
 ...
    // 前面的逻辑已省略,走到这里说明没有需要运行任务的goroutine,将当前的P标记为空闲模式,辅助标记对象
 if gcBlackenEnabled != 0 && _p_.gcBgMarkWorker != 0 && gcMarkWorkAvailable(_p_) {
  _p_.gcMarkWorkerMode = gcMarkWorkerIdleMode
  gp := _p_.gcBgMarkWorker.ptr()
  casgstatus(gp, _Gwaiting, _Grunnable)
  if trace.enabled {
   traceGoUnpark(gp, 0)
  }
  return gp, false
 }
...
}

前面讲了gcBgMarkWorker是在标记阶段前的准备阶段开启的goroutine,每个P一个,只是开启后处于休眠挂起状态,等待被唤醒执行。标记被schedule调度器唤醒时,会根据处理器P上的gcMarkWorkerMode模式决定标记的策略。等所有的标记任务都完成之后,进入标记完成阶段gcMarkDone.可以结合下面的源码解析备注理解。

代码语言:javascript复制
func gcBgMarkWorker(_p_ *p) {
 gp := getg()

 // 用于休眠后重新获取P的结构体
 type parkInfo struct {
  m      muintptr // Release this m on park.
  attach puintptr // If non-nil, attach to this p on park.
 }
 
 gp.m.preemptoff = "GC worker init"
 park := new(parkInfo)
 gp.m.preemptoff = ""

 // 设置当前的M并禁止抢占
 park.m.set(acquirem())
 // 设置需要关联到的P为当前的P
 park.attach.set(_p_)
 // 通知gcBgMarkStartWorker可以继续处理
 notewakeup(&work.bgMarkReady)

 for {
  // 将当前的g gopark,进入休眠
  gopark(func(g *g, parkp unsafe.Pointer) bool {
   park := (*parkInfo)(parkp)

   // 重新允许抢占
   releasem(park.m.ptr())

   // 设置关联的P,把当前的G挂到P的gcBgMarkWorker上,下次findRunnableGCWorker会使用
   // 设置失败时不休眠
   if park.attach != 0 {
    p := park.attach.ptr()
    park.attach.set(nil)
  
    if !p.gcBgMarkWorker.cas(0, guintptr(unsafe.Pointer(g))) {
     return false
    }
   }
   return true
  }, unsafe.Pointer(park), waitReasonGCWorkerIdle, traceEvGoBlock, 0)

  // 检查P的gcBgMarkWorker上挂的G与当前的G是否一致,不一致直接结束任务退出
  if _p_.gcBgMarkWorker.ptr() != gp {
   break
  }

  // 禁止G被抢占
  park.m.set(acquirem())

  if gcBlackenEnabled == 0 {
   throw("gcBgMarkWorker: blackening not enabled")
  }

  startTime := nanotime()
  _p_.gcMarkWorkerStartTime = startTime

  decnwait := atomic.Xadd(&work.nwait, -1)
  if decnwait == work.nproc {
   println("runtime: work.nwait=", decnwait, "work.nproc=", work.nproc)
   throw("work.nwait was > work.nproc")
  }
  // 切换到系统栈G0运行
  systemstack(func() {
  
   // 设置G的状态从运行修改为等待状态,使得它的栈可以被扫描
   casgstatus(gp, _Grunning, _Gwaiting)
   // 根据后台标记任务的模式执行对应的工作策略
   switch _p_.gcMarkWorkerMode {
   default:
    throw("gcBgMarkWorker: unexpected gcMarkWorkerMode")
   case gcMarkWorkerDedicatedMode:
    // gcMarkWorkerDedicatedMode模式下P会专门执行标记
    // 执行标记,直到被抢占,根据需要计算后台的扫描量来减少辅助GC
    // 和唤醒等待中的G
    gcDrain(&_p_.gcw, gcDrainUntilPreempt|gcDrainFlushBgCredit)
    // gp被抢占,把它本地运行队列中的所有G都转移到全局队列中,因为接下来
    // 当前的P要全力运行标记任务,直到没有标记任务结束
    if gp.preempt {
     lock(&sched.lock)
     for {
      gp, _ := runqget(_p_)
      if gp == nil {
       break
      }
      globrunqput(gp)
     }
     unlock(&sched.lock)
    }
    
    // 继续执行标记,直到没有标记认为结束
    gcDrain(&_p_.gcw, gcDrainFlushBgCredit)
   case gcMarkWorkerFractionalMode:
    // 该模式下P会适量的执行标记,直到被抢占
    gcDrain(&_p_.gcw, gcDrainFractional|gcDrainUntilPreempt|gcDrainFlushBgCredit)
   case gcMarkWorkerIdleMode:
    // 该模式下P只会在空闲时执行标记
    gcDrain(&_p_.gcw, gcDrainIdle|gcDrainUntilPreempt|gcDrainFlushBgCredit)
   }
   // 恢复G的状态到运行中
   casgstatus(gp, _Gwaiting, _Grunning)
  })

  ...
  // 判断标记任务是否已完成
  if incnwait == work.nproc && !gcMarkWorkAvailable(nil) {
   _p_.gcBgMarkWorker.set(nil)
   releasem(park.m.ptr())

   // 准备进入完成标记阶段
   gcMarkDone()
   park.m.set(acquirem())
   park.attach.set(_p_)
  }
 }
}
并发扫描

前面讲了扫描GC worker goroutine的启动,会用25%的CPU资源进行并发对象扫描,真正执行的扫描启动函数在gcDrain。gcDrain函数是扫描和标记对象最为核心的方法,也是GC处理流程中最为核心的内容。整个处理流程很复杂,下面会先根据源码分析处理细节,辅助以图说明,最后在从三色标记的角度进一步阐述这个流程。整个gcDrain从大的方面说,做两件事:

  1. 标记处理完所有的根对象,将根对象引用的对象加入到gcw, 对应到代码中是markroot逻辑
  2. 从gcw中取出对象,对该对象进行扫描,将其引用的对象加入到gcw中,对应到代码中是scanobject逻辑。

会并发标记处理完所有1中的根对象,才会进行2中的逻辑的处理。可以将gcw理解为一个队列,生产者是gcDrain,消费者也是gcDrain。markroot会提供最原始的一批对象供gcDrain消费,gcDrain在2中消费对象的同时也生产对象添加到gcw.

除了上面最核心的两件事,还需要根据工作模式(隐含在gcDrainFlags中)做一些其他检查处理。例如在idle模式下,在markroot阶段,需要每标记一个job之后,检查是否有任务(其他goroutine)需要运行,如果有则会转为任务的处理。也会批量的将当前扫描的对象数加入到全局变量计数器 gcController.scanWork中,用于计算后台完成的标记任务量以减少并发标记期间的辅助垃圾收集的用户程序的工作量。

代码语言:javascript复制
func gcDrain(gcw *gcWork, flags gcDrainFlags) {
 if !writeBarrier.needed {
  throw("gcDrain phase incorrect")
 }

 gp := getg().m.curg
 // 获取是否设置了抢占标记使能
 preemptible := flags&gcDrainUntilPreempt != 0
 // 获取是否计算后台的扫描量来减少辅助GC和唤醒等待中的G
 flushBgCredit := flags&gcDrainFlushBgCredit != 0
 // 获取是否设置了空闲工作模式
 idle := flags&gcDrainIdle != 0
 // 记录开始时已扫描的数量
 initScanWork := gcw.scanWork

 checkWork := int64(1<<63 - 1)
 var check func() bool
 if flags&(gcDrainIdle|gcDrainFractional) != 0 {
  checkWork = initScanWork   drainCheckThreshold
  if idle {
   check = pollWork
  } else if flags&gcDrainFractional != 0 {
   check = pollFractionalWorkerExit
  }
 }

 // 如果根对象还没有扫描完,先执行对根对象的扫描
 if work.markrootNext < work.markrootJobs {
  // 如果标记了可被抢占,循环直到被抢占
  for !(preemptible && gp.preempt) {
   // 从根对象扫描队列中取出一个值,进行扫描,每扫描一个,已标记的根对象计数器work.markrootNext会加 1
   job := atomic.Xadd(&work.markrootNext,  1) - 1
   if job >= work.markrootJobs {
    break
   }
   // 执行第job个根对象扫描工作
   markroot(gcw, job)
   // 如果是空闲工作模式或是部分资源进行标记模式,检查是否有其他工作要执行
   // 如果有其他工作则执行,例如在空闲模式,检查是否有任务需要执行。
   if check != nil && check() {
    goto done
   }
  }
 }

 // 处理堆对象的标记,如果对象已在标记队列,对其进行处理。如果标记了可被抢占,循环直到被抢占
 for !(preemptible && gp.preempt) {
  // 如果全局标记队列为空,则把本地标记队列的一部分工作分过去,因为本地队列有两个缓冲区
  // wbuf1和wbuf2,如果wbuf2不为空则将wbuf2移动过去,否则只移动wbuf1中一半的标记对象过去
  if work.full == 0 {
   gcw.balance()
  }

  // 优先从本地标记队列中获取扫描对象,获取不到则从全局标记队列中获取,
  // 快速非阻塞获取标记对象
  b := gcw.tryGetFast()
  if b == 0 {
   // 阻塞获取
   b = gcw.tryGet()
   if b == 0 {
    wbBufFlush(nil, 0)
    b = gcw.tryGet()
   }
  }
  // 如果获取不到对象,说明标记队列中的已经没有可标记对象了,直接跳出循环
  if b == 0 {
   break
  }
  // 扫描获取到的对象
  scanobject(b, gcw)
  // 如果已经扫描的对象数量超过了2000,将扫描的数量信息同步到全局变量gcController.scanWork中
  // ,这样用户程序辅助标记可以利用这个值调整辅助标记策略
  if gcw.scanWork >= gcCreditSlack {
   // 把扫描的对象数量添加到全局变量中
   atomic.Xaddint64(&gcController.scanWork, gcw.scanWork)
   // 减少辅助GC的工作量和唤醒等待中的G
   if flushBgCredit {
    gcFlushBgCredit(gcw.scanWork - initScanWork)
    initScanWork = 0
   }
   checkWork -= gcw.scanWork
   gcw.scanWork = 0

   // 已扫描的对象数量达到了checkWork个,检查是否有其他任务goroutine需要运行,如果有跳出循环
   if checkWork <= 0 {
    checkWork  = drainCheckThreshold
    if check != nil && check() {
     break
    }
   }
  }
 }

done:
 // 将扫描的还未添加到全局变量中的对象数添加到全局变量中
 if gcw.scanWork > 0 {
  atomic.Xaddint64(&gcController.scanWork, gcw.scanWork)
  // 减少辅助GC的工作量和唤醒等待中的G
  if flushBgCredit {
   gcFlushBgCredit(gcw.scanWork - initScanWork)
  }
  gcw.scanWork = 0
 }
}

下面分析markroot的处理逻辑。markroot会根据i的值决定处理的任务类型

每个i值对应一个根job,i的值被划分成了多个区间段,例如在[baseData,baseBSS)范围代表一个BSS段内的标记job.这里着重分析markrootBlock和scanblock处理逻辑。scanblock是更底层且通用的函数,markrootBlock和markrootSpans处理逻辑中也都调用了scanblock。下面会先分scanblock。

代码语言:javascript复制
func markroot(gcw *gcWork, i uint32) {
 // 根据i的值决定处理的任务类型
 
 // baseFlushCache对应的索引区域为[2,baseData)
 baseFlushCache := uint32(fixedRootCount)
 // 数据段对应的索引区域[baseData,baseBSS)
 baseData := baseFlushCache   uint32(work.nFlushCacheRoots)
 // BSS段对应的索引区域[baseBSS,baseSpans)
 baseBSS := baseData   uint32(work.nDataRoots)
 // 根Spans对应的索引区域[baseSpans,baseStacks)
 baseSpans := baseBSS   uint32(work.nBSSRoots)
 // 栈对应的索引区域[baseStacks,end)
 baseStacks := baseSpans   uint32(work.nSpanRoots)
 end := baseStacks   uint32(work.nStackRoots)

 switch {
 case baseFlushCache <= i && i < baseData:
  // 释放每个P中mcache中所有的span
  flushmcache(int(i - baseFlushCache))

 case baseData <= i && i < baseBSS:
  // 扫描可以读写的data段,每次会把每个activeModules中的所有的索引区域为i的内存块进行扫描
  // 每个内存块rootBlockBytes大小为256KB,扫描时会传入这个内存块中哪些区域有指针的bitmap数据(datap.gcdatamask.bytedata)
  for _, datap := range activeModules() {
   markrootBlock(datap.data, datap.edata-datap.data, datap.gcdatamask.bytedata, gcw, int(i-baseData))
  }

 case baseBSS <= i && i < baseSpans:
  // 处理只读的BSS段,处理逻辑与上面类似
  for _, datap := range activeModules() {
   markrootBlock(datap.bss, datap.ebss-datap.bss, datap.gcbssmask.bytedata, gcw, int(i-baseBSS))
  }

 case i == fixedRootFinalizers:
  // i为0,扫描析构器队列
  for fb := allfin; fb != nil; fb = fb.alllink {
   cnt := uintptr(atomic.Load(&fb.cnt))
   scanblock(uintptr(unsafe.Pointer(&fb.fin[0])), cnt*unsafe.Sizeof(fb.fin[0]), &finptrmask[0], gcw, nil)
  }

 case i == fixedRootFreeGStacks:
  // i为1,释放已终止的G的栈
  systemstack(markrootFreeGStacks)

 case baseSpans <= i && i < baseStacks:
  // 扫描根span对象,每次会标记处理一个span对象
  markrootSpans(gcw, int(i-baseSpans))

 default:
  // 扫描各个G的栈
  var gp *g
  if baseStacks <= i && i < end {
   gp = allgs[i-baseStacks]
  } else {
   throw("markroot: bad index")
  }

  status := readgstatus(gp) 
  if (status == _Gwaiting || status == _Gsyscall) && gp.waitsince == 0 {
   gp.waitsince = work.tstart
  }

  systemstack(func() {
   userG := getg().m.curg
   selfScan := gp == userG && readgstatus(userG) == _Grunning
   // 如果扫描的是自己的栈,为了防止死锁,将状态从_Grunning切换到_Gwaiting
   if selfScan {
    casgstatus(userG, _Grunning, _Gwaiting)
    userG.waitreason = waitReasonGarbageCollectionScan
   }

   stopped := suspendG(gp)
   if stopped.dead {
    gp.gcscandone = true
    return
   }
   if gp.gcscandone {
    throw("g already scanned")
   }
   // 扫描各个G的栈对象
   scanstack(gp, gcw)
   gp.gcscandone = true
   resumeG(stopped)

   if selfScan {
    // 如果扫描的是自己的栈,把它的状态从_Gwaiting改为_Grunning,因为前面在处理自己的栈的时候
    // 为了防止死锁,将状态从_Grunning改为了_Gwaiting,这里处理完毕,需要将状态还原
    casgstatus(userG, _Gwaiting, _Grunning)
   }
  })
 }
}

scanblock检查扫描[b0,b0 n0)这个内存地址范围中是否有指针对象,下面对源码做了详细的注解。在加一个图示加以说明,这样很好理解了。

每次取ptrmask中的一个字节判断是否为0,如果是0,说明里面的每个bit都为0,与当前字节对应的b0中64个字节都没有引用指针,直接跳过。如果不为0,需要找到哪个几个bit位不为0,并对其关联的b0中的8字节p进行判断,如果p如果是非0,表示是一个有效的内存地址,需要将这个指针指向的可达的内存对象置灰并加入到gcw中。

代码语言:javascript复制
// scanblock检查扫描[b0,b0 n0)这个内存地址范围中是否有指针对象
// 重点关注b0、no、ptrmask和gcw四个参数,b0是一个内存地址,表示从这里
// 开始扫描,n0表示扫描n0个字节。ptrmask是一个uint8数组,也就是一个字节数组
// ,它是bitmap,ptrmask数组中每个字节中每个bit表示[b0,b0 n0)区域内的一个位置(8Byte)是否是
// 指针, 这里说的是64位系统,所以一个指针长度为8Byte,所以一个ptrmask中的元素对应8*8Byte=64Byte
// 中的区域有多少个指针。
func scanblock(b0, n0 uintptr, ptrmask *uint8, gcw *gcWork, stk *stackScanState) {
 b := b0
 n := n0

 // 扫描到n0个字节结束
 for i := uintptr(0); i < n; {
  // 计算当前扫描的位置对应到bitmask数组中的第几个元素,i的步进位64
  // 所以i值在[0,64)对应到ptrmask[0], [64,128)对应到ptrmask[1]
  // 依次类推。
  bits := uint32(*addb(ptrmask, i/(sys.PtrSize*8)))
  // 如果bits为0,说明这64字节(8个8字节)都没有引用指针,可以不用扫描了,直接跳到下一轮循环
  if bits == 0 {
   i  = sys.PtrSize * 8
   continue
  }
  // bits不为0,说明这64字节内部有指针引用,需要对bits(大小为1Byte)中的每个bit检查,看那个bit位置为1
  // 对应到的[b0,b0 no)的8个字节是指针引用。
  for j := 0; j < 8 && i < n; j   {
   if bits&1 != 0 {
    // 当前指针的位置p如果是非0,表示是一个有效的内存地址,需要将这个指针指向的可达的内存对象
    // 置灰并加入到gcw中
    p := *(*uintptr)(unsafe.Pointer(b   i))
    if p != 0 {
     if obj, span, objIndex := findObject(p, b, i); obj != 0 {
      greyobject(obj, b, i, span, gcw, objIndex)
     } else if stk != nil && p >= stk.stack.lo && p < stk.stack.hi {
      stk.putPtr(p, false)
     }
    }
   }
   // 右移一个bit,判断下一个bit
   bits >>= 1
   // i步进8
   i  = sys.PtrSize
  }
 }
}

分析完了scanblock,我们来看他是怎么被markrootBlock调用的以及markrootBlock的处理细节。在分析之前,先看入口判断。入口判断有两处,case baseData <= i && i < baseBSS和baseBSS <= i && i < baseSpans。而baseData和baseBSS的值是根据各个段内的block累加出来的。具体各段的索引区域见上面的markroot注解。这里在补充一点work.nDataRoots来源,如下图所示,是在gcMarkRootPrepare准备阶段初始化的,work.nBSSRoots与之类似。

markrootBlock主要做的是参数的合理性检查,然后处理逻辑调用的还是scanblock。参数检查细节见下面代码中的分析,还是很容易理解。

代码语言:javascript复制
// markrootBlock对第shard分片内的内存区域中的进行扫描,分析并标记里面的指针对象
// b0是data段或是BSS段的起始地址,n0是段内内存地址大小,ptrmask0是段的指针对象bitmap
func markrootBlock(b0, n0 uintptr, ptrmask0 *uint8, gcw *gcWork, shard int) {
 if rootBlockBytes%(8*sys.PtrSize) != 0 {
  throw("rootBlockBytes must be a multiple of 8*ptrSize")
 }

 // 段的内存地址范围在[b0,b0 n0)的范围内,也就说段内的内存大小为n0,shard是段内的第几个分片,
 // 一个分片rootBlockBytes,大小是256KB,如果shard分片对应的内存大小超出n0,说明已经不再处理范围了,
 // 直接返回
 off := uintptr(shard) * rootBlockBytes
 if off >= n0 {
  return
 }
 // 求解第shard个分片所在的内存地址值,根据段的首地址b0和shard在段内的偏移大小为off
 // 可以得到它的内存地址为b0 off
 b := b0   off
 // 求解上面对应要检查区域指针bitmap所在ptrmask0数组中的位置
 ptrmask := (*uint8)(add(unsafe.Pointer(ptrmask0), uintptr(shard)*(rootBlockBytes/(8*sys.PtrSize))))
 n := uintptr(rootBlockBytes)
 // 因为段内存大小可能不是256KB的整数倍,所以最后一个block大小可能不够256KB
 // 如果off n>n0说明最后不够,需要将其调整为实际的大小=n0-偏移的位置=n0-off
 if off n > n0 {
  n = n0 - off
 }

 // 调用scanblock函数对[b,b n)内存地址范围的区域进行扫描
 scanblock(b, n, ptrmask, gcw, nil)
}

scanobject核心功能就是扫描对象b,找出对象b哪些字段是指针类型,并将这些指针类型指向的对象加入到待扫描队列中,就是进一步发现引用关系,这样顺藤摸瓜,把所有可达对象都扫描出来

代码语言:javascript复制
// scanobject核心功能就是扫描对象b,找出对象b哪些字段是指针类型,并将这些指针
// 类型指向的对象加入到待扫描队列中,就是进一步发现引用关系,这样顺藤摸瓜,把所有可达对象
// 都扫描出来
func scanobject(b uintptr, gcw *gcWork) {
 // 获取对象b所占用内存对应的hbits, hbits描述的是一块内存中哪些地方是指针信息
 hbits := heapBitsForAddr(b)
 // 获取对象b所在的span
 s := spanOfUnchecked(b)
 // span中对象的大小,每个span对象的大小都相同的
 n := s.elemsize
 if n == 0 {
  throw("scanobject n == 0")
 }
 // 对象大小超过128KB是需要做分割处理,每次最多扫描128KB
 if n > maxObletBytes {
  if b == s.base() {
   // 如果当前的span的spanclass是非扫描class中,直接返回,因为
   // noscan中的对象都不会是指针
   if s.spanclass.noscan() {
    // Bypass the whole scan.
    gcw.bytesMarked  = uint64(n)
    return
   }
           // 将分割的子扫描内存块加入到gcw中
   for oblet := b   maxObletBytes; oblet < s.base() s.elemsize; oblet  = maxObletBytes {
    if !gcw.putFast(oblet) {
     gcw.put(oblet)
    }
   }
  }
  n = s.base()   s.elemsize - b
  if n > maxObletBytes {
   n = maxObletBytes
  }
 }

 var i uintptr
 // 扫描对象b中的指针信息,对象b(没有超过128KB的时候)占用的内存为[b,b n)
 // 依次扫描这段内存,判断每8个字节(64位系统)中的值,如果它是一个指针,将其指向
 // 的对象设置为灰色并加入待扫描队列
 for i = 0; i < n; i  = sys.PtrSize {
  if i != 0 {
   hbits = hbits.next()
  }
  // 获取到内存块的bitmap
  bits := hbits.bits()
  // 如果整个内存块都不是指针,直接跳出循环
  if i != 1*sys.PtrSize && bits&bitScan == 0 {
   break 
  }
  // 判断bits最低bit位的值是否0,如果是0,说明它对应的8字节不是指针,
  // 跳过进行下一轮循环处理,如果是指针,需要做下面的进一步处理
  if bits&bitPointer == 0 {
   continue 
  }
  // 把指针指向的内容取出来,它指向的是一个别的对象的地址
  obj := *(*uintptr)(unsafe.Pointer(b   i))

  // 指向的地址是一个有效的地址(非0),并且指向的地址与b不是在同一个span中(obj-b>=n)
  // 才会进行设置为灰色并加入待处理队列。因为如果跟当前的b在同一个span,现在已经在处理这个
  // 这个span了,不需要继续处理了,否则会导致死循环
  if obj != 0 && obj-b >= n {
   // 如果obj是一个有效的对象,把obj对象设为灰色,并加入到扫描队列
   if obj, span, objIndex := findObject(obj, b, i); obj != 0 {
    greyobject(obj, b, i, span, gcw, objIndex)
   }
  }
 }
 // 更新扫描过对象的的大小和对象的数量
 gcw.bytesMarked  = uint64(n)
 gcw.scanWork  = int64(i)
}

greyobject核心工作是将span中第objIndex个对象标记为已扫描,并把第objIndex个对象obj加入到队列gcw中,为什么要把obj加入到gcw中呢?因为需要继续检查obj对象中所有的字段包不包含指针,在下一轮scanobject时,会将扫描检查obj中的每个字段,如果是指针,会将指针指向的对象进行置灰并加入gcw操作(greyobject)。

代码语言:javascript复制
// greyobject核心工作是将span中第objIndex个对象标记为已扫描,并把第objIndex个对象obj加入到队列gcw中
// 重点关注obj、span、gcw和objIndex参数,base和off参数是调试用的,可以不用过多关注
// span表示待标记的对象所在的mspan, objIndex表示待标记的对象在是span中第几个对象
// obj是带标记的对象的地址
func greyobject(obj, base, off uintptr, span *mspan, gcw *gcWork, objIndex uintptr) {
 // obj对齐检查
 if obj&(sys.PtrSize-1) != 0 {
  throw("greyobject: obj not pointer-aligned")
 }
 mbits := span.markBitsForIndex(objIndex)

 // useCheckmark为true,在出错debug时使用,用于检查是否所有可达的对象都被正确的标记
 // 这里我们先不过多关注
 if useCheckmark {
  if !mbits.isMarked() {
  ...
  }
 } else {
  if debug.gccheckmark > 0 && span.isFree(objIndex) {
   print("runtime: marking free object ", hex(obj), " found at *(", hex(base), " ", hex(off), ")n")
   gcDumpObject("base", base, off)
   gcDumpObject("obj", obj, ^uintptr(0))
   getg().m.traceback = 2
   throw("marking free object")
  }
  // 如果对象obj所在的span中的gcmarkBits中对应的bit位已经被设置为1,说明已经处理过了
  // 直接返回,也不用将obj加入到gcw队列了,因为它已经被加过了。
  if mbits.isMarked() {
   return
  }
  // 将对象obj所在的span中的gcmarkBits对应的bit位设置为1
  mbits.setMarked()

  arena, pageIdx, pageMask := pageIndexOf(span.base())
  if arena.pageMarks[pageIdx]&pageMask == 0 {
   atomic.Or8(&arena.pageMarks[pageIdx], pageMask)
  }

  // 如果当前obj对象的所在的span的class位noscan,说明obj对象中所有的字段
  // 都不是指针,也就是ojb中不存在指向其他对象的字段,所以不用将obj加入到gcw中了。
  if span.spanclass.noscan() {
   gcw.bytesMarked  = uint64(span.elemsize)
   return
  }
 }

 // 把对象放入到标记队列gcw中,放入策略为,先放入本地标记队列(gcw.putFast),如果放入失败,例如
 // 本地队列满了的情况,在调动gcw.put, gcw.put会把部分本地标记队列中的部分工作转给全局标记队列
 // 然后在将obj放入到本地队列
 if !gcw.putFast(obj) {
  gcw.put(obj)
 }
}

scanstack对goroutine栈上的内存对象依次进行处理,对于一个对象,怎么知道它对应的内存中哪些地方是指针呢?是通过这个对象的type结构知道的,我们知道每个对象都有一个type结构,type结构定义见下面的代码,type.gcdata标记了这个对象中哪些地方是指针对象,gcdata是编译器在编译期间设置的,type.ptrdata限定了指针所在的边界,所以通过对象的type可以确定它的扫描区域和指针分布情况。

代码语言:javascript复制
func scanstack(gp *g, gcw *gcWork) {
 ...
 // 扫描栈上所有可达的对象
 state.buildIndex()
 for {
  p, conservative := state.getPtr()
  if p == 0 {
   break
  }
  // 获取到栈上一个对象
  obj := state.findObject(p)
  if obj == nil {
   continue
  }
  // 取得这个对象的类型
  t := obj.typ
  if t == nil {
   continue
  }
  obj.setType(nil) 
 ...
  // 获取到这个对象的内存块的ptr的bitmap
  gcdata := t.gcdata
  var s *mspan
  if t.kind&kindGCProg != 0 {
   s = materializeGCProg(t.ptrdata, gcdata)
   gcdata = (*byte)(unsafe.Pointer(s.startAddr))
  }
  // 待扫描对象的起始地址为state.stack.lo   uintptr(obj.off), 终止地址为gcdata
  b := state.stack.lo   uintptr(obj.off)
  if conservative {
   scanConservative(b, t.ptrdata, gcdata, gcw, &state)
  } else {
   // 调用scanblock对这对象内存块进行扫描,找到里面哪些字段是指针
   scanblock(b, t.ptrdata, gcdata, gcw, &state)
  }

  if s != nil {
   dematerializeGCProg(s)
  }
 }
...
}

type _type struct {
 size       uintptr
 ptrdata    uintptr 
 ...
 gcdata    *byte
 str       nameOff
 ptrToThis typeOff
}

上面已完整的分析了GC扫描和标记的流程,处理逻辑非常抽象复杂,针对核心处理函数功能都做了细致的说明,函数内部的逻辑也做了详细的注解,相信大家看了上面的分析,就很容易理解了。没有罗列的markrootSpans,它做了一些处理之后内部也会调用上的scanobject或scanblock函数。

整个处理流程可以概括为生产者和消费者模型,生产者有markrootBlock、markrootSpans、scanstack等函数,它们会将根对象加入到队列gcw中,scanobject会从队列gcw中获取对象标记它,并将获取对象的引用的对象加入到队列gcw中,所以scanobject即是生产者也是消费者函数。

每个P都有一个gcw队列,这样每个P上可以进行独立的扫描标记工作,减少了锁的竞争。但是这会遇到一个问题,不同P上的待扫描标记对象数量是不均匀的,可能会导致有些P没有事情可做,runtime引入了工作窃取机制来解决该问题,工作窃取机制思想还用在goroutine的调度的时候,如果Local P没有可运行的G会从Global P获取,如果Global P也没有可运行的G,则会从其他P的Local P中窃取可运行的G。

最后这里再从三色标记的角度阐述下扫描标记的过程,在深入理解屏障技术文章中说了,Go的GC采用了三色标记法进行对象的回收。上面从源码分析了整个处理流程。在实现层面来说,每个对象并不是真有表示白色、灰色和黑色的标记字段,默认所有的对象都是白色的,加入到gcw队列中的对象理解为灰色,从gcw队列中出队的对象会将它的gcmarkBits的bit位设置为1,这时对象已标记为黑色。当全部标记完成之后,所有的对象分为两类,已标记的为黑色,剩余的全部为白色。

辅助标记

为了防止heap增速太快,在执行GC的过程中如果有用户程序运行的G分配了内存,那么这个G会被要求辅助GC做一部分工作,分配多少内存就需要完成多少标记任务。整个辅助标记模型是一个债务系统,标记扫描程序是还债方,内存分配程序是借债方。通过这个债务系统,既能够让G在正常运行时不会被垃圾回收造成太大的压力,又能保证在达到堆大小目标时完成对象标记。标记系统当前收支结余的是runtime.gcFlushBgCredit。

辅助标记的工作是在内存对象统一分配入口mallocgc函数中检查启动的。

代码语言:javascript复制
func mallocgc(size uintptr, typ *_type, needzero bool) unsafe.Pointer {
...
 var assistG *g
 // gcBlackenEnabled在GC标记阶段会被设置为1
 if gcBlackenEnabled != 0 {
  // assistG为执行当前用户程序任务的G
  assistG = getg()
  if assistG.m.curg != nil {
   assistG = assistG.m.curg
  }
  // assistG.gcAssistBytes记录每个G需要辅助标记的字节数,每个G都有一个gcAssistBytes字段
  // 在GC准备阶段,每个G的gcAssistBytes被设置为0.所以在第一次分配内存的时候,assistG.gcAssistBytes
  // 减去要分配出去的内存(size)之后肯定是一个负数,会执行下面的gcAssistAlloc。
  assistG.gcAssistBytes -= int64(size)

  if assistG.gcAssistBytes < 0 {
   gcAssistAlloc(assistG)
  }
 }

...
 if assistG != nil {
  assistG.gcAssistBytes -= int64(size - dataSize)
 }
...
}

每个G有一个gp.gcAssistBytes表示当前的G需要协助标记的字节数。根据gcAssistBytes和辅助标记的系数相乘得到一个需要标记的工作量scanWork,具体是多少见下面的代码。如果在完成辅助标记任务后,当前的G任然处于亏损状态,如果G是可以被抢占的,会让出调用,如果不被抢占,会将当前的G休眠,并加入全局的辅助标记队列等待后台标记任务gcDrain的唤醒。

代码语言:javascript复制
func gcAssistAlloc(gp *g) {
 // 检查运行的gp是否是用户程序G,调用gcAssistAlloc必须是用户程序G
 if getg() == gp.m.g0 {
  return
 }
 // 检查是否在抢占模式下,必须在启用抢占的模式下才能调用gcAssistAlloc
 if mp := getg().m; mp.locks > 0 || mp.preemptoff != "" {
  return
 }

 traced := false
retry:
 // 需要借债的字节数
 debtBytes := -gp.gcAssistBytes
 // scanWork表示工作量,需要G帮忙扫描工作量个对象
 scanWork := int64(gcController.assistWorkPerByte * float64(debtBytes))
 if scanWork < gcOverAssistWork {
  scanWork = gcOverAssistWork
  debtBytes = int64(gcController.assistBytesPerWork * float64(scanWork))
 }
 // 从后台 GC 的扫描信用中窃取尽可能多的信用。这很活泼,如果两个突变器同时窃取,
 // 可能会将背景信用降至 0 以下。这只会导致抢断失败,直到信用再次累积,
 // 所以从长远来看这并不重要,但我们必须处理负面信用情况
 bgScanCredit := atomic.Loadint64(&gcController.bgScanCredit)
 stolen := int64(0)
 if bgScanCredit > 0 {
  if bgScanCredit < scanWork {
   stolen = bgScanCredit
   gp.gcAssistBytes  = 1   int64(gcController.assistBytesPerWork*float64(stolen))
  } else {
   stolen = scanWork
   gp.gcAssistBytes  = debtBytes
  }
  atomic.Xaddint64(&gcController.bgScanCredit, -stolen)

  scanWork -= stolen
  // 走到这里说明,bgScanCredit是大于scanWork,也就是有足够的信用,
  // 本次可以不用进行辅助标记
  if scanWork == 0 {
   if traced {
    traceGCMarkAssistDone()
   }
   return
  }
 }

 if trace.enabled && !traced {
  traced = true
  traceGCMarkAssistStart()
 }

 // 全局的信用不够,切换到系统栈辅助进行标记任务,只辅助扫描scanWork工作量的任务
 systemstack(func() {
  gcAssistAlloc1(gp, scanWork)
 })

 completed := gp.param != nil
 gp.param = nil
 if completed {
  gcMarkDone()
 }

 if gp.gcAssistBytes < 0 {
  // 如果可以被抢占,然后调度
  if gp.preempt {
   Gosched()
   goto retry
  }

  // 如果全局信用任然不够,将当前goroutine陷入休眠,加入全局的辅助标记队列并等待后台标记任务的唤醒
  if !gcParkAssist() {
   goto retry
  }
 }
 if traced {
  traceGCMarkAssistDone()
 }
}

在后台标记任务执行gcDrain处理中会调用gcFlushBgCredit,gcFlushBgCredit会获取辅助标记休眠队列中的G,如果当前的信用足够,会唤醒辅助标记的G,这里的G是在上面的函数处理中因信用不够而挂起的G.如果信用还有剩余,会剩余的信用加入到全局信用中。

代码语言:javascript复制
func gcFlushBgCredit(scanWork int64) {
 // 如果辅助标记队列中没有等待的goroutine,直接将当前的扫描工作量scanWork添加到全局的信用gcController.bgScanCredit
 // 然后返回
 if work.assistQueue.q.empty() {
  atomic.Xaddint64(&gcController.bgScanCredit, scanWork)
  return
 }
 // 将当前的扫描工作量scanWork乘以一个系数得到扫描的字节数
 scanBytes := int64(float64(scanWork) * gcController.assistBytesPerWork)

 lock(&work.assistQueue.lock)
 // 如果辅助标记队列不为空并且扫描的字节数大于0,相当于是盈余状态,尝试唤醒goroutine
 for !work.assistQueue.q.empty() && scanBytes > 0 {
  gp := work.assistQueue.q.pop()
  // 如果盈余值scanBytes和G辅助标记的字节数gcAssistBytes之和为正数
  // 那么当前的goroutine gp可以别唤醒执行了,否则又将gp加回辅助标记队列中
  if scanBytes gp.gcAssistBytes >= 0 {
   scanBytes  = gp.gcAssistBytes
   gp.gcAssistBytes = 0
   ready(gp, 0, false)
  } else {
   gp.gcAssistBytes  = scanBytes
   scanBytes = 0
   // 又加入辅助标记队列
   work.assistQueue.q.pushBack(gp)
   break
  }
 }
    // 如果扫描标记的字节数任然有结余,将结余加入到全局信用gcController.bgScanCredit中
 if scanBytes > 0 {
  scanWork = int64(float64(scanBytes) * gcController.assistWorkPerByte)
  atomic.Xaddint64(&gcController.bgScanCredit, scanWork)
 }
 unlock(&work.assistQueue.lock)
}
标记终止阶段

标记终止阶段就是检查所有待扫描标记的对象是否全部标记完成,因为标记是并发的,也就是分散在gc worker goroutine和用户辅助标记程序上,所以判断所有标记的G已标记完毕并且没有标记对象还有点小麻烦,代码中通过work.nwait和work.nproc是否相等来判断标记的G是否已标记完,这两个字段具体作用见前面gcBgMarkPrepare中的说明。整个标记终止过程对应的核心函数是下面的gcMarkDone和gcMarkTermination两个函数。下面分别来说明它们各具体做了哪些事情。gcMarkDone主要工作是做两件事情:

  1. 检查所有可达对象是否都被标记,如果都被标记,GC的状态将切换到_GCmarktermination,如果本地队列中还有没有处理完的任务,会将这些任务加入到全局队列,让其他G协助处理。
  2. 关闭混合写屏障,唤醒所有辅助标记的用户程序,恢复用户程序goroutine的调度,最后调用gcMarkTermination进行到标记终止阶段。
代码语言:javascript复制
func gcMarkDone() {
 semacquire(&work.markDoneSema)

top:
 // 检查标记任务是否已执行完毕
 if !(gcphase == _GCmark && work.nwait == work.nproc && !gcMarkWorkAvailable(nil)) {
  semrelease(&work.markDoneSema)
  return
 }

 gcMarkDoneFlushed = 0
 systemstack(func() {
  gp := getg().m.curg
  // 设置gp的状态,从运行状态修改为等待状态
  casgstatus(gp, _Grunning, _Gwaiting)
  // 遍历所有的P,执行下面的处理
  forEachP(func(_p_ *p) {
   // 将P对应的write barrier buffer中对象加入到gcw中
   wbBufFlush1(_p_)
   if debugCachedWork {
    b := &_p_.wbBuf
    b.end = uintptr(unsafe.Pointer(&b.buf[wbBufEntryPointers]))
    b.debugGen = gcWorkPauseGen
   }
   // 将gcw中的缓存对象加入到全局队列中
   _p_.gcw.dispose()
   ...
  })
  casgstatus(gp, _Gwaiting, _Grunning)
 })
...
 // 禁止辅助GC和后台标记任务的运行
 atomic.Store(&gcBlackenEnabled, 0)
 // 唤醒因辅助标记而休眠的G
 gcWakeAllAssists()
 semrelease(&work.markDoneSema)
 schedEnableUser(true)
 // 计算下一次触发GC需要达到的堆内存大小
 nextTriggerRatio := gcController.endCycle()
 // 执行标记终止
 gcMarkTermination(nextTriggerRatio)
}

gcMarkTermination函数主要做一些统计和刷新工作。该函数会将GC阶段标记gcphase更新到_GCmarktermination,在通过gcMark确认是否所有的GC标记工作都已完成,最后会更新GC状态为_GCoff,调用gcSweep进行垃圾清理。另一块内容是统计GC时间、CPU利用率等数据。还有一项重要的工作是将每个P中mcache中缓存的所有mspan都归还给mcentral,然后将本地的mcache中的mspan设置为空span(emptymspan),为什么需要把mspan归还给mcentral,是为了垃圾清扫,详细的分析会在下面的清扫阶段说明。

代码语言:javascript复制
func gcMarkTermination(nextTriggerRatio float64) {
 // 执行下面逻辑的时候是stop the world模式下
 // 扫描标记置灰最后设置为黑色标记关闭,设置GC阶段gcphase为_GCmarktermination
 atomic.Store(&gcBlackenEnabled, 0)
 setGCPhase(_GCmarktermination)

 work.heap1 = memstats.heap_live
 startTime := nanotime()

 mp := acquirem()
 mp.preemptoff = "gcing"
 _g_ := getg()
 _g_.m.traceback = 2
 gp := _g_.m.curg
 // 将G的状态从运行修改为等待,使得它的栈可以被扫描
 casgstatus(gp, _Grunning, _Gwaiting)
 gp.waitreason = waitReasonGarbageCollection

 // 切换到系统栈,开始STW中的标记
 systemstack(func() {
  gcMark(startTime)
 })

 systemstack(func() {
  work.heap2 = work.bytesMarked
  // 如果启用了checkmark则执行检查, 检查是否所有可到达的对象都有标记
  if debug.gccheckmark > 0 {
   gcResetMarkState()
   initCheckmarks()
   gcw := &getg().m.p.ptr().gcw
   gcDrain(gcw, 0)
   wbBufFlush1(getg().m.p.ptr())
   gcw.dispose()
   clearCheckmarks()
  }
  // 设置GC的阶段为_GCoff,设置完成之后,完成了写屏障的关闭
  setGCPhase(_GCoff)
  // 唤醒后台清扫任务
  gcSweep(work.mode)
 })

 _g_.m.traceback = 0
 // 设置G的状态为运行中
 casgstatus(gp, _Gwaiting, _Grunning)
...
 // 重置清扫状态
 sweep.nbgsweep = 0
 sweep.npausesweep = 0

 // 统计强制开始GC的次数
 if work.userForced {
  memstats.numforcedgc  
 }

 // 统计执行GC的次数然后唤醒等待清扫的G
 lock(&work.sweepWaiters.lock)
 memstats.numgc  
 injectglist(&work.sweepWaiters.list)
 unlock(&work.sweepWaiters.lock)
    
 mProf_NextCycle()
 // 重新启动世界
 systemstack(func() { startTheWorldWithSema(true) })
...
 // 移动标记队列使用的缓冲区到自由列表,使得他们可以被回收
 prepareFreeWorkbufs()

 // 释放未使用的栈
 systemstack(freeStackSpans)
   // 每个P中mcache都被flush准备清扫
 systemstack(func() {
  forEachP(func(_p_ *p) {
   _p_.mcache.prepareForSweep()
  })
 })
...
 // 如果不是并行进行GC,则让出当前的调度
 if !concurrentSweep {
  // give the queued finalizers, if any, a chance to run
  Gosched()
 }
}
清扫阶段

清扫让程序知道哪些内存可以重新分配使用,清扫的过程并不是把不再使用的对象内存设置为0,而是在这些对象占用的内存在重新分配出去给其他对象时被覆盖。下面会介绍清理的基本原理,最后会结合源码做一些补充说明。每个span内有一个bitmap allocBits,它表示上一次GC之后每一个 object的分配情况,1表示已分配,0表示未使用或释放。为了加深理解,我们在来看下mspan的结构体定义,它有一个allocBits和gcmarkBits字段,这两个字段的类型都是一样的。

代码语言:javascript复制
type mspan struct {
 ...
 allocBits  *gcBits
 gcmarkBits *gcBits
    ...
}

在前面的标记阶段分析中,可以看到一个对象被标记为黑色是将它所在的mspan中的gcmarkBits对应的bit位设置为1. GC会用gcmarkBits位图来跟踪在使用中的内存。使用gcmarkBits精确查看可用于分配的内存,将gcmarkBits赋值给allocBits的这个操作就是内存清理。然而必须每个mspan都来一次类似的处理,需要耗费大量时间。Go 的目标是在清理内存时不阻碍执行,并为此提供了两种策略.

  1. 在后台启动一个worker goroutine等待清理内存,一个一个mspan处理
  2. 当申请分配内存的时候lazy触发

第一种策略是在开始运行程序时,runtime将设置一个后台运行的worker goroutine,它的任务只有一个,就是干清理内存的工作。开始设置时不会运行,会进入睡眠状态等待内存扫描。在上面的gcMarkTermination处理中会唤醒它。

第二种策略lazy触发是即时执行。但是,由于这些内存段已经被分发到每一个处理器P的本地缓存mcache中,因为辅助清扫在申请新span时才会检查,所以Go首先将所有P本地mcache中的span动到mcentral,然后将本地的mcache设置为emptymspan,然后,新申请内存时,本地mcache中没有span,会让本地缓存mcache再次请求它们,去即时清理。将本地mcache中的span动到mcentral的处理是在上面的标记终止阶段的gcMarkTermination中做的,详情见上面的分析。

即时扫描确保所有内存段在保存资源的过程中都会得到清理,同时会保存资源以及不会阻塞程序执行.由于后台只有一个worker在清理内存块,清理过程可能会花费一些时间。但是,我们可能想知道如果另一个GC周期在一次清理过程中启动会发生什么。在这种情况下,这个运行GC的 goroutine就会在开始标记阶段前去协助完成剩余的清理工作。这也是在本文开头介绍GC的四个处理阶段时说为啥要将清扫放在gcStart处理第一步的原因。理解了上面介绍的清理处理的原理,下面结合清理的关键源码做一点分析,进一步加深对清理流程的认识。

gcSweep主要的任务是重置清理阶段的相关状态,然后唤醒sweep清扫的goroutine。后台的清扫goroutine在是runtime main函数中的bgsweep启动的。

代码语言:javascript复制
func gcSweep(mode gcMode) {
 if gcphase != _GCoff {
  throw("gcSweep being done but phase is not GCoff")
 }

 lock(&mheap_.lock)
 // 增加sweepgen的值,这样sweepSpans中两个队列角色会交换,所有span都会变为待清扫的span
 mheap_.sweepgen  = 2
 mheap_.sweepdone = 0
 if mheap_.sweepSpans[mheap_.sweepgen/2%2].index != 0 {
  throw("non-empty swept list")
 }
 mheap_.pagesSwept = 0
 mheap_.sweepArenas = mheap_.allArenas
 mheap_.reclaimIndex = 0
 mheap_.reclaimCredit = 0
 unlock(&mheap_.lock)
    ...
 lock(&sweep.lock)
 // 唤醒后台清扫任务
 if sweep.parked {
  sweep.parked = false
  ready(sweep.g, 0, true)
 }
 unlock(&sweep.lock)
}

gcenable中启动异步清扫goroutine

代码语言:javascript复制
func gcenable() {
 c := make(chan int, 2)
 // 设置异步清扫
 go bgsweep(c)
 go bgscavenge(c)
 <-c
 <-c
 memstats.enablegc = true
}

bgsweep会调用sweepone进行清理,它会在堆内存中查找待清理的span,并且返回清扫了多少Page到heap中。

代码语言:javascript复制
func bgsweep(c chan int) {
 // 设置清扫goroutine
 sweep.g = getg()
 // 等待唤醒
 lock(&sweep.lock)
 sweep.parked = true
 c <- 1
 goparkunlock(&sweep.lock, waitReasonGCSweepWait, traceEvGoBlock, 1)

 // 循环清扫
 for {
  // 清扫一个span,然后进入调度
  for sweepone() != ^uintptr(0) {
   sweep.nbgsweep  
   Gosched()
  }
  // 释放一些没有使用的标记队列缓冲区到heap
  for freeSomeWbufs(true) {
   Gosched()
  }
  lock(&sweep.lock)
  // 如果清扫还没有完成则继续循环
  if !isSweepDone() {
   unlock(&sweep.lock)
   continue
  }
  // 否则让后台清扫任务进入休眠,让当前M继续调度
  sweep.parked = true
  goparkunlock(&sweep.lock, waitReasonGCSweepWait, traceEvGoBlock, 1)
 }
}
代码语言:javascript复制
func sweepone() uintptr {
 _g_ := getg()
 sweepRatio := mheap_.sweepPagesPerByte 
 _g_.m.locks  
 // 检查是否清扫完成,清扫完成时mheap_.sweepdone为1
 if atomic.Load(&mheap_.sweepdone) != 0 {
  _g_.m.locks--
  return ^uintptr(0)
 }
 atomic.Xadd(&mheap_.sweepers,  1)

 // 查找一个span并释放
 var s *mspan
 sg := mheap_.sweepgen
 for {
  s = mheap_.sweepSpans[1-sg/2%2].pop()
  if s == nil {
   atomic.Store(&mheap_.sweepdone, 1)
   break
  }
  if state := s.state.get(); state != mSpanInUse {
   if !(s.sweepgen == sg || s.sweepgen == sg 3) {
    print("runtime: bad span s.state=", state, " s.sweepgen=", s.sweepgen, " sweepgen=", sg, "n")
    throw("non in-use span in unswept list")
   }
   continue
  }
  // span的sweepgen等于mheap.sweepgen-2表示当前span需要被清理
  if s.sweepgen == sg-2 && atomic.Cas(&s.sweepgen, sg-2, sg-1) {
   break
  }
 }

 // 清理span
 npages := ^uintptr(0)
 if s != nil {
  npages = s.npages
  // 回收内存
  if s.sweep(false) {
  atomic.Xadduintptr(&mheap_.reclaimCredit, npages)
  } else {
   npages = 0
  }
 }
...
 return npages
}

mspan的sweep方法清扫单个span对象,核心操作就是把gcmarkBits赋值给allocBits,这这个操作就是相对于把内存清理了。

代码语言:javascript复制
func (s *mspan) sweep(preserve bool) bool {
 _g_ := getg()
 if _g_.m.locks == 0 && _g_.m.mallocing == 0 && _g_ != _g_.m.g0 {
  throw("mspan.sweep: m is not locked")
 }
 sweepgen := mheap_.sweepgen
 ...
 // 设置新的allocCount
 s.allocCount = nalloc
 // 判断span中是否有无未分配的对象
 wasempty := s.nextFreeIndex() == s.nelems
 // 重置freeindex为0
 s.freeindex = 0 
 if trace.enabled {
  getg().m.p.ptr().traceReclaimed  = uintptr(nfreed) * s.elemsize
 }

 // 将gcmarkBits赋值给allocBits,这这个操作就是相对于把内存清理了
 // 然后重新分配一块全部为0的gcmarkBits,下次分配对象时可以根据allocBits得知哪些元素是未分配的
 s.allocBits = s.gcmarkBits
 s.gcmarkBits = newMarkBits(s.nelems)

 // 更新freeindex开始的allocCache
 s.refillAllocCache(0)

 // 如果span中已经没有存活的对象则更新sweepgen到最新,然后把span加入到mcentral或者mheap
 if freeToHeap || nfreed == 0 {
  if state := s.state.get(); state != mSpanInUse || s.sweepgen != sweepgen-1 {
   print("mspan.sweep: state=", state, " sweepgen=", s.sweepgen, " mheap.sweepgen=", sweepgen, "n")
   throw("mspan.sweep: bad span state after sweep")
  }
  atomic.Store(&s.sweepgen, sweepgen)
 }

 if nfreed > 0 && spc.sizeclass() != 0 {
  // 把span加到mcentral,res等于1表示添加成功
  c.local_nsmallfree[spc.sizeclass()]  = uintptr(nfreed)
  res = mheap_.central[spc].mcentral.freeSpan(s, preserve, wasempty)
 } else if freeToHeap {
  // 把span释放到mheap
  if debug.efence > 0 {
   s.limit = 0 
   sysFault(unsafe.Pointer(s.base()), size)
  } else {
   mheap_.freeSpan(s)
  }
  c.local_nlargefree  
  c.local_largefree  = size
  res = true
 }
 // 如果span没有添加到mcentral或者添加到mheap,表示span任然在使用
 if !res {
  // 把任然在使用的span添加到sweepSpans的已清扫队列中
  mheap_.sweepSpans[sweepgen/2%2].push(s)
 }
 return res
}

上面通过原理和源码分析了清扫阶段的流程,清理的过程核心就是将gcmarkBits赋值给allocBits,还要一个问题在这里说下,是怎么把不用的内存归还的呢?是分级归还的,将mcache中的归还给mcentral,然后mcentral在归还给mheap.具体的归还核心处理见下面的代码,想进一步深入细节的可以研究细节逻辑。

代码语言:javascript复制
func freemcache(c *mcache) {
    systemstack(func() {
        c.releaseAll()
        ...

        lock(&mheap_.lock)
        purgecachedstats(c)
        mheap_.cachealloc.free(unsafe.Pointer(c))
        unlock(&mheap_.lock)
    })
}

func (c *mcache) releaseAll() {
    for i := 0; i < _NumSizeClasses; i   {
        s := c.alloc[i]
        if s != &emptymspan {
            mheap_.central[i].mcentral.uncacheSpan(s)
            c.alloc[i] = &emptymspan
        }
    }
    c.tiny = 0
    c.tinyoffset = 0
}

func (c *mcentral) freeSpan(s *mspan, preserve bool, wasempty bool) bool {
    ...
    lock(&c.lock)
    ...
    if s.allocCount != 0 {
        unlock(&c.lock)
        return false
    }

    c.nonempty.remove(s)
    unlock(&c.lock)
    mheap_.freeSpan(s, 0)
    return true
}

总结

全文结合源码深入分析了GC的四个阶段,重点在标记阶段的gcBgMarkWorker处理逻辑和清理的本质原理。小编通过深入源码细节和图示相结合的方式分析了垃圾回收的实现细节,相信各位同学在学习本文之后对GC的实现会有深入的理解。

Garbage Collection In Go[1]garbage-collection[2]Golang源码探索(三) GC的实现原理[3]深度剖析 Golang 的 GC 扫描对象实现[4]golang-garbage-collector[5]Go:内存管理与内存清理[6]

Reference

[1]

Garbage Collection In Go: https://www.ardanlabs.com/blog/2018/12/garbage-collection-in-go-part1-semantics.html

[2]

garbage-collection: https://github.com/ardanlabs/gotraining/tree/master/reading#garbage-collection

[3]

Golang源码探索(三) GC的实现原理: https://www.cnblogs.com/zkweb/p/7880099.html

[4]

深度剖析 Golang 的 GC 扫描对象实现: https://www.jianshu.com/p/ebd8b012572e

[5]

golang-garbage-collector: https://draveness.me/golang/docs/part3-runtime/ch07-memory/golang-garbage-collector

[6]

Go:内存管理与内存清理: https://zhuanlan.zhihu.com/p/373438878

0 人点赞