在分析完核心数据结构后,我们结合使用boltdb的核心过程了解下上述数据结构建立的过程,总结下来核心过程如下:
代码语言:javascript复制bolt.Open
db.Update
db.Begin
tx.CreateBucket
tx.Bucket
b.Get
b.Put
tx.Commit
在db.go中我们可以了解到Update函数其实也是调用了Begin来开启一个事务的。事务执行成功就提交,失败则回滚。
代码语言:javascript复制func (db *DB) Update(fn func(*Tx) error) error {
t, err := db.Begin(true)
defer func() {
if t.db != nil {
t.rollback()
t.managed = true
err = fn(t)
t.managed = false
if err != nil {
_ = t.Rollback()
return t.Commit()
开启事务的时候通过参数来决定开启一个只读事务还是一个写事务:
代码语言:javascript复制func (db *DB) Begin(writable bool) (*Tx, error) {
if writable {
return db.beginRWTx()
}
return db.beginTx()
}
开启写事务的时候会加排他锁,并且会给meta信息也加锁,然后创建一个Tx对象,并将它赋值给db对象,因为只能有一个写事务。
代码语言:javascript复制func (db *DB) beginRWTx() (*Tx, error) {
db.rwlock.Lock()
db.metalock.Lock()
defer db.metalock.Unlock()
if !db.opened {
db.rwlock.Unlock()
t := &Tx{writable: true}
t.init(db)
db.rwtx = t
代码语言:javascript复制func (db *DB) beginTx() (*Tx, error) {
db.metalock.Lock()
db.mmaplock.RLock()
db.mmaplock.RUnlock()
db.metalock.Unlock()
t := &Tx{}
t.init(db)
db.txs = append(db.txs, t)
n := len(db.txs)
db.metalock.Unlock()
db.statlock.Lock()
db.stats.TxN
db.stats.OpenTxN = n
db.statlock.Unlock()
View执行的过程类似,只不过它开启的是读事务。
代码语言:javascript复制 func (db *DB) View(fn func(*Tx) error) error {
t, err := db.Begin(false)
defer func() {
if t.db != nil {
t.rollback()
t.managed = true
// If an error is returned from the function then pass it through.
err = fn(t)
t.managed = false
if err != nil {
_ = t.Rollback()
if err := t.Rollback(); err != nil {
开启读事务的时候只需要给meta加锁,并且,可以开启多个读事务,是通过数组的形式存储在db上
代码语言:javascript复制
func (db *DB) beginTx() (*Tx, error) {
// Lock the meta pages while we initialize the transaction. We obtain
// the meta lock before the mmap lock because that's the order that the
// write transaction will obtain them.
db.metalock.Lock()
// Obtain a read-only lock on the mmap. When the mmap is remapped it will
// obtain a write lock so all transactions must finish before it can be
// remapped.
db.mmaplock.RLock()
// Exit if the database is not open yet.
if !db.opened {
db.mmaplock.RUnlock()
db.metalock.Unlock()
return nil, ErrDatabaseNotOpen
}
// Create a transaction associated with the database.
t := &Tx{}
t.init(db)
// Keep track of transaction until it closes.
db.txs = append(db.txs, t)
n := len(db.txs)
// Unlock the meta pages.
db.metalock.Unlock()
// Update the transaction stats.
db.statlock.Lock()
db.stats.TxN
db.stats.OpenTxN = n
db.statlock.Unlock()
return t, nil
}
Batch只不过是批量调用Update
代码语言:javascript复制func (db *DB) Batch(fn func(*Tx) error) error {
db.batchMu.Lock()
db.batch = &batch{
db: db,
}
db.batch.timer = time.AfterFunc(db.MaxBatchDelay, db.batch.trigger)
db.batch.calls = append(db.batch.calls, call{fn: fn, err: errCh})
go db.batch.trigger()
db.batchMu.Unlock()
err = db.Update(fn)
代码语言:javascript复制func (b *batch) trigger() {
b.start.Do(b.run)
}
代码语言:javascript复制func (b *batch) run() {
for len(b.calls) > 0 {
err := b.db.Update(func(tx *Tx) error {
for i, c := range b.calls {
if err := safelyCall(c.fn, tx); err != nil {
c := b.calls[failIdx]
b.calls[failIdx], b.calls = b.calls[len(b.calls)-1], b.calls[:len(b.calls)-1]
代码语言:javascript复制type batch struct {
db *DB
timer *time.Timer
start sync.Once
calls []call
}
事务的定义位于tx.go
代码语言:javascript复制type Tx struct {
writable bool
managed bool
db *DB
meta *meta
root Bucket
pages map[pgid]*page
stats TxStats
commitHandlers []func()
// WriteFlag specifies the flag for write-related methods like WriteTo().
// Tx opens the database file with the specified flag to copy the data.
//
// By default, the flag is unset, which works well for mostly in-memory
// workloads. For databases that are much larger than available RAM,
// set the flag to syscall.O_DIRECT to avoid trashing the page cache.
WriteFlag int
}
定义完事务对象后,会进行初始化操作,它会复制一份meta信息,写事务事务id会增加,读事务事务id不会增加。
代码语言:javascript复制func (tx *Tx) init(db *DB) {
tx.meta = &meta{}
db.meta().copy(tx.meta)
tx.root = newBucket(tx)
tx.root.bucket = &bucket{}
*tx.root.bucket = tx.meta.root
if tx.writable {
tx.pages = make(map[pgid]*page)
tx.meta.txid = txid(1)
}
var minid txid = 0xFFFFFFFFFFFFFFFF
for _, t := range db.txs {
if t.meta.txid < minid {
minid = t.meta.txid
}
}
if minid > 0 {
db.freelist.release(minid - 1)
}
事务提交的时候会先进行rebalance操作来释放填充因子不满足条件的页避免空间浪费,然后通过spill操作来拆分填充因子过大的页。最后会把释放的页填入freelist,落盘。最后修改meta完成事务的提交。
代码语言:javascript复制func (tx *Tx) Commit() error {
var startTime = time.Now()
tx.root.rebalance()
startTime = time.Now()
if err := tx.root.spill(); err != nil {
tx.rollback()
tx.meta.root.root = tx.root.root
opgid := tx.meta.pgid
tx.db.freelist.free(tx.meta.txid, tx.db.page(tx.meta.freelist))
p, err := tx.allocate((tx.db.freelist.size() / tx.db.pageSize) 1)
if err != nil {
tx.rollback()
if err := tx.db.freelist.write(p); err != nil {
tx.rollback()
tx.meta.freelist = p.id
if tx.meta.pgid > opgid {
if err := tx.db.grow(int(tx.meta.pgid 1) * tx.db.pageSize); err != nil {
tx.rollback()
if err := tx.write(); err != nil {
tx.rollback()
if tx.db.StrictMode {
ch := tx.Check()
if err := tx.writeMeta(); err != nil {
tx.rollback()
tx.close()
// Execute commit handlers now that the locks have been removed.
for _, fn := range tx.commitHandlers {
fn()
}
由于boltdb采用了mvcc模式,所以事务回滚很简单,直接重新加载meat信息,复原freelist即可。读事务回滚不需要任何操作。
代码语言:javascript复制func (tx *Tx) Rollback() error {
tx.rollback()
代码语言:javascript复制 func (tx *Tx) rollback() {
if tx.writable {
tx.db.freelist.rollback(tx.meta.txid)
tx.db.freelist.reload(tx.db.page(tx.db.meta().freelist))
}
tx.close()
创建和获取Bucket的过程如下:
代码语言:javascript复制 func (tx *Tx) CreateBucket(name []byte) (*Bucket, error) {
return tx.root.CreateBucket(name)
}
代码语言:javascript复制func (tx *Tx) Bucket(name []byte) *Bucket {
return tx.root.Bucket(name)
代码语言:javascript复制// TxStats represents statistics about the actions performed by the transaction.
type TxStats struct {
// Page statistics.
PageCount int // number of page allocations
PageAlloc int // total bytes allocated
// Cursor statistics.
CursorCount int // number of cursors created
// Node statistics
NodeCount int // number of node allocations
NodeDeref int // number of node dereferences
// Rebalance statistics.
Rebalance int // number of node rebalances
RebalanceTime time.Duration // total time spent rebalancing
// Split/Spill statistics.
Split int // number of nodes split
Spill int // number of nodes spilled
SpillTime time.Duration // total time spent spilling
// Write statistics.
Write int // number of writes performed
WriteTime time.Duration // total time spent writing to disk
}
通过key定位到插入或者查找的位置,然后进行数据的插入和查找
代码语言:javascript复制func (b *Bucket) CreateBucket(key []byte) (*Bucket, error) {
c := b.Cursor()
k, _, flags := c.seek(key)
var bucket = Bucket{
bucket: &bucket{},
rootNode: &node{isLeaf: true},
FillPercent: DefaultFillPercent,
}
var value = bucket.write()
key = cloneBytes(key)
c.node().put(key, key, value, 0, bucketLeafFlag)
return b.Bucket(key), nil
代码语言:javascript复制func (b *Bucket) Bucket(name []byte) *Bucket {
if b.buckets != nil {
if child := b.buckets[string(name)]; child != nil {
return child
c := b.Cursor()
k, v, flags := c.seek(name)
if !bytes.Equal(name, k) || (flags&bucketLeafFlag) == 0 {
return nil
var child = b.openBucket(v)
if b.buckets != nil {
b.buckets[string(name)] = child
func (b *Bucket) Put(key []byte, value []byte) error {
c := b.Cursor()
k, _, flags := c.seek(key)
key = cloneBytes(key)
c.node().put(key, key, value, 0, 0)
在Bucket内部获取某个key的值的过程类似
代码语言:javascript复制func (b *Bucket) Get(key []byte) []byte {
k, v, flags := b.Cursor().seek(key)
代码语言:javascript复制func (b *Bucket) Cursor() *Cursor {
// Update transaction statistics.
b.tx.stats.CursorCount
// Allocate and return a cursor.
return &Cursor{
bucket: b,
stack: make([]elemRef, 0),
}
}
最后看看Bucket的定义
代码语言:javascript复制type Bucket struct {
*bucket
tx *Tx // the associated transaction
buckets map[string]*Bucket // subbucket cache
page *page // inline page reference
rootNode *node // materialized node for the root page.
nodes map[pgid]*node // node cache
// Sets the threshold for filling nodes when they split. By default,
// the bucket will fill to 50% but it can be useful to increase this
// amount if you know that your write workloads are mostly append-only.
//
// This is non-persisted across transactions so it must be set in every Tx.
FillPercent float64
}
代码语言:javascript复制type bucket struct {
root pgid // page id of the bucket's root-level page
sequence uint64 // monotonically incrementing, used by NextSequence()
}
// bucket represents the on-file representation of a bucket.
// This is stored as the "value" of a bucket key. If the bucket is small enough,
// then its root page can be stored inline in the "value", after the bucket
// header. In the case of inline buckets, the "root" will be 0.
数据查询过程中,使用了游标,定义位于cursor.go
代码语言:javascript复制type elemRef struct {
page *page
node *node
index int
}
执行事务过程中获取页面信息的过程如下:
代码语言:javascript复制func (tx *Tx) Page(id int) (*PageInfo, error) {
if tx.db == nil {
return nil, ErrTxClosed
} else if pgid(id) >= tx.meta.pgid {
return nil, nil
}
// Build the page info.
p := tx.db.page(pgid(id))
info := &PageInfo{
ID: id,
Count: int(p.count),
OverflowCount: int(p.overflow),
}
// Determine the type (or if it's free).
if tx.db.freelist.freed(pgid(id)) {
info.Type = "free"
} else {
info.Type = p.typ()
}
return info, nil
}
从上面过程中我们可以看到,核心复杂逻辑就在提交事务过程的rebalance和spill对应代码位于bucket.go,获取根Bucket后,会对它每一个node、每一个嵌套Bucket都进行rebalance。
代码语言:javascript复制func (b *Bucket) rebalance() {
for _, n := range b.nodes {
n.rebalance()
}
for _, child := range b.buckets {
child.rebalance()
}
}
代码语言:javascript复制func (n *node) rebalance() {
n.unbalanced = false
// Update statistics.
n.bucket.tx.stats.Rebalance
var threshold = n.bucket.tx.db.pageSize / 4
if n.size() > threshold && len(n.inodes) > n.minKeys() {
return
}
// Root node has special handling.
if n.parent == nil {
// If root node is a branch and only has one node then collapse it.
if !n.isLeaf && len(n.inodes) == 1 {
// Move root's child up.
child := n.bucket.node(n.inodes[0].pgid, n)
n.isLeaf = child.isLeaf
n.inodes = child.inodes[:]
n.children = child.children
// Reparent all child nodes being moved.
for _, inode := range n.inodes {
if child, ok := n.bucket.nodes[inode.pgid]; ok {
child.parent = n
}
}
// Remove old child.
child.parent = nil
delete(n.bucket.nodes, child.pgid)
child.free()
}
return
}
if n.numChildren() == 0 {
n.parent.del(n.key)
n.parent.removeChild(n)
delete(n.bucket.nodes, n.pgid)
n.free()
n.parent.rebalance()
return
}
var useNextSibling = (n.parent.childIndex(n) == 0)
if useNextSibling {
target = n.nextSibling()
} else {
target = n.prevSibling()
}
for _, inode := range target.inodes {
if child, ok := n.bucket.nodes[inode.pgid]; ok {
child.parent.removeChild(child)
child.parent = n
child.parent.children = append(child.parent.children, child)
}
}
// Copy over inodes from target and remove target.
n.inodes = append(n.inodes, target.inodes...)
n.parent.del(target.key)
n.parent.removeChild(target)
delete(n.bucket.nodes, target.pgid)
target.free()
for _, inode := range n.inodes {
if child, ok := n.bucket.nodes[inode.pgid]; ok {
child.parent.removeChild(child)
child.parent = target
child.parent.children = append(child.parent.children, child)
}
}
// Copy over inodes to target and remove node.
target.inodes = append(target.inodes, n.inodes...)
n.parent.del(n.key)
n.parent.removeChild(n)
delete(n.bucket.nodes, n.pgid)
n.free()
n.parent.rebalance()
代码语言:javascript复制
// spill writes all the nodes for this bucket to dirty pages.
func (b *Bucket) spill() error {
// Spill all child buckets first.
for name, child := range b.buckets {
// If the child bucket is small enough and it has no child buckets then
// write it inline into the parent bucket's page. Otherwise spill it
// like a normal bucket and make the parent value a pointer to the page.
var value []byte
if child.inlineable() {
child.free()
value = child.write()
} else {
if err := child.spill(); err != nil {
return err
}
// Update the child bucket header in this bucket.
value = make([]byte, unsafe.Sizeof(bucket{}))
var bucket = (*bucket)(unsafe.Pointer(&value[0]))
*bucket = *child.bucket
}
// Skip writing the bucket if there are no materialized nodes.
if child.rootNode == nil {
continue
}
// Update parent node.
var c = b.Cursor()
k, _, flags := c.seek([]byte(name))
if !bytes.Equal([]byte(name), k) {
panic(fmt.Sprintf("misplaced bucket header: %x -> %x", []byte(name), k))
}
if flags&bucketLeafFlag == 0 {
panic(fmt.Sprintf("unexpected bucket header flag: %x", flags))
}
c.node().put([]byte(name), []byte(name), value, 0, bucketLeafFlag)
}
// Ignore if there's not a materialized root node.
if b.rootNode == nil {
return nil
}
// Spill nodes.
if err := b.rootNode.spill(); err != nil {
return err
}
b.rootNode = b.rootNode.root()
// Update the root node for this bucket.
if b.rootNode.pgid >= b.tx.meta.pgid {
panic(fmt.Sprintf("pgid (%d) above high water mark (%d)", b.rootNode.pgid, b.tx.meta.pgid))
}
b.root = b.rootNode.pgid
return nil
}