btcd p2p 网络分析
比特币依赖于对等网络来实现信息的共享与传输,网络中的每个节点即可以是客户端也可以是服务端,本篇文章基于比特币go版本btcd探索比特币对等网络的实现原理,整个实现从底层到上层可以分为地址,连接,节点三层,每层都有自己的功能与职责。下面逐一的分析这三个部分的构成与功能
地址管理
连接管理对象结构,其中重要的两个成员是addrNew和addTried,前者维护了1024个地址桶,每个桶的尺寸为64,地址经过一个散列算法放入到桶里面,保存的是已经添加尚未确认的连接,后者则维护了64个list,每个确定完好的连接hash散列后放到里面。
代码语言:javascript复制 type AddrManager struct {
mtx sync.Mutex
peersFile string
lookupFunc func(string) ([]net.IP, error)
rand *rand.Rand
key [32]byte
addrIndex map[string]*KnownAddress // address key to ka for all addrs.
addrNew [newBucketCount]map[string]*KnownAddress
addrTried [triedBucketCount]*list.List
started int32
shutdown int32
wg sync.WaitGroup
quit chan struct{}
nTried int
nNew int
lamtx sync.Mutex
localAddresses map[string]*localAddress
}
当通过AddLocalAddress函数添加一个新的地址的时候,这个地址会先加到addrNew里面,GetAddress会有一半的几率从addrNew里面随机选取一个地址上来尝试进行网络连接校验,如果检验完成,则会调用Good方法,将这个地址从New移动到tred里面。
GetAddress函数选择addrNew和选择addrTried的几率一半一半,选择addrTried用于更新老的可用性差的节点。for循环保证能获取到节点,chance()用于用于桶内位置调整,访问的越频繁这个值越大,选中的可能性越小,这是为了避免多次访问重复的节点。为了防止经过多轮次也无法选中地址,通过factor变量来控制,随着轮次的增加factor增加进而减少上面的限制,意思是实在是找不到没用过的点到用老的也行了。
代码语言:javascript复制 func (a *AddrManager) GetAddress() *KnownAddress {
//省略
// Use a 50% chance for choosing between tried and new table entries.
if a.nTried > 0 && (a.nNew == 0 || a.rand.Intn(2) == 0) {
// Tried entry.
large := 1 << 30
factor := 1.0
for {
// pick a random bucket.
bucket := a.rand.Intn(len(a.addrTried))
if a.addrTried[bucket].Len() == 0 {
continue
}
randval := a.rand.Intn(large)
//控制几率避免段时间访问重复节点
if float64(randval) < (factor * ka.chance() * float64(large)) {
log.Tracef("Selected %v from tried bucket",
NetAddressKey(ka.na))
return ka
}
factor *= 1.2
}
} else {
// new node.
// XXX use a closure/function to avoid repeating this.
large := 1 << 30
factor := 1.0
for {
// Pick a random bucket.
bucket := a.rand.Intn(len(a.addrNew))
if len(a.addrNew[bucket]) == 0 {
continue
}
// Then, a random entry in it.
var ka *KnownAddress
nth := a.rand.Intn(len(a.addrNew[bucket]))
for _, value := range a.addrNew[bucket] {
if nth == 0 {
ka = value
}
nth--
}
randval := a.rand.Intn(large)
if float64(randval) < (factor * ka.chance() * float64(large)) {
log.Tracef("Selected %v from new bucket",
NetAddressKey(ka.na))
return ka
}
factor *= 1.2
}
}
}
连接管理
连接主要控制地址的可达性,通过一个状态即来控制连接的生命周期,每个连接都有如下五种状态,
代码语言:javascript复制 ConnPending ConnState = iota
ConnFailing
ConnCanceled
ConnEstablished
ConnDisconnected
函数connHandler监控几个chanel()的变化来对应处理连接的状态。
1)ConnPending 通过NewConn函数创建的对象进入该分支处理,将这个新的连接状态标记为ConnPending,并把连接挂到挂起连接里面
代码语言:javascript复制 case registerPending:
connReq := msg.c
connReq.updateState(ConnPending)
pending[msg.c.id] = connReq
close(msg.done)
2)ConnFailing 当Connect函数确认连接出现错误的时候会进入handfail分支标记连接错误
代码语言:javascript复制 case handleFailed:
connReq := msg.c
//一种特殊情况,当存在的peer过少,为了保证系统可用会把这个操作当作失败的情况处理
if _, ok := pending[connReq.id]; !ok {
log.Debugf("Ignoring connection for "
"canceled conn req: %v", connReq)
continue
}
connReq.updateState(ConnFailing)
log.Debugf("Failed to connect to %v: %v",
connReq, msg.err)
cm.handleFailedConn(connReq)
3)ConnCanceled 当用户主动断开连接且该连接尚未完全建立的时候视为ConnCanceled状态
代码语言:javascript复制 case handleDisconnected:
connReq, ok := conns[msg.id]
if !ok {
connReq, ok = pending[msg.id]
if !ok {
log.Errorf("Unknown connid=%d",
msg.id)
continue
}
// Pending connection was found, remove
// it from pending map if we should
// ignore a later, successful
// connection.
connReq.updateState(ConnCanceled)
log.Debugf("Canceling: %v", connReq)
delete(pending, msg.id)
continue
}
4)ConnEstablished 连接通过函数Connect确认建立成功后变成ConnEstablished
代码语言:javascript复制 case handleConnected:
connReq := msg.c
if _, ok := pending[connReq.id]; !ok {
if msg.conn != nil {
msg.conn.Close()
}
log.Debugf("Ignoring connection for "
"canceled connreq=%v", connReq)
continue
}
connReq.updateState(ConnEstablished)
connReq.conn = msg.conn
conns[connReq.id] = connReq
log.Debugf("Connected to %v", connReq)
connReq.retryCount = 0
cm.failedAttempts = 0
delete(pending, connReq.id)
if cm.cfg.OnConnection != nil {
go cm.cfg.OnConnection(connReq, msg.conn)
}
5)ConnDisconnected 用户主动断开已经建立好的连接,该连接状态会变成ConnDisconnected
代码语言:javascript复制 case handleDisconnected:
if connReq.conn != nil {
connReq.conn.Close()
}
if cm.cfg.OnDisconnection != nil {
go cm.cfg.OnDisconnection(connReq)
}
// All internal state has been cleaned up, if
// this connection is being removed, we will
// make no further attempts with this request.
if !msg.retry {
connReq.updateState(ConnDisconnected)
continue
}
连接管理里面还附带了一个用于考评连接质量的估分函数,该数值累加函数如下
代码语言:javascript复制 func (s *DynamicBanScore) increase(persistent, transient uint32, t time.Time) uint32 {
s.persistent = persistent
tu := t.Unix()
dt := tu - s.lastUnix
if transient > 0 {
if Lifetime < dt {
s.transient = 0
} else if s.transient > 1 && dt > 0 {
s.transient *= decayFactor(dt)
}
s.transient = float64(transient)
s.lastUnix = tu
}
return s.persistent uint32(s.transient)
}
参数和具体的操作有关系,大致来说会随着操作不断累加,操作越多增长的越快,当超过一定阈值后,该peer会休息一段时间,主要是为了防止恶意流量攻击。调用increase的位置都是可能出现大流量的位置(GetData,MemPool...)。
协议
协议层定义了网络消息的读写格式与应答方式,该协议定义了如下的消息类型.
代码语言:javascript复制 CmdVersion = "version" 版本
CmdVerAck = "verack" 联通
CmdGetAddr = "getaddr" 获取地址
CmdAddr = "addr" 发送地址
CmdGetBlocks = "getblocks" 获取区块
CmdInv = "inv" 发送inv(交易/区块)
CmdGetData = "getdata" 发送区块数据
CmdNotFound = "notfound"
CmdBlock = "block" 发送区块
CmdTx = "tx" 发送交易
CmdGetHeaders = "getheaders"获取区块头
CmdHeaders = "headers" 发送区块头
CmdPing = "ping"
CmdPong = "pong" 和ping成对使用 维护连接
CmdAlert = "alert" 无用
CmdMemPool = "mempool" 交易池(代码上看没有用处)
CmdFilterAdd = "filteradd"
CmdFilterClear = "filterclear"
CmdFilterLoad = "filterload"
CmdMerkleBlock = "merkleblock"
CmdReject = "reject"
CmdSendHeaders = "sendheaders"
CmdFeeFilter = "feefilter"
CmdGetCFilters = "getcfilters"
CmdGetCFHeaders = "getcfheaders"
CmdGetCFCheckpt = "getcfcheckpt"
CmdCFilter = "cfilter"
CmdCFHeaders = "cfheaders"
CmdCFCheckpt = "cfcheckpt"
inHandler
负责收消息,收到消息解析具体的消息类型,在调用一个更加具体的函数来处理这些消息,这个具体的函数常常是通过配置从上层传递下来的.
代码语言:javascript复制 for atomic.LoadInt32(&p.disconnect) == 0 {
rmsg, buf, err := p.readMessage(p.wireEncoding)
idleTimer.Stop()
//略
atomic.StoreInt64(&p.lastRecv, time.Now().Unix())
p.stallControl <- stallControlMsg{sccReceiveMessage, rmsg}
// Handle each supported message type.
p.stallControl <- stallControlMsg{sccHandlerStart, rmsg}
switch msg := rmsg.(type) {
case *wire.MsgVersion:
// Limit to one version message per peer.
p.PushRejectMsg(msg.Command(), wire.RejectDuplicate,
"duplicate version message", nil, true)
break out
case *wire.MsgVerAck:
// No read lock is necessary because verAckReceived is not written
// to in any other goroutine.
if p.verAckReceived {
log.Infof("Already received 'verack' from peer %v -- "
"disconnecting", p)
break out
}
p.flagsMtx.Lock()
p.verAckReceived = true
p.flagsMtx.Unlock()
if p.cfg.Listeners.OnVerAck != nil {
p.cfg.Listeners.OnVerAck(p, msg)
}
///略
case *wire.MsgSendHeaders:
p.flagsMtx.Lock()
p.sendHeadersPreferred = true
p.flagsMtx.Unlock()
if p.cfg.Listeners.OnSendHeaders != nil {
p.cfg.Listeners.OnSendHeaders(p, msg)
}
default:
log.Debugf("Received unhandled message of type %v "
"from %v", rmsg.Command(), p)
}
p.stallControl <- stallControlMsg{sccHandlerDone, rmsg}
// A message was received so reset the idle timer.
idleTimer.Reset(idleTimeout)
}
outHandler
通过for select监听sendQueue,如果有新消息进来调用writeMessage发送消息
序列化,反序列化
每条消息都是由消息头和消息体构成。每个消息都实现有一个Message接口,
代码语言:javascript复制 type Message interface {
BtcDecode(io.Reader, uint32, MessageEncoding) error
BtcEncode(io.Writer, uint32, MessageEncoding) error
Command() string
MaxPayloadLength(uint32) uint32
}
序列化的过程中先调用BtcDecode方法序列化一个二进制消息体,这里逐个写入消息字段,有兴趣的看wire文件夹下面的每个消息的BtcDecode消息的具体实现。然后双重hash该消息体取其前四位作为消息体校验位,在把网络号,消息名,消息长度,校验位置合起来构成一个消息头。反序列化则是个相反的过程
代码语言:javascript复制 err := msg.BtcEncode(&bw, pver, encoding)
if err != nil {
return totalBytes, err
}
payload := bw.Bytes()
lenp := len(payload)
//略
// Create header for the message.
hdr := messageHeader{}
hdr.magic = btcnet
hdr.command = cmd
hdr.length = uint32(lenp)
copy(hdr.checksum[:], chainhash.DoubleHashB(payload)[0:4])
交易控制
在实际应用中节点之间互相广播交易会占用很多流量,为了提高网路性能,节点会控制交易的重发,具体实现为每个peer带有一个knownInventory对象,记录已经发送过的交易Id,发送的时候会检查是否发过,发过的不会在重发发送.这个机制有时候会带来些奇怪的问题.之前在部署网络时候遇到个奇怪的现象就是矿机断电重启后,有些交易需要很久才能打包,原因则在于该机器直接相连的机器认为已经发送过交易,不会在二次重发。
代码语言:javascript复制 for e := invSendQueue.Front(); e != nil; e = invSendQueue.Front() {
iv := invSendQueue.Remove(e).(*wire.InvVect)
// Don't send inventory that became known after
// the initial check.
if p.knownInventory.Exists(iv) {
continue
}
invMsg.AddInvVect(iv)
if len(invMsg.InvList) >= maxInvTrickleSize {
waiting = queuePacket(
outMsg{msg: invMsg},
pendingMsgs, waiting)
invMsg = wire.NewMsgInvSizeHint(uint(invSendQueue.Len()))
}
// Add the inventory that is being relayed to
// the known inventory for the peer.
p.AddKnownInventory(iv)
}
peer管理
peer管理主要负责peer节点的维护,消息的应答方式等
peer维护
peerHandler中for select 监听几个chanel,每个chanel对应几种操作,增删改查之类的,即在系统内部使用也提供外部rpc功能.
代码语言:javascript复制 for {
select {
// New peers connected to the server.
case p := <-s.newPeers:
s.handleAddPeerMsg(state, p)
// Disconnected peers.
case p := <-s.donePeers:
s.handleDonePeerMsg(state, p)
// Block accepted in mainchain or orphan, update peer height.
case umsg := <-s.peerHeightsUpdate:
s.handleUpdatePeerHeights(state, umsg)
// Peer to ban.
case p := <-s.banPeers:
s.handleBanPeerMsg(state, p)
// New inventory to potentially be relayed to other peers.
case invMsg := <-s.relayInv:
s.handleRelayInvMsg(state, invMsg)
// Message to broadcast to all connected peers except those
// which are excluded by the message.
case bmsg := <-s.broadcast:
s.handleBroadcastMsg(state, &bmsg)
case qmsg := <-s.query:
s.handleQuery(state, qmsg)
case <-s.quit:
// Disconnect all peers on server shutdown.
state.forAllPeers(func(sp *serverPeer) {
srvrLog.Tracef("Shutdown peer %s", sp)
sp.Disconnect()
})
break out
}
}
这里有个可以说下的是交易重发机制,主要是解决节点刚启动时候同步交易池的问题,只要连接到节点,等待一段时间,周边的节点就会进行广播,这时候就能得到交易了,考虑到上面说过的knownInv问题,这时候最好是找个新的节点连接比较安全
peer的产生
server 启动时候会调用connManager的Start方法,其中会产生许多的连接对象,
代码语言:javascript复制 for i := atomic.LoadUint64(&cm.connReqCount); i < uint64 (cm.cfg.TargetOutbound); i {
go cm.NewConnReq()
}
NewConnReq函数里面会走上面提过的几种状态变化,最后进到ConnEstablished状态分支里面,该分支会调用OnConnection函数,这个函数就是outboundPeerConnected函数。这里结构有点差,整体上是这样子的,connManager中配置了一个函数变量OnConnect,而在p2p servver启动的时候会赋值connManager的函数,这里就是把outboundPeerConnected函数赋值给connManager的OnConnection变量(OnConnect:PeerConnected)。
代码语言:javascript复制 func (s *server) outboundPeerConnected(c *connmgr.ConnReq, conn net.Conn) {
sp := newServerPeer(s, c.Permanent)
p, err := peer.NewOutboundPeer(newPeerConfig(sp), c.Addr.String())
if err != nil {
srvrLog.Debugf("Cannot create outbound peer %s: %v", c.Addr, err)
s.connManager.Disconnect(c.ID())
}
sp.Peer = p
sp.connReq = c
sp.isWhitelisted = isWhitelisted(conn.RemoteAddr())
sp.AssociateConnection(conn)
go s.peerDoneHandler(sp)
s.addrManager.Attempt(sp.NA())
}
peer产生之后在调用AssociateConnection关联连接对象的时候就开始进行信息沟通. 调用流程是:AssociateConnection -> start -> negotiateOutboundProtocol -> readRemoteVersionMsg 具体代码就不贴了,该函数最后又会调用到server的OnVersion函数(OnVersion和OnConnect是相同的做法),该函数主要就是校验版本,服务之类的功能是否完整匹配,此后节点就建立成功,之后就可以进行数据的广播同步了.
代码语言:javascript复制 if !cfg.SimNet && !isInbound {
addrManager.SetServices(remoteAddr, msg.Services) //服务校验
}
// Ignore peers that have a protcol version that is too old. The peer
// negotiation logic will disconnect it after this callback returns.
if msg.ProtocolVersion < int32(peer.MinAcceptableProtocolVersion) { //版本校验
return nil
}
//这里是交换地址用的 协议是 getaddr/addr 当你连到一个点的时候互相交换地址,
hasTimestamp := sp.ProtocolVersion() >= wire.NetAddressTimeVersion
if addrManager.NeedMoreAddresses() && hasTimestamp {
sp.QueueMessage(wire.NewMsgGetAddr(), nil)
}
//这句上面说过会把newAddr移动到triedAddr 这句通过了才说能对象节点是完全可用的
// Mark the address as a known good address.
addrManager.Good(remoteAddr)
peer同步维持
有个SyncManager结构体负责区块,交易同步的维护,接上文,新的节点加进来之后刷新bestPeer(原则是高度最高的节点),之后向该节点发送getblocks请求,参数则是自己的高度状态
代码语言:javascript复制 if sm.nextCheckpoint != nil &&
best.Height < sm.nextCheckpoint.Height &&
sm.chainParams != &chaincfg.RegressionNetParams {
bestPeer.PushGetHeadersMsg(locator, sm.nextCheckpoint.Hash)
sm.headersFirstMode = true
log.Infof("Downloading headers for blocks %d to "
"%d from peer %s", best.Height 1,
sm.nextCheckpoint.Height, bestPeer.Addr())
} else {
bestPeer.PushGetBlocksMsg(locator, &zeroHash)
}
对方收到请求之后,比对高度,查找到自己的BlockHash构造InvTypeBlock类型NewMsgInv消息返回,然后在逐个的请求区块
发送方 | 接收方 |
---|---|
PushGetBlocksMsg(当前自己高度) | InvTypeBlock类型NewMsgInv(返回对方没有的高度hash)) |
OnInv处理后发送NewMsgGetData(接受message后逐个getdata) | MsgBlock(区块详细信息) |
OnBlock处理附加到链上 |