前言
分析tron
是如何接收到交易,并在接收到交易后,后续是如何处理的,交易处理细节可以看看:tron 交易处理--交易执行逻辑
接收交易
节点使用netty进行P2P连接,主要使用到的类:
- TransactionsMsgHandler: netty Handler处理器
- TronNetService: 消息分发
- AdvService: 消息广播
- FetchInvDataMsgHandler: 消息拉取
交易处理调用栈:
代码语言:javascript复制TronNetHandler.channelRead0 接收消息
--TronNetService.onMessage 分发消息
--transactionsMsgHandler.processMessage; 具体业务处理
TronNetService.onMessage 分发消息
代码语言:javascript复制protected void onMessage(PeerConnection peer, TronMessage msg) {
long startTime = System.currentTimeMillis();
try {
switch (msg.getType()) {
case SYNC_BLOCK_CHAIN:
syncBlockChainMsgHandler.processMessage(peer, msg);
break;
case BLOCK_CHAIN_INVENTORY:
chainInventoryMsgHandler.processMessage(peer, msg);
break;
case INVENTORY:
inventoryMsgHandler.processMessage(peer, msg);
break;
case FETCH_INV_DATA:
fetchInvDataMsgHandler.processMessage(peer, msg);
break;
case BLOCK:
blockMsgHandler.processMessage(peer, msg);
break;
case TRXS:
// 交易处理入口
transactionsMsgHandler.processMessage(peer, msg);
break;
case PBFT_COMMIT_MSG:
pbftDataSyncHandler.processMessage(peer, msg);
break;
default:
throw new P2pException(TypeEnum.NO_SUCH_MESSAGE, msg.getType().toString());
}
} catch (Exception e) {
processException(peer, msg, e);
} finally {
long costs = System.currentTimeMillis() - startTime;
if (costs > DURATION_STEP) {
logger.info("Message processing costs {} ms, peer: {}, type: {}, time tag: {}",
costs, peer.getInetAddress(), msg.getType(), getTimeTag(costs));
Metrics.histogramObserve(MetricKeys.Histogram.MESSAGE_PROCESS_LATENCY,
costs / Metrics.MILLISECONDS_PER_SECOND, msg.getType().name());
}
}
}
TransactionsMsgHandler
接收到的交易先放在线程池: trxHandlePool
再由trxHandlePool
调用handleTransaction
处理交易。
普通交易和智能合约的交易,处理还不一样。
先看下交易缓冲池:
代码语言:javascript复制// 无界队列
private BlockingQueue<Runnable> queue = new LinkedBlockingQueue();
// 工作线程数
private int threadNum = Args.getInstance().getValidateSignThreadNum();
private ExecutorService trxHandlePool = new ThreadPoolExecutor(threadNum, threadNum, 0L,
TimeUnit.MILLISECONDS, queue);
processMessage 消息处理入口
区分普通交易和合约交易,另外会统计队列大小
代码语言:javascript复制@Override
public void processMessage(PeerConnection peer, TronMessage msg) throws P2pException {
TransactionsMessage transactionsMessage = (TransactionsMessage) msg;
check(peer, transactionsMessage);
int smartContractQueueSize = 0;
int trxHandlePoolQueueSize = 0;
int dropSmartContractCount = 0;
// 遍历交易
for (Transaction trx : transactionsMessage.getTransactions().getTransactionsList()) {
int type = trx.getRawData().getContract(0).getType().getNumber();
// 合约类型交易
if (type == ContractType.TriggerSmartContract_VALUE
|| type == ContractType.CreateSmartContract_VALUE) {
// 合约类型交易没有直接执行,而是添加到了 smartContractQueue 队列当中
// 注意,这里用的是 !offer,也就是说插入失败了,超过限制
// MAX_TRX_SIZE = 50_000
if (!smartContractQueue.offer(new TrxEvent(peer, new TransactionMessage(trx)))) {
smartContractQueueSize = smartContractQueue.size();
// queue 是线程池的队列长度
trxHandlePoolQueueSize = queue.size();
dropSmartContractCount ;
}
// 没有 else 处理,那这笔交易就丢掉了!!!
} else {
// 普通交易
trxHandlePool.submit(() -> handleTransaction(peer, new TransactionMessage(trx)));
}
}
// 上面没有else处理,但是这里加了判断,会打印出队列长度
if (dropSmartContractCount > 0) {
logger.warn("Add smart contract failed, drop count: {}, queueSize {}:{}",
dropSmartContractCount, smartContractQueueSize, trxHandlePoolQueueSize);
}
}
智能合约处理 handleSmartContract
智能合约交易,会有单独的线程来处理:
代码语言:javascript复制private void handleSmartContract() {
// 这是个单线程的延时处理线程池
// 也就是智能合约的交易,20ms执行一次
smartContractExecutor.scheduleWithFixedDelay(() -> {
try {
// 限制 MAX_SMART_CONTRACT_SUBMIT_SIZE = 100
// 那 queue 里数据多了,还执行不了!!
// 也就是 queue 一定要先消费到 < MAX_SMART_CONTRACT_SUBMIT_SIZE
while (queue.size() < MAX_SMART_CONTRACT_SUBMIT_SIZE) {
TrxEvent event = smartContractQueue.take();
trxHandlePool.submit(() -> handleTransaction(event.getPeer(), event.getMsg()));
}
} catch (InterruptedException e) {
logger.warn("Handle smart server interrupted");
Thread.currentThread().interrupt();
} catch (Exception e) {
logger.error("Handle smart contract exception", e);
}
}, 1000, 20, TimeUnit.MILLISECONDS);
}
交易处理、广播 TransactionsMsgHandler.handleTransaction
调用栈
代码语言:javascript复制TransactionsMsgHandler.handleTransaction
--AdvService.broadcast: 广播服务
在这里可以看到,每个tron
节点在接到到交易到后:
- 先自己处理
- 再广播交易
广播也挺复杂,单独写个博客细扣。
代码语言:javascript复制private void handleTransaction(PeerConnection peer, TransactionMessage trx) {
if (peer.isDisconnect()) {
logger.warn("Drop trx {} from {}, peer is disconnect", trx.getMessageId(),
peer.getInetAddress());
return;
}
// 广播队列验重
if (advService.getMessage(new Item(trx.getMessageId(), InventoryType.TRX)) != null) {
return;
}
try {
tronNetDelegate.pushTransaction(trx.getTransactionCapsule());
// 广播交易
advService.broadcast(trx);
} catch (P2pException e) {
logger.warn("Trx {} from peer {} process failed. type: {}, reason: {}",
trx.getMessageId(), peer.getInetAddress(), e.getType(), e.getMessage());
// 如果是 BAD_TRX 断开连接
if (e.getType().equals(TypeEnum.BAD_TRX)) {
peer.disconnect(ReasonCode.BAD_TX);
}
} catch (Exception e) {
logger.error("Trx {} from peer {} process failed", trx.getMessageId(), peer.getInetAddress(),
e);
}
}
广播数据 AdvService.broadcast
首先要明确一个点:广播过去的,并示是交易,而是交易ID!!
广播的方式并不是把交易直接广播到其它节点,而是广播ID,然后其它节点到这个节点来拉取交易信息!!
广播缓存,使用guave cache
,最老淘汰机制,如果超过MAX_TRX_CACHE_SIZE
大小则老数据会丢弃,已经验证过这个场景,不过一般超达不到这个限制,只有在极端测试环境下能达到。
数据也就保留1H,也就是超时就丢弃。
重要成员变量
代码语言:javascript复制private final int MAX_TRX_CACHE_SIZE = 50_000;
// 广播缓存,MAX_TRX_CACHE_SIZE
// 提供缓存供外部获取、验重等作用
private Cache<Item, Message> trxCache = CacheBuilder.newBuilder()
.maximumSize(MAX_TRX_CACHE_SIZE).expireAfterWrite(1, TimeUnit.HOURS)
.recordStats().build();
// invToSpread 最大限制
private final int MAX_SPREAD_SIZE = 1_000
// 待发送队列
private ConcurrentHashMap<Item, Long> invToSpread = new ConcurrentHashMap<>();
广播逻辑
可以广播block
和transaction
数据。
public void broadcast(Message msg) {
if (fastForward) {
return;
}
// 校验交易缓存大小,这里会限制,不过一般超不过这个限制,可以适当调大或调小
if (invToSpread.size() > MAX_SPREAD_SIZE) {
logger.warn("Drop message, type: {}, ID: {}", msg.getType(), msg.getMessageId());
return;
}
Item item;
if (msg instanceof BlockMessage) {
BlockMessage blockMsg = (BlockMessage) msg;
item = new Item(blockMsg.getMessageId(), InventoryType.BLOCK);
logger.info("Ready to broadcast block {}", blockMsg.getBlockId().getString());
blockMsg.getBlockCapsule().getTransactions().forEach(transactionCapsule -> {
Sha256Hash tid = transactionCapsule.getTransactionId();
invToSpread.remove(tid);
trxCache.put(new Item(tid, InventoryType.TRX),
new TransactionMessage(transactionCapsule.getInstance()));
});
blockCache.put(item, msg);
} else if (msg instanceof TransactionMessage) {
TransactionMessage trxMsg = (TransactionMessage) msg;
// 注意,trxMsg.getMessageId() 是交易id: transactionCapsule.getTransactionId()
// 也就是这里构建了一条广播消息的Item,包含了:交易ID、交易类型 TRX
item = new Item(trxMsg.getMessageId(), InventoryType.TRX);
trxCount.add();
trxCache.put(item, new TransactionMessage(trxMsg.getTransactionCapsule().getInstance()));
} else {
logger.error("Adv item is neither block nor trx, type: {}", msg.getType());
return;
}
invToSpread.put(item, System.currentTimeMillis());
if (InventoryType.BLOCK.equals(item.getType())) {
consumerInvToSpread();
}
}
拉取数据 FetchInvDataMsgHandler
假设上面的的交易通过节点A广播到了节点B,节点B收到消息后,就会来拉取直正的交易数据。
B 节点会发送 FETCH_INV_DATA
类型消息来A节点获数据。
核心方法在:FetchInvDataMsgHandler.processMessage
for (Sha256Hash hash : fetchInvDataMsg.getHashList()) {
Item item = new Item(hash, type);
// 遍历 advService 的缓存数据,getMessage 中包含之前已发送的数据
Message message = advService.getMessage(item);
if (message == null) {
try {
// type: block、trx
message = tronNetDelegate.getData(hash, type);
} catch (Exception e) {
throw new P2pException(TypeEnum.DB_ITEM_NOT_FOUND,
"Fetch item " item " failed. reason: " e.getMessage());
}
}
···
}
FetchInvDataMsgHandler.processMessage
从AdvService
的trxCache
中拉取之前缓存的数据,这样就完成了一个广播到获取数据的流程。
AdvService.getMessage
public Message getMessage(Item item) {
if (item.getType() == InventoryType.TRX) {
return trxCache.getIfPresent(item);
} else {
return blockCache.getIfPresent(item);
}
}
发送 consumerInvToSpread
发送数据由: consumerInvToSpread
方法执行,通过:
spreadExecutor
定时执行broadcast
中判断类型为InventoryType.BLOCK
则立即发送
private synchronized void consumerInvToSpread() {
List<PeerConnection> peers = tronNetDelegate.getActivePeer().stream()
.filter(peer -> !peer.isNeedSyncFromPeer() && !peer.isNeedSyncFromUs())
.collect(Collectors.toList());
if (invToSpread.isEmpty() || peers.isEmpty()) {
return;
}
InvSender invSender = new InvSender();
invToSpread.forEach((item, time) -> peers.forEach(peer -> {
if (peer.getAdvInvReceive().getIfPresent(item) == null
&& peer.getAdvInvSpread().getIfPresent(item) == null
&& !(item.getType().equals(InventoryType.BLOCK)
&& System.currentTimeMillis() - time > BLOCK_PRODUCED_INTERVAL)) {
peer.getAdvInvSpread().put(item, Time.getCurrentMillis());
invSender.add(item, peer);
}
// 移除本次发送的数据,这样才不会越来越大
invToSpread.remove(item);
}));
// 发送
invSender.sendInv();
}
Transaction 交易结构
tron
链使用protobuf
进行序列化和反序列化,观察一下Transaction
的结构:
message Transaction {
message Contract {
enum ContractType {
AccountCreateContract = 0;
// 普通交易
TransferContract = 1;
// TRC10资产交易
TransferAssetContract = 2;
VoteAssetContract = 3;
VoteWitnessContract = 4;
WitnessCreateContract = 5;
AssetIssueContract = 6;
// 7 呢?
WitnessUpdateContract = 8;
ParticipateAssetIssueContract = 9;
AccountUpdateContract = 10;
// 冻结
FreezeBalanceContract = 11;
// 解冻
UnfreezeBalanceContract = 12;
// 提取奖励
WithdrawBalanceContract = 13;
UnfreezeAssetContract = 14;
UpdateAssetContract = 15;
ProposalCreateContract = 16;
ProposalApproveContract = 17;
ProposalDeleteContract = 18;
SetAccountIdContract = 19;
CustomContract = 20;
CreateSmartContract = 30;
TriggerSmartContract = 31;
GetContract = 32;
UpdateSettingContract = 33;
ExchangeCreateContract = 41;
ExchangeInjectContract = 42;
ExchangeWithdrawContract = 43;
ExchangeTransactionContract = 44;
UpdateEnergyLimitContract = 45;
AccountPermissionUpdateContract = 46;
ClearABIContract = 48;
UpdateBrokerageContract = 49;
ShieldedTransferContract = 51;
MarketSellAssetContract = 52;
MarketCancelOrderContract = 53;
}
ContractType type = 1;
google.protobuf.Any parameter = 2;
bytes provider = 3;
bytes ContractName = 4;
int32 Permission_id = 5;
}
message Result {
enum code {
SUCESS = 0;
FAILED = 1;
}
enum contractResult {
DEFAULT = 0;
SUCCESS = 1;
REVERT = 2;
BAD_JUMP_DESTINATION = 3;
OUT_OF_MEMORY = 4;
PRECOMPILED_CONTRACT = 5;
STACK_TOO_SMALL = 6;
STACK_TOO_LARGE = 7;
ILLEGAL_OPERATION = 8;
STACK_OVERFLOW = 9;
OUT_OF_ENERGY = 10;
OUT_OF_TIME = 11;
JVM_STACK_OVER_FLOW = 12;
UNKNOWN = 13;
TRANSFER_FAILED = 14;
INVALID_CODE = 15;
}
int64 fee = 1;
code ret = 2;
contractResult contractRet = 3;
string assetIssueID = 14;
int64 withdraw_amount = 15;
int64 unfreeze_amount = 16;
int64 exchange_received_amount = 18;
int64 exchange_inject_another_amount = 19;
int64 exchange_withdraw_another_amount = 20;
int64 exchange_id = 21;
int64 shielded_transaction_fee = 22;
bytes orderId = 25;
repeated MarketOrderDetail orderDetails = 26;
}
message raw {
bytes ref_block_bytes = 1;
int64 ref_block_num = 3;
bytes ref_block_hash = 4;
int64 expiration = 8;
repeated authority auths = 9;
// data not used
bytes data = 10;
//only support size = 1, repeated list here for extension
repeated Contract contract = 11;
// scripts not used
bytes scripts = 12;
int64 timestamp = 14;
int64 fee_limit = 18;
}
raw raw_data = 1;
// only support size = 1, repeated list here for muti-sig extension
repeated bytes signature = 2;
repeated Result ret = 5;
}
交易广播播代码:
TronNetService.java
AdvService.java
总结
了解这块代码的意义在于知道交易是怎么接收、处理、广播的,了解交易在所以节点之间的处理、流转。