boltdb源码分析系列-读&写数据是如何执行的?

2022-08-15 15:04:17 浏览数 (1)

boltdb是一个k-v存储引擎,它的核心操作是读写数据。本文从使用者的角度,结合读写数据的实例,分析读&写数据是如何执行的,以及各个组件是如何串联起来工作的。

boltdb中有两种事务模式:只读事务和读写事务。读数据采用只读事务操作,写数据采用读写事务操作。任意时刻,最多只有一个读写事务在操作。即多个只读事务、多个只读事务 1个读写事务都是可以并发进行操作。

读数据执行流程

读数据示例

读取boltdb数据有两种操作模式,一种是用户手动管理事务,另一种是通过boltdb提供的func (db *DB) View(fn func(*Tx) error) error方法,我们只要传入一个类型为func(*Tx) error的函数。

我们先来看手动管理事务的代码,分为以下流程:

  1. 通过Open操作,创建一个boltdb实例
  2. 对boltdb实例执行Begin操作,传入false, 开启一个只读事务tx
  3. 通过事务tx执行我们自己的逻辑,可以获取Bucket,以及从Bucket中get数据,也可以对Bucket执行遍历等只读操作
  4. 执行回滚操作,对于只读事务来说,不能执行tx.Commit操作
代码语言:javascript复制
func main() {
 db, err := bolt.Open("test.db", 0600, nil)
 if err != nil {
  fmt.Println(err)
  return
 }
 defer db.Close()
 readDemo(db)
}

// 读模式
func readDemo(db *bolt.DB) {
 // 传false,开启一个只读事务
 tx, err := db.Begin(false)
 if err != nil {
  fmt.Println(err)
  return
 }
 // 获取Bucket
 bucket := tx.Bucket([]byte("bucket"))
 if bucket != nil {
  // 从Bucket获取数据
  value := bucket.Get([]byte("key"))
  fmt.Println(value)
 }

 // 回滚操作,只读事务不能执行tx.Commit操作
 err = tx.Rollback()
 if err != nil {
  fmt.Println(err)
 }
}

另一种读操作是通过db.View方法,只需传入我们的业务处理逻辑函数,由db.View来调用我们传入的函数,我们不用关心事务句柄tx的获取和Rollback操作。

代码语言:javascript复制
err := db.View(func(tx *bolt.Tx) error {
 ...
 return nil
})

结合下面的db.View方法,来看它做了哪些工作。总结起来有以下几点:

  1. 创建一个只读事务,设置t.managed值,该字段的含义是用户程序逻辑能否执行tx.Rollback操作,这里设置为true,也就是说我们传入的函数内部不能执行tx.Rollback操作,否则会引发panic,回滚操作交给框架代码db.View处理。
  2. 执行用户逻辑函数fn
  3. 恢复t.managed值为false,表示现在框架中要执行t.Rollback操作了。
代码语言:javascript复制
// 执行只读事务
func (db *DB) View(fn func(*Tx) error) error {
 t, err := db.Begin(false)
 if err != nil {
  return err
 }

 defer func() {
  if t.db != nil {
   t.rollback()
  }
 }()

 // fn内部不能进行rollback,否则引发panic
 t.managed = true

 err = fn(t)
 // 在下面执行rollback之前,要将状态值修改回来
 t.managed = false
 if err != nil {
  _ = t.Rollback()
  return err
 }
 // 因为是只读操作,所以不会执行t.Commit操作
 if err := t.Rollback(); err != nil {
  return err
 }

 return nil
}

工作流程

前面通过两个代码实例介绍了从boltdb中读数据的流程。本小节对这个操作流程做一个概括,从原理层面分析只读事务操作工作流程。

  1. 通过bolt.Open操作打开了一个db文件,返回一个DB对象,这个过程会将db文件中的关键页(两个meta page,freelist page和)信息转成了内存数据存在在DB对象中。具体该操作分为两个关键步骤. 1.1 通过mmap将db文件加载到内存,绑定到DB.data字段上 1.2 初始化DB.meta0和DB.meta1,以及DB.freelist
代码语言:javascript复制
func Open(path string, mode os.FileMode, options *Options) (*DB, error) {
 ...
    // 1.1 通过mmap将db文件加载到内存,绑定到db.data字段上,
    // 并初始化 db.meta0和db.meta1
 if err := db.mmap(options.InitialMmapSize); err != nil {
  _ = db.close()
  return nil, err
 }

    // 1.2 初始化db.freelist
 db.freelist = newFreelist()
 // 根据page 3 填充freelist
 db.freelist.read(db.page(db.meta().freelist))

 return db, nil
}
  1. db.Begin(false)开启一个只读事务,此操作会返回一个只读事务句柄tx。这个过程核心是对db文件创建了一个只读快照,将DB.meta信息拷贝了一份存储在tx中,主要是存储了root page id. tx内部会隐含创建一个伪根Bucket节点tx.root.
代码语言:javascript复制
func (db *DB) Begin(writable bool) (*Tx, error) {
 ...
 // 开启一个只读事务
 return db.beginTx()
}

func (db *DB) beginTx() (*Tx, error) {
 ...
 t := &Tx{}
 t.init(db)
        ...
 return t, nil
}

func (tx *Tx) init(db *DB) {
 tx.db = db
 tx.pages = nil
    
 tx.meta = &meta{}
 // 深拷贝db.meta,更改tx.meta不影响db
 db.meta().copy(tx.meta)

 // 创建根Bucket,相当于对B 做一个快照
 tx.root = newBucket(tx)
 tx.root.bucket = &bucket{}
 *tx.root.bucket = tx.meta.root
        ...
}

  1. 通过桶名获取一个Bucket,通过Bucket提供的Get接口从桶中读取数据。这个过程涉及到对桶的遍历,是通过Cursor进行的。在boltdb源码分析系列-Bucket文章中已详细介绍过func (b *Bucket) Get(key []byte) []byte操作,Cursor工作原理在boltdb源码分析系列-迭代器文章中已有介绍,本文不在做重复分析说明,感兴趣的同学可以看本系列文章。

写数据执行流程

写数据示例

类似读取数据,向boltdb中写入数据也有两种模式,一种是用户自己管理事务,另一种是通过boltdb提供的API接口,我们只需传入一个事务操作函数,像func (db *DB) Update(fn func(*Tx) error) errorfunc (db *DB) Batch(fn func(*Tx) error) error.

对于手动进行写数据操作示例如下,操作与读数据基本相同,不同点有3处:

  1. 需要创建读写事务,即db.Begin需要传true,读数据创建的是只读事务
  2. 调用写数据的API接口Put操作,读数据用的是Get
  3. 最后需要执行Commit提交操作,读数据不需要
代码语言:javascript复制
// 写模式
func writeDemo(db *bolt.DB) {
 // 创建一个读写事务
 tx, err := db.Begin(true)
 if err != nil {
  fmt.Println(err)
  return
 }
 // 失败回滚操作
 defer func() {
  tx.Rollback()
 }()

 // 获取bucket
 bucket := tx.Bucket([]byte("bucket"))
 // 向bucket中添加key-value数据
 err = bucket.Put([]byte("key"), []byte("value"))
 if err != nil {
  fmt.Println(err)
  return
 }
 // 提交事务
 err = tx.Commit()
 if err != nil {
  fmt.Println(err)
  return
 }
}

通过boltdb提供的写操作API接口,我们只需要提供一个事务逻辑函数即可。对于Update接口,它的内部逻辑与前面的读数据操作中的View是一致的,唯一不同的是多了Commit提交操作,不在重复分析。

因为写事务操作是有代价的,需要将数据写入磁盘,这个过程是比较费时的,boltdb提供了一个批量执行写事务的接口Batch,也就是说调用Batch(fn)传入的fn不会立即执行,需要等待一段时间,收集一批之后一起执行。具体等多久,这里有两种策略:

  1. 策略1,等待一段时间后执行,这个默认时间是10毫秒
  2. 策略2,批量收集的fn达到一定数量,也会立即执行,默认是1000个
代码语言:javascript复制
const (
 // 批量事务最多1000个
 DefaultMaxBatchSize  int = 1000
 // 批量事务等待时间10毫秒
 DefaultMaxBatchDelay     = 10 * time.Millisecond
)

满足上面两种中的任意一种,批量事务操作就会执行。通过策略1和策略2,同时兼顾了时间与效率,非常好。

下面是Batch方法处理过程,核心是收集和执行两个步骤。

  • 收集:收集阶段,获取到的的fn都会加入到db.batch中,即db.batch暂存了所有即将要批量被执行的fn. 在执行阶段,db.batch会被重置为nil, 所以在一个新收集时间开始时,db.batch为nil, 重新初始化为其赋值,并开启一个定时器。对应到代码里面的第一个if逻辑。「注意」,还有一种情况,就是收集的fn数量很快达到到了上限(默认1000个),这时有1001个fn过来,也会新分配db.batch,将第1001个fn加入新的db.batch
  • 执行:执行阶段在db.batch.trigger(),代码中有两个调用点,对应到前面说的策略1和策略2, trigger内部对每个fn,调用db.Update方法执行事务操作。
代码语言:javascript复制
func (db *DB) Batch(fn func(*Tx) error) error {
 errCh := make(chan error, 1)

 db.batchMu.Lock()
 if (db.batch == nil) || (db.batch != nil && len(db.batch.calls) >= db.MaxBatchSize) {
  db.batch = &batch{
   db: db,
  }
  // 启动一个定时器,延时一段时间批量处理收集到的fn
  db.batch.timer = time.AfterFunc(db.MaxBatchDelay, db.batch.trigger)
 }
 db.batch.calls = append(db.batch.calls, call{fn: fn, err: errCh})
 // 已经收集够了tx,立即开启一个协程执行批量run,就不用等到db.MaxBatchDelay再来执行了
 // 那有没有重复执行的可能,下面立即执行了,上面的定时器到了又会执行,不会的,batch中通过sync.Once
 // 只执行一次
 if len(db.batch.calls) >= db.MaxBatchSize {
  // 这里为什么要开启一个协程执行run? 开启一个协程是为了尽早释放batchMu锁,不开一个协程
  // 会延迟锁的释放,导致其他goroutine执行Batch卡死,fn加入不到db.batch
  go db.batch.trigger()
 }
 db.batchMu.Unlock()

 err := <-errCh
 if err == trySolo {
  // fn执行失败,在单独执行一次
  err = db.Update(fn)
 }
 return err
}

看了Batch实现,也许有同学有下面的疑问点?小编这里做一个分析说明。

  • 疑问1:代码中有两处调用db.batch.trigger,time.AfterFunc内部也是启用了一个goroutine,时间到了执行function,下面也是go db.batch.trigger(),会不会被执行两次?

答:不会被执行两次,虽然相当于在两个goroutine中都调用了db.batch.trigger,但是batch中start sync.Once保证了多次调用也只会执行一次,「sync.Once用的恰到好处」

  • 疑问2:go db.batch.trigger()中go可以去掉吗?

答:不可以,开启一个goroutine执行是为了尽早释放锁,否则会导致其他调用Batch会阻塞卡死,fn加入不到db.batch

  • 疑问3:在第一处if逻辑中,收集到的fn数量超过db.MaxBatchSize,也对db.batch做了重新赋值操作,这里会影响go db.batch.trigger()执行吗?

答:不会的,if中新赋值的db.batch和go db.batch.trigger()中的db.batch不是同一个对象,不会影响。

Batch是一个阻塞操作,所以开启多个goroutine调用它才有意义。同时要注意,Batch中部分fn执行失败有重试执行操作,所以要考虑幂等性。

工作流程

前一小节通过示例介绍了写数据操作流程,本小节从原理层面概括写数据是如何执行的。在示例代码中可以看到,写数据操作和读数据操作是一致的。都有db.Open一个boltdb实例,然后开启事务,获取桶并在桶上进行读写操作。这里只介绍写数据流程中与读不同的地方。

写数据关键的两步是func (b *Bucket) Put(key []byte, value []byte) errorfunc (tx *Tx) Commit() error. 相比读操作,写操作复杂不少,因为读操作不涉及数据更改,所以直接从mmap映射后的内存page中读取。而写操作会修改数据,要支持数据库的ACID特性,boltdb处理方法是:

  1. 写数据写入内存中的node
  2. 执行事务Commit操作后,将node转换成脏页page,最后将脏页刷到硬盘上保存

下面结合源码对处理过程进行分析,boltdb是如何实现事务的在下篇文章中详细介绍。

  1. 创建读写事务,对db文件创建了一个读写快照,因为有修改操作,不能原地修改,要保持数据库的ACID特性,在拷贝后的数据上修改。

2.调用Bucket提供的Put方法写入key-value数据,因所有的数据写入操作都是在叶子节点中进行的,先创建一个迭代器,对Bucket遍历并定位到要写入的叶子节点,Cursor c游标最后的位置即要写入的叶子节点。

2.1 定位叶子节点后,执行c.node().put(key,value,0,0)操作,这个操作分为两步,第一步是找到叶子节点node,「是node而不是page」,所有修改操作都是针对node的,对node不是很理解的可以看boltdb源码分析系列-内存结构文章。

代码语言:javascript复制
func (b *Bucket) Put(key []byte, value []byte) error {
 ...
 // 创建一个游标对象,对Bucket进行遍历,查找键值对为key的数据
 c := b.Cursor()
 k, _, flags := c.seek(key)
 ...
    
 // 运行到这里,key有可能存在,也有可能不存在,不存在会创建一个key-value键值对,
 // 存在会更新原来key的value,这些处理都是在put函数中实现的
 // NOTE 对key是深拷贝,对value是浅拷贝
 key = cloneBytes(key)
 c.node().put(key, key, value, 0, 0)

 return nil
}

Cursor的node方法返回叶子节点,如果node已存在,直接返回,否则从根节点开始从上往下,将游走路径上的page转成node,并保存到Bucket中的nodes。

代码语言:javascript复制
func (c *Cursor) node() *node {
 ...
 // 叶子节点已经在内存中,直接返回
 if ref := &c.stack[len(c.stack)-1]; ref.node != nil && ref.isLeaf() {
  return ref.node
 }

 // 从根节点开始,从上往下直到根节点,将游走路径上的page转成node,并保存到
 // Bucket中的nodes
 var n = c.stack[0].node
 if n == nil {
  n = c.bucket.node(c.stack[0].page.id, nil)
 }
 for _, ref := range c.stack[:len(c.stack)-1] {
  _assert(!n.isLeaf, "expected branch node")
  n = n.childAt(int(ref.index))
 }
 _assert(n.isLeaf, "expected leaf node")
 return n
}

// 返回bucket中的第index孩子节点
func (n *node) childAt(index int) *node {
 if n.isLeaf {
  panic(fmt.Sprintf("invalid childAt(%d) on a leaf node", index))
 }
 return n.bucket.node(n.inodes[index].pgid, n)
}

将pgid对应的page转换成node,保存到Bucket中的nodes中。调用链为Cursor.node-->node.childAt-->Bucket.node. Cursor和node结构中都有一个记录Bucket字段,它们都来自读写事务tx. 所以经过这个操作之后,从根节点到有修改写入叶子节点路径上的所有节点都添加到了Bucket的nodes中。

代码语言:javascript复制
func (b *Bucket) node(pgid pgid, parent *node) *node {
 ...
 if n := b.nodes[pgid]; n != nil {
  return n
 }
    // 同时,将这些新建的node保存到了已b.rootNode为根节点的tree中
 // 在执行tx.Commit时,根据rootNode将它们转成page,最后刷新到磁盘
 n := &node{bucket: b, parent: parent}
 if parent == nil {
  b.rootNode = n
 } else {
  parent.children = append(parent.children, n)
 }
    ...
 // 根据page页p来对节点n进一步初始化一些内容,主要是填充key-value信息
 n.read(p)
 // 将节点n缓存起来
 b.nodes[pgid] = n
    ...
 return n
}

2.2 调用node的put操作,将修改的内容写入,这里会传入两个key,一个是旧key,一个是新key,如果旧key存在,用新的key-value将其覆盖,如果,旧key不存在,相当于新添加了新的key-value.

代码语言:javascript复制
func (n *node) put(oldKey, newKey, value []byte, pgid pgid, flags uint32) {
 // 异常校验
    ...
 // 在节点n的元素中查询旧key所在下标索引,即oldKey是它的第几个元素
 index := sort.Search(len(n.inodes), func(i int) bool { return bytes.Compare(n.inodes[i].key, oldKey) != -1 })

 // 进一步判断索引为index位置的元素的key是否是oldKey,进一步确认oldKey是否存在
 exact := (len(n.inodes) > 0 && index < len(n.inodes) && bytes.Equal(n.inodes[index].key, oldKey))
 // oldKey不存在,新增加一个节点
 if !exact {
  n.inodes = append(n.inodes, inode{})
  copy(n.inodes[index 1:], n.inodes[index:])
 }
 // 更新index位置key-value数据
 inode := &n.inodes[index]
 inode.flags = flags
 inode.key = newKey
 inode.value = value
 inode.pgid = pgid
 _assert(len(inode.key) > 0, "put: zero-length inode key")
}

总结起来,Put操作的核心要点有两个:

  • 修改写入数据都是针对node,不会在原始数据page做修改写入
  • 写入数据发生在叶子节点上
  • 叶子节点执行Put操作之后,从根节点到此叶子节点路径上所有的node都已缓存到tx关联的Bucket的nodes中
  • Put操作后,从根节点到叶子节点路径上的node也保存到了以rootNode为根节点的tree中。

结合下图,boltdb文件被mmap到内存之后,以page形式表示,右侧是这些page转成node之后的树形表示。当执行Put操作时,在叶子节点node d写入数据,从根节点到node d路径上所有的节点都被保存到了Bucket.nodes中,同时这些nodes节点也保存到了以rootNode为根节点的树中,对应到图中的红色节点。「注意,图中灰色的node并没有在内存中真实存在,这里是为了对比说明,只有Put操作会影响到的page才会转成node,保存在Bucket中」。对于Put操作会影响到的page是指哪些page? Put操作是发生在某个leaf page上,从root page到某leaf page路径上的page都是会影响的page.

  1. 2中Put写入的数据当前在内存中,执行Commit操作才会将内存中的数据刷新到硬盘上。Commit操作比较复杂,详细实现细节在下一篇文章中介绍。执行tx.Commit操作,会将2中Bucket中nodes节点数据转成page,因为这些nodes是有更新或写入操作的,它们是脏nodes,转成page之后保存到tx.pages中,tx.pages中保存脏page.最后将tx.pages写入磁盘。

下面的Commit代码抽取了转换nodes和写入磁盘操作, 将脏nodes转成page在tx.root.spill函数中,将page写入磁盘是tx.write操作。

代码语言:javascript复制
func (tx *Tx) Commit() error {
 ...
 // 对B 进行溢出处理, 并将Bucket中nodes转成dirty page
 if err := tx.root.spill(); err != nil {
  tx.rollback()
  return err
 }
 ...
 // 刷新脏页(包含新的freelist)到磁盘
 if err := tx.write(); err != nil {
  tx.rollback()
  return err
 }

 ...
}

将nodes转成page调用链很长,依次是tx.root.spill-->b.rootNode.spill-->tx.allocate.

代码语言:javascript复制
func (b *Bucket) spill() error {
 ...
 // 从根节点开始进行溢出处理
 if err := b.rootNode.spill(); err != nil {
  return err
 }
 ...
}

注意这里被执行spill操作的node,是在前面的Bucket Put操作中,加入到以rootNode为根节点中的node. 也就是Put操作影响到的node,会转成脏page。对于没有影响到的page,它们不会被转成node.

代码语言:javascript复制
func (n *node) spill() error {
 ...
 for _, node := range nodes {
 ...
  p, err := tx.allocate((node.size() / tx.db.pageSize)   1)
        if err != nil {
   return err
  }

  if p.id >= tx.meta.pgid {
   panic(fmt.Sprintf("pgid (%d) above high water mark (%d)", p.id, tx.meta.pgid))
  }
  // 分配的page id值赋值给node
  node.pgid = p.id
  // 将node中的元素内容序列化化到page p中
  node.write(p)
 ...
 }
        ...
}

在tx.allocate中将node转成page. 分配给定数量的page,并保存到脏页缓存pages中.

代码语言:javascript复制
func (tx *Tx) allocate(count int) (*page, error) {
 p, err := tx.db.allocate(count)
 if err != nil {
  return nil, err
 }

 tx.pages[p.id] = p

 tx.stats.PageCount  
 tx.stats.PageAlloc  = count * tx.db.pageSize

 return p, nil
}

tx.write()将dirty page保存到磁盘中,释放之前旧的page,将它们加入到freelist中,以便后续写入数据操作复用这些page.

0 人点赞