tron checkpoint数据还原点

2023-10-23 14:40:02 浏览数 (1)

前言

TRON 的数据还原点checkpoint指的是数据在某一刻建立的一个快照的备份,给内存快照(snapshot)生成一个临时持久化存储。

作用

保存数据在内存中的状态到碰盘,用于服务异常数据异常恢复。 checkpoint 是将某一时刻在内存在还没有写入到磁盘中的数据,临时写入到磁盘当中,当处理成功后删除本次的checkpoint,待下一次刷盘时,重新创建checkpoint,重复这个过程。

在此之前需要对TRON的内存快照机制有一定的了解。

刷盘机制

TRON中的刷盘和很多别的应用的刷盘一样,都是将内存中的数据刷入到磁盘当中。也就是说:TRON对数据的写入是先内存,后磁盘。 如果数据在内存当中就会存在一个问题,如果服务进程挂了,那内存中的数据就会丢失。举个例子: 比如:

A 给 B 转10块钱,这笔交易需要等待刷盘时机触发,才会写入到内存当中 此时,服务挂了,这个数据并没写入到磁盘中,那么这笔交易就丢失了。

为了解决这一问题的,使用 checkpoint 机制。

先看一下内存中的数据结构图,每一次对区块操作,都会创建一个对应区块的checkpoint。 SnapshotRoot是对leveldb的抽象,并不是一个内存的snapshot,而SnapshotImpl是对应每一次操作生成的一个内存快照,数据存储在ConcurrentHashMap

checkpoint 机制

在写入磁盘之前,先写入一个临时存储,这个临时存储就是一个checkpoint。 破案了,那么checkpoint,实际上就是一个用来管理临时存储的功能。 写入一个临时存储?那不就等于写入到了一个磁盘当中,那为什么要多此一举不如直接写入到磁盘当中费这个劲干什么呢,直接一把把所有内存数据写入对应的存储不就完了。

机制

主要做三件事,这三件事也是checkpoint的逻辑顺序:

  1. 检查checkpoint
  2. 删除checkpoint
  3. 创建checkpoint

启动服务时检查checkpoint,在刷盘之前检查上一次是否存checkpoint并删除checkpoint,最后创建checkpoint,用于防止服务异常挂了后造成的数据库异常。

实现

看一下主要的处理逻辑:

代码语言:javascript复制

public void flush() {
  if (unChecked) {
    return;
  }

  if (shouldBeRefreshed()) {
    try {
      long start = System.currentTimeMillis();
      // 删除checkpoint
      deleteCheckpoint();
      // 创建checkpoint
      createCheckpoint();
      long checkPointEnd = System.currentTimeMillis();
      // 刷盘,将数据提交到leveldb当中
      refresh();
      flushCount = 0;
      logger.info("flush cost:{}, create checkpoint cost:{}, refresh cost:{}",
          System.currentTimeMillis() - start,
          checkPointEnd - start,
          System.currentTimeMillis() - checkPointEnd
      );
    } catch (TronDBException e) {
      logger.error(" Find fatal error , program will be exited soon", e);
      hitDown = true;
      LockSupport.unpark(exitThread);
    }
  }
}

创建checkpoint

创建checkpoint将内存中的数据放到临时存储CheckTmpStore中去。 可以看下CheckTmpStore的具体做了什么。

代码语言:javascript复制
private void createCheckpoint() {
  try {
    Map<WrappedByteArray, WrappedByteArray> batch = new HashMap<>();
    // 遍历内存数据,这里是所有未持久化的数据,以数据库为维度,如 account、block
    for (Chainbase db : dbs) {
      Snapshot head = db.getHead();
      // root是 leveldb 或 rocksdb 数据库本自,结构为:
      // snapshotRoot(leveldb)-->snapshot-->snapshot-->...-->snapshot
      // 如果到了root说明链表已经固化完了
      if (Snapshot.isRoot(head)) {
        return;
      }

      String dbName = db.getDbName();
      Snapshot next = head.getRoot();
      // 1 或 500
      // flush [block:1]
      // flush [block:1]--[block2]--[block n]--[block:500]
      for (int i = 0; i < flushCount;   i) {
        next = next.getNext();
        SnapshotImpl snapshot = (SnapshotImpl) next;
        DB<Key, Value> keyValueDB = snapshot.getDb();
        for (Map.Entry<Key, Value> e : keyValueDB) {
          Key k = e.getKey();
          Value v = e.getValue();
          batch.put(WrappedByteArray.of(Bytes.concat(simpleEncode(dbName), k.getBytes())),
              WrappedByteArray.of(v.encode()));
        }
      }
    }

    // 进行存储
    checkTmpStore.getDbSource().updateByBatch(batch.entrySet().stream()
            .map(e -> Maps.immutableEntry(e.getKey().getBytes(), e.getValue().getBytes()))
            .collect(HashMap::new, (m, k) -> m.put(k.getKey(), k.getValue()), HashMap::putAll),
        WriteOptionsWrapper.getInstance().sync(CommonParameter
            .getInstance().getStorage().isDbSync()));

  } catch ( Exception e) {
    throw new TronDBException(e);
  }
}

创建checkpoint就是这么点事,那么问题来了 创建checkpoint过程中如果程序挂了,数据会不会有问题? 数据丢失了怎么办? 机器宕机了怎么办?

创建checkpoint过程中如果程序挂了,数据会不会有问题?

如查程序挂了,数据没有写入到数据库中,只会丢失内存部分的数据库。原始数据没有受到影响,缺失的部分从其它节点同步后获得。

数据丢失了怎么办?

checkpoint创建成功后,如果服务挂了,重启后会先检查checkpoint中的数据,如果存在就加载到内存当中。所以这个机制本身就是用来防止写库时丢失内存数据的。

机器宕机了怎么办?

宕机数据有可能会丢失!!! 如果正好createCheckpoint进行调用checkTmpStore写到一半机器宕机了,那数据有可能只有了一半,就会出现脏数据。这样的话,下次服务启动后,数据有可能对不上,服务就会一直处理异常状态。

checkTmpStore

作用:存储内存快照(Snapshot)数据。 也就是把内存的Snapshot数据,存在这里。

检查checkpoint

java-tron服务启动时,检查checkpoint是否有数据,如果有数据,则将数据加载入内存当中。

启动流程:

代码语言:javascript复制
Manager.init()
    |---revokingStore.check();
           |---SnapshotManager.check()

SnapshotManager.check() 中check的就是checkpoint的状态。

代码语言:javascript复制
// ensure run this method first after process start.
@Override
public void check() {
  for (Chainbase db : dbs) {
    if (!Snapshot.isRoot(db.getHead())) {
      throw new IllegalStateException("first check.");
    }
  }

  // checkTmpStore 是临时存储库,会存储所有数据在内存中的状态
  // 检查 checkTmpStore 里是否有数据
  if (!checkTmpStore.getDbSource().allKeys().isEmpty()) {
    // 把所有数据库转成一个Map,这里的数据,是上一次刷盘之前的数据!!!
    Map<String, Chainbase> dbMap = dbs.stream()
        .map(db -> Maps.immutableEntry(db.getDbName(), db))
        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    // advance 就是字面意思,在内存中开辟一个新的 snapshot,这个是链表结构
    advance();
    for (Map.Entry<byte[], byte[]> e : checkTmpStore.getDbSource()) {
      byte[] key = e.getKey();
      byte[] value = e.getValue();
      // 遍历DB,db 是数据库名
      String db = simpleDecode(key);
      if (dbMap.get(db) == null) {
        continue;
      }
      byte[] realKey = Arrays.copyOfRange(key, db.getBytes().length   4, key.length);
      byte[] realValue = value.length == 1 ? null : Arrays.copyOfRange(value, 1, value.length);
      if (realValue != null) {
        // 如果checkTmpStore 里对应的值不为 null,就更新到数据库中
        dbMap.get(db).getHead().put(realKey, realValue);
      } else {
        // 为空则说明key对应的value值为空,也移除掉这个值
        // 如果key不存在,调用get方法返回就是null
        dbMap.get(db).getHead().remove(realKey);
      }
    }
    // 注意,这个是 merge 操作,就是合并的意思。意思是把前一块数据合并写到root中
    dbs.forEach(db -> db.getHead().getRoot().merge(db.getHead()));
    // 修改链表指针指向新一个head
    retreat();
  }

  // 标志自己已经check过了
  unChecked = false;
}

总结

checkPoint机制可以有效的防止服务挂掉时保存内存中的数据,但是宕机的场景的话确实没有办法,包括像RocketMQ号称高可用也没有办法解决宕机队列数据丢失的问题。 如果数据不一致,最好的方法是删除数据库,重新下载备分库使用。

0 人点赞