boltdb是golang实现的一个基于b 树的存储系统,通过mvcc实现了单个写事务和多个读事务并发。结构也很清晰,由于比较稳定,已经归档,确实是学习数据库的最佳选择。而且不少出名的开源项目在使用它,比如etcd,InfluxDB等。下面我们先通过例子分析下它是如何使用的,后面再从源码的角度来分析下它的具体实现原理。
boltdb是通过内存映射的方式实现持久化的,每一个db对象代表了一个存储实例。实例内部通过Bucket来管理命名空间的,类似于mysql的表名,不同点是Bucket是可以嵌套的。每个Bucket都是一个基于B 树实现的k,v存储。
代码语言:javascript复制package main
import (
"log"
"time"
"github.com/boltdb/bolt"
)
func main() {
// Open the my.db data file in your current directory.
// It will be created if it doesn't exist.
db, err := bolt.Open("my.db", 0600, &bolt.Options{Timeout: 1 * time.Second})
if err != nil {
log.Fatal(err)
}
defer db.Close()
err = db.Update(func(tx *bolt.Tx) error {
return nil
})
err = db.View(func(tx *bolt.Tx) error {
return nil
})
err = db.Batch(func(tx *bolt.Tx) error {
return nil
})
transaction(db)
}
func transaction(db *bolt.DB) error {
// Start a writable transaction.
tx, err := db.Begin(true)
if err != nil {
return err
}
defer tx.Rollback()
// Use the transaction...
_, err = tx.CreateBucket([]byte("MyBucket"))
if err != nil {
return err
}
// Commit the transaction and check for error.
if err := tx.Commit(); err != nil {
return err
}
return nil
}
通过例子我们看到它通过bolt.Open创建实例,然后通过db.Update开启一个写事务,它的参数是一个函数,在函数内部我们可以执行更新操作,操作中任意一步只要返回error就会引起事务的回滚,返回nil事务可以顺利提交。当然也可以支持批量事务对应接口是db.Batch,只读事物是通过db.View实现的,在其内部的更新操作是不会生效的,因为不会被提交。当然也可以通过下面系列接口,进行显式事物的声明和回滚。
代码语言:javascript复制tx, err := db.Begin(true)
if err := tx.Commit(); err != nil {
defer tx.Rollback()
事务内部关于kv的操作可以参考下面的例子:
代码语言:javascript复制package main
import (
"bytes"
"encoding/json"
"fmt"
"log"
"os"
"time"
"github.com/boltdb/bolt"
)
func main() {
// Open the my.db data file in your current directory.
// It will be created if it doesn't exist.
db, err := bolt.Open("my.db", 0600, &bolt.Options{Timeout: 1 * time.Second})
if err != nil {
log.Fatal(err)
}
defer db.Close()
//Buckets are collections of key/value pairs within the database. All keys in a bucket must be unique
db.Update(func(tx *bolt.Tx) error {
b, err := tx.CreateBucket([]byte("MyBucket"))
if err != nil {
return fmt.Errorf("create bucket: %s", err)
}
fmt.Println(b)
return nil
})
db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("MyBucket"))
err := b.Put([]byte("answer"), []byte("42"))
return err
})
db.Update(func(tx *bolt.Tx) error {
b, err := tx.CreateBucketIfNotExists([]byte("MyBucket"))
if err != nil {
return fmt.Errorf("create bucket: %s", err)
}
tx.DeleteBucket([]byte("MyBucket"))
fmt.Println(b)
return nil
})
db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("MyBucket"))
fmt.Println(b, b == nil)
if b == nil {
b, _ = tx.CreateBucket([]byte("MyBucket1"))
fmt.Println(b, b == nil)
}
return nil
})
db.Update(func(tx *bolt.Tx) error {
b, err := tx.CreateBucket([]byte("MyBucket"))
fmt.Println(b, b == nil)
b, err = tx.CreateBucket([]byte("Events"))
return err
})
db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("MyBucket"))
v := b.Get([]byte("answer"))
fmt.Printf("The answer is: %sn", v)
b.Delete([]byte("answer"))
return nil
})
db.View(func(tx *bolt.Tx) error {
// Assume bucket exists and has keys
b := tx.Bucket([]byte("MyBucket"))
c := b.Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
fmt.Printf("key=%s, value=%sn", k, v)
}
return nil
})
/*
First() Move to the first key.
Last() Move to the last key.
Seek() Move to a specific key.
Next() Move to the next key.
Prev() Move to the previous key.
*/
db.View(func(tx *bolt.Tx) error {
// Assume bucket exists and has keys
c := tx.Bucket([]byte("MyBucket")).Cursor()
prefix := []byte("1234")
for k, v := c.Seek(prefix); k != nil && bytes.HasPrefix(k, prefix); k, v = c.Next() {
fmt.Printf("key=%s, value=%sn", k, v)
}
return nil
})
db.View(func(tx *bolt.Tx) error {
// Assume our events bucket exists and has RFC3339 encoded time keys.
c := tx.Bucket([]byte("Events")).Cursor()
// Our time range spans the 90's decade.
min := []byte("1990-01-01T00:00:00Z")
max := []byte("2000-01-01T00:00:00Z")
// Iterate over the 90's.
for k, v := c.Seek(min); k != nil && bytes.Compare(k, max) <= 0; k, v = c.Next() {
fmt.Printf("%s: %sn", k, v)
}
return nil
})
db.View(func(tx *bolt.Tx) error {
// Assume bucket exists and has keys
b := tx.Bucket([]byte("MyBucket"))
b.ForEach(func(k, v []byte) error {
fmt.Printf("key=%s, value=%sn", k, v)
return nil
})
return nil
})
//Nested buckets
/*
func (*Bucket) CreateBucket(key []byte) (*Bucket, error)
func (*Bucket) CreateBucketIfNotExists(key []byte) (*Bucket, error)
func (*Bucket) DeleteBucket(key []byte) error
*/
go func() {
// Grab the initial stats.
prev := db.Stats()
for {
// Wait for 10s.
time.Sleep(10 * time.Second)
// Grab the current stats and diff them.
stats := db.Stats()
diff := stats.Sub(&prev)
// Encode stats to JSON and print to STDERR.
json.NewEncoder(os.Stderr).Encode(diff)
// Save stats for the next loop.
prev = stats
}
}()
select {}
}
/*
&{0xc0000ae1b0 0xc0000c6380 map[] 0xc0000d4010 <nil> map[] 0.5} false
The answer is:
{"FreePageN":0,"PendingPageN":2,"FreeAlloc":8192,"FreelistInuse":32,"TxN":0,"OpenTxN":0,"TxStats":{"PageCount":0,"PageAlloc":0,"CursorCount":0,"NodeCount":0,"NodeDeref":0,"Rebalance":0,"RebalanceTime":0,"Split":0,"Spill":0,"SpillTime":0,"Write":0,"WriteTime":0}}
{"FreePageN":0,"PendingPageN":2,"FreeAlloc":8192,"FreelistInuse":32,"TxN":0,"OpenTxN":0,"TxStats":{"PageCount":0,"PageAlloc":0,"CursorCount":0,"NodeCount":0,"NodeDeref":0,"Rebalance":0,"RebalanceTime":0,"Split":0,"Spill":0,"SpillTime":0,"Write":0,"WriteTime":0}}
*/
在进行k/v 的crud之前需要先创建Buckrt
代码语言:javascript复制b, err := tx.CreateBucket([]byte("MyBucket"))
然后在bucket上进行写入和读取操作
代码语言:javascript复制 err := b.Put([]byte("answer"), []byte("42"))
代码语言:javascript复制b := tx.Bucket([]byte("MyBucket"))
v := b.Get([]byte("answer"))
为了简化存储序列化过程,参数里的k/v类型都是byte数组。