diff --git a/peer.go b/peer.go index 29dc0ae3..a4ae8f0e 100644 --- a/peer.go +++ b/peer.go @@ -136,30 +136,20 @@ type outMsg struct { // to push messages to the peer. Internally they use QueueMessage. type peer struct { server *server - protocolVersion uint32 btcnet btcwire.BitcoinNet - services btcwire.ServiceFlag started int32 conn net.Conn addr string na *btcwire.NetAddress - timeConnected time.Time - lastSend time.Time - lastRecv time.Time - bytesReceived uint64 - bytesSent uint64 inbound bool connected int32 disconnect int32 // only to be used atomically persistent bool - versionKnown bool - versionMutex sync.Mutex knownAddresses map[string]bool knownInventory *MruInventoryMap knownInvMutex sync.Mutex requestedTxns map[btcwire.ShaHash]bool // owned by blockmanager requestedBlocks map[btcwire.ShaHash]bool // owned by blockmanager - lastBlock int32 retryCount int64 prevGetBlocksBegin *btcwire.ShaHash // owned by blockmanager prevGetBlocksStop *btcwire.ShaHash // owned by blockmanager @@ -175,11 +165,20 @@ type peer struct { txProcessed chan bool blockProcessed chan bool quit chan bool + StatsMtx sync.Mutex // protects all statistics below here. + versionKnown bool + protocolVersion uint32 + services btcwire.ServiceFlag + timeConnected time.Time + lastSend time.Time + lastRecv time.Time + bytesReceived uint64 + bytesSent uint64 userAgent string - pingStatsMtx sync.Mutex // protects lastPing* - lastPingNonce uint64 // Set to nonce if we have a pending ping. - lastPingTime time.Time // Time we sent last ping. - lastPingMicros int64 // Time for last ping to return. + lastBlock int32 + lastPingNonce uint64 // Set to nonce if we have a pending ping. + lastPingTime time.Time // Time we sent last ping. + lastPingMicros int64 // Time for last ping to return. } // String returns the peer's address and directionality as a human-readable @@ -212,12 +211,21 @@ func (p *peer) AddKnownInventory(invVect *btcwire.InvVect) { // VersionKnown returns the whether or not the version of a peer is known locally. // It is safe for concurrent access. func (p *peer) VersionKnown() bool { - p.versionMutex.Lock() - defer p.versionMutex.Unlock() + p.StatsMtx.Lock() + defer p.StatsMtx.Unlock() return p.versionKnown } +// ProtocolVersion returns the peer protocol version in a manner that is safe +// for concurrent access. +func (p *peer) ProtocolVersion() uint32 { + p.StatsMtx.Lock() + defer p.StatsMtx.Unlock() + + return p.protocolVersion +} + // pushVersionMsg sends a version message to the connected peer using the // current state. func (p *peer) pushVersionMsg() error { @@ -287,12 +295,12 @@ func (p *peer) handleVersionMsg(msg *btcwire.MsgVersion) { return } + p.StatsMtx.Lock() // Updating a bunch of stats. // Limit to one version message per peer. - p.versionMutex.Lock() if p.versionKnown { p.logError("Only one version message per peer is allowed %s.", p) - p.versionMutex.Unlock() + p.StatsMtx.Unlock() p.Disconnect() return } @@ -300,7 +308,6 @@ func (p *peer) handleVersionMsg(msg *btcwire.MsgVersion) { // Negotiate the protocol version. p.protocolVersion = minUint32(p.protocolVersion, uint32(msg.ProtocolVersion)) p.versionKnown = true - p.versionMutex.Unlock() peerLog.Debugf("Negotiated protocol version %d for peer %s", p.protocolVersion, p) p.lastBlock = msg.LastBlock @@ -312,6 +319,8 @@ func (p *peer) handleVersionMsg(msg *btcwire.MsgVersion) { // Set the remote peer's user agent. p.userAgent = msg.UserAgent + p.StatsMtx.Unlock() + // Inbound connections. if p.inbound { // Set up a NetAddress for the peer to be used with AddrManager. @@ -354,7 +363,8 @@ func (p *peer) handleVersionMsg(msg *btcwire.MsgVersion) { // Request known addresses if the server address manager needs // more and the peer has a protocol version new enough to // include a timestamp with addresses. - hasTimestamp := p.protocolVersion >= btcwire.NetAddressTimeVersion + hasTimestamp := p.ProtocolVersion() >= + btcwire.NetAddressTimeVersion if p.server.addrManager.NeedMoreAddresses() && hasTimestamp { p.QueueMessage(btcwire.NewMsgGetAddr(), nil) } @@ -908,7 +918,7 @@ func (p *peer) pushAddrMsg(addresses []*btcwire.NetAddress) error { // is used to notify the server about advertised addresses. func (p *peer) handleAddrMsg(msg *btcwire.MsgAddr) { // Ignore old style addresses which don't include a timestamp. - if p.protocolVersion < btcwire.NetAddressTimeVersion { + if p.ProtocolVersion() < btcwire.NetAddressTimeVersion { return } @@ -952,7 +962,7 @@ func (p *peer) handleAddrMsg(msg *btcwire.MsgAddr) { // is considered a successful ping. func (p *peer) handlePingMsg(msg *btcwire.MsgPing) { // Only Reply with pong is message comes from a new enough client. - if p.protocolVersion > btcwire.BIP0031Version { + if p.ProtocolVersion() > btcwire.BIP0031Version { // Include nonce from ping so pong can be identified. p.QueueMessage(btcwire.NewMsgPong(msg.Nonce), nil) } @@ -963,8 +973,8 @@ func (p *peer) handlePingMsg(msg *btcwire.MsgPing) { // previosuly we update our ping time statistics. If the client is too old or // we had not send a ping we ignore it. func (p *peer) handlePongMsg(msg *btcwire.MsgPong) { - p.pingStatsMtx.Lock() - defer p.pingStatsMtx.Unlock() + p.StatsMtx.Lock() + defer p.StatsMtx.Unlock() // Arguably we could use a buffered channel here sending data // in a fifo manner whenever we send a ping, or a list keeping track of @@ -984,8 +994,11 @@ func (p *peer) handlePongMsg(msg *btcwire.MsgPong) { // readMessage reads the next bitcoin message from the peer with logging. func (p *peer) readMessage() (btcwire.Message, []byte, error) { - n, msg, buf, err := btcwire.ReadMessageN(p.conn, p.protocolVersion, p.btcnet) + n, msg, buf, err := btcwire.ReadMessageN(p.conn, p.ProtocolVersion(), + p.btcnet) + p.StatsMtx.Lock() p.bytesReceived += uint64(n) + p.StatsMtx.Unlock() p.server.AddBytesReceived(uint64(n)) if err != nil { return nil, nil, err @@ -1045,7 +1058,8 @@ func (p *peer) writeMessage(msg btcwire.Message) { })) peerLog.Tracef("%v", newLogClosure(func() string { var buf bytes.Buffer - err := btcwire.WriteMessage(&buf, msg, p.protocolVersion, p.btcnet) + err := btcwire.WriteMessage(&buf, msg, p.ProtocolVersion(), + p.btcnet) if err != nil { return err.Error() } @@ -1053,8 +1067,11 @@ func (p *peer) writeMessage(msg btcwire.Message) { })) // Write the message to the peer. - n, err := btcwire.WriteMessageN(p.conn, msg, p.protocolVersion, p.btcnet) + n, err := btcwire.WriteMessageN(p.conn, msg, p.ProtocolVersion(), + p.btcnet) + p.StatsMtx.Lock() p.bytesSent += uint64(n) + p.StatsMtx.Unlock() p.server.AddBytesSent(uint64(n)) if err != nil { p.Disconnect() @@ -1126,7 +1143,9 @@ out: } break out } + p.StatsMtx.Lock() p.lastRecv = time.Now() + p.StatsMtx.Unlock() // Ensure version message comes first. if _, ok := rmsg.(*btcwire.MsgVersion); !ok && !p.VersionKnown() { @@ -1395,12 +1414,12 @@ out: case *btcwire.MsgPing: // expects pong // Also set up statistics. - p.pingStatsMtx.Lock() + p.StatsMtx.Lock() if p.protocolVersion > btcwire.BIP0031Version { p.lastPingNonce = m.Nonce p.lastPingTime = time.Now() } - p.pingStatsMtx.Unlock() + p.StatsMtx.Unlock() case *btcwire.MsgMemPool: // Should return an inv. case *btcwire.MsgGetData: @@ -1418,7 +1437,9 @@ out: pingTimer.Reset(pingTimeoutMinutes * time.Minute) } p.writeMessage(msg.msg) + p.StatsMtx.Lock() p.lastSend = time.Now() + p.StatsMtx.Unlock() if msg.doneChan != nil { msg.doneChan <- true } diff --git a/server.go b/server.go index a4591f72..5592912c 100644 --- a/server.go +++ b/server.go @@ -352,6 +352,7 @@ func (s *server) handleQuery(querymsg interface{}, state *peerState) { // however it is statistics for purely informational purposes // and we don't really care if they are raced to get the new // version. + p.StatsMtx.Lock() info := &btcjson.GetPeerInfoResult{ Addr: p.addr, Services: fmt.Sprintf("%08d", p.services), @@ -367,14 +368,13 @@ func (s *server) handleQuery(querymsg interface{}, state *peerState) { BanScore: 0, SyncNode: p == syncPeer, } - p.pingStatsMtx.Lock() info.PingTime = p.lastPingMicros if p.lastPingNonce != 0 { wait := time.Now().Sub(p.lastPingTime).Nanoseconds() // We actually want microseconds. info.PingWait = wait / 1000 } - p.pingStatsMtx.Unlock() + p.StatsMtx.Unlock() infos = append(infos, info) }) msg.reply <- infos