golang源码分析:btree

2023-09-20 08:28:50 浏览数 (1)

github.com/google/btree是golang实现的一个平衡多叉树。它是etcd索引使用的树形结构。它使用起来非常简单。

代码语言:javascript复制
package main

import (
  "fmt"
  "strconv"

  "github.com/google/btree"
)

type MyTree struct {
  Age  int
  Name string
}

func (m *MyTree) Less(item btree.Item) bool {
  return m.Age < (item.(*MyTree)).Age
}

func main() {
  tree := btree.New(2) //创建一个度为2的树,它最多有4个孩子节点和三个元素
  for i := 0; i < 100; i   {
    //插入数据
    tree.ReplaceOrInsert(&MyTree{Age: i, Name: "freedom"   strconv.Itoa(i)})
  }
  //查询数据
  tree.DescendRange(&MyTree{Age: 52}, &MyTree{Age: 48}, func(a btree.Item) bool {
    item := a.(*MyTree)
    fmt.Println(item)
    return true
  })
}

接着,我们复习下多叉平衡树的定义:对于一个度为m的多叉平衡树,最多有2*m 1个孩子节点和2*m个元素,所有非叶子非根节点至少有m个元素,所有叶子节点都在同一层级。

接着我们看下这个包里是如何实现的:它实现了两个版本go1.18以上的范型版本和go1.18 btree_generic.go以下的非范型版本btree.go。并且和左倾红黑树做了性能对比:btree_mem.go

这里我们重点看下非范型版本(范型版本逻辑一样,只是将一些基于接口实现的部分换成了范型实现),源码位于btree.go,如果要使用btree,需要实现Less接口即可。

代码语言:javascript复制
type Item interface {
  // Less tests whether the current item is less than the given argument.
  //
  // This must provide a strict weak ordering.
  // If !a.Less(b) && !b.Less(a), we treat this to mean a == b (i.e. we can only
  // hold one of either a or b in the tree).
  Less(than Item) bool
}

然后定义了FreeList里面保存了可以回收再利用的节点列表,用于减轻gc的压力

代码语言:javascript复制
  const (
  DefaultFreeListSize = 32
)
代码语言:javascript复制
type FreeList struct {
  mu       sync.Mutex
  freelist []*node
}

所有新节点的分配和节点释放都是基于freeList来实现的,可以认为是一个简化的内存管理器:每次从节点slice的最后一个位置分配节点,缩小slice,如果slice为空就new 新创建一个节点。

代码语言:javascript复制
func (f *FreeList) newNode() (n *node) {
  f.mu.Lock()
  index := len(f.freelist) - 1
  if index < 0 {
    f.mu.Unlock()
    return new(node)
  }
  n = f.freelist[index]
  f.freelist[index] = nil
  f.freelist = f.freelist[:index]

释放节点的过程刚好反过来,小于最大长度,把要释放的节点放在slice最后,超过长度不处理,让gc给处理掉。

代码语言:javascript复制
func (f *FreeList) freeNode(n *node) (out bool) {
  if len(f.freelist) < cap(f.freelist) {
    f.freelist = append(f.freelist, n)
    out = true

它管理的node也就是我们的多叉树节点,其定义如下

代码语言:javascript复制
type node struct {
  items    items //节点内元素
  children children //孩子节点
  cow      *copyOnWriteContext //实现copy on write
}
代码语言:javascript复制
type items []Item
type children []*node
代码语言:javascript复制
type copyOnWriteContext struct {
  freelist *FreeList
}
代码语言:javascript复制
func (c *copyOnWriteContext) newNode() (n *node) {
  n = c.freelist.newNode()
  n.cow = c
  return
}
代码语言:javascript复制
func (c *copyOnWriteContext) freeNode(n *node) freeType {
  if n.cow == c {
    // clear to allow GC
    n.items.truncate(0)
    n.children.truncate(0)
    n.cow = nil
    if c.freelist.freeNode(n) {

copyOnWriteContext也实现了节点分配和释放逻辑,分配的时候从freelist里面分配节点,并把当前context的地址赋值给新分配的节点。释放节点的时候,如果是相同context就释放,否则不操作。释放的时候先把元素和孩子节点指针清空,然后把节点放入当前context的freelist里面,放满了交给垃圾回收器处理。

通过逻辑保证孩子节点数永远比节点内元素个数多一个。BTree代表了一颗平衡多叉树,里面包含了这棵树的度和树里元素的总个数,是读并发安全的,但是不是写并发安全的。

代码语言:javascript复制
type BTree struct {
  degree int
  length int
  root   *node
  cow    *copyOnWriteContext
}

我们从New函数进入看下它是如何初始化的:

代码语言:javascript复制
func New(degree int) *BTree {
  return NewWithFreeList(degree, NewFreeList(DefaultFreeListSize))
}
代码语言:javascript复制
func NewWithFreeList(degree int, f *FreeList) *BTree {
  return &BTree{
  degree: degree,
  cow:    &copyOnWriteContext{freelist: f},
  }

在cow里存储了预分配的节点指针空间

代码语言:javascript复制
func NewFreeList(size int) *FreeList {
  return &FreeList{freelist: make([]*node, 0, size)}
}

在学习整棵树的增删改查之前,我们先看下节点内部元素的增删改查过程。

代码语言:javascript复制
type items []Item
代码语言:javascript复制
func (s *items) insertAt(index int, item Item) {
  *s = append(*s, nil)
  if index < len(*s) {
  copy((*s)[index 1:], (*s)[index:])
  }
  (*s)[index] = item

在slice的最后插入一个空指针,并把插入位置之后的所有元素后移一个位置,然后在空出的位置插入当前元素,删除是逆向过程,将当前位置之后的所有元素左移一位,最后位置置空

代码语言:javascript复制
func (s *items) removeAt(index int) Item {
   copy((*s)[index:], (*s)[index 1:])
  (*s)[len(*s)-1] = nil

返回最后的元素,并把最后的元素位置地址置空

代码语言:javascript复制
func (s *items) pop() (out Item) {
代码语言:javascript复制
var (
  nilItems    = make(items, 16)
  nilChildren = make(children, 16)
)

把指定位置之后的所有指针16个一组全部置为空

代码语言:javascript复制
func (s *items) truncate(index int) {  
  var toClear items
  *s, toClear = (*s)[:index], (*s)[index:]
  for len(toClear) > 0 {
    toClear = toClear[copy(toClear, nilItems):]
  }

查找的过程就是slice的二分查找

代码语言:javascript复制
func (s items) find(item Item) (index int, found bool) {
      i := sort.Search(len(s), func(i int) bool {
    return item.Less(s[i])
  })
  if i > 0 && !s[i-1].Less(item) {
    return i - 1, true
  }
  return i, false

对于孩子节点的操作是一模一样的

代码语言:javascript复制
type children []*node
代码语言:javascript复制
func (s *children) insertAt(index int, n *node) {
func (s *children) removeAt(index int) *node {
func (s *children) pop() (out *node) {
func (s *children) truncate(index int) {

分析完节点的内部元素和孩子节点后,我们看下树的节点,为了便于节点的遍历,定义了迭代器,如果返回false,迭代器停止遍历,否则继续在树上遍历

代码语言:javascript复制
type ItemIterator func(i Item) bool

mutableFor用于判断当前上下文中,可以修改的节点,如果不能修改,就复制一份用来修改,从而实现了copy on write,复制的时候会复制内部元素和孩子节点指针

代码语言:javascript复制
func (n *node) mutableFor(cow *copyOnWriteContext) *node {
      if n.cow == cow {
        return n
      out := cow.newNode()
      copy(out.items, n.items)
      copy(out.children, n.children)

获取某个孩子在当前上下文可以修改的节点。

代码语言:javascript复制
func (n *node) mutableChild(i int) *node {
  c := n.children[i].mutableFor(n.cow)
  n.children[i] = c
  return c
}

把节点的元素和孩子节点在指定位置拆分成两部分,返回当前元素和拆分后的新节点

代码语言:javascript复制
func (n *node) split(i int) (Item, *node) {
  item := n.items[i]
  
  next := n.cow.newNode()
  next.items = append(next.items, n.items[i 1:]...)
  n.items.truncate(i)
  
  if len(n.children) > 0 {
    next.children = append(next.children, n.children[i 1:]...)
    n.children.truncate(i   1)

检查孩子节点是否需要拆分,如果孩子节点需要拆分,返回true

代码语言:javascript复制
func (n *node) maybeSplitChild(i, maxItems int) bool {
  if len(n.children[i].items) < maxItems {
    return false
  first := n.mutableChild(i)
  item, second := first.split(maxItems / 2)
  n.items.insertAt(i, item)
  n.children.insertAt(i 1, second)

找到i位置的孩子节点,并把孩子节点从最大节点数的一半地方拆开,在当前节点i的位置插入切分位置的元素暗,并在i 1的孩子节点位置插入从i位置孩子节点上拆分出来的右半部分。

接着看下节点的插入函数的执行流程,首先找到元素,如果找到了,替换这个位置的元素返回旧元素;没有找到执行插入新节点的流程。

插入的过程分两种情况:叶子节点(没有孩子节点的时候),直接在当前节点位置i插入新元素即可;非叶子节点如果需要拆分孩子节点拆分孩子节点,如果当前节点比插入点元素小,在i位置的孩子节点插入元素;否则在i 1位置的孩子节点插入元素。非叶子节点如果不需要拆分孩子节点,在孩子i的位置插入节点。插入过程是递归进行的。

代码语言:javascript复制
func (n *node) insert(item Item, maxItems int) Item {
  i, found := n.items.find(item)
  if found {
    out := n.items[i]
    n.items[i] = item
    return out
  if len(n.children) == 0 {
    n.items.insertAt(i, item)
  if n.maybeSplitChild(i, maxItems) {
    inTree := n.items[i]
    switch {
    case item.Less(inTree):
      // no change, we want first split node
    case inTree.Less(item):
      i   // we want second split node
    default:
      out := n.items[i]
      n.items[i] = item
      return out
    }
  }
  return n.mutableChild(i).insert(item, maxItems)

查找的过程是先通过二分查找方式在当前节点查找元素,如果找到了返回当前节点对应位置的元素,否则,在孩子i的位置查找元素,递归进行

代码语言:javascript复制
func (n *node) get(key Item) Item {
   i, found := n.items.find(key)
  if found {
    return n.items[i]
  } else if len(n.children) > 0 {
    return n.children[i].get(key)
  }

获取最小元素的时候,先从最小孩子的最小元素开始递归查找,如果没有孩子,返回当前节点的最小元素

代码语言:javascript复制
func min(n *node) Item {
      for len(n.children) > 0 {
         n = n.children[0]
      return n.items[0]

最大元素查找是类似流程

代码语言:javascript复制
func max(n *node) Item {
   for len(n.children) > 0 {
    n = n.children[len(n.children)-1]
  return n.items[len(n.items)-1]

删除的过程比较复杂:1,如果是叶子节点,并且找到了这个元素,直接删除即可;2,如果i位置的孩子节点元素个数比最小个数少,调用函数growChildAndRemove;3,如果在孩子节点i处找到了元素,移除孩子节点i的最大元素,并将它填入当前节点的i位置;4,如果没有找到,递归在孩子节点i处继续查找。

代码语言:javascript复制
func (n *node) remove(item Item, minItems int, typ toRemove) Item {
 switch typ {
  case removeMax:
    if len(n.children) == 0 {
      return n.items.pop()
    }
    i = len(n.items)
  case removeMin:
    if len(n.children) == 0 {
      return n.items.removeAt(0)
    }
    i = 0
  case removeItem:
    i, found = n.items.find(item)
    if len(n.children) == 0 {
      if found {
        return n.items.removeAt(i)
      }
      return nil
    }
    default:
      panic("invalid type")
    }
  if len(n.children[i].items) <= minItems {
    return n.growChildAndRemove(i, item, minItems, typ)
  }
  child := n.mutableChild(i)
  if found {
    // The item exists at index 'i', and the child we've selected can give us a
    // predecessor, since if we've gotten here it's got > minItems items in it.
    out := n.items[i]
    // We use our special-case 'remove' call with typ=maxItem to pull the
    // predecessor of item i (the rightmost leaf of our immediate left child)
    // and set it into where we pulled the item from.
    n.items[i] = child.remove(nil, minItems, removeMax)
    return out
  }
return child.remove(item, minItems, typ)

分三种场景进行调整,调整完毕后再递归调用remove方法来进行节点删除:1,孩子的左兄弟节点位置的元素个数比最小元素个数大,在孩子的左兄弟节点偷取最大元素,存入i-1位置,并把i-1位置的元素存入孩子i的最左侧,并把被偷位置的孩子节点插入到孩子i的最左孩子位置。2,如果孩子的右兄弟元素个数比最小元素个数大,从孩子的右兄弟的最左孩子位置处偷元素。插入到孩子i的最右位置。

代码语言:javascript复制
func (n *node) growChildAndRemove(i int, item Item, minItems int, typ toRemove) Item {
   if i > 0 && len(n.children[i-1].items) > minItems {
    // Steal from left child
    child := n.mutableChild(i)
    stealFrom := n.mutableChild(i - 1)
    stolenItem := stealFrom.items.pop()
    child.items.insertAt(0, n.items[i-1])
    n.items[i-1] = stolenItem
    if len(stealFrom.children) > 0 {
      child.children.insertAt(0, stealFrom.children.pop())
    }
   } else if i < len(n.items) && len(n.children[i 1].items) > minItems {
        // steal from right child
        child := n.mutableChild(i)
        stealFrom := n.mutableChild(i   1)
        stolenItem := stealFrom.items.removeAt(0)
        child.items = append(child.items, n.items[i])
        n.items[i] = stolenItem
        if len(stealFrom.children) > 0 {
          child.children = append(child.children, stealFrom.children.removeAt(0))
        }
    } else {
      if i >= len(n.items) {
          i--
       }
      child := n.mutableChild(i)
      // merge with right child
      mergeItem := n.items.removeAt(i)
      mergeChild := n.children.removeAt(i   1)
      child.items = append(child.items, mergeItem)
      child.items = append(child.items, mergeChild.items...)
      child.children = append(child.children, mergeChild.children...)
      n.cow.freeNode(mergeChild)
    }

  return n.remove(item, minItems, typ)

3,如果不满足上述两种情况,先把i位置的元素删掉,然后把i 1孩子给删掉,把删掉的元素和删掉的孩子的元素追加到孩子i的元素末尾,把删掉的孩子的孩子,追加到孩子i的孩子末尾。处理完上述调整后,就继续进行递归的删除操作。

iterate实现了迭代器,在节点内通过二分查找快速定位元素,然后再树形父子结构上通过递归来定位元素,迭代的时候需要控制方向的正反。

代码语言:javascript复制
func (n *node) iterate(dir direction, start, stop Item, includeStart bool, hit bool, iter ItemIterator) (bool, bool) {
 switch dir {
  case ascend:
    if start != nil {
    index, _ = n.items.find(start)
    }
    for i := index; i < len(n.items); i   {
    if len(n.children) > 0 {
    if hit, ok = n.children[i].iterate(dir, start, stop, includeStart, hit, iter); !ok {
    return hit, false
    }

学习完node的增删改查算法后,BTree的增删改查只是对上述方法的一个包装而已。

Clone方法,通过对cow属性的简单复制,实现了懒复制。

代码语言:javascript复制
func (t *BTree) Clone() (t2 *BTree) {
  cow1, cow2 := *t.cow, *t.cow
  out := *t
  t.cow = &cow1
  out.cow = &cow2
  return &out  

定义了节点的最大元素个数和最小元素个数。

代码语言:javascript复制
func (t *BTree) maxItems() int {
  return t.degree*2 - 1
代码语言:javascript复制
func (t *BTree) minItems() int {
  return t.degree - 1

插入元素过程,如果根节点为空,直接插入元素;如果根节点的元素个数达到最大了,将根节点一份为二,新生成一个节点作为它们的根节点;在根节点处插入元素。

代码语言:javascript复制
func (t *BTree) ReplaceOrInsert(item Item) Item {
   if t.root == nil {
      t.root = t.cow.newNode()
      t.root.items = append(t.root.items, item)
      t.length  
      return nil
    t.root = t.root.mutableFor(t.cow)
      if len(t.root.items) >= t.maxItems() {
        item2, second := t.root.split(t.maxItems() / 2)
    out := t.root.insert(item, t.maxItems())
代码语言:javascript复制
func (t *BTree) Delete(item Item) Item {
  return t.deleteItem(item, removeItem)

删除节点的过程是,首先移除元素,如果移除后根节点为空,把第一个孩子节点当作根节点,删除老的根节点。

代码语言:javascript复制
func (t *BTree) deleteItem(item Item, typ toRemove) Item {
   t.root = t.root.mutableFor(t.cow)
   out := t.root.remove(item, t.minItems(), typ)
  if len(t.root.items) == 0 && len(t.root.children) > 0 {
    oldroot := t.root
    t.root = t.root.children[0]
    t.cow.freeNode(oldroot)
  }

range操作,就是迭代器的执行

代码语言:javascript复制
func (t *BTree) AscendRange(greaterOrEqual, lessThan Item, iterator ItemIterator) {
      t.root.iterate(ascend, greaterOrEqual, lessThan, true, false, iterator)
代码语言:javascript复制
func (t *BTree) Ascend(iterator ItemIterator) {
代码语言:javascript复制
func (t *BTree) DescendRange(lessOrEqual, greaterThan Item, iterator ItemIterator) {

查找类似

代码语言:javascript复制
func (t *BTree) Get(key Item) Item {
      return t.root.get(key)
代码语言:javascript复制
func (t *BTree) Clear(addNodesToFreelist bool) {
      t.root.reset(t.cow)    
代码语言:javascript复制
func (n *node) reset(c *copyOnWriteContext) bool {
      for _, child := range n.children {
    if !child.reset(c) {

BTree实现了b树,并通过cow属性, cow相等才能修改,否则只能复制 来实现了copy on write;通过编译选项,来控制使用范型版本还是非范型版本:

代码语言:javascript复制
//go:build !go1.18
//  build !go1.18

至此整个BTree的实现分析完毕。

0 人点赞