tron-网络模型-AdvService广播服务

2023-10-23 14:51:08 浏览数 (1)

AdvService 作用

AdvService 负责将数据广播到tron网络当中。 基础框架是netty,在此之上开发AdvService对业务进行了封装。

数据包括:

  1. 交易
  2. 区块

需要注意的是,tronAdvService的这套广播逻辑,不是单向广播,而是双向互动。 啥意思,就是说,一般理解,一条数据广播出去后,就广播到对方节上了。 但是tron的广播不是这样,而是先广播一个交易ID到目录节点上,目标节点收到ID后,再发一条请求接取的网络请求,把数据接回去!!!! 是不是有点反直觉!!!

AdvService 主要成员

invToFetch: invToSpread: 待广播的数据:交易、区块 invToFetchCache:

主要方法

consumerInvToSpread: 处理发送队列 consumerInvToFetch: 处理拉取队列 broadcast: 广播

处理流程

  1. broadcast: 构建广播消息体:只包含ID
    1. 将数据添加入trxCache/blockCache
    2. 封装item
    3. 保存待发送消息: invToSpread.put(item)
  2. consumerInvToSpread: 处理 invToSpread

广播 id

需要发送的数据如:交易区块,通过调用AdvService.broadcastid广播。 但是广播并不是一调用broadcast就发送出去的,还需要在各个队列中导来导去好几次。 调用栈

代码语言:javascript复制
broadcast()
--consumerInvToSpread()
   --invSender.sendInv();
       --peer.sendMessage(new InventoryMessage(value, key));
          --msgQueue.sendMessage(message);
             --requestQueue.add(new MessageRoundTrip(msg));

步骤:

  1. 判断block/trx blockCache //接收到的数据进缓存 trxCache //接收到的数据进缓存
  2. 将数据装成 item
  3. invToSpread.put(item)
  4. consumerInvToSpread //处理 invToSpread 的中数据

具体实现:

代码语言:javascript复制
public void broadcast(Message msg) {
  // 如果是 fastForward 节点不广播
  if (fastForward) {
    return;
  }

  // MAX_SPREAD_SIZE = 1_000
  if (invToSpread.size() > MAX_SPREAD_SIZE) {
    logger.warn("Drop message, type: {}, ID: {}", msg.getType(), msg.getMessageId());
    return;
  }

  Item item;
  if (msg instanceof BlockMessage) {
    BlockMessage blockMsg = (BlockMessage) msg;
    // 注意这里,是id,而不是区块本身
    item = new Item(blockMsg.getMessageId(), InventoryType.BLOCK);
    logger.info("Ready to broadcast block {}", blockMsg.getBlockId().getString());
    // 把Block中的交易放到trxCache
    blockMsg.getBlockCapsule().getTransactions().forEach(transactionCapsule -> {
      Sha256Hash tid = transactionCapsule.getTransactionId();
      invToSpread.remove(tid);
      trxCache.put(new Item(tid, InventoryType.TRX),
          new TransactionMessage(transactionCapsule.getInstance()));
    });
    blockCache.put(item, msg);
  } else if (msg instanceof TransactionMessage) {
    TransactionMessage trxMsg = (TransactionMessage) msg;
    // getMessageId 是交易id
    item = new Item(trxMsg.getMessageId(), InventoryType.TRX);
    trxCount.add();
    trxCache.put(item, new TransactionMessage(trxMsg.getTransactionCapsule().getInstance()));
  } else {
    logger.error("Adv item is neither block nor trx, type: {}", msg.getType());
    return;
  }

  invToSpread.put(item, System.currentTimeMillis());

  if (InventoryType.BLOCK.equals(item.getType())) {
    consumerInvToSpread();
  }
}

@Override
public Sha256Hash getMessageId() {
  return this.transactionCapsule.getTransactionId();
}

@Override
public Sha256Hash getMessageId() {
  return getBlockCapsule().getBlockId();
}

consumerInvToSpread 中处理

处理过程:

  1. 拿到所有活跃PeerConnection
  2. 创建InvSender
  3. 把数据copy到 peer 的AdvInvSpread
代码语言:javascript复制
private synchronized void consumerInvToSpread() {
  // 判断 peer 状态,这段代码不能忽略
  // peer.isNeedSyncFromPeer() 表示这个peer的状态,正在同步,区块状态不完整,不完整的不要广播给它
  // peer.isNeedSyncFromUs() 表示peer,正在给别的节点同步数据:
  //   区块还不完整,那么链的状态也是不完整的,广播过去的交易,它们也处理不了
  List<PeerConnection> peers = tronNetDelegate.getActivePeer().stream()
      .filter(peer -> !peer.isNeedSyncFromPeer() && !peer.isNeedSyncFromUs())
      .collect(Collectors.toList());

  if (invToSpread.isEmpty() || peers.isEmpty()) {
    return;
  }

  InvSender invSender = new InvSender();

  invToSpread.forEach((item, time) -> peers.forEach(peer -> {
    if (peer.getAdvInvReceive().getIfPresent(item) == null
        && peer.getAdvInvSpread().getIfPresent(item) == null
        && !(item.getType().equals(InventoryType.BLOCK)       // 如果是Block 且 超过3秒就不广播了
        && System.currentTimeMillis() - time > BLOCK_PRODUCED_INTERVAL)) {
      // 把交易塞到 peer 的 AdvInvSpread 队列
      peer.getAdvInvSpread().put(item, Time.getCurrentMillis());
      invSender.add(item, peer);
    }
    // 移除提交易
    invToSpread.remove(item);
  }));

  invSender.sendInv();
}

//TODO

0 人点赞