mirror of
https://github.com/btcsuite/btcd.git
synced 2024-11-19 18:00:11 +01:00
35d555d541
ok @davecgh
1735 lines
54 KiB
Go
1735 lines
54 KiB
Go
// Copyright (c) 2013-2014 Conformal Systems LLC.
|
|
// Use of this source code is governed by an ISC
|
|
// license that can be found in the LICENSE file.
|
|
|
|
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"container/list"
|
|
"fmt"
|
|
"github.com/conformal/btcchain"
|
|
"github.com/conformal/btcdb"
|
|
"github.com/conformal/btcutil"
|
|
"github.com/conformal/btcwire"
|
|
"github.com/conformal/go-socks"
|
|
"github.com/davecgh/go-spew/spew"
|
|
"net"
|
|
"strconv"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
const (
|
|
// maxProtocolVersion is the max protocol version the peer supports.
|
|
maxProtocolVersion = 70001
|
|
|
|
// outputBufferSize is the number of elements the output channels use.
|
|
outputBufferSize = 50
|
|
|
|
// invTrickleSize is the maximum amount of inventory to send in a single
|
|
// message when trickling inventory to remote peers.
|
|
maxInvTrickleSize = 1000
|
|
|
|
// maxKnownInventory is the maximum number of items to keep in the known
|
|
// inventory cache.
|
|
maxKnownInventory = 1000
|
|
|
|
// negotiateTimeoutSeconds is the number of seconds of inactivity before
|
|
// we timeout a peer that hasn't completed the initial version
|
|
// negotiation.
|
|
negotiateTimeoutSeconds = 30
|
|
|
|
// idleTimeoutMinutes is the number of minutes of inactivity before
|
|
// we time out a peer.
|
|
idleTimeoutMinutes = 5
|
|
|
|
// pingTimeoutMinutes is the number of minutes since we last sent a
|
|
// message requiring a reply before we will ping a host.
|
|
pingTimeoutMinutes = 2
|
|
)
|
|
|
|
var (
|
|
// userAgentName is the user agent name and is used to help identify
|
|
// ourselves to other bitcoin peers.
|
|
userAgentName = "btcd"
|
|
|
|
// userAgentVersion is the user agent version and is used to help
|
|
// identify ourselves to other bitcoin peers.
|
|
userAgentVersion = fmt.Sprintf("%d.%d.%d", appMajor, appMinor, appPatch)
|
|
)
|
|
|
|
// zeroHash is the zero value hash (all zeros). It is defined as a convenience.
|
|
var zeroHash btcwire.ShaHash
|
|
|
|
// minUint32 is a helper function to return the minimum of two uint32s.
|
|
// This avoids a math import and the need to cast to floats.
|
|
func minUint32(a, b uint32) uint32 {
|
|
if a < b {
|
|
return a
|
|
}
|
|
return b
|
|
}
|
|
|
|
// newNetAddress attempts to extract the IP address and port from the passed
|
|
// net.Addr interface and create a bitcoin NetAddress structure using that
|
|
// information.
|
|
func newNetAddress(addr net.Addr, services btcwire.ServiceFlag) (*btcwire.NetAddress, error) {
|
|
// addr will be a net.TCPAddr when not using a proxy.
|
|
if tcpAddr, ok := addr.(*net.TCPAddr); ok {
|
|
ip := tcpAddr.IP
|
|
port := uint16(tcpAddr.Port)
|
|
na := btcwire.NewNetAddressIPPort(ip, port, services)
|
|
return na, nil
|
|
}
|
|
|
|
// addr will be a socks.ProxiedAddr when using a proxy.
|
|
if proxiedAddr, ok := addr.(*socks.ProxiedAddr); ok {
|
|
ip := net.ParseIP(proxiedAddr.Host)
|
|
if ip == nil {
|
|
ip = net.ParseIP("0.0.0.0")
|
|
}
|
|
port := uint16(proxiedAddr.Port)
|
|
na := btcwire.NewNetAddressIPPort(ip, port, services)
|
|
return na, nil
|
|
}
|
|
|
|
// For the most part, addr should be one of the two above cases, but
|
|
// to be safe, fall back to trying to parse the information from the
|
|
// address string as a last resort.
|
|
host, portStr, err := net.SplitHostPort(addr.String())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
ip := net.ParseIP(host)
|
|
port, err := strconv.ParseUint(portStr, 10, 16)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
na := btcwire.NewNetAddressIPPort(ip, uint16(port), services)
|
|
return na, nil
|
|
}
|
|
|
|
// TODO(davec): Rename and comment this
|
|
type outMsg struct {
|
|
msg btcwire.Message
|
|
doneChan chan bool
|
|
}
|
|
|
|
// peer provides a bitcoin peer for handling bitcoin communications. The
|
|
// overall data flow is split into 3 goroutines and a separate block manager.
|
|
// Inbound messages are read via the inHandler goroutine and generally
|
|
// dispatched to their own handler. For inbound data-related messages such as
|
|
// blocks, transactions, and inventory, the data is pased on to the block
|
|
// manager to handle it. Outbound messages are queued via QueueMessage or
|
|
// QueueInventory. QueueMessage is intended for all messages, including
|
|
// responses to data such as blocks and transactions. QueueInventory, on the
|
|
// other hand, is only intended for relaying inventory as it employs a trickling
|
|
// mechanism to batch the inventory together. The data flow for outbound
|
|
// messages uses two goroutines, queueHandler and outHandler. The first,
|
|
// queueHandler, is used as a way for external entities (mainly block manager)
|
|
// to queue messages quickly regardless of whether the peer is currently
|
|
// sending or not. It acts as the traffic cop between the external world and
|
|
// the actual goroutine which writes to the network socket. In addition, the
|
|
// peer contains several functions which are of the form pushX, that are used
|
|
// to push messages to the peer. Internally they use QueueMessage.
|
|
type peer struct {
|
|
server *server
|
|
btcnet btcwire.BitcoinNet
|
|
started int32
|
|
conn net.Conn
|
|
addr string
|
|
na *btcwire.NetAddress
|
|
inbound bool
|
|
connected int32
|
|
disconnect int32 // only to be used atomically
|
|
persistent bool
|
|
knownAddresses map[string]bool
|
|
knownInventory *MruInventoryMap
|
|
knownInvMutex sync.Mutex
|
|
requestedTxns map[btcwire.ShaHash]bool // owned by blockmanager
|
|
requestedBlocks map[btcwire.ShaHash]bool // owned by blockmanager
|
|
retryCount int64
|
|
prevGetBlocksBegin *btcwire.ShaHash // owned by blockmanager
|
|
prevGetBlocksStop *btcwire.ShaHash // owned by blockmanager
|
|
prevGetHdrsBegin *btcwire.ShaHash // owned by blockmanager
|
|
prevGetHdrsStop *btcwire.ShaHash // owned by blockmanager
|
|
requestQueue *list.List
|
|
continueHash *btcwire.ShaHash
|
|
outputQueue chan outMsg
|
|
sendQueue chan outMsg
|
|
sendDoneQueue chan bool
|
|
queueWg sync.WaitGroup // TODO(oga) wg -> single use channel?
|
|
outputInvChan chan *btcwire.InvVect
|
|
txProcessed chan bool
|
|
blockProcessed chan bool
|
|
quit chan bool
|
|
StatsMtx sync.Mutex // protects all statistics below here.
|
|
versionKnown bool
|
|
protocolVersion uint32
|
|
services btcwire.ServiceFlag
|
|
timeConnected time.Time
|
|
lastSend time.Time
|
|
lastRecv time.Time
|
|
bytesReceived uint64
|
|
bytesSent uint64
|
|
userAgent string
|
|
lastBlock int32
|
|
lastPingNonce uint64 // Set to nonce if we have a pending ping.
|
|
lastPingTime time.Time // Time we sent last ping.
|
|
lastPingMicros int64 // Time for last ping to return.
|
|
}
|
|
|
|
// String returns the peer's address and directionality as a human-readable
|
|
// string.
|
|
func (p *peer) String() string {
|
|
return fmt.Sprintf("%s (%s)", p.addr, directionString(p.inbound))
|
|
}
|
|
|
|
// isKnownInventory returns whether or not the peer is known to have the passed
|
|
// inventory. It is safe for concurrent access.
|
|
func (p *peer) isKnownInventory(invVect *btcwire.InvVect) bool {
|
|
p.knownInvMutex.Lock()
|
|
defer p.knownInvMutex.Unlock()
|
|
|
|
if p.knownInventory.Exists(invVect) {
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
// AddKnownInventory adds the passed inventory to the cache of known inventory
|
|
// for the peer. It is safe for concurrent access.
|
|
func (p *peer) AddKnownInventory(invVect *btcwire.InvVect) {
|
|
p.knownInvMutex.Lock()
|
|
defer p.knownInvMutex.Unlock()
|
|
|
|
p.knownInventory.Add(invVect)
|
|
}
|
|
|
|
// VersionKnown returns the whether or not the version of a peer is known locally.
|
|
// It is safe for concurrent access.
|
|
func (p *peer) VersionKnown() bool {
|
|
p.StatsMtx.Lock()
|
|
defer p.StatsMtx.Unlock()
|
|
|
|
return p.versionKnown
|
|
}
|
|
|
|
// ProtocolVersion returns the peer protocol version in a manner that is safe
|
|
// for concurrent access.
|
|
func (p *peer) ProtocolVersion() uint32 {
|
|
p.StatsMtx.Lock()
|
|
defer p.StatsMtx.Unlock()
|
|
|
|
return p.protocolVersion
|
|
}
|
|
|
|
// pushVersionMsg sends a version message to the connected peer using the
|
|
// current state.
|
|
func (p *peer) pushVersionMsg() error {
|
|
_, blockNum, err := p.server.db.NewestSha()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
theirNa := p.na
|
|
|
|
// If we are behind a proxy and the connection comes from the proxy then
|
|
// we return an unroutable address as their address. This is to prevent
|
|
// leaking the tor proxy address.
|
|
if cfg.Proxy != "" {
|
|
proxyaddress, _, err := net.SplitHostPort(cfg.Proxy)
|
|
// invalid proxy means poorly configured, be on the safe side.
|
|
if err != nil || p.na.IP.String() == proxyaddress {
|
|
theirNa = &btcwire.NetAddress{
|
|
Timestamp: time.Now(),
|
|
IP: net.IP([]byte{0, 0, 0, 0}),
|
|
}
|
|
}
|
|
}
|
|
|
|
// Version message.
|
|
msg := btcwire.NewMsgVersion(
|
|
p.server.addrManager.getBestLocalAddress(p.na), theirNa,
|
|
p.server.nonce, int32(blockNum))
|
|
msg.AddUserAgent(userAgentName, userAgentVersion)
|
|
|
|
// XXX: bitcoind appears to always enable the full node services flag
|
|
// of the remote peer netaddress field in the version message regardless
|
|
// of whether it knows it supports it or not. Also, bitcoind sets
|
|
// the services field of the local peer to 0 regardless of support.
|
|
//
|
|
// Realistically, this should be set as follows:
|
|
// - For outgoing connections:
|
|
// - Set the local netaddress services to what the local peer
|
|
// actually supports
|
|
// - Set the remote netaddress services to 0 to indicate no services
|
|
// as they are still unknown
|
|
// - For incoming connections:
|
|
// - Set the local netaddress services to what the local peer
|
|
// actually supports
|
|
// - Set the remote netaddress services to the what was advertised by
|
|
// by the remote peer in its version message
|
|
msg.AddrYou.Services = btcwire.SFNodeNetwork
|
|
|
|
// Advertise that we're a full node.
|
|
msg.Services = btcwire.SFNodeNetwork
|
|
|
|
// Advertise our max supported protocol version.
|
|
msg.ProtocolVersion = maxProtocolVersion
|
|
|
|
p.QueueMessage(msg, nil)
|
|
return nil
|
|
}
|
|
|
|
// updateAddresses potentially adds addresses to the address manager and
|
|
// requests known addresses from the remote peer depending on whether the peer
|
|
// is an inbound or outbound peer and other factors such as address routability
|
|
// and the negotiated protocol version.
|
|
func (p *peer) updateAddresses(msg *btcwire.MsgVersion) {
|
|
// Outbound connections.
|
|
if !p.inbound {
|
|
// TODO(davec): Only do this if not doing the initial block
|
|
// download and the local address is routable.
|
|
if !cfg.DisableListen /* && isCurrent? */ {
|
|
// Get address that best matches.
|
|
lna := p.server.addrManager.getBestLocalAddress(p.na)
|
|
if Routable(lna) {
|
|
addresses := []*btcwire.NetAddress{lna}
|
|
p.pushAddrMsg(addresses)
|
|
}
|
|
}
|
|
|
|
// Request known addresses if the server address manager needs
|
|
// more and the peer has a protocol version new enough to
|
|
// include a timestamp with addresses.
|
|
hasTimestamp := p.ProtocolVersion() >=
|
|
btcwire.NetAddressTimeVersion
|
|
if p.server.addrManager.NeedMoreAddresses() && hasTimestamp {
|
|
p.QueueMessage(btcwire.NewMsgGetAddr(), nil)
|
|
}
|
|
|
|
// Mark the address as a known good address.
|
|
p.server.addrManager.Good(p.na)
|
|
} else {
|
|
// A peer might not be advertising the same address that it
|
|
// actually connected from. One example of why this can happen
|
|
// is with NAT. Only add the address to the address manager if
|
|
// the addresses agree.
|
|
if NetAddressKey(&msg.AddrMe) == NetAddressKey(p.na) {
|
|
p.server.addrManager.AddAddress(p.na, p.na)
|
|
p.server.addrManager.Good(p.na)
|
|
}
|
|
}
|
|
}
|
|
|
|
// handleVersionMsg is invoked when a peer receives a version bitcoin message
|
|
// and is used to negotiate the protocol version details as well as kick start
|
|
// the communications.
|
|
func (p *peer) handleVersionMsg(msg *btcwire.MsgVersion) {
|
|
// Detect self connections.
|
|
if msg.Nonce == p.server.nonce {
|
|
peerLog.Debugf("Disconnecting peer connected to self %s", p)
|
|
p.Disconnect()
|
|
return
|
|
}
|
|
|
|
p.StatsMtx.Lock() // Updating a bunch of stats.
|
|
// Limit to one version message per peer.
|
|
if p.versionKnown {
|
|
p.logError("Only one version message per peer is allowed %s.",
|
|
p)
|
|
p.StatsMtx.Unlock()
|
|
p.Disconnect()
|
|
return
|
|
}
|
|
|
|
// Negotiate the protocol version.
|
|
p.protocolVersion = minUint32(p.protocolVersion, uint32(msg.ProtocolVersion))
|
|
p.versionKnown = true
|
|
peerLog.Debugf("Negotiated protocol version %d for peer %s",
|
|
p.protocolVersion, p)
|
|
p.lastBlock = msg.LastBlock
|
|
|
|
// Set the supported services for the peer to what the remote peer
|
|
// advertised.
|
|
p.services = msg.Services
|
|
|
|
// Set the remote peer's user agent.
|
|
p.userAgent = msg.UserAgent
|
|
|
|
p.StatsMtx.Unlock()
|
|
|
|
// Inbound connections.
|
|
if p.inbound {
|
|
// Set up a NetAddress for the peer to be used with AddrManager.
|
|
// We only do this inbound because outbound set this up
|
|
// at connection time and no point recomputing.
|
|
na, err := newNetAddress(p.conn.RemoteAddr(), p.services)
|
|
if err != nil {
|
|
p.logError("Can't get remote address: %v", err)
|
|
p.Disconnect()
|
|
return
|
|
}
|
|
p.na = na
|
|
|
|
// Send version.
|
|
err = p.pushVersionMsg()
|
|
if err != nil {
|
|
p.logError("Can't send version message to %s: %v",
|
|
p, err)
|
|
p.Disconnect()
|
|
return
|
|
}
|
|
}
|
|
|
|
// Send verack.
|
|
p.QueueMessage(btcwire.NewMsgVerAck(), nil)
|
|
|
|
// Update the address manager and request known addresses from the
|
|
// remote peer for outbound connections. This is skipped when running
|
|
// on the simulation test network since it is only intended to connect
|
|
// to specified peers and actively avoids advertising and connecting to
|
|
// discovered peers.
|
|
if !cfg.SimNet {
|
|
p.updateAddresses(msg)
|
|
}
|
|
|
|
// Signal the block manager this peer is a new sync candidate.
|
|
p.server.blockManager.NewPeer(p)
|
|
|
|
// TODO: Relay alerts.
|
|
}
|
|
|
|
// pushTxMsg sends a tx message for the provided transaction hash to the
|
|
// connected peer. An error is returned if the transaction hash is not known.
|
|
func (p *peer) pushTxMsg(sha *btcwire.ShaHash, doneChan, waitChan chan bool) error {
|
|
// Attempt to fetch the requested transaction from the pool. A
|
|
// call could be made to check for existence first, but simply trying
|
|
// to fetch a missing transaction results in the same behavior.
|
|
tx, err := p.server.txMemPool.FetchTransaction(sha)
|
|
if err != nil {
|
|
peerLog.Tracef("Unable to fetch tx %v from transaction "+
|
|
"pool: %v", sha, err)
|
|
return err
|
|
}
|
|
|
|
// Once we have fetched data wait for any previous operation to finish.
|
|
if waitChan != nil {
|
|
<-waitChan
|
|
}
|
|
|
|
p.QueueMessage(tx.MsgTx(), doneChan)
|
|
|
|
return nil
|
|
}
|
|
|
|
// pushBlockMsg sends a block message for the provided block hash to the
|
|
// connected peer. An error is returned if the block hash is not known.
|
|
func (p *peer) pushBlockMsg(sha *btcwire.ShaHash, doneChan, waitChan chan bool) error {
|
|
blk, err := p.server.db.FetchBlockBySha(sha)
|
|
if err != nil {
|
|
peerLog.Tracef("Unable to fetch requested block sha %v: %v",
|
|
sha, err)
|
|
return err
|
|
}
|
|
|
|
// Once we have fetched data wait for any previous operation to finish.
|
|
if waitChan != nil {
|
|
<-waitChan
|
|
}
|
|
|
|
// We only send the channel for this message if we aren't sending
|
|
// an inv straight after.
|
|
var dc chan bool
|
|
sendInv := p.continueHash != nil && p.continueHash.IsEqual(sha)
|
|
if !sendInv {
|
|
dc = doneChan
|
|
}
|
|
p.QueueMessage(blk.MsgBlock(), dc)
|
|
|
|
// When the peer requests the final block that was advertised in
|
|
// response to a getblocks message which requested more blocks than
|
|
// would fit into a single message, send it a new inventory message
|
|
// to trigger it to issue another getblocks message for the next
|
|
// batch of inventory.
|
|
if p.continueHash != nil && p.continueHash.IsEqual(sha) {
|
|
hash, _, err := p.server.db.NewestSha()
|
|
if err == nil {
|
|
invMsg := btcwire.NewMsgInvSizeHint(1)
|
|
iv := btcwire.NewInvVect(btcwire.InvTypeBlock, hash)
|
|
invMsg.AddInvVect(iv)
|
|
p.QueueMessage(invMsg, doneChan)
|
|
p.continueHash = nil
|
|
} else if doneChan != nil {
|
|
// Avoid deadlock when caller waits on channel.
|
|
go func() {
|
|
doneChan <- false
|
|
}()
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// PushGetBlocksMsg sends a getblocks message for the provided block locator
|
|
// and stop hash. It will ignore back-to-back duplicate requests.
|
|
func (p *peer) PushGetBlocksMsg(locator btcchain.BlockLocator, stopHash *btcwire.ShaHash) error {
|
|
// Extract the begin hash from the block locator, if one was specified,
|
|
// to use for filtering duplicate getblocks requests.
|
|
// request.
|
|
var beginHash *btcwire.ShaHash
|
|
if len(locator) > 0 {
|
|
beginHash = locator[0]
|
|
}
|
|
|
|
// Filter duplicate getblocks requests.
|
|
if p.prevGetBlocksStop != nil && p.prevGetBlocksBegin != nil &&
|
|
beginHash != nil && stopHash.IsEqual(p.prevGetBlocksStop) &&
|
|
beginHash.IsEqual(p.prevGetBlocksBegin) {
|
|
|
|
peerLog.Tracef("Filtering duplicate [getblocks] with begin "+
|
|
"hash %v, stop hash %v", beginHash, stopHash)
|
|
return nil
|
|
}
|
|
|
|
// Construct the getblocks request and queue it to be sent.
|
|
msg := btcwire.NewMsgGetBlocks(stopHash)
|
|
for _, hash := range locator {
|
|
err := msg.AddBlockLocatorHash(hash)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
p.QueueMessage(msg, nil)
|
|
|
|
// Update the previous getblocks request information for filtering
|
|
// duplicates.
|
|
p.prevGetBlocksBegin = beginHash
|
|
p.prevGetBlocksStop = stopHash
|
|
return nil
|
|
}
|
|
|
|
// PushGetHeadersMsg sends a getblocks message for the provided block locator
|
|
// and stop hash. It will ignore back-to-back duplicate requests.
|
|
func (p *peer) PushGetHeadersMsg(locator btcchain.BlockLocator, stopHash *btcwire.ShaHash) error {
|
|
// Extract the begin hash from the block locator, if one was specified,
|
|
// to use for filtering duplicate getheaders requests.
|
|
var beginHash *btcwire.ShaHash
|
|
if len(locator) > 0 {
|
|
beginHash = locator[0]
|
|
}
|
|
|
|
// Filter duplicate getheaders requests.
|
|
if p.prevGetHdrsStop != nil && p.prevGetHdrsBegin != nil &&
|
|
beginHash != nil && stopHash.IsEqual(p.prevGetHdrsStop) &&
|
|
beginHash.IsEqual(p.prevGetHdrsBegin) {
|
|
|
|
peerLog.Tracef("Filtering duplicate [getheaders] with begin "+
|
|
"hash %v", beginHash)
|
|
return nil
|
|
}
|
|
|
|
// Construct the getheaders request and queue it to be sent.
|
|
msg := btcwire.NewMsgGetHeaders()
|
|
msg.HashStop = *stopHash
|
|
for _, hash := range locator {
|
|
err := msg.AddBlockLocatorHash(hash)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
p.QueueMessage(msg, nil)
|
|
|
|
// Update the previous getheaders request information for filtering
|
|
// duplicates.
|
|
p.prevGetHdrsBegin = beginHash
|
|
p.prevGetHdrsStop = stopHash
|
|
return nil
|
|
}
|
|
|
|
// handleMemPoolMsg is invoked when a peer receives a mempool bitcoin message.
|
|
// It creates and sends an inventory message with the contents of the memory
|
|
// pool up to the maximum inventory allowed per message.
|
|
func (p *peer) handleMemPoolMsg(msg *btcwire.MsgMemPool) {
|
|
// Generate inventory message with the available transactions in the
|
|
// transaction memory pool. Limit it to the max allowed inventory
|
|
// per message. The the NewMsgInvSizeHint function automatically limits
|
|
// the passed hint to the maximum allowed, so it's safe to pass it
|
|
// without double checking it here.
|
|
hashes := p.server.txMemPool.TxShas()
|
|
invMsg := btcwire.NewMsgInvSizeHint(uint(len(hashes)))
|
|
for i, hash := range hashes {
|
|
// Another thread might have removed the transaction from the
|
|
// pool since the initial query.
|
|
if !p.server.txMemPool.IsTransactionInPool(hash) {
|
|
continue
|
|
}
|
|
|
|
iv := btcwire.NewInvVect(btcwire.InvTypeTx, hash)
|
|
invMsg.AddInvVect(iv)
|
|
if i+1 >= btcwire.MaxInvPerMsg {
|
|
break
|
|
}
|
|
}
|
|
|
|
// Send the inventory message if there is anything to send.
|
|
if len(invMsg.InvList) > 0 {
|
|
p.QueueMessage(invMsg, nil)
|
|
}
|
|
}
|
|
|
|
// handleTxMsg is invoked when a peer receives a tx bitcoin message. It blocks
|
|
// until the bitcoin transaction has been fully processed. Unlock the block
|
|
// handler this does not serialize all transactions through a single thread
|
|
// transactions don't rely on the previous one in a linear fashion like blocks.
|
|
func (p *peer) handleTxMsg(msg *btcwire.MsgTx) {
|
|
// Add the transaction to the known inventory for the peer.
|
|
// Convert the raw MsgTx to a btcutil.Tx which provides some convenience
|
|
// methods and things such as hash caching.
|
|
tx := btcutil.NewTx(msg)
|
|
iv := btcwire.NewInvVect(btcwire.InvTypeTx, tx.Sha())
|
|
p.AddKnownInventory(iv)
|
|
|
|
// Queue the transaction up to be handled by the block manager and
|
|
// intentionally block further receives until the transaction is fully
|
|
// processed and known good or bad. This helps prevent a malicious peer
|
|
// from queueing up a bunch of bad transactions before disconnecting (or
|
|
// being disconnected) and wasting memory.
|
|
p.server.blockManager.QueueTx(tx, p)
|
|
<-p.txProcessed
|
|
}
|
|
|
|
// handleBlockMsg is invoked when a peer receives a block bitcoin message. It
|
|
// blocks until the bitcoin block has been fully processed.
|
|
func (p *peer) handleBlockMsg(msg *btcwire.MsgBlock, buf []byte) {
|
|
// Convert the raw MsgBlock to a btcutil.Block which provides some
|
|
// convenience methods and things such as hash caching.
|
|
block := btcutil.NewBlockFromBlockAndBytes(msg, buf)
|
|
|
|
// Add the block to the known inventory for the peer.
|
|
hash, err := block.Sha()
|
|
if err != nil {
|
|
peerLog.Errorf("Unable to get block hash: %v", err)
|
|
return
|
|
}
|
|
iv := btcwire.NewInvVect(btcwire.InvTypeBlock, hash)
|
|
p.AddKnownInventory(iv)
|
|
|
|
// Queue the block up to be handled by the block
|
|
// manager and intentionally block further receives
|
|
// until the bitcoin block is fully processed and known
|
|
// good or bad. This helps prevent a malicious peer
|
|
// from queueing up a bunch of bad blocks before
|
|
// disconnecting (or being disconnected) and wasting
|
|
// memory. Additionally, this behavior is depended on
|
|
// by at least the block acceptance test tool as the
|
|
// reference implementation processes blocks in the same
|
|
// thread and therefore blocks further messages until
|
|
// the bitcoin block has been fully processed.
|
|
p.server.blockManager.QueueBlock(block, p)
|
|
<-p.blockProcessed
|
|
}
|
|
|
|
// handleInvMsg is invoked when a peer receives an inv bitcoin message and is
|
|
// used to examine the inventory being advertised by the remote peer and react
|
|
// accordingly. We pass the message down to blockmanager which will call
|
|
// QueueMessage with any appropriate responses.
|
|
func (p *peer) handleInvMsg(msg *btcwire.MsgInv) {
|
|
p.server.blockManager.QueueInv(msg, p)
|
|
}
|
|
|
|
// handleHeadersMsg is invoked when a peer receives a headers bitcoin message.
|
|
// The message is passed down to the block manager.
|
|
func (p *peer) handleHeadersMsg(msg *btcwire.MsgHeaders) {
|
|
p.server.blockManager.QueueHeaders(msg, p)
|
|
}
|
|
|
|
// handleGetData is invoked when a peer receives a getdata bitcoin message and
|
|
// is used to deliver block and transaction information.
|
|
func (p *peer) handleGetDataMsg(msg *btcwire.MsgGetData) {
|
|
numAdded := 0
|
|
notFound := btcwire.NewMsgNotFound()
|
|
|
|
// We wait on the this wait channel periodically to prevent queueing
|
|
// far more data than we can send in a reasonable time, wasting memory.
|
|
// The waiting occurs after the database fetch for the next one to
|
|
// provide a little pipelining.
|
|
var waitChan chan bool
|
|
doneChan := make(chan bool)
|
|
|
|
for i, iv := range msg.InvList {
|
|
var c chan bool
|
|
// If this will be the last message we send.
|
|
if i == len(msg.InvList)-1 && len(notFound.InvList) == 0 {
|
|
c = doneChan
|
|
} else if i > 0 && i+1%3 == 0 {
|
|
// buffered so as to not make the send goroutine block.
|
|
c = make(chan bool, 1)
|
|
}
|
|
var err error
|
|
switch iv.Type {
|
|
case btcwire.InvTypeTx:
|
|
err = p.pushTxMsg(&iv.Hash, c, waitChan)
|
|
case btcwire.InvTypeBlock:
|
|
err = p.pushBlockMsg(&iv.Hash, c, waitChan)
|
|
case btcwire.InvTypeFilteredBlock: // unhandled
|
|
continue
|
|
default:
|
|
peerLog.Warnf("Unknown type in inventory request %d",
|
|
iv.Type)
|
|
continue
|
|
}
|
|
if err != nil {
|
|
notFound.AddInvVect(iv)
|
|
}
|
|
numAdded++
|
|
waitChan = c
|
|
}
|
|
if len(notFound.InvList) != 0 {
|
|
p.QueueMessage(notFound, doneChan)
|
|
}
|
|
|
|
// Wait for messages to be sent. We can send quite a lot of data at this
|
|
// point and this will keep the peer busy for a decent amount of time.
|
|
// We don't process anything else by them in this time so that we
|
|
// have an idea of when we should hear back from them - else the idle
|
|
// timeout could fire when we were only half done sending the blocks.
|
|
if numAdded > 0 {
|
|
<-doneChan
|
|
}
|
|
}
|
|
|
|
// handleGetBlocksMsg is invoked when a peer receives a getdata bitcoin message.
|
|
func (p *peer) handleGetBlocksMsg(msg *btcwire.MsgGetBlocks) {
|
|
// Return all block hashes to the latest one (up to max per message) if
|
|
// no stop hash was specified.
|
|
// Attempt to find the ending index of the stop hash if specified.
|
|
endIdx := btcdb.AllShas
|
|
if !msg.HashStop.IsEqual(&zeroHash) {
|
|
height, err := p.server.db.FetchBlockHeightBySha(&msg.HashStop)
|
|
if err == nil {
|
|
endIdx = height + 1
|
|
}
|
|
}
|
|
|
|
// Find the most recent known block based on the block locator.
|
|
// Use the block after the genesis block if no other blocks in the
|
|
// provided locator are known. This does mean the client will start
|
|
// over with the genesis block if unknown block locators are provided.
|
|
// This mirrors the behavior in the reference implementation.
|
|
startIdx := int64(1)
|
|
for _, hash := range msg.BlockLocatorHashes {
|
|
height, err := p.server.db.FetchBlockHeightBySha(hash)
|
|
if err == nil {
|
|
// Start with the next hash since we know this one.
|
|
startIdx = height + 1
|
|
break
|
|
}
|
|
}
|
|
|
|
// Don't attempt to fetch more than we can put into a single message.
|
|
autoContinue := false
|
|
if endIdx-startIdx > btcwire.MaxBlocksPerMsg {
|
|
endIdx = startIdx + btcwire.MaxBlocksPerMsg
|
|
autoContinue = true
|
|
}
|
|
|
|
// Generate inventory message.
|
|
//
|
|
// The FetchBlockBySha call is limited to a maximum number of hashes
|
|
// per invocation. Since the maximum number of inventory per message
|
|
// might be larger, call it multiple times with the appropriate indices
|
|
// as needed.
|
|
invMsg := btcwire.NewMsgInv()
|
|
for start := startIdx; start < endIdx; {
|
|
// Fetch the inventory from the block database.
|
|
hashList, err := p.server.db.FetchHeightRange(start, endIdx)
|
|
if err != nil {
|
|
peerLog.Warnf("Block lookup failed: %v", err)
|
|
return
|
|
}
|
|
|
|
// The database did not return any further hashes. Break out of
|
|
// the loop now.
|
|
if len(hashList) == 0 {
|
|
break
|
|
}
|
|
|
|
// Add block inventory to the message.
|
|
for _, hash := range hashList {
|
|
hashCopy := hash
|
|
iv := btcwire.NewInvVect(btcwire.InvTypeBlock, &hashCopy)
|
|
invMsg.AddInvVect(iv)
|
|
}
|
|
start += int64(len(hashList))
|
|
}
|
|
|
|
// Send the inventory message if there is anything to send.
|
|
if len(invMsg.InvList) > 0 {
|
|
invListLen := len(invMsg.InvList)
|
|
if autoContinue && invListLen == btcwire.MaxBlocksPerMsg {
|
|
// Intentionally use a copy of the final hash so there
|
|
// is not a reference into the inventory slice which
|
|
// would prevent the entire slice from being eligible
|
|
// for GC as soon as it's sent.
|
|
continueHash := invMsg.InvList[invListLen-1].Hash
|
|
p.continueHash = &continueHash
|
|
}
|
|
p.QueueMessage(invMsg, nil)
|
|
}
|
|
}
|
|
|
|
// handleGetHeadersMsg is invoked when a peer receives a getheaders bitcoin
|
|
// message.
|
|
func (p *peer) handleGetHeadersMsg(msg *btcwire.MsgGetHeaders) {
|
|
// Attempt to look up the height of the provided stop hash.
|
|
endIdx := btcdb.AllShas
|
|
height, err := p.server.db.FetchBlockHeightBySha(&msg.HashStop)
|
|
if err == nil {
|
|
endIdx = height + 1
|
|
}
|
|
|
|
// There are no block locators so a specific header is being requested
|
|
// as identified by the stop hash.
|
|
if len(msg.BlockLocatorHashes) == 0 {
|
|
// No blocks with the stop hash were found so there is nothing
|
|
// to do. Just return. This behavior mirrors the reference
|
|
// implementation.
|
|
if endIdx == btcdb.AllShas {
|
|
return
|
|
}
|
|
|
|
// Fetch and send the requested block header.
|
|
header, err := p.server.db.FetchBlockHeaderBySha(&msg.HashStop)
|
|
if err != nil {
|
|
peerLog.Warnf("Lookup of known block hash failed: %v",
|
|
err)
|
|
return
|
|
}
|
|
|
|
headersMsg := btcwire.NewMsgHeaders()
|
|
headersMsg.AddBlockHeader(header)
|
|
p.QueueMessage(headersMsg, nil)
|
|
return
|
|
}
|
|
|
|
// Find the most recent known block based on the block locator.
|
|
// Use the block after the genesis block if no other blocks in the
|
|
// provided locator are known. This does mean the client will start
|
|
// over with the genesis block if unknown block locators are provided.
|
|
// This mirrors the behavior in the reference implementation.
|
|
startIdx := int64(1)
|
|
for _, hash := range msg.BlockLocatorHashes {
|
|
height, err := p.server.db.FetchBlockHeightBySha(hash)
|
|
if err == nil {
|
|
// Start with the next hash since we know this one.
|
|
startIdx = height + 1
|
|
break
|
|
}
|
|
}
|
|
|
|
// Don't attempt to fetch more than we can put into a single message.
|
|
if endIdx-startIdx > btcwire.MaxBlockHeadersPerMsg {
|
|
endIdx = startIdx + btcwire.MaxBlockHeadersPerMsg
|
|
}
|
|
|
|
// Generate headers message and send it.
|
|
//
|
|
// The FetchHeightRange call is limited to a maximum number of hashes
|
|
// per invocation. Since the maximum number of headers per message
|
|
// might be larger, call it multiple times with the appropriate indices
|
|
// as needed.
|
|
headersMsg := btcwire.NewMsgHeaders()
|
|
for start := startIdx; start < endIdx; {
|
|
// Fetch the inventory from the block database.
|
|
hashList, err := p.server.db.FetchHeightRange(start, endIdx)
|
|
if err != nil {
|
|
peerLog.Warnf("Header lookup failed: %v", err)
|
|
return
|
|
}
|
|
|
|
// The database did not return any further hashes. Break out of
|
|
// the loop now.
|
|
if len(hashList) == 0 {
|
|
break
|
|
}
|
|
|
|
// Add headers to the message.
|
|
for _, hash := range hashList {
|
|
header, err := p.server.db.FetchBlockHeaderBySha(&hash)
|
|
if err != nil {
|
|
peerLog.Warnf("Lookup of known block hash "+
|
|
"failed: %v", err)
|
|
continue
|
|
}
|
|
headersMsg.AddBlockHeader(header)
|
|
}
|
|
|
|
// Start at the next block header after the latest one on the
|
|
// next loop iteration.
|
|
start += int64(len(hashList))
|
|
}
|
|
p.QueueMessage(headersMsg, nil)
|
|
}
|
|
|
|
// handleGetAddrMsg is invoked when a peer receives a getaddr bitcoin message
|
|
// and is used to provide the peer with known addresses from the address
|
|
// manager.
|
|
func (p *peer) handleGetAddrMsg(msg *btcwire.MsgGetAddr) {
|
|
// Don't return any addresses when running on the simulation test
|
|
// network. This helps prevent the network from becoming another
|
|
// public test network since it will not be able to learn about other
|
|
// peers that have not specifically been provided.
|
|
if cfg.SimNet {
|
|
return
|
|
}
|
|
|
|
// Get the current known addresses from the address manager.
|
|
addrCache := p.server.addrManager.AddressCache()
|
|
|
|
// Push the addresses.
|
|
err := p.pushAddrMsg(addrCache)
|
|
if err != nil {
|
|
p.logError("Can't push address message to %s: %v", p, err)
|
|
p.Disconnect()
|
|
return
|
|
}
|
|
}
|
|
|
|
// pushAddrMsg sends one, or more, addr message(s) to the connected peer using
|
|
// the provided addresses.
|
|
func (p *peer) pushAddrMsg(addresses []*btcwire.NetAddress) error {
|
|
// Nothing to send.
|
|
if len(addresses) == 0 {
|
|
return nil
|
|
}
|
|
|
|
numAdded := 0
|
|
msg := btcwire.NewMsgAddr()
|
|
for _, na := range addresses {
|
|
// Filter addresses the peer already knows about.
|
|
if p.knownAddresses[NetAddressKey(na)] {
|
|
continue
|
|
}
|
|
|
|
// Add the address to the message.
|
|
err := msg.AddAddress(na)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
numAdded++
|
|
|
|
// Split into multiple messages as needed.
|
|
if numAdded > 0 && numAdded%btcwire.MaxAddrPerMsg == 0 {
|
|
p.QueueMessage(msg, nil)
|
|
|
|
// NOTE: This needs to be a new address message and not
|
|
// simply call ClearAddresses since the message is a
|
|
// pointer and queueing it does not make a copy.
|
|
msg = btcwire.NewMsgAddr()
|
|
}
|
|
}
|
|
|
|
// Send message with remaining addresses if needed.
|
|
if numAdded%btcwire.MaxAddrPerMsg != 0 {
|
|
p.QueueMessage(msg, nil)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// handleAddrMsg is invoked when a peer receives an addr bitcoin message and
|
|
// is used to notify the server about advertised addresses.
|
|
func (p *peer) handleAddrMsg(msg *btcwire.MsgAddr) {
|
|
// Ignore addresses when running on the simulation test network. This
|
|
// helps prevent the network from becoming another public test network
|
|
// since it will not be able to learn about other peers that have not
|
|
// specifically been provided.
|
|
if cfg.SimNet {
|
|
return
|
|
}
|
|
|
|
// Ignore old style addresses which don't include a timestamp.
|
|
if p.ProtocolVersion() < btcwire.NetAddressTimeVersion {
|
|
return
|
|
}
|
|
|
|
// A message that has no addresses is invalid.
|
|
if len(msg.AddrList) == 0 {
|
|
p.logError("Command [%s] from %s does not contain any addresses",
|
|
msg.Command(), p)
|
|
p.Disconnect()
|
|
return
|
|
}
|
|
|
|
for _, na := range msg.AddrList {
|
|
// Don't add more address if we're disconnecting.
|
|
if atomic.LoadInt32(&p.disconnect) != 0 {
|
|
return
|
|
}
|
|
|
|
// Set the timestamp to 5 days ago if it's more than 24 hours
|
|
// in the future so this address is one of the first to be
|
|
// removed when space is needed.
|
|
now := time.Now()
|
|
if na.Timestamp.After(now.Add(time.Minute * 10)) {
|
|
na.Timestamp = now.Add(-1 * time.Hour * 24 * 5)
|
|
}
|
|
|
|
// Add address to known addresses for this peer.
|
|
p.knownAddresses[NetAddressKey(na)] = true
|
|
}
|
|
|
|
// Add addresses to server address manager. The address manager handles
|
|
// the details of things such as preventing duplicate addresses, max
|
|
// addresses, and last seen updates.
|
|
// XXX bitcoind gives a 2 hour time penalty here, do we want to do the
|
|
// same?
|
|
p.server.addrManager.AddAddresses(msg.AddrList, p.na)
|
|
}
|
|
|
|
// handlePingMsg is invoked when a peer receives a ping bitcoin message. For
|
|
// recent clients (protocol version > BIP0031Version), it replies with a pong
|
|
// message. For older clients, it does nothing and anything other than failure
|
|
// is considered a successful ping.
|
|
func (p *peer) handlePingMsg(msg *btcwire.MsgPing) {
|
|
// Only Reply with pong is message comes from a new enough client.
|
|
if p.ProtocolVersion() > btcwire.BIP0031Version {
|
|
// Include nonce from ping so pong can be identified.
|
|
p.QueueMessage(btcwire.NewMsgPong(msg.Nonce), nil)
|
|
}
|
|
}
|
|
|
|
// handlePongMsg is invoked when a peer received a pong bitcoin message.
|
|
// recent clients (protocol version > BIP0031Version), and if we had send a ping
|
|
// previosuly we update our ping time statistics. If the client is too old or
|
|
// we had not send a ping we ignore it.
|
|
func (p *peer) handlePongMsg(msg *btcwire.MsgPong) {
|
|
p.StatsMtx.Lock()
|
|
defer p.StatsMtx.Unlock()
|
|
|
|
// Arguably we could use a buffered channel here sending data
|
|
// in a fifo manner whenever we send a ping, or a list keeping track of
|
|
// the times of each ping. For now we just make a best effort and
|
|
// only record stats if it was for the last ping sent. Any preceding
|
|
// and overlapping pings will be ignored. It is unlikely to occur
|
|
// without large usage of the ping rpc call since we ping
|
|
// infrequently enough that if they overlap we would have timed out
|
|
// the peer.
|
|
if p.protocolVersion > btcwire.BIP0031Version &&
|
|
p.lastPingNonce != 0 && msg.Nonce == p.lastPingNonce {
|
|
p.lastPingMicros = time.Now().Sub(p.lastPingTime).Nanoseconds()
|
|
p.lastPingMicros /= 1000 // convert to usec.
|
|
p.lastPingNonce = 0
|
|
}
|
|
}
|
|
|
|
// readMessage reads the next bitcoin message from the peer with logging.
|
|
func (p *peer) readMessage() (btcwire.Message, []byte, error) {
|
|
n, msg, buf, err := btcwire.ReadMessageN(p.conn, p.ProtocolVersion(),
|
|
p.btcnet)
|
|
p.StatsMtx.Lock()
|
|
p.bytesReceived += uint64(n)
|
|
p.StatsMtx.Unlock()
|
|
p.server.AddBytesReceived(uint64(n))
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// Use closures to log expensive operations so they are only run when
|
|
// the logging level requires it.
|
|
peerLog.Debugf("%v", newLogClosure(func() string {
|
|
// Debug summary of message.
|
|
summary := messageSummary(msg)
|
|
if len(summary) > 0 {
|
|
summary = " (" + summary + ")"
|
|
}
|
|
return fmt.Sprintf("Received %v%s from %s",
|
|
msg.Command(), summary, p)
|
|
}))
|
|
peerLog.Tracef("%v", newLogClosure(func() string {
|
|
return spew.Sdump(msg)
|
|
}))
|
|
peerLog.Tracef("%v", newLogClosure(func() string {
|
|
return spew.Sdump(buf)
|
|
}))
|
|
|
|
return msg, buf, nil
|
|
}
|
|
|
|
// writeMessage sends a bitcoin Message to the peer with logging.
|
|
func (p *peer) writeMessage(msg btcwire.Message) {
|
|
// Don't do anything if we're disconnecting.
|
|
if atomic.LoadInt32(&p.disconnect) != 0 {
|
|
return
|
|
}
|
|
if !p.VersionKnown() {
|
|
switch msg.(type) {
|
|
case *btcwire.MsgVersion:
|
|
// This is OK.
|
|
default:
|
|
// We drop all messages other than version if we
|
|
// haven't done the handshake already.
|
|
return
|
|
}
|
|
}
|
|
|
|
// Use closures to log expensive operations so they are only run when
|
|
// the logging level requires it.
|
|
peerLog.Debugf("%v", newLogClosure(func() string {
|
|
// Debug summary of message.
|
|
summary := messageSummary(msg)
|
|
if len(summary) > 0 {
|
|
summary = " (" + summary + ")"
|
|
}
|
|
return fmt.Sprintf("Sending %v%s to %s", msg.Command(),
|
|
summary, p)
|
|
}))
|
|
peerLog.Tracef("%v", newLogClosure(func() string {
|
|
return spew.Sdump(msg)
|
|
}))
|
|
peerLog.Tracef("%v", newLogClosure(func() string {
|
|
var buf bytes.Buffer
|
|
err := btcwire.WriteMessage(&buf, msg, p.ProtocolVersion(),
|
|
p.btcnet)
|
|
if err != nil {
|
|
return err.Error()
|
|
}
|
|
return spew.Sdump(buf.Bytes())
|
|
}))
|
|
|
|
// Write the message to the peer.
|
|
n, err := btcwire.WriteMessageN(p.conn, msg, p.ProtocolVersion(),
|
|
p.btcnet)
|
|
p.StatsMtx.Lock()
|
|
p.bytesSent += uint64(n)
|
|
p.StatsMtx.Unlock()
|
|
p.server.AddBytesSent(uint64(n))
|
|
if err != nil {
|
|
p.Disconnect()
|
|
p.logError("Can't send message to %s: %v", p, err)
|
|
return
|
|
}
|
|
}
|
|
|
|
// isAllowedByRegression returns whether or not the passed error is allowed by
|
|
// regression tests without disconnecting the peer. In particular, regression
|
|
// tests need to be allowed to send malformed messages without the peer being
|
|
// disconnected.
|
|
func (p *peer) isAllowedByRegression(err error) bool {
|
|
// Don't allow the error if it's not specifically a malformed message
|
|
// error.
|
|
if _, ok := err.(*btcwire.MessageError); !ok {
|
|
return false
|
|
}
|
|
|
|
// Don't allow the error if it's not coming from localhost or the
|
|
// hostname can't be determined for some reason.
|
|
host, _, err := net.SplitHostPort(p.addr)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
|
|
if host != "127.0.0.1" && host != "localhost" {
|
|
return false
|
|
}
|
|
|
|
// Allowed if all checks passed.
|
|
return true
|
|
}
|
|
|
|
// inHandler handles all incoming messages for the peer. It must be run as a
|
|
// goroutine.
|
|
func (p *peer) inHandler() {
|
|
// Peers must complete the initial version negotiation within a shorter
|
|
// timeframe than a general idle timeout. The timer is then reset below
|
|
// to idleTimeoutMinutes for all future messages.
|
|
idleTimer := time.AfterFunc(negotiateTimeoutSeconds*time.Second, func() {
|
|
if p.VersionKnown() {
|
|
peerLog.Warnf("Peer %s no answer for %d minutes, "+
|
|
"disconnecting", p, idleTimeoutMinutes)
|
|
}
|
|
p.Disconnect()
|
|
})
|
|
out:
|
|
for atomic.LoadInt32(&p.disconnect) == 0 {
|
|
rmsg, buf, err := p.readMessage()
|
|
// Stop the timer now, if we go around again we will reset it.
|
|
idleTimer.Stop()
|
|
if err != nil {
|
|
// In order to allow regression tests with malformed
|
|
// messages, don't disconnect the peer when we're in
|
|
// regression test mode and the error is one of the
|
|
// allowed errors.
|
|
if cfg.RegressionTest && p.isAllowedByRegression(err) {
|
|
peerLog.Errorf("Allowed regression test "+
|
|
"error from %s: %v", p, err)
|
|
idleTimer.Reset(idleTimeoutMinutes * time.Minute)
|
|
continue
|
|
}
|
|
|
|
// Only log the error if we're not forcibly disconnecting.
|
|
if atomic.LoadInt32(&p.disconnect) == 0 {
|
|
p.logError("Can't read message from %s: %v",
|
|
p, err)
|
|
}
|
|
break out
|
|
}
|
|
p.StatsMtx.Lock()
|
|
p.lastRecv = time.Now()
|
|
p.StatsMtx.Unlock()
|
|
|
|
// Ensure version message comes first.
|
|
if _, ok := rmsg.(*btcwire.MsgVersion); !ok && !p.VersionKnown() {
|
|
p.logError("A version message must precede all others")
|
|
break out
|
|
}
|
|
|
|
// Handle each supported message type.
|
|
markConnected := false
|
|
switch msg := rmsg.(type) {
|
|
case *btcwire.MsgVersion:
|
|
p.handleVersionMsg(msg)
|
|
markConnected = true
|
|
|
|
case *btcwire.MsgVerAck:
|
|
// Do nothing.
|
|
|
|
case *btcwire.MsgGetAddr:
|
|
p.handleGetAddrMsg(msg)
|
|
|
|
case *btcwire.MsgAddr:
|
|
p.handleAddrMsg(msg)
|
|
markConnected = true
|
|
|
|
case *btcwire.MsgPing:
|
|
p.handlePingMsg(msg)
|
|
markConnected = true
|
|
|
|
case *btcwire.MsgPong:
|
|
p.handlePongMsg(msg)
|
|
|
|
case *btcwire.MsgAlert:
|
|
p.server.BroadcastMessage(msg, p)
|
|
|
|
case *btcwire.MsgMemPool:
|
|
p.handleMemPoolMsg(msg)
|
|
|
|
case *btcwire.MsgTx:
|
|
p.handleTxMsg(msg)
|
|
|
|
case *btcwire.MsgBlock:
|
|
p.handleBlockMsg(msg, buf)
|
|
|
|
case *btcwire.MsgInv:
|
|
p.handleInvMsg(msg)
|
|
markConnected = true
|
|
|
|
case *btcwire.MsgHeaders:
|
|
p.handleHeadersMsg(msg)
|
|
|
|
case *btcwire.MsgNotFound:
|
|
// TODO(davec): Ignore this for now, but ultimately
|
|
// it should probably be used to detect when something
|
|
// we requested needs to be re-requested from another
|
|
// peer.
|
|
|
|
case *btcwire.MsgGetData:
|
|
p.handleGetDataMsg(msg)
|
|
markConnected = true
|
|
|
|
case *btcwire.MsgGetBlocks:
|
|
p.handleGetBlocksMsg(msg)
|
|
|
|
case *btcwire.MsgGetHeaders:
|
|
p.handleGetHeadersMsg(msg)
|
|
|
|
default:
|
|
peerLog.Debugf("Received unhandled message of type %v: Fix Me",
|
|
rmsg.Command())
|
|
}
|
|
|
|
// Mark the address as currently connected and working as of
|
|
// now if one of the messages that trigger it was processed.
|
|
if markConnected && atomic.LoadInt32(&p.disconnect) == 0 {
|
|
if p.na == nil {
|
|
peerLog.Warnf("we're getting stuff before we " +
|
|
"got a version message. that's bad")
|
|
continue
|
|
}
|
|
p.server.addrManager.Connected(p.na)
|
|
}
|
|
// ok we got a message, reset the timer.
|
|
// timer just calls p.Disconnect() after logging.
|
|
idleTimer.Reset(idleTimeoutMinutes * time.Minute)
|
|
}
|
|
|
|
idleTimer.Stop()
|
|
|
|
// Ensure connection is closed and notify the server that the peer is
|
|
// done.
|
|
p.Disconnect()
|
|
p.server.donePeers <- p
|
|
|
|
// Only tell block manager we are gone if we ever told it we existed.
|
|
if p.VersionKnown() {
|
|
p.server.blockManager.DonePeer(p)
|
|
}
|
|
|
|
peerLog.Tracef("Peer input handler done for %s", p)
|
|
}
|
|
|
|
// queueHandler handles the queueing of outgoing data for the peer. This runs
|
|
// as a muxer for various sources of input so we can ensure that blockmanager
|
|
// and the server goroutine both will not block on us sending a message.
|
|
// We then pass the data on to outHandler to be actually written.
|
|
func (p *peer) queueHandler() {
|
|
pendingMsgs := list.New()
|
|
invSendQueue := list.New()
|
|
trickleTicker := time.NewTicker(time.Second * 10)
|
|
|
|
// We keep the waiting flag so that we know if we have a message queued
|
|
// to the outHandler or not. We could use the presence of a head of
|
|
// the list for this but then we have rather racy concerns about whether
|
|
// it has gotten it at cleanup time - and thus who sends on the
|
|
// message's done channel. To avoid such confusion we keep a different
|
|
// flag and pendingMsgs only contains messages that we have not yet
|
|
// passed to outHandler.
|
|
waiting := false
|
|
|
|
// To avoid duplication below.
|
|
queuePacket := func(msg outMsg, list *list.List, waiting bool) bool {
|
|
if !waiting {
|
|
peerLog.Tracef("%s: sending to outHandler", p)
|
|
p.sendQueue <- msg
|
|
peerLog.Tracef("%s: sent to outHandler", p)
|
|
} else {
|
|
list.PushBack(msg)
|
|
}
|
|
// we are always waiting now.
|
|
return true
|
|
}
|
|
out:
|
|
for {
|
|
select {
|
|
case msg := <-p.outputQueue:
|
|
waiting = queuePacket(msg, pendingMsgs, waiting)
|
|
|
|
// This channel is notified when a message has been sent across
|
|
// the network socket.
|
|
case <-p.sendDoneQueue:
|
|
peerLog.Tracef("%s: acked by outhandler", p)
|
|
|
|
// No longer waiting if there are no more messages
|
|
// in the pending messages queue.
|
|
next := pendingMsgs.Front()
|
|
if next == nil {
|
|
waiting = false
|
|
continue
|
|
}
|
|
|
|
// Notify the outHandler about the next item to
|
|
// asynchronously send.
|
|
val := pendingMsgs.Remove(next)
|
|
peerLog.Tracef("%s: sending to outHandler", p)
|
|
p.sendQueue <- val.(outMsg)
|
|
peerLog.Tracef("%s: sent to outHandler", p)
|
|
|
|
case iv := <-p.outputInvChan:
|
|
// No handshake? They'll find out soon enough.
|
|
if p.VersionKnown() {
|
|
invSendQueue.PushBack(iv)
|
|
}
|
|
|
|
case <-trickleTicker.C:
|
|
// Don't send anything if we're disconnecting or there
|
|
// is no queued inventory.
|
|
// version is known if send queue has any entries.
|
|
if atomic.LoadInt32(&p.disconnect) != 0 ||
|
|
invSendQueue.Len() == 0 {
|
|
continue
|
|
}
|
|
|
|
// Create and send as many inv messages as needed to
|
|
// drain the inventory send queue.
|
|
invMsg := btcwire.NewMsgInv()
|
|
for e := invSendQueue.Front(); e != nil; e = invSendQueue.Front() {
|
|
iv := invSendQueue.Remove(e).(*btcwire.InvVect)
|
|
|
|
// Don't send inventory that became known after
|
|
// the initial check.
|
|
if p.isKnownInventory(iv) {
|
|
continue
|
|
}
|
|
|
|
invMsg.AddInvVect(iv)
|
|
if len(invMsg.InvList) >= maxInvTrickleSize {
|
|
waiting = queuePacket(
|
|
outMsg{msg: invMsg},
|
|
pendingMsgs, waiting)
|
|
invMsg = btcwire.NewMsgInv()
|
|
}
|
|
|
|
// Add the inventory that is being relayed to
|
|
// the known inventory for the peer.
|
|
p.AddKnownInventory(iv)
|
|
}
|
|
if len(invMsg.InvList) > 0 {
|
|
waiting = queuePacket(outMsg{msg: invMsg},
|
|
pendingMsgs, waiting)
|
|
}
|
|
|
|
case <-p.quit:
|
|
break out
|
|
}
|
|
}
|
|
|
|
// Drain any wait channels before we go away so we don't leave something
|
|
// waiting for us.
|
|
for e := pendingMsgs.Front(); e != nil; e = pendingMsgs.Front() {
|
|
val := pendingMsgs.Remove(e)
|
|
msg := val.(outMsg)
|
|
if msg.doneChan != nil {
|
|
msg.doneChan <- false
|
|
}
|
|
}
|
|
cleanup:
|
|
for {
|
|
select {
|
|
case msg := <-p.outputQueue:
|
|
if msg.doneChan != nil {
|
|
msg.doneChan <- false
|
|
}
|
|
case <-p.outputInvChan:
|
|
// Just drain channel
|
|
// sendDoneQueue is buffered so doesn't need draining.
|
|
default:
|
|
break cleanup
|
|
}
|
|
}
|
|
p.queueWg.Done()
|
|
peerLog.Tracef("Peer queue handler done for %s", p)
|
|
}
|
|
|
|
// outHandler handles all outgoing messages for the peer. It must be run as a
|
|
// goroutine. It uses a buffered channel to serialize output messages while
|
|
// allowing the sender to continue running asynchronously.
|
|
func (p *peer) outHandler() {
|
|
pingTimer := time.AfterFunc(pingTimeoutMinutes*time.Minute, func() {
|
|
nonce, err := btcwire.RandomUint64()
|
|
if err != nil {
|
|
peerLog.Errorf("Not sending ping on timeout to %s: %v",
|
|
p, err)
|
|
return
|
|
}
|
|
p.QueueMessage(btcwire.NewMsgPing(nonce), nil)
|
|
})
|
|
out:
|
|
for {
|
|
select {
|
|
case msg := <-p.sendQueue:
|
|
// If the message is one we should get a reply for
|
|
// then reset the timer, we only want to send pings
|
|
// when otherwise we would not receive a reply from
|
|
// the peer. We specifically do not count block or inv
|
|
// messages here since they are not sure of a reply if
|
|
// the inv is of no interest explicitly solicited invs
|
|
// should elicit a reply but we don't track them
|
|
// specially.
|
|
peerLog.Tracef("%s: received from queuehandler", p)
|
|
reset := true
|
|
switch m := msg.msg.(type) {
|
|
case *btcwire.MsgVersion:
|
|
// should get an ack
|
|
case *btcwire.MsgGetAddr:
|
|
// should get addresses
|
|
case *btcwire.MsgPing:
|
|
// expects pong
|
|
// Also set up statistics.
|
|
p.StatsMtx.Lock()
|
|
if p.protocolVersion > btcwire.BIP0031Version {
|
|
p.lastPingNonce = m.Nonce
|
|
p.lastPingTime = time.Now()
|
|
}
|
|
p.StatsMtx.Unlock()
|
|
case *btcwire.MsgMemPool:
|
|
// Should return an inv.
|
|
case *btcwire.MsgGetData:
|
|
// Should get us block, tx, or not found.
|
|
case *btcwire.MsgGetHeaders:
|
|
// Should get us headers back.
|
|
|
|
default:
|
|
// Not one of the above, no sure reply.
|
|
// We want to ping if nothing else
|
|
// interesting happens.
|
|
reset = false
|
|
}
|
|
if reset {
|
|
pingTimer.Reset(pingTimeoutMinutes * time.Minute)
|
|
}
|
|
p.writeMessage(msg.msg)
|
|
p.StatsMtx.Lock()
|
|
p.lastSend = time.Now()
|
|
p.StatsMtx.Unlock()
|
|
if msg.doneChan != nil {
|
|
msg.doneChan <- true
|
|
}
|
|
peerLog.Tracef("%s: acking queuehandler", p)
|
|
p.sendDoneQueue <- true
|
|
peerLog.Tracef("%s: acked queuehandler", p)
|
|
|
|
case <-p.quit:
|
|
break out
|
|
}
|
|
}
|
|
|
|
pingTimer.Stop()
|
|
|
|
p.queueWg.Wait()
|
|
|
|
// Drain any wait channels before we go away so we don't leave something
|
|
// waiting for us. We have waited on queueWg and thus we can be sure
|
|
// that we will not miss anything sent on sendQueue.
|
|
cleanup:
|
|
for {
|
|
select {
|
|
case msg := <-p.sendQueue:
|
|
if msg.doneChan != nil {
|
|
msg.doneChan <- false
|
|
}
|
|
// no need to send on sendDoneQueue since queueHandler
|
|
// has been waited on and already exited.
|
|
default:
|
|
break cleanup
|
|
}
|
|
}
|
|
peerLog.Tracef("Peer output handler done for %s", p)
|
|
}
|
|
|
|
// QueueMessage adds the passed bitcoin message to the peer send queue. It
|
|
// uses a buffered channel to communicate with the output handler goroutine so
|
|
// it is automatically rate limited and safe for concurrent access.
|
|
func (p *peer) QueueMessage(msg btcwire.Message, doneChan chan bool) {
|
|
// Avoid risk of deadlock if goroutine already exited. The goroutine
|
|
// we will be sending to hangs around until it knows for a fact that
|
|
// it is marked as disconnected. *then* it drains the channels.
|
|
if !p.Connected() {
|
|
// avoid deadlock...
|
|
if doneChan != nil {
|
|
go func() {
|
|
doneChan <- false
|
|
}()
|
|
}
|
|
return
|
|
}
|
|
p.outputQueue <- outMsg{msg: msg, doneChan: doneChan}
|
|
}
|
|
|
|
// QueueInventory adds the passed inventory to the inventory send queue which
|
|
// might not be sent right away, rather it is trickled to the peer in batches.
|
|
// Inventory that the peer is already known to have is ignored. It is safe for
|
|
// concurrent access.
|
|
func (p *peer) QueueInventory(invVect *btcwire.InvVect) {
|
|
// Don't add the inventory to the send queue if the peer is
|
|
// already known to have it.
|
|
if p.isKnownInventory(invVect) {
|
|
return
|
|
}
|
|
|
|
// Avoid risk of deadlock if goroutine already exited. The goroutine
|
|
// we will be sending to hangs around until it knows for a fact that
|
|
// it is marked as disconnected. *then* it drains the channels.
|
|
if !p.Connected() {
|
|
return
|
|
}
|
|
|
|
p.outputInvChan <- invVect
|
|
}
|
|
|
|
// Connected returns whether or not the peer is currently connected.
|
|
func (p *peer) Connected() bool {
|
|
return atomic.LoadInt32(&p.connected) != 0 &&
|
|
atomic.LoadInt32(&p.disconnect) == 0
|
|
}
|
|
|
|
// Disconnect disconnects the peer by closing the connection. It also sets
|
|
// a flag so the impending shutdown can be detected.
|
|
func (p *peer) Disconnect() {
|
|
// did we win the race?
|
|
if atomic.AddInt32(&p.disconnect, 1) != 1 {
|
|
return
|
|
}
|
|
peerLog.Tracef("disconnecting %s", p)
|
|
close(p.quit)
|
|
if atomic.LoadInt32(&p.connected) != 0 {
|
|
p.conn.Close()
|
|
}
|
|
}
|
|
|
|
// Start begins processing input and output messages. It also sends the initial
|
|
// version message for outbound connections to start the negotiation process.
|
|
func (p *peer) Start() error {
|
|
// Already started?
|
|
if atomic.AddInt32(&p.started, 1) != 1 {
|
|
return nil
|
|
}
|
|
|
|
peerLog.Tracef("Starting peer %s", p)
|
|
|
|
// Send an initial version message if this is an outbound connection.
|
|
if !p.inbound {
|
|
err := p.pushVersionMsg()
|
|
if err != nil {
|
|
p.logError("Can't send outbound version message %v", err)
|
|
p.Disconnect()
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Start processing input and output.
|
|
go p.inHandler()
|
|
// queueWg is kept so that outHandler knows when the queue has exited so
|
|
// it can drain correctly.
|
|
p.queueWg.Add(1)
|
|
go p.queueHandler()
|
|
go p.outHandler()
|
|
|
|
return nil
|
|
}
|
|
|
|
// Shutdown gracefully shuts down the peer by disconnecting it.
|
|
func (p *peer) Shutdown() {
|
|
peerLog.Tracef("Shutdown peer %s", p)
|
|
p.Disconnect()
|
|
}
|
|
|
|
// newPeerBase returns a new base bitcoin peer for the provided server and
|
|
// inbound flag. This is used by the newInboundPeer and newOutboundPeer
|
|
// functions to perform base setup needed by both types of peers.
|
|
func newPeerBase(s *server, inbound bool) *peer {
|
|
p := peer{
|
|
server: s,
|
|
protocolVersion: maxProtocolVersion,
|
|
btcnet: s.netParams.Net,
|
|
services: btcwire.SFNodeNetwork,
|
|
inbound: inbound,
|
|
knownAddresses: make(map[string]bool),
|
|
knownInventory: NewMruInventoryMap(maxKnownInventory),
|
|
requestedTxns: make(map[btcwire.ShaHash]bool),
|
|
requestedBlocks: make(map[btcwire.ShaHash]bool),
|
|
requestQueue: list.New(),
|
|
outputQueue: make(chan outMsg, outputBufferSize),
|
|
sendQueue: make(chan outMsg, 1), // nonblocking sync
|
|
sendDoneQueue: make(chan bool, 1), // nonblocking sync
|
|
outputInvChan: make(chan *btcwire.InvVect, outputBufferSize),
|
|
txProcessed: make(chan bool, 1),
|
|
blockProcessed: make(chan bool, 1),
|
|
quit: make(chan bool),
|
|
}
|
|
return &p
|
|
}
|
|
|
|
// newInboundPeer returns a new inbound bitcoin peer for the provided server and
|
|
// connection. Use Start to begin processing incoming and outgoing messages.
|
|
func newInboundPeer(s *server, conn net.Conn) *peer {
|
|
p := newPeerBase(s, true)
|
|
p.conn = conn
|
|
p.addr = conn.RemoteAddr().String()
|
|
p.timeConnected = time.Now()
|
|
atomic.AddInt32(&p.connected, 1)
|
|
return p
|
|
}
|
|
|
|
// newOutbountPeer returns a new outbound bitcoin peer for the provided server and
|
|
// address and connects to it asynchronously. If the connection is successful
|
|
// then the peer will also be started.
|
|
func newOutboundPeer(s *server, addr string, persistent bool) *peer {
|
|
p := newPeerBase(s, false)
|
|
p.addr = addr
|
|
p.persistent = persistent
|
|
|
|
// Setup p.na with a temporary address that we are connecting to with
|
|
// faked up service flags. We will replace this with the real one after
|
|
// version negotiation is successful. The only failure case here would
|
|
// be if the string was incomplete for connection so can't be split
|
|
// into address and port, and thus this would be invalid anyway. In
|
|
// which case we return nil to be handled by the caller. This must be
|
|
// done before we fork off the goroutine because as soon as this
|
|
// function returns the peer must have a valid netaddress.
|
|
host, portStr, err := net.SplitHostPort(addr)
|
|
if err != nil {
|
|
p.logError("Tried to create a new outbound peer with invalid "+
|
|
"address %s: %v", addr, err)
|
|
return nil
|
|
}
|
|
|
|
port, err := strconv.ParseUint(portStr, 10, 16)
|
|
if err != nil {
|
|
p.logError("Tried to create a new outbound peer with invalid "+
|
|
"port %s: %v", portStr, err)
|
|
return nil
|
|
}
|
|
|
|
p.na, err = hostToNetAddress(host, uint16(port), 0)
|
|
if err != nil {
|
|
p.logError("Can not turn host %s into netaddress: %v",
|
|
host, err)
|
|
return nil
|
|
}
|
|
|
|
go func() {
|
|
// Attempt to connect to the peer. If the connection fails and
|
|
// this is a persistent connection, retry after the retry
|
|
// interval.
|
|
for atomic.LoadInt32(&p.disconnect) == 0 {
|
|
srvrLog.Debugf("Attempting to connect to %s", addr)
|
|
conn, err := btcdDial("tcp", addr)
|
|
if err != nil {
|
|
p.retryCount++
|
|
srvrLog.Debugf("Failed to connect to %s: %v",
|
|
addr, err)
|
|
if !persistent {
|
|
p.server.donePeers <- p
|
|
return
|
|
}
|
|
scaledInterval := connectionRetryInterval.Nanoseconds() * p.retryCount / 2
|
|
scaledDuration := time.Duration(scaledInterval)
|
|
srvrLog.Debugf("Retrying connection to %s in "+
|
|
"%s", addr, scaledDuration)
|
|
time.Sleep(scaledDuration)
|
|
continue
|
|
}
|
|
|
|
// While we were sleeping trying to connect, the server
|
|
// may have scheduled a shutdown. In that case ditch
|
|
// the peer immediately.
|
|
if atomic.LoadInt32(&p.disconnect) == 0 {
|
|
p.timeConnected = time.Now()
|
|
p.server.addrManager.Attempt(p.na)
|
|
|
|
// Connection was successful so log it and start peer.
|
|
srvrLog.Debugf("Connected to %s",
|
|
conn.RemoteAddr())
|
|
p.conn = conn
|
|
atomic.AddInt32(&p.connected, 1)
|
|
p.retryCount = 0
|
|
p.Start()
|
|
}
|
|
|
|
return
|
|
}
|
|
}()
|
|
return p
|
|
}
|
|
|
|
// logError makes sure that we only log errors loudly on user peers.
|
|
func (p *peer) logError(fmt string, args ...interface{}) {
|
|
if p.persistent {
|
|
peerLog.Errorf(fmt, args...)
|
|
} else {
|
|
peerLog.Debugf(fmt, args...)
|
|
}
|
|
}
|