前言
TRON 的数据还原点checkpoint
指的是数据在某一刻建立的一个快照的备份,给内存快照(snapshot)生成一个临时持久化存储。
作用
保存数据在内存中的状态到碰盘,用于服务异常数据异常恢复。
checkpoint 是将某一时刻在内存在还没有写入到磁盘中的数据,临时
写入到磁盘当中,当处理成功后删除本次的checkpoint,待下一次刷盘时,重新创建checkpoint,重复这个过程。
在此之前需要对TRON的内存快照机制有一定的了解。
刷盘机制
TRON中的刷盘和很多别的应用的刷盘一样,都是将内存中的数据刷入到磁盘当中。也就是说:TRON对数据的写入是先内存,后磁盘。 如果数据在内存当中就会存在一个问题,如果服务进程挂了,那内存中的数据就会丢失。举个例子: 比如:
A 给 B 转10块钱,这笔交易需要等待刷盘时机触发,才会写入到内存当中 此时,服务挂了,这个数据并没写入到磁盘中,那么这笔交易就丢失了。
为了解决这一问题的,使用 checkpoint 机制。
先看一下内存中的数据结构图,每一次对区块
操作,都会创建一个对应区块
的checkpoint。
SnapshotRoot
是对leveldb
的抽象,并不是一个内存的snapshot
,而SnapshotImpl
是对应每一次操作生成的一个内存快照,数据存储在ConcurrentHashMap
。
checkpoint 机制
在写入磁盘之前,先写入一个临时存储,这个临时存储就是一个checkpoint
。
破案了,那么checkpoint,实际上就是一个用来管理临时存储的功能
。
写入一个临时存储?那不就等于写入到了一个磁盘当中,那为什么要多此一举不如直接写入到磁盘当中费这个劲干什么呢,直接一把把所有内存数据写入对应的存储不就完了。
机制
主要做三件事,这三件事也是checkpoint的逻辑顺序:
- 检查checkpoint
- 删除checkpoint
- 创建checkpoint
启动服务时检查checkpoint
,在刷盘之前检查上一次是否存checkpoint并删除checkpoint
,最后创建checkpoint
,用于防止服务异常挂了后造成的数据库异常。
实现
看一下主要的处理逻辑:
代码语言:javascript复制
public void flush() {
if (unChecked) {
return;
}
if (shouldBeRefreshed()) {
try {
long start = System.currentTimeMillis();
// 删除checkpoint
deleteCheckpoint();
// 创建checkpoint
createCheckpoint();
long checkPointEnd = System.currentTimeMillis();
// 刷盘,将数据提交到leveldb当中
refresh();
flushCount = 0;
logger.info("flush cost:{}, create checkpoint cost:{}, refresh cost:{}",
System.currentTimeMillis() - start,
checkPointEnd - start,
System.currentTimeMillis() - checkPointEnd
);
} catch (TronDBException e) {
logger.error(" Find fatal error , program will be exited soon", e);
hitDown = true;
LockSupport.unpark(exitThread);
}
}
}
创建checkpoint
创建checkpoint
将内存中的数据放到临时存储CheckTmpStore
中去。
可以看下CheckTmpStore
的具体做了什么。
private void createCheckpoint() {
try {
Map<WrappedByteArray, WrappedByteArray> batch = new HashMap<>();
// 遍历内存数据,这里是所有未持久化的数据,以数据库为维度,如 account、block
for (Chainbase db : dbs) {
Snapshot head = db.getHead();
// root是 leveldb 或 rocksdb 数据库本自,结构为:
// snapshotRoot(leveldb)-->snapshot-->snapshot-->...-->snapshot
// 如果到了root说明链表已经固化完了
if (Snapshot.isRoot(head)) {
return;
}
String dbName = db.getDbName();
Snapshot next = head.getRoot();
// 1 或 500
// flush [block:1]
// flush [block:1]--[block2]--[block n]--[block:500]
for (int i = 0; i < flushCount; i) {
next = next.getNext();
SnapshotImpl snapshot = (SnapshotImpl) next;
DB<Key, Value> keyValueDB = snapshot.getDb();
for (Map.Entry<Key, Value> e : keyValueDB) {
Key k = e.getKey();
Value v = e.getValue();
batch.put(WrappedByteArray.of(Bytes.concat(simpleEncode(dbName), k.getBytes())),
WrappedByteArray.of(v.encode()));
}
}
}
// 进行存储
checkTmpStore.getDbSource().updateByBatch(batch.entrySet().stream()
.map(e -> Maps.immutableEntry(e.getKey().getBytes(), e.getValue().getBytes()))
.collect(HashMap::new, (m, k) -> m.put(k.getKey(), k.getValue()), HashMap::putAll),
WriteOptionsWrapper.getInstance().sync(CommonParameter
.getInstance().getStorage().isDbSync()));
} catch ( Exception e) {
throw new TronDBException(e);
}
}
创建checkpoint就是这么点事,那么问题来了 创建checkpoint过程中如果程序挂了,数据会不会有问题? 数据丢失了怎么办? 机器宕机了怎么办?
创建checkpoint过程中如果程序挂了,数据会不会有问题?
如查程序挂了,数据没有写入到数据库中,只会丢失内存部分的数据库。原始数据没有受到影响,缺失的部分从其它节点同步后获得。
数据丢失了怎么办?
checkpoint
创建成功后,如果服务挂了,重启后会先检查checkpoint
中的数据,如果存在就加载到内存当中。所以这个机制本身就是用来防止写库时丢失内存数据的。
机器宕机了怎么办?
宕机数据有可能会丢失!!!
如果正好createCheckpoint
进行调用checkTmpStore
写到一半机器宕机了,那数据有可能只有了一半,就会出现脏数据。这样的话,下次服务启动后,数据有可能对不上,服务就会一直处理异常状态。
checkTmpStore
作用:存储内存快照(Snapshot)数据。 也就是把内存的Snapshot数据,存在这里。
检查checkpoint
在java-tron
服务启动时,检查checkpoint
是否有数据,如果有数据,则将数据加载入内存当中。
启动流程:
代码语言:javascript复制Manager.init()
|---revokingStore.check();
|---SnapshotManager.check()
SnapshotManager.check() 中check的就是checkpoint的状态。
代码语言:javascript复制// ensure run this method first after process start.
@Override
public void check() {
for (Chainbase db : dbs) {
if (!Snapshot.isRoot(db.getHead())) {
throw new IllegalStateException("first check.");
}
}
// checkTmpStore 是临时存储库,会存储所有数据在内存中的状态
// 检查 checkTmpStore 里是否有数据
if (!checkTmpStore.getDbSource().allKeys().isEmpty()) {
// 把所有数据库转成一个Map,这里的数据,是上一次刷盘之前的数据!!!
Map<String, Chainbase> dbMap = dbs.stream()
.map(db -> Maps.immutableEntry(db.getDbName(), db))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
// advance 就是字面意思,在内存中开辟一个新的 snapshot,这个是链表结构
advance();
for (Map.Entry<byte[], byte[]> e : checkTmpStore.getDbSource()) {
byte[] key = e.getKey();
byte[] value = e.getValue();
// 遍历DB,db 是数据库名
String db = simpleDecode(key);
if (dbMap.get(db) == null) {
continue;
}
byte[] realKey = Arrays.copyOfRange(key, db.getBytes().length 4, key.length);
byte[] realValue = value.length == 1 ? null : Arrays.copyOfRange(value, 1, value.length);
if (realValue != null) {
// 如果checkTmpStore 里对应的值不为 null,就更新到数据库中
dbMap.get(db).getHead().put(realKey, realValue);
} else {
// 为空则说明key对应的value值为空,也移除掉这个值
// 如果key不存在,调用get方法返回就是null
dbMap.get(db).getHead().remove(realKey);
}
}
// 注意,这个是 merge 操作,就是合并的意思。意思是把前一块数据合并写到root中
dbs.forEach(db -> db.getHead().getRoot().merge(db.getHead()));
// 修改链表指针指向新一个head
retreat();
}
// 标志自己已经check过了
unChecked = false;
}
总结
checkPoint
机制可以有效的防止服务挂掉时保存内存中的数据,但是宕机的场景的话确实没有办法,包括像RocketMQ号称高可用也没有办法解决宕机队列数据丢失的问题。
如果数据不一致,最好的方法是删除数据库,重新下载备分库使用。