golang源码分析:boltdb(1)

2023-09-06 19:28:52 浏览数 (2)

boltdb是golang实现的一个基于b 树的存储系统,通过mvcc实现了单个写事务和多个读事务并发。结构也很清晰,由于比较稳定,已经归档,确实是学习数据库的最佳选择。而且不少出名的开源项目在使用它,比如etcd,InfluxDB等。下面我们先通过例子分析下它是如何使用的,后面再从源码的角度来分析下它的具体实现原理。

boltdb是通过内存映射的方式实现持久化的,每一个db对象代表了一个存储实例。实例内部通过Bucket来管理命名空间的,类似于mysql的表名,不同点是Bucket是可以嵌套的。每个Bucket都是一个基于B 树实现的k,v存储。

代码语言:javascript复制
package main

import (
  "log"
  "time"

  "github.com/boltdb/bolt"
)

func main() {
  // Open the my.db data file in your current directory.
  // It will be created if it doesn't exist.
  db, err := bolt.Open("my.db", 0600, &bolt.Options{Timeout: 1 * time.Second})
  if err != nil {
    log.Fatal(err)
  }
  defer db.Close()

  err = db.Update(func(tx *bolt.Tx) error {
    return nil
  })

  err = db.View(func(tx *bolt.Tx) error {
    return nil
  })

  err = db.Batch(func(tx *bolt.Tx) error {
    return nil
  })

  transaction(db)
}

func transaction(db *bolt.DB) error {
  // Start a writable transaction.
  tx, err := db.Begin(true)
  if err != nil {
    return err
  }
  defer tx.Rollback()

  // Use the transaction...
  _, err = tx.CreateBucket([]byte("MyBucket"))
  if err != nil {
    return err
  }

  // Commit the transaction and check for error.
  if err := tx.Commit(); err != nil {
    return err
  }
  return nil
}

通过例子我们看到它通过bolt.Open创建实例,然后通过db.Update开启一个写事务,它的参数是一个函数,在函数内部我们可以执行更新操作,操作中任意一步只要返回error就会引起事务的回滚,返回nil事务可以顺利提交。当然也可以支持批量事务对应接口是db.Batch,只读事物是通过db.View实现的,在其内部的更新操作是不会生效的,因为不会被提交。当然也可以通过下面系列接口,进行显式事物的声明和回滚。

代码语言:javascript复制
tx, err := db.Begin(true)
if err := tx.Commit(); err != nil {
defer tx.Rollback()

事务内部关于kv的操作可以参考下面的例子:

代码语言:javascript复制
package main

import (
  "bytes"
  "encoding/json"
  "fmt"
  "log"
  "os"
  "time"

  "github.com/boltdb/bolt"
)

func main() {
  // Open the my.db data file in your current directory.
  // It will be created if it doesn't exist.
  db, err := bolt.Open("my.db", 0600, &bolt.Options{Timeout: 1 * time.Second})
  if err != nil {
    log.Fatal(err)
  }
  defer db.Close()
  //Buckets are collections of key/value pairs within the database. All keys in a bucket must be unique
  db.Update(func(tx *bolt.Tx) error {
    b, err := tx.CreateBucket([]byte("MyBucket"))
    if err != nil {
      return fmt.Errorf("create bucket: %s", err)
    }
    fmt.Println(b)
    return nil
  })

  db.Update(func(tx *bolt.Tx) error {
    b := tx.Bucket([]byte("MyBucket"))
    err := b.Put([]byte("answer"), []byte("42"))
    return err
  })

  db.Update(func(tx *bolt.Tx) error {
    b, err := tx.CreateBucketIfNotExists([]byte("MyBucket"))
    if err != nil {
      return fmt.Errorf("create bucket: %s", err)
    }
    tx.DeleteBucket([]byte("MyBucket"))
    fmt.Println(b)
    return nil
  })

  db.View(func(tx *bolt.Tx) error {
    b := tx.Bucket([]byte("MyBucket"))
    fmt.Println(b, b == nil)
    if b == nil {
      b, _ = tx.CreateBucket([]byte("MyBucket1"))
      fmt.Println(b, b == nil)
    }
    return nil
  })

  db.Update(func(tx *bolt.Tx) error {
    b, err := tx.CreateBucket([]byte("MyBucket"))
    fmt.Println(b, b == nil)
    b, err = tx.CreateBucket([]byte("Events"))
    return err
  })

  db.View(func(tx *bolt.Tx) error {
    b := tx.Bucket([]byte("MyBucket"))
    v := b.Get([]byte("answer"))
    fmt.Printf("The answer is: %sn", v)
    b.Delete([]byte("answer"))
    return nil
  })

  db.View(func(tx *bolt.Tx) error {
    // Assume bucket exists and has keys
    b := tx.Bucket([]byte("MyBucket"))

    c := b.Cursor()

    for k, v := c.First(); k != nil; k, v = c.Next() {
      fmt.Printf("key=%s, value=%sn", k, v)
    }

    return nil
  })

  /*
      First()  Move to the first key.
    Last()   Move to the last key.
    Seek()   Move to a specific key.
    Next()   Move to the next key.
    Prev()   Move to the previous key.
  */

  db.View(func(tx *bolt.Tx) error {
    // Assume bucket exists and has keys
    c := tx.Bucket([]byte("MyBucket")).Cursor()

    prefix := []byte("1234")
    for k, v := c.Seek(prefix); k != nil && bytes.HasPrefix(k, prefix); k, v = c.Next() {
      fmt.Printf("key=%s, value=%sn", k, v)
    }

    return nil
  })

  db.View(func(tx *bolt.Tx) error {
    // Assume our events bucket exists and has RFC3339 encoded time keys.
    c := tx.Bucket([]byte("Events")).Cursor()

    // Our time range spans the 90's decade.
    min := []byte("1990-01-01T00:00:00Z")
    max := []byte("2000-01-01T00:00:00Z")

    // Iterate over the 90's.
    for k, v := c.Seek(min); k != nil && bytes.Compare(k, max) <= 0; k, v = c.Next() {
      fmt.Printf("%s: %sn", k, v)
    }

    return nil
  })

  db.View(func(tx *bolt.Tx) error {
    // Assume bucket exists and has keys
    b := tx.Bucket([]byte("MyBucket"))

    b.ForEach(func(k, v []byte) error {
      fmt.Printf("key=%s, value=%sn", k, v)
      return nil
    })
    return nil
  })

  //Nested buckets
  /*
      func (*Bucket) CreateBucket(key []byte) (*Bucket, error)
    func (*Bucket) CreateBucketIfNotExists(key []byte) (*Bucket, error)
    func (*Bucket) DeleteBucket(key []byte) error
  */

  go func() {
    // Grab the initial stats.
    prev := db.Stats()

    for {
      // Wait for 10s.
      time.Sleep(10 * time.Second)

      // Grab the current stats and diff them.
      stats := db.Stats()
      diff := stats.Sub(&prev)

      // Encode stats to JSON and print to STDERR.
      json.NewEncoder(os.Stderr).Encode(diff)

      // Save stats for the next loop.
      prev = stats
    }
  }()
  select {}

}

/*
&{0xc0000ae1b0 0xc0000c6380 map[] 0xc0000d4010 <nil> map[] 0.5} false
The answer is:
{"FreePageN":0,"PendingPageN":2,"FreeAlloc":8192,"FreelistInuse":32,"TxN":0,"OpenTxN":0,"TxStats":{"PageCount":0,"PageAlloc":0,"CursorCount":0,"NodeCount":0,"NodeDeref":0,"Rebalance":0,"RebalanceTime":0,"Split":0,"Spill":0,"SpillTime":0,"Write":0,"WriteTime":0}}
{"FreePageN":0,"PendingPageN":2,"FreeAlloc":8192,"FreelistInuse":32,"TxN":0,"OpenTxN":0,"TxStats":{"PageCount":0,"PageAlloc":0,"CursorCount":0,"NodeCount":0,"NodeDeref":0,"Rebalance":0,"RebalanceTime":0,"Split":0,"Spill":0,"SpillTime":0,"Write":0,"WriteTime":0}}
*/

在进行k/v 的crud之前需要先创建Buckrt

代码语言:javascript复制
b, err := tx.CreateBucket([]byte("MyBucket"))

然后在bucket上进行写入和读取操作

代码语言:javascript复制
  err := b.Put([]byte("answer"), []byte("42"))
代码语言:javascript复制
b := tx.Bucket([]byte("MyBucket"))
    v := b.Get([]byte("answer"))

为了简化存储序列化过程,参数里的k/v类型都是byte数组。

0 人点赞