github.com/google/btree是golang实现的一个平衡多叉树。它是etcd索引使用的树形结构。它使用起来非常简单。
代码语言:javascript复制package main
import (
"fmt"
"strconv"
"github.com/google/btree"
)
type MyTree struct {
Age int
Name string
}
func (m *MyTree) Less(item btree.Item) bool {
return m.Age < (item.(*MyTree)).Age
}
func main() {
tree := btree.New(2) //创建一个度为2的树,它最多有4个孩子节点和三个元素
for i := 0; i < 100; i {
//插入数据
tree.ReplaceOrInsert(&MyTree{Age: i, Name: "freedom" strconv.Itoa(i)})
}
//查询数据
tree.DescendRange(&MyTree{Age: 52}, &MyTree{Age: 48}, func(a btree.Item) bool {
item := a.(*MyTree)
fmt.Println(item)
return true
})
}
接着,我们复习下多叉平衡树的定义:对于一个度为m的多叉平衡树,最多有2*m 1个孩子节点和2*m个元素,所有非叶子非根节点至少有m个元素,所有叶子节点都在同一层级。
接着我们看下这个包里是如何实现的:它实现了两个版本go1.18以上的范型版本和go1.18 btree_generic.go以下的非范型版本btree.go。并且和左倾红黑树做了性能对比:btree_mem.go
这里我们重点看下非范型版本(范型版本逻辑一样,只是将一些基于接口实现的部分换成了范型实现),源码位于btree.go,如果要使用btree,需要实现Less接口即可。
代码语言:javascript复制type Item interface {
// Less tests whether the current item is less than the given argument.
//
// This must provide a strict weak ordering.
// If !a.Less(b) && !b.Less(a), we treat this to mean a == b (i.e. we can only
// hold one of either a or b in the tree).
Less(than Item) bool
}
然后定义了FreeList里面保存了可以回收再利用的节点列表,用于减轻gc的压力
代码语言:javascript复制 const (
DefaultFreeListSize = 32
)
代码语言:javascript复制type FreeList struct {
mu sync.Mutex
freelist []*node
}
所有新节点的分配和节点释放都是基于freeList来实现的,可以认为是一个简化的内存管理器:每次从节点slice的最后一个位置分配节点,缩小slice,如果slice为空就new 新创建一个节点。
代码语言:javascript复制func (f *FreeList) newNode() (n *node) {
f.mu.Lock()
index := len(f.freelist) - 1
if index < 0 {
f.mu.Unlock()
return new(node)
}
n = f.freelist[index]
f.freelist[index] = nil
f.freelist = f.freelist[:index]
释放节点的过程刚好反过来,小于最大长度,把要释放的节点放在slice最后,超过长度不处理,让gc给处理掉。
代码语言:javascript复制func (f *FreeList) freeNode(n *node) (out bool) {
if len(f.freelist) < cap(f.freelist) {
f.freelist = append(f.freelist, n)
out = true
它管理的node也就是我们的多叉树节点,其定义如下
代码语言:javascript复制type node struct {
items items //节点内元素
children children //孩子节点
cow *copyOnWriteContext //实现copy on write
}
代码语言:javascript复制type items []Item
type children []*node
代码语言:javascript复制type copyOnWriteContext struct {
freelist *FreeList
}
代码语言:javascript复制func (c *copyOnWriteContext) newNode() (n *node) {
n = c.freelist.newNode()
n.cow = c
return
}
代码语言:javascript复制func (c *copyOnWriteContext) freeNode(n *node) freeType {
if n.cow == c {
// clear to allow GC
n.items.truncate(0)
n.children.truncate(0)
n.cow = nil
if c.freelist.freeNode(n) {
copyOnWriteContext也实现了节点分配和释放逻辑,分配的时候从freelist里面分配节点,并把当前context的地址赋值给新分配的节点。释放节点的时候,如果是相同context就释放,否则不操作。释放的时候先把元素和孩子节点指针清空,然后把节点放入当前context的freelist里面,放满了交给垃圾回收器处理。
通过逻辑保证孩子节点数永远比节点内元素个数多一个。BTree代表了一颗平衡多叉树,里面包含了这棵树的度和树里元素的总个数,是读并发安全的,但是不是写并发安全的。
代码语言:javascript复制type BTree struct {
degree int
length int
root *node
cow *copyOnWriteContext
}
我们从New函数进入看下它是如何初始化的:
代码语言:javascript复制func New(degree int) *BTree {
return NewWithFreeList(degree, NewFreeList(DefaultFreeListSize))
}
代码语言:javascript复制func NewWithFreeList(degree int, f *FreeList) *BTree {
return &BTree{
degree: degree,
cow: ©OnWriteContext{freelist: f},
}
在cow里存储了预分配的节点指针空间
代码语言:javascript复制func NewFreeList(size int) *FreeList {
return &FreeList{freelist: make([]*node, 0, size)}
}
在学习整棵树的增删改查之前,我们先看下节点内部元素的增删改查过程。
代码语言:javascript复制type items []Item
代码语言:javascript复制func (s *items) insertAt(index int, item Item) {
*s = append(*s, nil)
if index < len(*s) {
copy((*s)[index 1:], (*s)[index:])
}
(*s)[index] = item
在slice的最后插入一个空指针,并把插入位置之后的所有元素后移一个位置,然后在空出的位置插入当前元素,删除是逆向过程,将当前位置之后的所有元素左移一位,最后位置置空
代码语言:javascript复制func (s *items) removeAt(index int) Item {
copy((*s)[index:], (*s)[index 1:])
(*s)[len(*s)-1] = nil
返回最后的元素,并把最后的元素位置地址置空
代码语言:javascript复制func (s *items) pop() (out Item) {
代码语言:javascript复制var (
nilItems = make(items, 16)
nilChildren = make(children, 16)
)
把指定位置之后的所有指针16个一组全部置为空
代码语言:javascript复制func (s *items) truncate(index int) {
var toClear items
*s, toClear = (*s)[:index], (*s)[index:]
for len(toClear) > 0 {
toClear = toClear[copy(toClear, nilItems):]
}
查找的过程就是slice的二分查找
代码语言:javascript复制func (s items) find(item Item) (index int, found bool) {
i := sort.Search(len(s), func(i int) bool {
return item.Less(s[i])
})
if i > 0 && !s[i-1].Less(item) {
return i - 1, true
}
return i, false
对于孩子节点的操作是一模一样的
代码语言:javascript复制type children []*node
代码语言:javascript复制func (s *children) insertAt(index int, n *node) {
func (s *children) removeAt(index int) *node {
func (s *children) pop() (out *node) {
func (s *children) truncate(index int) {
分析完节点的内部元素和孩子节点后,我们看下树的节点,为了便于节点的遍历,定义了迭代器,如果返回false,迭代器停止遍历,否则继续在树上遍历
代码语言:javascript复制type ItemIterator func(i Item) bool
mutableFor用于判断当前上下文中,可以修改的节点,如果不能修改,就复制一份用来修改,从而实现了copy on write,复制的时候会复制内部元素和孩子节点指针
代码语言:javascript复制func (n *node) mutableFor(cow *copyOnWriteContext) *node {
if n.cow == cow {
return n
out := cow.newNode()
copy(out.items, n.items)
copy(out.children, n.children)
获取某个孩子在当前上下文可以修改的节点。
代码语言:javascript复制func (n *node) mutableChild(i int) *node {
c := n.children[i].mutableFor(n.cow)
n.children[i] = c
return c
}
把节点的元素和孩子节点在指定位置拆分成两部分,返回当前元素和拆分后的新节点
代码语言:javascript复制func (n *node) split(i int) (Item, *node) {
item := n.items[i]
next := n.cow.newNode()
next.items = append(next.items, n.items[i 1:]...)
n.items.truncate(i)
if len(n.children) > 0 {
next.children = append(next.children, n.children[i 1:]...)
n.children.truncate(i 1)
检查孩子节点是否需要拆分,如果孩子节点需要拆分,返回true
代码语言:javascript复制func (n *node) maybeSplitChild(i, maxItems int) bool {
if len(n.children[i].items) < maxItems {
return false
first := n.mutableChild(i)
item, second := first.split(maxItems / 2)
n.items.insertAt(i, item)
n.children.insertAt(i 1, second)
找到i位置的孩子节点,并把孩子节点从最大节点数的一半地方拆开,在当前节点i的位置插入切分位置的元素暗,并在i 1的孩子节点位置插入从i位置孩子节点上拆分出来的右半部分。
接着看下节点的插入函数的执行流程,首先找到元素,如果找到了,替换这个位置的元素返回旧元素;没有找到执行插入新节点的流程。
插入的过程分两种情况:叶子节点(没有孩子节点的时候),直接在当前节点位置i插入新元素即可;非叶子节点如果需要拆分孩子节点拆分孩子节点,如果当前节点比插入点元素小,在i位置的孩子节点插入元素;否则在i 1位置的孩子节点插入元素。非叶子节点如果不需要拆分孩子节点,在孩子i的位置插入节点。插入过程是递归进行的。
代码语言:javascript复制func (n *node) insert(item Item, maxItems int) Item {
i, found := n.items.find(item)
if found {
out := n.items[i]
n.items[i] = item
return out
if len(n.children) == 0 {
n.items.insertAt(i, item)
if n.maybeSplitChild(i, maxItems) {
inTree := n.items[i]
switch {
case item.Less(inTree):
// no change, we want first split node
case inTree.Less(item):
i // we want second split node
default:
out := n.items[i]
n.items[i] = item
return out
}
}
return n.mutableChild(i).insert(item, maxItems)
查找的过程是先通过二分查找方式在当前节点查找元素,如果找到了返回当前节点对应位置的元素,否则,在孩子i的位置查找元素,递归进行
代码语言:javascript复制func (n *node) get(key Item) Item {
i, found := n.items.find(key)
if found {
return n.items[i]
} else if len(n.children) > 0 {
return n.children[i].get(key)
}
获取最小元素的时候,先从最小孩子的最小元素开始递归查找,如果没有孩子,返回当前节点的最小元素
代码语言:javascript复制func min(n *node) Item {
for len(n.children) > 0 {
n = n.children[0]
return n.items[0]
最大元素查找是类似流程
代码语言:javascript复制func max(n *node) Item {
for len(n.children) > 0 {
n = n.children[len(n.children)-1]
return n.items[len(n.items)-1]
删除的过程比较复杂:1,如果是叶子节点,并且找到了这个元素,直接删除即可;2,如果i位置的孩子节点元素个数比最小个数少,调用函数growChildAndRemove;3,如果在孩子节点i处找到了元素,移除孩子节点i的最大元素,并将它填入当前节点的i位置;4,如果没有找到,递归在孩子节点i处继续查找。
代码语言:javascript复制func (n *node) remove(item Item, minItems int, typ toRemove) Item {
switch typ {
case removeMax:
if len(n.children) == 0 {
return n.items.pop()
}
i = len(n.items)
case removeMin:
if len(n.children) == 0 {
return n.items.removeAt(0)
}
i = 0
case removeItem:
i, found = n.items.find(item)
if len(n.children) == 0 {
if found {
return n.items.removeAt(i)
}
return nil
}
default:
panic("invalid type")
}
if len(n.children[i].items) <= minItems {
return n.growChildAndRemove(i, item, minItems, typ)
}
child := n.mutableChild(i)
if found {
// The item exists at index 'i', and the child we've selected can give us a
// predecessor, since if we've gotten here it's got > minItems items in it.
out := n.items[i]
// We use our special-case 'remove' call with typ=maxItem to pull the
// predecessor of item i (the rightmost leaf of our immediate left child)
// and set it into where we pulled the item from.
n.items[i] = child.remove(nil, minItems, removeMax)
return out
}
return child.remove(item, minItems, typ)
分三种场景进行调整,调整完毕后再递归调用remove方法来进行节点删除:1,孩子的左兄弟节点位置的元素个数比最小元素个数大,在孩子的左兄弟节点偷取最大元素,存入i-1位置,并把i-1位置的元素存入孩子i的最左侧,并把被偷位置的孩子节点插入到孩子i的最左孩子位置。2,如果孩子的右兄弟元素个数比最小元素个数大,从孩子的右兄弟的最左孩子位置处偷元素。插入到孩子i的最右位置。
代码语言:javascript复制func (n *node) growChildAndRemove(i int, item Item, minItems int, typ toRemove) Item {
if i > 0 && len(n.children[i-1].items) > minItems {
// Steal from left child
child := n.mutableChild(i)
stealFrom := n.mutableChild(i - 1)
stolenItem := stealFrom.items.pop()
child.items.insertAt(0, n.items[i-1])
n.items[i-1] = stolenItem
if len(stealFrom.children) > 0 {
child.children.insertAt(0, stealFrom.children.pop())
}
} else if i < len(n.items) && len(n.children[i 1].items) > minItems {
// steal from right child
child := n.mutableChild(i)
stealFrom := n.mutableChild(i 1)
stolenItem := stealFrom.items.removeAt(0)
child.items = append(child.items, n.items[i])
n.items[i] = stolenItem
if len(stealFrom.children) > 0 {
child.children = append(child.children, stealFrom.children.removeAt(0))
}
} else {
if i >= len(n.items) {
i--
}
child := n.mutableChild(i)
// merge with right child
mergeItem := n.items.removeAt(i)
mergeChild := n.children.removeAt(i 1)
child.items = append(child.items, mergeItem)
child.items = append(child.items, mergeChild.items...)
child.children = append(child.children, mergeChild.children...)
n.cow.freeNode(mergeChild)
}
return n.remove(item, minItems, typ)
3,如果不满足上述两种情况,先把i位置的元素删掉,然后把i 1孩子给删掉,把删掉的元素和删掉的孩子的元素追加到孩子i的元素末尾,把删掉的孩子的孩子,追加到孩子i的孩子末尾。处理完上述调整后,就继续进行递归的删除操作。
iterate实现了迭代器,在节点内通过二分查找快速定位元素,然后再树形父子结构上通过递归来定位元素,迭代的时候需要控制方向的正反。
代码语言:javascript复制func (n *node) iterate(dir direction, start, stop Item, includeStart bool, hit bool, iter ItemIterator) (bool, bool) {
switch dir {
case ascend:
if start != nil {
index, _ = n.items.find(start)
}
for i := index; i < len(n.items); i {
if len(n.children) > 0 {
if hit, ok = n.children[i].iterate(dir, start, stop, includeStart, hit, iter); !ok {
return hit, false
}
学习完node的增删改查算法后,BTree的增删改查只是对上述方法的一个包装而已。
Clone方法,通过对cow属性的简单复制,实现了懒复制。
代码语言:javascript复制func (t *BTree) Clone() (t2 *BTree) {
cow1, cow2 := *t.cow, *t.cow
out := *t
t.cow = &cow1
out.cow = &cow2
return &out
定义了节点的最大元素个数和最小元素个数。
代码语言:javascript复制func (t *BTree) maxItems() int {
return t.degree*2 - 1
代码语言:javascript复制func (t *BTree) minItems() int {
return t.degree - 1
插入元素过程,如果根节点为空,直接插入元素;如果根节点的元素个数达到最大了,将根节点一份为二,新生成一个节点作为它们的根节点;在根节点处插入元素。
代码语言:javascript复制func (t *BTree) ReplaceOrInsert(item Item) Item {
if t.root == nil {
t.root = t.cow.newNode()
t.root.items = append(t.root.items, item)
t.length
return nil
t.root = t.root.mutableFor(t.cow)
if len(t.root.items) >= t.maxItems() {
item2, second := t.root.split(t.maxItems() / 2)
out := t.root.insert(item, t.maxItems())
代码语言:javascript复制func (t *BTree) Delete(item Item) Item {
return t.deleteItem(item, removeItem)
删除节点的过程是,首先移除元素,如果移除后根节点为空,把第一个孩子节点当作根节点,删除老的根节点。
代码语言:javascript复制func (t *BTree) deleteItem(item Item, typ toRemove) Item {
t.root = t.root.mutableFor(t.cow)
out := t.root.remove(item, t.minItems(), typ)
if len(t.root.items) == 0 && len(t.root.children) > 0 {
oldroot := t.root
t.root = t.root.children[0]
t.cow.freeNode(oldroot)
}
range操作,就是迭代器的执行
代码语言:javascript复制func (t *BTree) AscendRange(greaterOrEqual, lessThan Item, iterator ItemIterator) {
t.root.iterate(ascend, greaterOrEqual, lessThan, true, false, iterator)
代码语言:javascript复制func (t *BTree) Ascend(iterator ItemIterator) {
代码语言:javascript复制func (t *BTree) DescendRange(lessOrEqual, greaterThan Item, iterator ItemIterator) {
查找类似
代码语言:javascript复制func (t *BTree) Get(key Item) Item {
return t.root.get(key)
代码语言:javascript复制func (t *BTree) Clear(addNodesToFreelist bool) {
t.root.reset(t.cow)
代码语言:javascript复制func (n *node) reset(c *copyOnWriteContext) bool {
for _, child := range n.children {
if !child.reset(c) {
BTree实现了b树,并通过cow属性, cow相等才能修改,否则只能复制 来实现了copy on write;通过编译选项,来控制使用范型版本还是非范型版本:
代码语言:javascript复制//go:build !go1.18
// build !go1.18
至此整个BTree的实现分析完毕。