boltdb是一个k-v存储引擎,它的核心操作是读写数据。本文从使用者的角度,结合读写数据的实例,分析读&写数据是如何执行的,以及各个组件是如何串联起来工作的。
boltdb中有两种事务模式:只读事务和读写事务。读数据采用只读事务操作,写数据采用读写事务操作。任意时刻,最多只有一个读写事务在操作。即多个只读事务、多个只读事务 1个读写事务都是可以并发进行操作。
读数据执行流程
读数据示例
读取boltdb数据有两种操作模式,一种是用户手动管理事务,另一种是通过boltdb提供的func (db *DB) View(fn func(*Tx) error) error
方法,我们只要传入一个类型为func(*Tx) error
的函数。
我们先来看手动管理事务的代码,分为以下流程:
- 通过Open操作,创建一个boltdb实例
- 对boltdb实例执行Begin操作,传入false, 开启一个只读事务tx
- 通过事务tx执行我们自己的逻辑,可以获取Bucket,以及从Bucket中get数据,也可以对Bucket执行遍历等只读操作
- 执行回滚操作,对于只读事务来说,不能执行tx.Commit操作
func main() {
db, err := bolt.Open("test.db", 0600, nil)
if err != nil {
fmt.Println(err)
return
}
defer db.Close()
readDemo(db)
}
// 读模式
func readDemo(db *bolt.DB) {
// 传false,开启一个只读事务
tx, err := db.Begin(false)
if err != nil {
fmt.Println(err)
return
}
// 获取Bucket
bucket := tx.Bucket([]byte("bucket"))
if bucket != nil {
// 从Bucket获取数据
value := bucket.Get([]byte("key"))
fmt.Println(value)
}
// 回滚操作,只读事务不能执行tx.Commit操作
err = tx.Rollback()
if err != nil {
fmt.Println(err)
}
}
另一种读操作是通过db.View方法,只需传入我们的业务处理逻辑函数,由db.View来调用我们传入的函数,我们不用关心事务句柄tx的获取和Rollback操作。
代码语言:javascript复制err := db.View(func(tx *bolt.Tx) error {
...
return nil
})
结合下面的db.View方法,来看它做了哪些工作。总结起来有以下几点:
- 创建一个只读事务,设置t.managed值,该字段的含义是用户程序逻辑能否执行tx.Rollback操作,这里设置为true,也就是说我们传入的函数内部不能执行tx.Rollback操作,否则会引发panic,回滚操作交给框架代码db.View处理。
- 执行用户逻辑函数fn
- 恢复t.managed值为false,表示现在框架中要执行t.Rollback操作了。
// 执行只读事务
func (db *DB) View(fn func(*Tx) error) error {
t, err := db.Begin(false)
if err != nil {
return err
}
defer func() {
if t.db != nil {
t.rollback()
}
}()
// fn内部不能进行rollback,否则引发panic
t.managed = true
err = fn(t)
// 在下面执行rollback之前,要将状态值修改回来
t.managed = false
if err != nil {
_ = t.Rollback()
return err
}
// 因为是只读操作,所以不会执行t.Commit操作
if err := t.Rollback(); err != nil {
return err
}
return nil
}
工作流程
前面通过两个代码实例介绍了从boltdb中读数据的流程。本小节对这个操作流程做一个概括,从原理层面分析只读事务操作工作流程。
- 通过bolt.Open操作打开了一个db文件,返回一个DB对象,这个过程会将db文件中的关键页(两个meta page,freelist page和)信息转成了内存数据存在在DB对象中。具体该操作分为两个关键步骤. 1.1 通过mmap将db文件加载到内存,绑定到DB.data字段上 1.2 初始化DB.meta0和DB.meta1,以及DB.freelist
func Open(path string, mode os.FileMode, options *Options) (*DB, error) {
...
// 1.1 通过mmap将db文件加载到内存,绑定到db.data字段上,
// 并初始化 db.meta0和db.meta1
if err := db.mmap(options.InitialMmapSize); err != nil {
_ = db.close()
return nil, err
}
// 1.2 初始化db.freelist
db.freelist = newFreelist()
// 根据page 3 填充freelist
db.freelist.read(db.page(db.meta().freelist))
return db, nil
}
db.Begin(false)
开启一个只读事务,此操作会返回一个只读事务句柄tx。这个过程核心是对db文件创建了一个只读快照,将DB.meta信息拷贝了一份存储在tx中,主要是存储了root page id. tx内部会隐含创建一个伪根Bucket节点tx.root.
func (db *DB) Begin(writable bool) (*Tx, error) {
...
// 开启一个只读事务
return db.beginTx()
}
func (db *DB) beginTx() (*Tx, error) {
...
t := &Tx{}
t.init(db)
...
return t, nil
}
func (tx *Tx) init(db *DB) {
tx.db = db
tx.pages = nil
tx.meta = &meta{}
// 深拷贝db.meta,更改tx.meta不影响db
db.meta().copy(tx.meta)
// 创建根Bucket,相当于对B 做一个快照
tx.root = newBucket(tx)
tx.root.bucket = &bucket{}
*tx.root.bucket = tx.meta.root
...
}
- 通过桶名获取一个Bucket,通过Bucket提供的Get接口从桶中读取数据。这个过程涉及到对桶的遍历,是通过Cursor进行的。在boltdb源码分析系列-Bucket文章中已详细介绍过
func (b *Bucket) Get(key []byte) []byte
操作,Cursor工作原理在boltdb源码分析系列-迭代器文章中已有介绍,本文不在做重复分析说明,感兴趣的同学可以看本系列文章。
写数据执行流程
写数据示例
类似读取数据,向boltdb中写入数据也有两种模式,一种是用户自己管理事务,另一种是通过boltdb提供的API接口,我们只需传入一个事务操作函数,像func (db *DB) Update(fn func(*Tx) error) error
、func (db *DB) Batch(fn func(*Tx) error) error
.
对于手动进行写数据操作示例如下,操作与读数据基本相同,不同点有3处:
- 需要创建读写事务,即db.Begin需要传true,读数据创建的是只读事务
- 调用写数据的API接口Put操作,读数据用的是Get
- 最后需要执行Commit提交操作,读数据不需要
// 写模式
func writeDemo(db *bolt.DB) {
// 创建一个读写事务
tx, err := db.Begin(true)
if err != nil {
fmt.Println(err)
return
}
// 失败回滚操作
defer func() {
tx.Rollback()
}()
// 获取bucket
bucket := tx.Bucket([]byte("bucket"))
// 向bucket中添加key-value数据
err = bucket.Put([]byte("key"), []byte("value"))
if err != nil {
fmt.Println(err)
return
}
// 提交事务
err = tx.Commit()
if err != nil {
fmt.Println(err)
return
}
}
通过boltdb提供的写操作API接口,我们只需要提供一个事务逻辑函数即可。对于Update接口,它的内部逻辑与前面的读数据操作中的View是一致的,唯一不同的是多了Commit提交操作,不在重复分析。
因为写事务操作是有代价的,需要将数据写入磁盘,这个过程是比较费时的,boltdb提供了一个批量执行写事务的接口Batch,也就是说调用Batch(fn)传入的fn不会立即执行,需要等待一段时间,收集一批之后一起执行。具体等多久,这里有两种策略:
- 策略1,等待一段时间后执行,这个默认时间是10毫秒
- 策略2,批量收集的fn达到一定数量,也会立即执行,默认是1000个
const (
// 批量事务最多1000个
DefaultMaxBatchSize int = 1000
// 批量事务等待时间10毫秒
DefaultMaxBatchDelay = 10 * time.Millisecond
)
满足上面两种中的任意一种,批量事务操作就会执行。通过策略1和策略2,同时兼顾了时间与效率,非常好。
下面是Batch方法处理过程,核心是收集和执行两个步骤。
- 收集:收集阶段,获取到的的fn都会加入到db.batch中,即db.batch暂存了所有即将要批量被执行的fn. 在执行阶段,db.batch会被重置为nil, 所以在一个新收集时间开始时,db.batch为nil, 重新初始化为其赋值,并开启一个定时器。对应到代码里面的第一个if逻辑。「注意」,还有一种情况,就是收集的fn数量很快达到到了上限(默认1000个),这时有1001个fn过来,也会新分配db.batch,将第1001个fn加入新的db.batch
- 执行:执行阶段在db.batch.trigger(),代码中有两个调用点,对应到前面说的策略1和策略2, trigger内部对每个fn,调用db.Update方法执行事务操作。
func (db *DB) Batch(fn func(*Tx) error) error {
errCh := make(chan error, 1)
db.batchMu.Lock()
if (db.batch == nil) || (db.batch != nil && len(db.batch.calls) >= db.MaxBatchSize) {
db.batch = &batch{
db: db,
}
// 启动一个定时器,延时一段时间批量处理收集到的fn
db.batch.timer = time.AfterFunc(db.MaxBatchDelay, db.batch.trigger)
}
db.batch.calls = append(db.batch.calls, call{fn: fn, err: errCh})
// 已经收集够了tx,立即开启一个协程执行批量run,就不用等到db.MaxBatchDelay再来执行了
// 那有没有重复执行的可能,下面立即执行了,上面的定时器到了又会执行,不会的,batch中通过sync.Once
// 只执行一次
if len(db.batch.calls) >= db.MaxBatchSize {
// 这里为什么要开启一个协程执行run? 开启一个协程是为了尽早释放batchMu锁,不开一个协程
// 会延迟锁的释放,导致其他goroutine执行Batch卡死,fn加入不到db.batch
go db.batch.trigger()
}
db.batchMu.Unlock()
err := <-errCh
if err == trySolo {
// fn执行失败,在单独执行一次
err = db.Update(fn)
}
return err
}
看了Batch实现,也许有同学有下面的疑问点?小编这里做一个分析说明。
- 疑问1:代码中有两处调用db.batch.trigger,time.AfterFunc内部也是启用了一个goroutine,时间到了执行function,下面也是go db.batch.trigger(),会不会被执行两次?
答:不会被执行两次,虽然相当于在两个goroutine中都调用了db.batch.trigger,但是batch中start sync.Once
保证了多次调用也只会执行一次,「sync.Once用的恰到好处」
- 疑问2:
go db.batch.trigger()
中go可以去掉吗?
答:不可以,开启一个goroutine执行是为了尽早释放锁,否则会导致其他调用Batch会阻塞卡死,fn加入不到db.batch
- 疑问3:在第一处if逻辑中,收集到的fn数量超过db.MaxBatchSize,也对db.batch做了重新赋值操作,这里会影响
go db.batch.trigger()
执行吗?
答:不会的,if中新赋值的db.batch和go db.batch.trigger()中的db.batch不是同一个对象,不会影响。
Batch是一个阻塞操作,所以开启多个goroutine调用它才有意义。同时要注意,Batch中部分fn执行失败有重试执行操作,所以要考虑幂等性。
工作流程
前一小节通过示例介绍了写数据操作流程,本小节从原理层面概括写数据是如何执行的。在示例代码中可以看到,写数据操作和读数据操作是一致的。都有db.Open一个boltdb实例,然后开启事务,获取桶并在桶上进行读写操作。这里只介绍写数据流程中与读不同的地方。
写数据关键的两步是func (b *Bucket) Put(key []byte, value []byte) error
和func (tx *Tx) Commit() error
. 相比读操作,写操作复杂不少,因为读操作不涉及数据更改,所以直接从mmap映射后的内存page中读取。而写操作会修改数据,要支持数据库的ACID特性,boltdb处理方法是:
- 写数据写入内存中的node
- 执行事务Commit操作后,将node转换成脏页page,最后将脏页刷到硬盘上保存
下面结合源码对处理过程进行分析,boltdb是如何实现事务的在下篇文章中详细介绍。
- 创建读写事务,对db文件创建了一个读写快照,因为有修改操作,不能原地修改,要保持数据库的ACID特性,在拷贝后的数据上修改。
2.调用Bucket提供的Put方法写入key-value数据,因所有的数据写入操作都是在叶子节点中进行的,先创建一个迭代器,对Bucket遍历并定位到要写入的叶子节点,Cursor c游标最后的位置即要写入的叶子节点。
2.1 定位叶子节点后,执行c.node().put(key,value,0,0)
操作,这个操作分为两步,第一步是找到叶子节点node,「是node而不是page」,所有修改操作都是针对node的,对node不是很理解的可以看boltdb源码分析系列-内存结构文章。
func (b *Bucket) Put(key []byte, value []byte) error {
...
// 创建一个游标对象,对Bucket进行遍历,查找键值对为key的数据
c := b.Cursor()
k, _, flags := c.seek(key)
...
// 运行到这里,key有可能存在,也有可能不存在,不存在会创建一个key-value键值对,
// 存在会更新原来key的value,这些处理都是在put函数中实现的
// NOTE 对key是深拷贝,对value是浅拷贝
key = cloneBytes(key)
c.node().put(key, key, value, 0, 0)
return nil
}
Cursor的node方法返回叶子节点,如果node已存在,直接返回,否则从根节点开始从上往下,将游走路径上的page转成node,并保存到Bucket中的nodes。
代码语言:javascript复制func (c *Cursor) node() *node {
...
// 叶子节点已经在内存中,直接返回
if ref := &c.stack[len(c.stack)-1]; ref.node != nil && ref.isLeaf() {
return ref.node
}
// 从根节点开始,从上往下直到根节点,将游走路径上的page转成node,并保存到
// Bucket中的nodes
var n = c.stack[0].node
if n == nil {
n = c.bucket.node(c.stack[0].page.id, nil)
}
for _, ref := range c.stack[:len(c.stack)-1] {
_assert(!n.isLeaf, "expected branch node")
n = n.childAt(int(ref.index))
}
_assert(n.isLeaf, "expected leaf node")
return n
}
// 返回bucket中的第index孩子节点
func (n *node) childAt(index int) *node {
if n.isLeaf {
panic(fmt.Sprintf("invalid childAt(%d) on a leaf node", index))
}
return n.bucket.node(n.inodes[index].pgid, n)
}
将pgid对应的page转换成node,保存到Bucket中的nodes中。调用链为Cursor.node-->node.childAt-->Bucket.node. Cursor和node结构中都有一个记录Bucket字段,它们都来自读写事务tx. 所以经过这个操作之后,从根节点到有修改写入叶子节点路径上的所有节点都添加到了Bucket的nodes中。
代码语言:javascript复制func (b *Bucket) node(pgid pgid, parent *node) *node {
...
if n := b.nodes[pgid]; n != nil {
return n
}
// 同时,将这些新建的node保存到了已b.rootNode为根节点的tree中
// 在执行tx.Commit时,根据rootNode将它们转成page,最后刷新到磁盘
n := &node{bucket: b, parent: parent}
if parent == nil {
b.rootNode = n
} else {
parent.children = append(parent.children, n)
}
...
// 根据page页p来对节点n进一步初始化一些内容,主要是填充key-value信息
n.read(p)
// 将节点n缓存起来
b.nodes[pgid] = n
...
return n
}
2.2 调用node的put操作,将修改的内容写入,这里会传入两个key,一个是旧key,一个是新key,如果旧key存在,用新的key-value将其覆盖,如果,旧key不存在,相当于新添加了新的key-value.
代码语言:javascript复制func (n *node) put(oldKey, newKey, value []byte, pgid pgid, flags uint32) {
// 异常校验
...
// 在节点n的元素中查询旧key所在下标索引,即oldKey是它的第几个元素
index := sort.Search(len(n.inodes), func(i int) bool { return bytes.Compare(n.inodes[i].key, oldKey) != -1 })
// 进一步判断索引为index位置的元素的key是否是oldKey,进一步确认oldKey是否存在
exact := (len(n.inodes) > 0 && index < len(n.inodes) && bytes.Equal(n.inodes[index].key, oldKey))
// oldKey不存在,新增加一个节点
if !exact {
n.inodes = append(n.inodes, inode{})
copy(n.inodes[index 1:], n.inodes[index:])
}
// 更新index位置key-value数据
inode := &n.inodes[index]
inode.flags = flags
inode.key = newKey
inode.value = value
inode.pgid = pgid
_assert(len(inode.key) > 0, "put: zero-length inode key")
}
总结起来,Put操作的核心要点有两个:
- 修改写入数据都是针对node,不会在原始数据page做修改写入
- 写入数据发生在叶子节点上
- 叶子节点执行Put操作之后,从根节点到此叶子节点路径上所有的node都已缓存到tx关联的Bucket的nodes中
- Put操作后,从根节点到叶子节点路径上的node也保存到了以rootNode为根节点的tree中。
结合下图,boltdb文件被mmap到内存之后,以page形式表示,右侧是这些page转成node之后的树形表示。当执行Put操作时,在叶子节点node d写入数据,从根节点到node d路径上所有的节点都被保存到了Bucket.nodes中,同时这些nodes节点也保存到了以rootNode为根节点的树中,对应到图中的红色节点。「注意,图中灰色的node并没有在内存中真实存在,这里是为了对比说明,只有Put操作会影响到的page才会转成node,保存在Bucket中」。对于Put操作会影响到的page是指哪些page? Put操作是发生在某个leaf page上,从root page到某leaf page路径上的page都是会影响的page.
- 2中Put写入的数据当前在内存中,执行Commit操作才会将内存中的数据刷新到硬盘上。Commit操作比较复杂,详细实现细节在下一篇文章中介绍。执行tx.Commit操作,会将2中Bucket中nodes节点数据转成page,因为这些nodes是有更新或写入操作的,它们是脏nodes,转成page之后保存到tx.pages中,tx.pages中保存脏page.最后将tx.pages写入磁盘。
下面的Commit代码抽取了转换nodes和写入磁盘操作, 将脏nodes转成page在tx.root.spill函数中,将page写入磁盘是tx.write操作。
代码语言:javascript复制func (tx *Tx) Commit() error {
...
// 对B 进行溢出处理, 并将Bucket中nodes转成dirty page
if err := tx.root.spill(); err != nil {
tx.rollback()
return err
}
...
// 刷新脏页(包含新的freelist)到磁盘
if err := tx.write(); err != nil {
tx.rollback()
return err
}
...
}
将nodes转成page调用链很长,依次是tx.root.spill-->b.rootNode.spill-->tx.allocate.
代码语言:javascript复制func (b *Bucket) spill() error {
...
// 从根节点开始进行溢出处理
if err := b.rootNode.spill(); err != nil {
return err
}
...
}
注意这里被执行spill操作的node,是在前面的Bucket Put操作中,加入到以rootNode为根节点中的node. 也就是Put操作影响到的node,会转成脏page。对于没有影响到的page,它们不会被转成node.
代码语言:javascript复制func (n *node) spill() error {
...
for _, node := range nodes {
...
p, err := tx.allocate((node.size() / tx.db.pageSize) 1)
if err != nil {
return err
}
if p.id >= tx.meta.pgid {
panic(fmt.Sprintf("pgid (%d) above high water mark (%d)", p.id, tx.meta.pgid))
}
// 分配的page id值赋值给node
node.pgid = p.id
// 将node中的元素内容序列化化到page p中
node.write(p)
...
}
...
}
在tx.allocate中将node转成page. 分配给定数量的page,并保存到脏页缓存pages中.
代码语言:javascript复制func (tx *Tx) allocate(count int) (*page, error) {
p, err := tx.db.allocate(count)
if err != nil {
return nil, err
}
tx.pages[p.id] = p
tx.stats.PageCount
tx.stats.PageAlloc = count * tx.db.pageSize
return p, nil
}
tx.write()将dirty page保存到磁盘中,释放之前旧的page,将它们加入到freelist中,以便后续写入数据操作复用这些page.