golang源码分析:boltdb(5)

2023-09-06 19:30:24 浏览数 (1)

在分析完核心数据结构后,我们结合使用boltdb的核心过程了解下上述数据结构建立的过程,总结下来核心过程如下:

代码语言:javascript复制
bolt.Open
db.Update
db.Begin
tx.CreateBucket
tx.Bucket
b.Get
b.Put
tx.Commit

在db.go中我们可以了解到Update函数其实也是调用了Begin来开启一个事务的。事务执行成功就提交,失败则回滚。

代码语言:javascript复制
func (db *DB) Update(fn func(*Tx) error) error {
        t, err := db.Begin(true)
          defer func() {
    if t.db != nil {
      t.rollback()
        t.managed = true
        err = fn(t)
        t.managed = false
        if err != nil {
    _ = t.Rollback()
        return t.Commit()

开启事务的时候通过参数来决定开启一个只读事务还是一个写事务:

代码语言:javascript复制
func (db *DB) Begin(writable bool) (*Tx, error) {
  if writable {
    return db.beginRWTx()
  }
  return db.beginTx()
}

开启写事务的时候会加排他锁,并且会给meta信息也加锁,然后创建一个Tx对象,并将它赋值给db对象,因为只能有一个写事务。

代码语言:javascript复制
func (db *DB) beginRWTx() (*Tx, error) {
        db.rwlock.Lock()
          db.metalock.Lock()
  defer db.metalock.Unlock()
          if !db.opened {
    db.rwlock.Unlock()
          t := &Tx{writable: true}
  t.init(db)
  db.rwtx = t
代码语言:javascript复制
func (db *DB) beginTx() (*Tx, error) {
        db.metalock.Lock()
        db.mmaplock.RLock()
            db.mmaplock.RUnlock()
    db.metalock.Unlock()
          t := &Tx{}
  t.init(db)


          db.txs = append(db.txs, t)
  n := len(db.txs)
        db.metalock.Unlock()
          db.statlock.Lock()
  db.stats.TxN  
  db.stats.OpenTxN = n
  db.statlock.Unlock()

View执行的过程类似,只不过它开启的是读事务。

代码语言:javascript复制
  func (db *DB) View(fn func(*Tx) error) error {
        t, err := db.Begin(false)
          defer func() {
    if t.db != nil {
      t.rollback()
        t.managed = true


  // If an error is returned from the function then pass it through.
  err = fn(t)
  t.managed = false
          if err != nil {
    _ = t.Rollback()
        if err := t.Rollback(); err != nil {

开启读事务的时候只需要给meta加锁,并且,可以开启多个读事务,是通过数组的形式存储在db上

代码语言:javascript复制

func (db *DB) beginTx() (*Tx, error) {
  // Lock the meta pages while we initialize the transaction. We obtain
  // the meta lock before the mmap lock because that's the order that the
  // write transaction will obtain them.
  db.metalock.Lock()

  // Obtain a read-only lock on the mmap. When the mmap is remapped it will
  // obtain a write lock so all transactions must finish before it can be
  // remapped.
  db.mmaplock.RLock()

  // Exit if the database is not open yet.
  if !db.opened {
    db.mmaplock.RUnlock()
    db.metalock.Unlock()
    return nil, ErrDatabaseNotOpen
  }

  // Create a transaction associated with the database.
  t := &Tx{}
  t.init(db)

  // Keep track of transaction until it closes.
  db.txs = append(db.txs, t)
  n := len(db.txs)

  // Unlock the meta pages.
  db.metalock.Unlock()

  // Update the transaction stats.
  db.statlock.Lock()
  db.stats.TxN  
  db.stats.OpenTxN = n
  db.statlock.Unlock()

  return t, nil
}

Batch只不过是批量调用Update

代码语言:javascript复制
func (db *DB) Batch(fn func(*Tx) error) error {
        db.batchMu.Lock()
          db.batch = &batch{
      db: db,
    }
    db.batch.timer = time.AfterFunc(db.MaxBatchDelay, db.batch.trigger)
        db.batch.calls = append(db.batch.calls, call{fn: fn, err: errCh})
        go db.batch.trigger()
        db.batchMu.Unlock()
        err = db.Update(fn)
代码语言:javascript复制
func (b *batch) trigger() {
  b.start.Do(b.run)
}
代码语言:javascript复制
func (b *batch) run() {
        for len(b.calls) > 0 {
              err := b.db.Update(func(tx *Tx) error {
      for i, c := range b.calls {
        if err := safelyCall(c.fn, tx); err != nil {
        c := b.calls[failIdx]
      b.calls[failIdx], b.calls = b.calls[len(b.calls)-1], b.calls[:len(b.calls)-1]
代码语言:javascript复制
type batch struct {
  db    *DB
  timer *time.Timer
  start sync.Once
  calls []call
}

事务的定义位于tx.go

代码语言:javascript复制
type Tx struct {
  writable       bool
  managed        bool
  db             *DB
  meta           *meta
  root           Bucket
  pages          map[pgid]*page
  stats          TxStats
  commitHandlers []func()


  // WriteFlag specifies the flag for write-related methods like WriteTo().
  // Tx opens the database file with the specified flag to copy the data.
  //
  // By default, the flag is unset, which works well for mostly in-memory
  // workloads. For databases that are much larger than available RAM,
  // set the flag to syscall.O_DIRECT to avoid trashing the page cache.
  WriteFlag int
}

定义完事务对象后,会进行初始化操作,它会复制一份meta信息,写事务事务id会增加,读事务事务id不会增加。

代码语言:javascript复制
func (tx *Tx) init(db *DB) {
        tx.meta = &meta{}
  db.meta().copy(tx.meta)
          tx.root = newBucket(tx)
  tx.root.bucket = &bucket{}
  *tx.root.bucket = tx.meta.root
          if tx.writable {
    tx.pages = make(map[pgid]*page)
    tx.meta.txid  = txid(1)
  }
        var minid txid = 0xFFFFFFFFFFFFFFFF
  for _, t := range db.txs {
    if t.meta.txid < minid {
      minid = t.meta.txid
    }
  }
  if minid > 0 {
    db.freelist.release(minid - 1)
  }

事务提交的时候会先进行rebalance操作来释放填充因子不满足条件的页避免空间浪费,然后通过spill操作来拆分填充因子过大的页。最后会把释放的页填入freelist,落盘。最后修改meta完成事务的提交。

代码语言:javascript复制
func (tx *Tx) Commit() error {
        var startTime = time.Now()
        tx.root.rebalance()
          startTime = time.Now()
  if err := tx.root.spill(); err != nil {
    tx.rollback()
          tx.meta.root.root = tx.root.root


  opgid := tx.meta.pgid
          tx.db.freelist.free(tx.meta.txid, tx.db.page(tx.meta.freelist))
  p, err := tx.allocate((tx.db.freelist.size() / tx.db.pageSize)   1)
  if err != nil {
    tx.rollback()
          if err := tx.db.freelist.write(p); err != nil {
    tx.rollback()
        tx.meta.freelist = p.id
        if tx.meta.pgid > opgid {
    if err := tx.db.grow(int(tx.meta.pgid 1) * tx.db.pageSize); err != nil {
      tx.rollback()
          if err := tx.write(); err != nil {
    tx.rollback()
        if tx.db.StrictMode {
    ch := tx.Check()
        if err := tx.writeMeta(); err != nil {
    tx.rollback()
        tx.close()


  // Execute commit handlers now that the locks have been removed.
  for _, fn := range tx.commitHandlers {
    fn()
  }

由于boltdb采用了mvcc模式,所以事务回滚很简单,直接重新加载meat信息,复原freelist即可。读事务回滚不需要任何操作。

代码语言:javascript复制
func (tx *Tx) Rollback() error {
        tx.rollback()
代码语言:javascript复制
 func (tx *Tx) rollback() {
          if tx.writable {
    tx.db.freelist.rollback(tx.meta.txid)
    tx.db.freelist.reload(tx.db.page(tx.db.meta().freelist))
  }
  tx.close()

创建和获取Bucket的过程如下:

代码语言:javascript复制
      func (tx *Tx) CreateBucket(name []byte) (*Bucket, error) {
  return tx.root.CreateBucket(name)
}
代码语言:javascript复制
func (tx *Tx) Bucket(name []byte) *Bucket {
  return tx.root.Bucket(name)
代码语言:javascript复制
// TxStats represents statistics about the actions performed by the transaction.
type TxStats struct {
  // Page statistics.
  PageCount int // number of page allocations
  PageAlloc int // total bytes allocated


  // Cursor statistics.
  CursorCount int // number of cursors created


  // Node statistics
  NodeCount int // number of node allocations
  NodeDeref int // number of node dereferences


  // Rebalance statistics.
  Rebalance     int           // number of node rebalances
  RebalanceTime time.Duration // total time spent rebalancing


  // Split/Spill statistics.
  Split     int           // number of nodes split
  Spill     int           // number of nodes spilled
  SpillTime time.Duration // total time spent spilling


  // Write statistics.
  Write     int           // number of writes performed
  WriteTime time.Duration // total time spent writing to disk
}

通过key定位到插入或者查找的位置,然后进行数据的插入和查找

代码语言:javascript复制
func (b *Bucket) CreateBucket(key []byte) (*Bucket, error) {

        c := b.Cursor()

  k, _, flags := c.seek(key)

          var bucket = Bucket{

    bucket:      &bucket{},

    rootNode:    &node{isLeaf: true},

    FillPercent: DefaultFillPercent,

  }

        var value = bucket.write()

          key = cloneBytes(key)

  c.node().put(key, key, value, 0, bucketLeafFlag)

        return b.Bucket(key), nil
代码语言:javascript复制
func (b *Bucket) Bucket(name []byte) *Bucket {

          if b.buckets != nil {

    if child := b.buckets[string(name)]; child != nil {

      return child

          c := b.Cursor()

  k, v, flags := c.seek(name)

        if !bytes.Equal(name, k) || (flags&bucketLeafFlag) == 0 {

    return nil

        var child = b.openBucket(v)

  if b.buckets != nil {

    b.buckets[string(name)] = child

      func (b *Bucket) Put(key []byte, value []byte) error {

          c := b.Cursor()

  k, _, flags := c.seek(key)

          key = cloneBytes(key)

  c.node().put(key, key, value, 0, 0)

在Bucket内部获取某个key的值的过程类似

代码语言:javascript复制
func (b *Bucket) Get(key []byte) []byte {
        k, v, flags := b.Cursor().seek(key)    
代码语言:javascript复制
func (b *Bucket) Cursor() *Cursor {
  // Update transaction statistics.
  b.tx.stats.CursorCount  


  // Allocate and return a cursor.
  return &Cursor{
    bucket: b,
    stack:  make([]elemRef, 0),
  }
}

最后看看Bucket的定义

代码语言:javascript复制
type Bucket struct {
  *bucket
  tx       *Tx                // the associated transaction
  buckets  map[string]*Bucket // subbucket cache
  page     *page              // inline page reference
  rootNode *node              // materialized node for the root page.
  nodes    map[pgid]*node     // node cache


  // Sets the threshold for filling nodes when they split. By default,
  // the bucket will fill to 50% but it can be useful to increase this
  // amount if you know that your write workloads are mostly append-only.
  //
  // This is non-persisted across transactions so it must be set in every Tx.
  FillPercent float64
}
代码语言:javascript复制
type bucket struct {
  root     pgid   // page id of the bucket's root-level page
  sequence uint64 // monotonically incrementing, used by NextSequence()
}
        // bucket represents the on-file representation of a bucket.
// This is stored as the "value" of a bucket key. If the bucket is small enough,
// then its root page can be stored inline in the "value", after the bucket
// header. In the case of inline buckets, the "root" will be 0.

数据查询过程中,使用了游标,定义位于cursor.go

代码语言:javascript复制
type elemRef struct {
  page  *page
  node  *node
  index int
}  

执行事务过程中获取页面信息的过程如下:

代码语言:javascript复制
func (tx *Tx) Page(id int) (*PageInfo, error) {
  if tx.db == nil {
    return nil, ErrTxClosed
  } else if pgid(id) >= tx.meta.pgid {
    return nil, nil
  }


  // Build the page info.
  p := tx.db.page(pgid(id))
  info := &PageInfo{
    ID:            id,
    Count:         int(p.count),
    OverflowCount: int(p.overflow),
  }


  // Determine the type (or if it's free).
  if tx.db.freelist.freed(pgid(id)) {
    info.Type = "free"
  } else {
    info.Type = p.typ()
  }


  return info, nil
}

从上面过程中我们可以看到,核心复杂逻辑就在提交事务过程的rebalance和spill对应代码位于bucket.go,获取根Bucket后,会对它每一个node、每一个嵌套Bucket都进行rebalance。

代码语言:javascript复制
func (b *Bucket) rebalance() {
  for _, n := range b.nodes {
    n.rebalance()
  }
  for _, child := range b.buckets {
    child.rebalance()
  }
}
代码语言:javascript复制
func (n *node) rebalance() {
          n.unbalanced = false


  // Update statistics.
  n.bucket.tx.stats.Rebalance  
          var threshold = n.bucket.tx.db.pageSize / 4
  if n.size() > threshold && len(n.inodes) > n.minKeys() {
    return
  }
        // Root node has special handling.
  if n.parent == nil {
    // If root node is a branch and only has one node then collapse it.
    if !n.isLeaf && len(n.inodes) == 1 {
      // Move root's child up.
      child := n.bucket.node(n.inodes[0].pgid, n)
      n.isLeaf = child.isLeaf
      n.inodes = child.inodes[:]
      n.children = child.children


      // Reparent all child nodes being moved.
      for _, inode := range n.inodes {
        if child, ok := n.bucket.nodes[inode.pgid]; ok {
          child.parent = n
        }
      }


      // Remove old child.
      child.parent = nil
      delete(n.bucket.nodes, child.pgid)
      child.free()
    }


    return
  }
          if n.numChildren() == 0 {
    n.parent.del(n.key)
    n.parent.removeChild(n)
    delete(n.bucket.nodes, n.pgid)
    n.free()
    n.parent.rebalance()
    return
  }
          var useNextSibling = (n.parent.childIndex(n) == 0)
  if useNextSibling {
    target = n.nextSibling()
  } else {
    target = n.prevSibling()
  }
        for _, inode := range target.inodes {
      if child, ok := n.bucket.nodes[inode.pgid]; ok {
        child.parent.removeChild(child)
        child.parent = n
        child.parent.children = append(child.parent.children, child)
      }
    }


    // Copy over inodes from target and remove target.
    n.inodes = append(n.inodes, target.inodes...)
    n.parent.del(target.key)
    n.parent.removeChild(target)
    delete(n.bucket.nodes, target.pgid)
    target.free()
        for _, inode := range n.inodes {
      if child, ok := n.bucket.nodes[inode.pgid]; ok {
        child.parent.removeChild(child)
        child.parent = target
        child.parent.children = append(child.parent.children, child)
      }
    }


    // Copy over inodes to target and remove node.
    target.inodes = append(target.inodes, n.inodes...)
    n.parent.del(n.key)
    n.parent.removeChild(n)
    delete(n.bucket.nodes, n.pgid)
    n.free()
        n.parent.rebalance()
代码语言:javascript复制

// spill writes all the nodes for this bucket to dirty pages.
func (b *Bucket) spill() error {
  // Spill all child buckets first.
  for name, child := range b.buckets {
    // If the child bucket is small enough and it has no child buckets then
    // write it inline into the parent bucket's page. Otherwise spill it
    // like a normal bucket and make the parent value a pointer to the page.
    var value []byte
    if child.inlineable() {
      child.free()
      value = child.write()
    } else {
      if err := child.spill(); err != nil {
        return err
      }

      // Update the child bucket header in this bucket.
      value = make([]byte, unsafe.Sizeof(bucket{}))
      var bucket = (*bucket)(unsafe.Pointer(&value[0]))
      *bucket = *child.bucket
    }

    // Skip writing the bucket if there are no materialized nodes.
    if child.rootNode == nil {
      continue
    }

    // Update parent node.
    var c = b.Cursor()
    k, _, flags := c.seek([]byte(name))
    if !bytes.Equal([]byte(name), k) {
      panic(fmt.Sprintf("misplaced bucket header: %x -> %x", []byte(name), k))
    }
    if flags&bucketLeafFlag == 0 {
      panic(fmt.Sprintf("unexpected bucket header flag: %x", flags))
    }
    c.node().put([]byte(name), []byte(name), value, 0, bucketLeafFlag)
  }

  // Ignore if there's not a materialized root node.
  if b.rootNode == nil {
    return nil
  }

  // Spill nodes.
  if err := b.rootNode.spill(); err != nil {
    return err
  }
  b.rootNode = b.rootNode.root()

  // Update the root node for this bucket.
  if b.rootNode.pgid >= b.tx.meta.pgid {
    panic(fmt.Sprintf("pgid (%d) above high water mark (%d)", b.rootNode.pgid, b.tx.meta.pgid))
  }
  b.root = b.rootNode.pgid

  return nil
}

0 人点赞