server/storage/backend/backend.go定义了后端存储的核心接口和具体实现,本质上是对boltdb的相关接口的一个封装
代码语言:javascript复制type Backend interface {
// ReadTx returns a read transaction. It is replaced by ConcurrentReadTx in the main data path, see #10523.
ReadTx() ReadTx
BatchTx() BatchTx
// ConcurrentReadTx returns a non-blocking read transaction.
ConcurrentReadTx() ReadTx
Snapshot() Snapshot
Hash(ignores func(bucketName, keyName []byte) bool) (uint32, error)
// Size returns the current size of the backend physically allocated.
// The backend can hold DB space that is not utilized at the moment,
// since it can conduct pre-allocation or spare unused space for recycling.
// Use SizeInUse() instead for the actual DB size.
Size() int64
// SizeInUse returns the current size of the backend logically in use.
// Since the backend can manage free space in a non-byte unit such as
// number of pages, the returned value can be not exactly accurate in bytes.
SizeInUse() int64
// OpenReadTxN returns the number of currently open read transactions in the backend.
OpenReadTxN() int64
Defrag() error
ForceCommit()
Close() error
// SetTxPostLockInsideApplyHook sets a txPostLockInsideApplyHook.
SetTxPostLockInsideApplyHook(func())
}
snapshot接口核心方法只有计算大小、写、关闭三个:
代码语言:javascript复制type Snapshot interface {
// Size gets the size of the snapshot.
Size() int64
// WriteTo writes the snapshot into the given writer.
WriteTo(w io.Writer) (n int64, err error)
// Close closes the snapshot.
Close() error
}
读事物的buffer是需要加锁的
代码语言:javascript复制type txReadBufferCache struct {
mu sync.Mutex
buf *txReadBuffer
bufVersion uint64
}
backend是Backend接口的具体实现:
代码语言:javascript复制type backend struct {
// size and commits are used with atomic operations so they must be
// 64-bit aligned, otherwise 32-bit tests will crash
// size is the number of bytes allocated in the backend
size int64
// sizeInUse is the number of bytes actually used in the backend
sizeInUse int64
// commits counts number of commits since start
commits int64
// openReadTxN is the number of currently open read transactions in the backend
openReadTxN int64
// mlock prevents backend database file to be swapped
mlock bool
mu sync.RWMutex
bopts *bolt.Options
db *bolt.DB
batchInterval time.Duration
batchLimit int
batchTx *batchTxBuffered
readTx *readTx
// txReadBufferCache mirrors "txReadBuffer" within "readTx" -- readTx.baseReadTx.buf.
// When creating "concurrentReadTx":
// - if the cache is up-to-date, "readTx.baseReadTx.buf" copy can be skipped
// - if the cache is empty or outdated, "readTx.baseReadTx.buf" copy is required
txReadBufferCache txReadBufferCache
stopc chan struct{}
donec chan struct{}
hooks Hooks
// txPostLockInsideApplyHook is called each time right after locking the tx.
txPostLockInsideApplyHook func()
lg *zap.Logger
}
响应的config定义了初始化backend对象需要的属性
代码语言:javascript复制type BackendConfig struct {
// Path is the file path to the backend file.
Path string
// BatchInterval is the maximum time before flushing the BatchTx.
BatchInterval time.Duration
// BatchLimit is the maximum puts before flushing the BatchTx.
BatchLimit int
// BackendFreelistType is the backend boltdb's freelist type.
BackendFreelistType bolt.FreelistType
// MmapSize is the number of bytes to mmap for the backend.
MmapSize uint64
// Logger logs backend-side operations.
Logger *zap.Logger
// UnsafeNoFsync disables all uses of fsync.
UnsafeNoFsync bool `json:"unsafe-no-fsync"`
// Mlock prevents backend database file to be swapped
Mlock bool
// Hooks are getting executed during lifecycle of Backend's transactions.
Hooks Hooks
}
newBackend就是初始化一个bolt对象,设置读事务和批量事物的buffer,最后启动一个协程提供存储服务:
代码语言:javascript复制func newBackend(bcfg BackendConfig) *backend {
db, err := bolt.Open(bcfg.Path, 0600, bopts)
b := &backend{
bopts: bopts,
db: db,
readTx: &readTx{
baseReadTx:baseReadTx{
buf: txReadBuffer{
b.batchTx = newBatchTxBuffered(b)
go b.run()
代码语言:javascript复制func (b *backend) BatchTx() BatchTx {
return b.batchTx
}
代码语言:javascript复制func (b *backend) SetTxPostLockInsideApplyHook(hook func()) {
Snapshot返回一个Snapshot对象,里面包含一个boltdb的只读事物
代码语言:javascript复制func (b *backend) Snapshot() Snapshot {
b.batchTx.Commit()
tx, err := b.db.Begin(false)
go func() {
case <-stopc:
snapshotTransferSec.Observe(time.Since(start).Seconds())
return &snapshot{tx, stopc, donec}
Hash方法会计算对应的crc,遍历所有bucket里面的k,v然后计算所有内容的crc
代码语言:javascript复制func (b *backend) Hash(ignores func(bucketName, keyName []byte) bool) (uint32, error) {
h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
err := b.db.View(func(tx *bolt.Tx) error {
c := tx.Cursor()
for next, _ := c.First(); next != nil; next, _ = c.Next() {
b := tx.Bucket(next)
h.Write(next)
b.ForEach(func(k, v []byte) error {
if ignores != nil && !ignores(next, k) {
h.Write(k)
h.Write(v)
return h.Sum32(), nil
}
run方法会做定时的事物提交和系统关闭时候的提交
代码语言:javascript复制func (b *backend) run() {
for {
select {
case <-t.C:
case <-b.stopc:
b.batchTx.CommitAndStop()
if b.batchTx.safePending() != 0 {
b.batchTx.Commit()
}
t.Reset(b.batchInterval)
defrag会进行碎片整理操作:
代码语言:javascript复制 func (b *backend) defrag() error {
tmpdb, err := bolt.Open(tdbp, 0600, &options)
err = defragdb(b.db, tmpdb, defragLimit)
会整理bolt的所有k,v
代码语言:javascript复制func defragdb(odb, tmpdb *bolt.DB, limit int) error {
tx, err := odb.Begin(false)
for next, _ := c.First(); next != nil; next, _ = c.Next() {
b := tx.Bucket(next)
if err = b.ForEach(func(k, v []byte) error {
tmptx, err = tmpdb.Begin(true)
tmpb = tmptx.Bucket(next)
代码语言:javascript复制func (b *backend) begin(write bool) *bolt.Tx {
size := tx.Size()
db := tx.DB()
stats := db.Stats()
atomic.StoreInt64(&b.size, size)
server/storage/backend/batch_tx.go 定义了Bucket和事务相关接口,这些都是和bolt对应的。
代码语言:javascript复制type Bucket interface {
// ID returns a unique identifier of a bucket.
// The id must NOT be persisted and can be used as lightweight identificator
// in the in-memory maps.
ID() BucketID
Name() []byte
// String implements Stringer (human readable name).
String() string
// IsSafeRangeBucket is a hack to avoid inadvertently reading duplicate keys;
// overwrites on a bucket should only fetch with limit=1, but safeRangeBucket
// is known to never overwrite any key so range is safe.
IsSafeRangeBucket() bool
}
代码语言:javascript复制type BatchTx interface {
ReadTx
UnsafeCreateBucket(bucket Bucket)
UnsafeDeleteBucket(bucket Bucket)
UnsafePut(bucket Bucket, key []byte, value []byte)
UnsafeSeqPut(bucket Bucket, key []byte, value []byte)
UnsafeDelete(bucket Bucket, key []byte)
// Commit commits a previous tx and begins a new writable one.
Commit()
// CommitAndStop commits the previous tx and does not create a new one.
CommitAndStop()
LockInsideApply()
LockOutsideApply()
}
batchTx的具体实现是对bolt的Tx进行了嵌入
代码语言:javascript复制type batchTx struct {
sync.Mutex
tx *bolt.Tx
backend *backend
pending int
}
代码语言:javascript复制func (t *batchTx) UnsafeCreateBucket(bucket Bucket) {
_, err := t.tx.CreateBucket(bucket.Name())
代码语言:javascript复制func (t *batchTx) UnsafeDeleteBucket(bucket Bucket) {
Put就是bolt的k/v的Put操作
代码语言:javascript复制func (t *batchTx) unsafePut(bucketType Bucket, key []byte, value []byte, seq bool) {
bucket := t.tx.Bucket(bucketType.Name())
if err := bucket.Put(key, value); err != nil {
代码语言:javascript复制func (t *batchTx) UnsafeRange(bucketType Bucket, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
代码语言:javascript复制func (t *batchTx) commit(stop bool) {
err := t.tx.Commit()
batchTxBuffered仅仅是加了个buffer而已
代码语言:javascript复制type batchTxBuffered struct {
batchTx
buf txWriteBuffer
}
server/storage/backend/config_default.go里定义了配置
代码语言:javascript复制func (bcfg *BackendConfig) mmapSize() int { return int(bcfg.MmapSize) }
server/storage/backend/config_linux.go每种平台的配置都有一定的差别
代码语言:javascript复制var boltOpenOptions = &bolt.Options{
MmapFlags: syscall.MAP_POPULATE,
NoFreelistSync: true,
}
代码语言:javascript复制func (bcfg *BackendConfig) mmapSize() int { return int(bcfg.MmapSize) }
server/storage/backend/hooks.go里定义了Hooks的接口,方便我们使用Backend的时候设置前置Hook操作
代码语言:javascript复制type HookFunc func(tx BatchTx)
代码语言:javascript复制type Hooks interface {
// OnPreCommitUnsafe is executed before Commit of transactions.
// The given transaction is already locked.
OnPreCommitUnsafe(tx BatchTx)
}
并提供了具体实现
代码语言:javascript复制type hooks struct {
onPreCommitUnsafe HookFunc
}
代码语言:javascript复制func (h hooks) OnPreCommitUnsafe(tx BatchTx) {
h.onPreCommitUnsafe(tx)
}
server/storage/backend/metrics.go里定义对应的监控指标
代码语言:javascript复制prometheus.MustRegister(commitSec)
prometheus.MustRegister(rebalanceSec)
server/storage/backend/read_tx.go定义了读事务接口和实现
代码语言:javascript复制type ReadTx interface {
Lock()
Unlock()
RLock()
RUnlock()
UnsafeRange(bucket Bucket, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
UnsafeForEach(bucket Bucket, visitor func(k, v []byte) error) error
}
具体实现中也是对bolt的事物的一个封装
代码语言:javascript复制type baseReadTx struct {
// mu protects accesses to the txReadBuffer
mu sync.RWMutex
buf txReadBuffer
// TODO: group and encapsulate {txMu, tx, buckets, txWg}, as they share the same lifecycle.
// txMu protects accesses to buckets and tx on Range requests.
txMu *sync.RWMutex
tx *bolt.Tx
buckets map[BucketID]*bolt.Bucket
// txWg protects tx from being rolled back at the end of a batch interval until all reads using this tx are done.
txWg *sync.WaitGroup
}
代码语言:javascript复制func (baseReadTx *baseReadTx) UnsafeForEach(bucket Bucket, visitor func(k, v []byte) error) error {
代码语言:javascript复制func (baseReadTx *baseReadTx) UnsafeRange(bucketType Bucket, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
keys, vals := baseReadTx.buf.Range(bucketType, key, endKey, limit)
读事物分两种实现,一个是普通读,一个是并发读
代码语言:javascript复制type readTx struct {
baseReadTx
}
代码语言:javascript复制type concurrentReadTx struct {
baseReadTx
}
server/storage/backend/tx_buffer.go事务buffer是一个map,把每个bucket分开单独处理
代码语言:javascript复制type txBuffer struct {
buckets map[BucketID]*bucketBuffer
}
写buffer也是一样的
代码语言:javascript复制type txWriteBuffer struct {
txBuffer
// Map from bucket ID into information whether this bucket is edited
// sequentially (i.e. keys are growing monotonically).
bucket2seq map[BucketID]bool
}
代码语言:javascript复制func (txw *txWriteBuffer) put(bucket Bucket, k, v []byte) {
代码语言:javascript复制type txReadBuffer struct {
txBuffer
// bufVersion is used to check if the buffer is modified recently
bufVersion uint64
}
代码语言:javascript复制func (txr *txReadBuffer) Range(bucket Bucket, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
代码语言:javascript复制func (txr *txReadBuffer) ForEach(bucket Bucket, visitor func(k, v []byte) error) error {
kv里的key和val都是[]byte这里和bolt是一致的减少了编解码的烦恼
代码语言:javascript复制type kv struct {
key []byte
val []byte
}
代码语言:javascript复制type bucketBuffer struct {
buf []kv
// used tracks number of elements in use so buf can be reused without reallocation.
used int
}
代码语言:javascript复制func (bb *bucketBuffer) Range(key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) {
server/storage/backend/verify.go会对环境变量进行一系列验证
代码语言:javascript复制func ValidateCalledInsideApply(lg *zap.Logger) {