AdvService 作用
AdvService
负责将数据广播到tron
网络当中。
基础框架是netty
,在此之上开发AdvService
对业务进行了封装。
数据包括:
- 交易
- 区块
需要注意的是,tron
的AdvService
的这套广播逻辑,不是单向广播,而是双向互动。
啥意思,就是说,一般理解,一条数据广播出去后,就广播到对方节上了。
但是tron的广播不是这样,而是先广播一个交易ID到目录节点上,目标节点收到ID后,再发一条请求接取的网络请求,把数据接回去!!!!
是不是有点反直觉!!!
AdvService 主要成员
invToFetch: invToSpread: 待广播的数据:交易、区块 invToFetchCache:
主要方法
consumerInvToSpread: 处理发送队列 consumerInvToFetch: 处理拉取队列 broadcast: 广播
处理流程
- broadcast: 构建广播消息体:只包含ID
- 将数据添加入trxCache/blockCache
- 封装item
- 保存待发送消息: invToSpread.put(item)
- consumerInvToSpread: 处理 invToSpread
广播 id
需要发送的数据如:交易、区块,通过调用AdvService.broadcast
将id
广播。
但是广播并不是一调用broadcast
就发送出去的,还需要在各个队列中导来导去好几次。
调用栈
broadcast()
--consumerInvToSpread()
--invSender.sendInv();
--peer.sendMessage(new InventoryMessage(value, key));
--msgQueue.sendMessage(message);
--requestQueue.add(new MessageRoundTrip(msg));
步骤:
- 判断block/trx blockCache //接收到的数据进缓存 trxCache //接收到的数据进缓存
- 将数据装成 item
- invToSpread.put(item)
- consumerInvToSpread //处理 invToSpread 的中数据
具体实现:
代码语言:javascript复制public void broadcast(Message msg) {
// 如果是 fastForward 节点不广播
if (fastForward) {
return;
}
// MAX_SPREAD_SIZE = 1_000
if (invToSpread.size() > MAX_SPREAD_SIZE) {
logger.warn("Drop message, type: {}, ID: {}", msg.getType(), msg.getMessageId());
return;
}
Item item;
if (msg instanceof BlockMessage) {
BlockMessage blockMsg = (BlockMessage) msg;
// 注意这里,是id,而不是区块本身
item = new Item(blockMsg.getMessageId(), InventoryType.BLOCK);
logger.info("Ready to broadcast block {}", blockMsg.getBlockId().getString());
// 把Block中的交易放到trxCache
blockMsg.getBlockCapsule().getTransactions().forEach(transactionCapsule -> {
Sha256Hash tid = transactionCapsule.getTransactionId();
invToSpread.remove(tid);
trxCache.put(new Item(tid, InventoryType.TRX),
new TransactionMessage(transactionCapsule.getInstance()));
});
blockCache.put(item, msg);
} else if (msg instanceof TransactionMessage) {
TransactionMessage trxMsg = (TransactionMessage) msg;
// getMessageId 是交易id
item = new Item(trxMsg.getMessageId(), InventoryType.TRX);
trxCount.add();
trxCache.put(item, new TransactionMessage(trxMsg.getTransactionCapsule().getInstance()));
} else {
logger.error("Adv item is neither block nor trx, type: {}", msg.getType());
return;
}
invToSpread.put(item, System.currentTimeMillis());
if (InventoryType.BLOCK.equals(item.getType())) {
consumerInvToSpread();
}
}
@Override
public Sha256Hash getMessageId() {
return this.transactionCapsule.getTransactionId();
}
@Override
public Sha256Hash getMessageId() {
return getBlockCapsule().getBlockId();
}
consumerInvToSpread 中处理
处理过程:
- 拿到所有活跃PeerConnection
- 创建InvSender
- 把数据copy到 peer 的AdvInvSpread
private synchronized void consumerInvToSpread() {
// 判断 peer 状态,这段代码不能忽略
// peer.isNeedSyncFromPeer() 表示这个peer的状态,正在同步,区块状态不完整,不完整的不要广播给它
// peer.isNeedSyncFromUs() 表示peer,正在给别的节点同步数据:
// 区块还不完整,那么链的状态也是不完整的,广播过去的交易,它们也处理不了
List<PeerConnection> peers = tronNetDelegate.getActivePeer().stream()
.filter(peer -> !peer.isNeedSyncFromPeer() && !peer.isNeedSyncFromUs())
.collect(Collectors.toList());
if (invToSpread.isEmpty() || peers.isEmpty()) {
return;
}
InvSender invSender = new InvSender();
invToSpread.forEach((item, time) -> peers.forEach(peer -> {
if (peer.getAdvInvReceive().getIfPresent(item) == null
&& peer.getAdvInvSpread().getIfPresent(item) == null
&& !(item.getType().equals(InventoryType.BLOCK) // 如果是Block 且 超过3秒就不广播了
&& System.currentTimeMillis() - time > BLOCK_PRODUCED_INTERVAL)) {
// 把交易塞到 peer 的 AdvInvSpread 队列
peer.getAdvInvSpread().put(item, Time.getCurrentMillis());
invSender.add(item, peer);
}
// 移除提交易
invToSpread.remove(item);
}));
invSender.sendInv();
}
//TODO