tron 接收交易和广播交易

2023-10-20 11:09:58 浏览数 (1)

前言

分析tron是如何接收到交易,并在接收到交易后,后续是如何处理的,交易处理细节可以看看:tron 交易处理--交易执行逻辑

接收交易

节点使用netty进行P2P连接,主要使用到的类:

  1. TransactionsMsgHandler: netty Handler处理器
  2. TronNetService: 消息分发
  3. AdvService: 消息广播
  4. FetchInvDataMsgHandler: 消息拉取

交易处理调用栈:

代码语言:javascript复制
TronNetHandler.channelRead0 接收消息
--TronNetService.onMessage 分发消息
   --transactionsMsgHandler.processMessage; 具体业务处理

TronNetService.onMessage 分发消息

代码语言:javascript复制
protected void onMessage(PeerConnection peer, TronMessage msg) {
  long startTime = System.currentTimeMillis();
  try {
    switch (msg.getType()) {
      case SYNC_BLOCK_CHAIN:
        syncBlockChainMsgHandler.processMessage(peer, msg);
        break;
      case BLOCK_CHAIN_INVENTORY:
        chainInventoryMsgHandler.processMessage(peer, msg);
        break;
      case INVENTORY:
        inventoryMsgHandler.processMessage(peer, msg);
        break;
      case FETCH_INV_DATA:
        fetchInvDataMsgHandler.processMessage(peer, msg);
        break;
      case BLOCK:
        blockMsgHandler.processMessage(peer, msg);
        break;
      case TRXS:
        // 交易处理入口
        transactionsMsgHandler.processMessage(peer, msg);
        break;
      case PBFT_COMMIT_MSG:
        pbftDataSyncHandler.processMessage(peer, msg);
        break;
      default:
        throw new P2pException(TypeEnum.NO_SUCH_MESSAGE, msg.getType().toString());
    }
  } catch (Exception e) {
    processException(peer, msg, e);
  } finally {
    long costs = System.currentTimeMillis() - startTime;
    if (costs > DURATION_STEP) {
      logger.info("Message processing costs {} ms, peer: {}, type: {}, time tag: {}",
          costs, peer.getInetAddress(), msg.getType(), getTimeTag(costs));
      Metrics.histogramObserve(MetricKeys.Histogram.MESSAGE_PROCESS_LATENCY,
          costs / Metrics.MILLISECONDS_PER_SECOND, msg.getType().name());
    }
  }
}

TransactionsMsgHandler

接收到的交易先放在线程池: trxHandlePool

再由trxHandlePool调用handleTransaction处理交易。

普通交易智能合约的交易,处理还不一样。

先看下交易缓冲池:

代码语言:javascript复制
// 无界队列
private BlockingQueue<Runnable> queue = new LinkedBlockingQueue();
// 工作线程数
private int threadNum = Args.getInstance().getValidateSignThreadNum();
private ExecutorService trxHandlePool = new ThreadPoolExecutor(threadNum, threadNum, 0L,
    TimeUnit.MILLISECONDS, queue);

processMessage 消息处理入口

区分普通交易和合约交易,另外会统计队列大小

代码语言:javascript复制
@Override
public void processMessage(PeerConnection peer, TronMessage msg) throws P2pException {
  TransactionsMessage transactionsMessage = (TransactionsMessage) msg;
  check(peer, transactionsMessage);
  int smartContractQueueSize = 0;
  int trxHandlePoolQueueSize = 0;
  int dropSmartContractCount = 0;

  // 遍历交易
  for (Transaction trx : transactionsMessage.getTransactions().getTransactionsList()) {
    int type = trx.getRawData().getContract(0).getType().getNumber();
    // 合约类型交易
    if (type == ContractType.TriggerSmartContract_VALUE
        || type == ContractType.CreateSmartContract_VALUE) {
      // 合约类型交易没有直接执行,而是添加到了 smartContractQueue 队列当中
      // 注意,这里用的是 !offer,也就是说插入失败了,超过限制
      // MAX_TRX_SIZE = 50_000
      if (!smartContractQueue.offer(new TrxEvent(peer, new TransactionMessage(trx)))) {
        smartContractQueueSize = smartContractQueue.size();
        // queue 是线程池的队列长度
        trxHandlePoolQueueSize = queue.size();
        dropSmartContractCount  ;
      }
      // 没有 else 处理,那这笔交易就丢掉了!!!
    } else {
    // 普通交易
      trxHandlePool.submit(() -> handleTransaction(peer, new TransactionMessage(trx)));
    }
  }

  // 上面没有else处理,但是这里加了判断,会打印出队列长度
  if (dropSmartContractCount > 0) {
    logger.warn("Add smart contract failed, drop count: {}, queueSize {}:{}",
        dropSmartContractCount, smartContractQueueSize, trxHandlePoolQueueSize);
  }
}

智能合约处理 handleSmartContract

智能合约交易,会有单独的线程来处理:

代码语言:javascript复制
private void handleSmartContract() {
  // 这是个单线程的延时处理线程池
  // 也就是智能合约的交易,20ms执行一次
  smartContractExecutor.scheduleWithFixedDelay(() -> {
    try {
      // 限制 MAX_SMART_CONTRACT_SUBMIT_SIZE = 100
      // 那 queue 里数据多了,还执行不了!!
      // 也就是 queue 一定要先消费到 < MAX_SMART_CONTRACT_SUBMIT_SIZE
      while (queue.size() < MAX_SMART_CONTRACT_SUBMIT_SIZE) {
        TrxEvent event = smartContractQueue.take();
        trxHandlePool.submit(() -> handleTransaction(event.getPeer(), event.getMsg()));
      }
    } catch (InterruptedException e) {
      logger.warn("Handle smart server interrupted");
      Thread.currentThread().interrupt();
    } catch (Exception e) {
      logger.error("Handle smart contract exception", e);
    }
  }, 1000, 20, TimeUnit.MILLISECONDS);
}

交易处理、广播 TransactionsMsgHandler.handleTransaction

调用栈

代码语言:javascript复制
TransactionsMsgHandler.handleTransaction
--AdvService.broadcast: 广播服务

在这里可以看到,每个tron节点在接到到交易到后:

  1. 先自己处理
  2. 再广播交易

广播也挺复杂,单独写个博客细扣。

代码语言:javascript复制
private void handleTransaction(PeerConnection peer, TransactionMessage trx) {
  if (peer.isDisconnect()) {
    logger.warn("Drop trx {} from {}, peer is disconnect", trx.getMessageId(),
        peer.getInetAddress());
    return;
  }

  // 广播队列验重
  if (advService.getMessage(new Item(trx.getMessageId(), InventoryType.TRX)) != null) {
    return;
  }

  try {
    tronNetDelegate.pushTransaction(trx.getTransactionCapsule());
    // 广播交易
    advService.broadcast(trx);
  } catch (P2pException e) {
    logger.warn("Trx {} from peer {} process failed. type: {}, reason: {}",
        trx.getMessageId(), peer.getInetAddress(), e.getType(), e.getMessage());
    // 如果是 BAD_TRX 断开连接
    if (e.getType().equals(TypeEnum.BAD_TRX)) {
      peer.disconnect(ReasonCode.BAD_TX);
    }
  } catch (Exception e) {
    logger.error("Trx {} from peer {} process failed", trx.getMessageId(), peer.getInetAddress(),
        e);
  }
}

广播数据 AdvService.broadcast

首先要明确一个点:广播过去的,并示是交易,而是交易ID!!

广播的方式并不是把交易直接广播到其它节点,而是广播ID,然后其它节点到这个节点来拉取交易信息!!

广播缓存,使用guave cache,最老淘汰机制,如果超过MAX_TRX_CACHE_SIZE大小则老数据会丢弃,已经验证过这个场景,不过一般超达不到这个限制,只有在极端测试环境下能达到。

数据也就保留1H,也就是超时就丢弃。

重要成员变量

代码语言:javascript复制
private final int MAX_TRX_CACHE_SIZE = 50_000;
// 广播缓存,MAX_TRX_CACHE_SIZE
// 提供缓存供外部获取、验重等作用
private Cache<Item, Message> trxCache = CacheBuilder.newBuilder()
    .maximumSize(MAX_TRX_CACHE_SIZE).expireAfterWrite(1, TimeUnit.HOURS)
    .recordStats().build();

// invToSpread 最大限制
private final int MAX_SPREAD_SIZE = 1_000
// 待发送队列
private ConcurrentHashMap<Item, Long> invToSpread = new ConcurrentHashMap<>();

广播逻辑

可以广播blocktransaction数据。

代码语言:javascript复制
public void broadcast(Message msg) {

   if (fastForward) {
     return;
   }

   // 校验交易缓存大小,这里会限制,不过一般超不过这个限制,可以适当调大或调小
   if (invToSpread.size() > MAX_SPREAD_SIZE) {
     logger.warn("Drop message, type: {}, ID: {}", msg.getType(), msg.getMessageId());
     return;
   }

   Item item;
   if (msg instanceof BlockMessage) {
     BlockMessage blockMsg = (BlockMessage) msg;
     item = new Item(blockMsg.getMessageId(), InventoryType.BLOCK);
     logger.info("Ready to broadcast block {}", blockMsg.getBlockId().getString());
     blockMsg.getBlockCapsule().getTransactions().forEach(transactionCapsule -> {
       Sha256Hash tid = transactionCapsule.getTransactionId();
       invToSpread.remove(tid);
       trxCache.put(new Item(tid, InventoryType.TRX),
           new TransactionMessage(transactionCapsule.getInstance()));
     });
     blockCache.put(item, msg);
   } else if (msg instanceof TransactionMessage) {
     TransactionMessage trxMsg = (TransactionMessage) msg;
     // 注意,trxMsg.getMessageId() 是交易id: transactionCapsule.getTransactionId()
     // 也就是这里构建了一条广播消息的Item,包含了:交易ID、交易类型 TRX
     item = new Item(trxMsg.getMessageId(), InventoryType.TRX);
     trxCount.add();
     trxCache.put(item, new TransactionMessage(trxMsg.getTransactionCapsule().getInstance()));
   } else {
     logger.error("Adv item is neither block nor trx, type: {}", msg.getType());
     return;
   }

   invToSpread.put(item, System.currentTimeMillis());

   if (InventoryType.BLOCK.equals(item.getType())) {
     consumerInvToSpread();
   }
 }

拉取数据 FetchInvDataMsgHandler

假设上面的的交易通过节点A广播到了节点B,节点B收到消息后,就会来拉取直正的交易数据。

B 节点会发送 FETCH_INV_DATA 类型消息来A节点获数据。

核心方法在:FetchInvDataMsgHandler.processMessage

代码语言:javascript复制
for (Sha256Hash hash : fetchInvDataMsg.getHashList()) {
  Item item = new Item(hash, type);
  // 遍历 advService 的缓存数据,getMessage 中包含之前已发送的数据
  Message message = advService.getMessage(item);
  if (message == null) {
    try {
      // type: block、trx
      message = tronNetDelegate.getData(hash, type);
    } catch (Exception e) {
      throw new P2pException(TypeEnum.DB_ITEM_NOT_FOUND,
              "Fetch item "   item   " failed. reason: "   e.getMessage());
    }
  }
  ···
}

FetchInvDataMsgHandler.processMessageAdvServicetrxCache中拉取之前缓存的数据,这样就完成了一个广播到获取数据的流程。

AdvService.getMessage

代码语言:javascript复制
public Message getMessage(Item item) {
  if (item.getType() == InventoryType.TRX) {
    return trxCache.getIfPresent(item);
  } else {
    return blockCache.getIfPresent(item);
  }
}

发送 consumerInvToSpread

发送数据由: consumerInvToSpread 方法执行,通过:

  1. spreadExecutor 定时执行
  2. broadcast 中判断类型为InventoryType.BLOCK则立即发送
代码语言:javascript复制
private synchronized void consumerInvToSpread() {

  List<PeerConnection> peers = tronNetDelegate.getActivePeer().stream()
      .filter(peer -> !peer.isNeedSyncFromPeer() && !peer.isNeedSyncFromUs())
      .collect(Collectors.toList());

  if (invToSpread.isEmpty() || peers.isEmpty()) {
    return;
  }

  InvSender invSender = new InvSender();

  invToSpread.forEach((item, time) -> peers.forEach(peer -> {
    if (peer.getAdvInvReceive().getIfPresent(item) == null
        && peer.getAdvInvSpread().getIfPresent(item) == null
        && !(item.getType().equals(InventoryType.BLOCK)
        && System.currentTimeMillis() - time > BLOCK_PRODUCED_INTERVAL)) {
      peer.getAdvInvSpread().put(item, Time.getCurrentMillis());
      invSender.add(item, peer);
    }
    // 移除本次发送的数据,这样才不会越来越大
    invToSpread.remove(item);
  }));
  // 发送
  invSender.sendInv();
}

Transaction 交易结构

tron链使用protobuf进行序列化和反序列化,观察一下Transaction的结构:

代码语言:javascript复制
message Transaction {
  message Contract {
    enum ContractType {
      AccountCreateContract = 0;
      // 普通交易
      TransferContract = 1;
      // TRC10资产交易
      TransferAssetContract = 2;
      VoteAssetContract = 3;
      VoteWitnessContract = 4;
      WitnessCreateContract = 5;
      AssetIssueContract = 6;
      // 7 呢?
      WitnessUpdateContract = 8;
      ParticipateAssetIssueContract = 9;
      AccountUpdateContract = 10;
      // 冻结
      FreezeBalanceContract = 11;
      // 解冻
      UnfreezeBalanceContract = 12;
      // 提取奖励
      WithdrawBalanceContract = 13;
      UnfreezeAssetContract = 14;
      UpdateAssetContract = 15;
      ProposalCreateContract = 16;
      ProposalApproveContract = 17;
      ProposalDeleteContract = 18;
      SetAccountIdContract = 19;
      CustomContract = 20;
      CreateSmartContract = 30;
      TriggerSmartContract = 31;
      GetContract = 32;
      UpdateSettingContract = 33;
      ExchangeCreateContract = 41;
      ExchangeInjectContract = 42;
      ExchangeWithdrawContract = 43;
      ExchangeTransactionContract = 44;
      UpdateEnergyLimitContract = 45;
      AccountPermissionUpdateContract = 46;
      ClearABIContract = 48;
      UpdateBrokerageContract = 49;
      ShieldedTransferContract = 51;
      MarketSellAssetContract = 52;
      MarketCancelOrderContract = 53;
    }
    ContractType type = 1;
    google.protobuf.Any parameter = 2;
    bytes provider = 3;
    bytes ContractName = 4;
    int32 Permission_id = 5;
  }

  message Result {
    enum code {
      SUCESS = 0;
      FAILED = 1;
    }
    enum contractResult {
      DEFAULT = 0;
      SUCCESS = 1;
      REVERT = 2;
      BAD_JUMP_DESTINATION = 3;
      OUT_OF_MEMORY = 4;
      PRECOMPILED_CONTRACT = 5;
      STACK_TOO_SMALL = 6;
      STACK_TOO_LARGE = 7;
      ILLEGAL_OPERATION = 8;
      STACK_OVERFLOW = 9;
      OUT_OF_ENERGY = 10;
      OUT_OF_TIME = 11;
      JVM_STACK_OVER_FLOW = 12;
      UNKNOWN = 13;
      TRANSFER_FAILED = 14;
      INVALID_CODE = 15;
    }
    int64 fee = 1;
    code ret = 2;
    contractResult contractRet = 3;

    string assetIssueID = 14;
    int64 withdraw_amount = 15;
    int64 unfreeze_amount = 16;
    int64 exchange_received_amount = 18;
    int64 exchange_inject_another_amount = 19;
    int64 exchange_withdraw_another_amount = 20;
    int64 exchange_id = 21;
    int64 shielded_transaction_fee = 22;


    bytes orderId = 25;
    repeated MarketOrderDetail orderDetails = 26;
  }

  message raw {
    bytes ref_block_bytes = 1;
    int64 ref_block_num = 3;
    bytes ref_block_hash = 4;
    int64 expiration = 8;
    repeated authority auths = 9;
    // data not used
    bytes data = 10;
    //only support size = 1,  repeated list here for extension
    repeated Contract contract = 11;
    // scripts not used
    bytes scripts = 12;
    int64 timestamp = 14;
    int64 fee_limit = 18;
  }

  raw raw_data = 1;
  // only support size = 1,  repeated list here for muti-sig extension
  repeated bytes signature = 2;
  repeated Result ret = 5;
}

交易广播播代码:

TronNetService.java

AdvService.java

总结

了解这块代码的意义在于知道交易是怎么接收、处理、广播的,了解交易在所以节点之间的处理、流转。

0 人点赞