diff --git a/btcd.go b/btcd.go index 7839b629..8d4b8802 100644 --- a/btcd.go +++ b/btcd.go @@ -20,6 +20,13 @@ import ( "github.com/btcsuite/btcd/limits" ) +const ( + // blockDbNamePrefix is the prefix for the block database name. The + // database type is appended to this value to form the full block + // database name. + blockDbNamePrefix = "blocks" +) + var ( cfg *config ) diff --git a/netsync/README.md b/netsync/README.md index 4734d331..d5e14115 100644 --- a/netsync/README.md +++ b/netsync/README.md @@ -7,12 +7,12 @@ netsync ## Overview -This package implements a concurrency safe block syncing protocol. The provided -implementation of SyncManager communicates with connected peers to perform an -initial block download, keep the chain in sync, and announce new blocks -connected to the chain. Currently the sync manager selects a single sync peer -that it downloads all blocks from until it is up to date with the longest chain -the sync peer is aware of. +This package implements a concurrency safe block syncing protocol. The +SyncManager communicates with connected peers to perform an initial block +download, keep the chain and unconfirmed transaction pool in sync, and announce +new blocks connected to the chain. Currently the sync manager selects a single +sync peer that it downloads all blocks from until it is up to date with the +longest chain the sync peer is aware of. ## Installation and Updating diff --git a/netsync/doc.go b/netsync/doc.go index 8fd29aad..7f45286d 100644 --- a/netsync/doc.go +++ b/netsync/doc.go @@ -3,11 +3,11 @@ // license that can be found in the LICENSE file. /* -The netsync package implements a concurrency safe block syncing protocol. The -provided implementation of SyncManager communicates with connected peers to -perform an initial block download, keep the chain and mempool in sync, and -announce new blocks connected to the chain. Currently the block manager selects -a single sync peer that it downloads all blocks from until it is up to date with -the longest chain the sync peer is aware of. +Package netsync implements a concurrency safe block syncing protocol. The +SyncManager communicates with connected peers to perform an initial block +download, keep the chain and unconfirmed transaction pool in sync, and announce +new blocks connected to the chain. Currently the sync manager selects a single +sync peer that it downloads all blocks from until it is up to date with the +longest chain the sync peer is aware of. */ package netsync diff --git a/netsync/interface.go b/netsync/interface.go index 69cd2421..61a230ac 100644 --- a/netsync/interface.go +++ b/netsync/interface.go @@ -37,58 +37,3 @@ type Config struct { DisableCheckpoints bool MaxPeers int } - -// SyncManager is the interface used to communicate block related messages with -// peers. The SyncManager is started as by executing Start() in a goroutine. -// Once started, it selects peers to sync from and starts the initial block -// download. Once the chain is in sync, the SyncManager handles incoming block -// and header notifications and relays announcements of new blocks to peers. -type SyncManager interface { - // NewPeer informs the SyncManager of a newly active peer. - NewPeer(p *peer.Peer) - - // QueueTx adds the passed transaction message and peer to the block - // handling queue. - QueueTx(tx *btcutil.Tx, p *peer.Peer, done chan struct{}) - - // QueueBlock adds the passed block message and peer to the block handling - // queue. - QueueBlock(block *btcutil.Block, p *peer.Peer, done chan struct{}) - - // QueueInv adds the passed inv message and peer to the block handling - // queue. - QueueInv(inv *wire.MsgInv, p *peer.Peer) - - // QueueHeaders adds the passed headers message and peer to the block - // handling queue. - QueueHeaders(headers *wire.MsgHeaders, p *peer.Peer) - - // DonePeer informs the SyncManager that a peer has disconnected. - DonePeer(p *peer.Peer) - - // Start begins the core block handler which processes block and inv - // messages. - Start() - - // Stop gracefully shuts down the SyncManager by stopping all asynchronous - // handlers and waiting for them to finish. - Stop() error - - // SyncPeerID returns the ID of the current sync peer, or 0 if there is - // none. - SyncPeerID() int32 - - // ProcessBlock makes use of ProcessBlock on an internal instance of a block - // chain. - ProcessBlock(block *btcutil.Block, flags blockchain.BehaviorFlags) (bool, error) - - // IsCurrent returns whether or not the SyncManager believes it is synced - // with the connected peers. - IsCurrent() bool - - // Pause pauses the SyncManager until the returned channel is closed. - // - // Note that while paused, all peer and block processing is halted. The - // message sender should avoid pausing the SyncManager for long durations. - Pause() chan<- struct{} -} diff --git a/netsync/manager.go b/netsync/manager.go index cb46ffab..7f1d3605 100644 --- a/netsync/manager.go +++ b/netsync/manager.go @@ -27,11 +27,6 @@ const ( // more. minInFlightBlocks = 10 - // blockDbNamePrefix is the prefix for the block database name. The - // database type is appended to this value to form the full block - // database name. - blockDbNamePrefix = "blocks" - // maxRejectedTxns is the maximum number of rejected transactions // hashes to store in memory. maxRejectedTxns = 1000 @@ -113,14 +108,14 @@ type processBlockMsg struct { } // isCurrentMsg is a message type to be sent across the message channel for -// requesting whether or not the block manager believes it is synced with -// the currently connected peers. +// requesting whether or not the sync manager believes it is synced with the +// currently connected peers. type isCurrentMsg struct { reply chan bool } // pauseMsg is a message type to be sent across the message channel for -// pausing the block manager. This effectively provides the caller with +// pausing the sync manager. This effectively provides the caller with // exclusive access over the manager until a receive is performed on the // unpause channel. type pauseMsg struct { @@ -134,7 +129,7 @@ type headerNode struct { hash *chainhash.Hash } -// peerSyncState stores additional information that the blockManager tracks +// peerSyncState stores additional information that the SyncManager tracks // about a peer. type peerSyncState struct { syncCandidate bool @@ -143,9 +138,12 @@ type peerSyncState struct { requestedBlocks map[chainhash.Hash]struct{} } -// blockManager provides a concurrency safe block manager for handling all -// incoming blocks. -type blockManager struct { +// SyncManager is used to communicate block related messages with peers. The +// SyncManager is started as by executing Start() in a goroutine. Once started, +// it selects peers to sync from and starts the initial block download. Once the +// chain is in sync, the SyncManager handles incoming block and header +// notifications and relays announcements of new blocks to peers. +type SyncManager struct { peerNotifier PeerNotifier started int32 shutdown int32 @@ -173,17 +171,17 @@ type blockManager struct { // resetHeaderState sets the headers-first mode state to values appropriate for // syncing from a new peer. -func (b *blockManager) resetHeaderState(newestHash *chainhash.Hash, newestHeight int32) { - b.headersFirstMode = false - b.headerList.Init() - b.startHeader = nil +func (sm *SyncManager) resetHeaderState(newestHash *chainhash.Hash, newestHeight int32) { + sm.headersFirstMode = false + sm.headerList.Init() + sm.startHeader = nil // When there is a next checkpoint, add an entry for the latest known // block into the header pool. This allows the next downloaded header // to prove it links to the chain properly. - if b.nextCheckpoint != nil { + if sm.nextCheckpoint != nil { node := headerNode{height: newestHeight, hash: newestHash} - b.headerList.PushBack(&node) + sm.headerList.PushBack(&node) } } @@ -191,8 +189,8 @@ func (b *blockManager) resetHeaderState(newestHash *chainhash.Hash, newestHeight // It returns nil when there is not one either because the height is already // later than the final checkpoint or some other reason such as disabled // checkpoints. -func (b *blockManager) findNextHeaderCheckpoint(height int32) *chaincfg.Checkpoint { - checkpoints := b.chain.Checkpoints() +func (sm *SyncManager) findNextHeaderCheckpoint(height int32) *chaincfg.Checkpoint { + checkpoints := sm.chain.Checkpoints() if len(checkpoints) == 0 { return nil } @@ -219,24 +217,24 @@ func (b *blockManager) findNextHeaderCheckpoint(height int32) *chaincfg.Checkpoi // download/sync the blockchain from. When syncing is already running, it // simply returns. It also examines the candidates for any which are no longer // candidates and removes them as needed. -func (b *blockManager) startSync() { +func (sm *SyncManager) startSync() { // Return now if we're already syncing. - if b.syncPeer != nil { + if sm.syncPeer != nil { return } // Once the segwit soft-fork package has activated, we only // want to sync from peers which are witness enabled to ensure // that we fully validate all blockchain data. - segwitActive, err := b.chain.IsDeploymentActive(chaincfg.DeploymentSegwit) + segwitActive, err := sm.chain.IsDeploymentActive(chaincfg.DeploymentSegwit) if err != nil { log.Errorf("Unable to query for segwit soft-fork state: %v", err) return } - best := b.chain.BestSnapshot() + best := sm.chain.BestSnapshot() var bestPeer *peerpkg.Peer - for peer, state := range b.peerStates { + for peer, state := range sm.peerStates { if !state.syncCandidate { continue } @@ -267,9 +265,9 @@ func (b *blockManager) startSync() { // Clear the requestedBlocks if the sync peer changes, otherwise // we may ignore blocks we need that the last sync peer failed // to send. - b.requestedBlocks = make(map[chainhash.Hash]struct{}) + sm.requestedBlocks = make(map[chainhash.Hash]struct{}) - locator, err := b.chain.LatestBlockLocator() + locator, err := sm.chain.LatestBlockLocator() if err != nil { log.Errorf("Failed to get block locator for the "+ "latest block: %v", err) @@ -296,19 +294,19 @@ func (b *blockManager) startSync() { // and fully validate them. Finally, regression test mode does // not support the headers-first approach so do normal block // downloads when in regression test mode. - if b.nextCheckpoint != nil && - best.Height < b.nextCheckpoint.Height && - b.chainParams != &chaincfg.RegressionNetParams { + if sm.nextCheckpoint != nil && + best.Height < sm.nextCheckpoint.Height && + sm.chainParams != &chaincfg.RegressionNetParams { - bestPeer.PushGetHeadersMsg(locator, b.nextCheckpoint.Hash) - b.headersFirstMode = true + bestPeer.PushGetHeadersMsg(locator, sm.nextCheckpoint.Hash) + sm.headersFirstMode = true log.Infof("Downloading headers for blocks %d to "+ "%d from peer %s", best.Height+1, - b.nextCheckpoint.Height, bestPeer.Addr()) + sm.nextCheckpoint.Height, bestPeer.Addr()) } else { bestPeer.PushGetBlocksMsg(locator, &zeroHash) } - b.syncPeer = bestPeer + sm.syncPeer = bestPeer } else { log.Warnf("No sync peer candidates available") } @@ -316,11 +314,11 @@ func (b *blockManager) startSync() { // isSyncCandidate returns whether or not the peer is a candidate to consider // syncing from. -func (b *blockManager) isSyncCandidate(peer *peerpkg.Peer) bool { +func (sm *SyncManager) isSyncCandidate(peer *peerpkg.Peer) bool { // Typically a peer is not a candidate for sync if it's not a full node, // however regression test is special in that the regression tool is // not a full node and still needs to be considered a sync candidate. - if b.chainParams == &chaincfg.RegressionNetParams { + if sm.chainParams == &chaincfg.RegressionNetParams { // The peer is not a candidate if it's not coming from localhost // or the hostname can't be determined for some reason. host, _, err := net.SplitHostPort(peer.Addr()) @@ -335,7 +333,7 @@ func (b *blockManager) isSyncCandidate(peer *peerpkg.Peer) bool { // The peer is not a candidate for sync if it's not a full // node. Additionally, if the segwit soft-fork package has // activated, then the peer must also be upgraded. - segwitActive, err := b.chain.IsDeploymentActive(chaincfg.DeploymentSegwit) + segwitActive, err := sm.chain.IsDeploymentActive(chaincfg.DeploymentSegwit) if err != nil { log.Errorf("Unable to query for segwit "+ "soft-fork state: %v", err) @@ -354,25 +352,25 @@ func (b *blockManager) isSyncCandidate(peer *peerpkg.Peer) bool { // handleNewPeerMsg deals with new peers that have signalled they may // be considered as a sync peer (they have already successfully negotiated). It // also starts syncing if needed. It is invoked from the syncHandler goroutine. -func (b *blockManager) handleNewPeerMsg(peer *peerpkg.Peer) { +func (sm *SyncManager) handleNewPeerMsg(peer *peerpkg.Peer) { // Ignore if in the process of shutting down. - if atomic.LoadInt32(&b.shutdown) != 0 { + if atomic.LoadInt32(&sm.shutdown) != 0 { return } log.Infof("New valid peer %s (%s)", peer, peer.UserAgent()) // Initialize the peer state - isSyncCandidate := b.isSyncCandidate(peer) - b.peerStates[peer] = &peerSyncState{ + isSyncCandidate := sm.isSyncCandidate(peer) + sm.peerStates[peer] = &peerSyncState{ syncCandidate: isSyncCandidate, requestedTxns: make(map[chainhash.Hash]struct{}), requestedBlocks: make(map[chainhash.Hash]struct{}), } // Start syncing by choosing the best candidate if needed. - if isSyncCandidate && b.syncPeer == nil { - b.startSync() + if isSyncCandidate && sm.syncPeer == nil { + sm.startSync() } } @@ -380,22 +378,22 @@ func (b *blockManager) handleNewPeerMsg(peer *peerpkg.Peer) { // removes the peer as a candidate for syncing and in the case where it was // the current sync peer, attempts to select a new best peer to sync from. It // is invoked from the syncHandler goroutine. -func (b *blockManager) handleDonePeerMsg(peer *peerpkg.Peer) { - state, exists := b.peerStates[peer] +func (sm *SyncManager) handleDonePeerMsg(peer *peerpkg.Peer) { + state, exists := sm.peerStates[peer] if !exists { log.Warnf("Received done peer message for unknown peer %s", peer) return } // Remove the peer from the list of candidate peers. - delete(b.peerStates, peer) + delete(sm.peerStates, peer) log.Infof("Lost peer %s", peer) // Remove requested transactions from the global map so that they will // be fetched from elsewhere next time we get an inv. for txHash := range state.requestedTxns { - delete(b.requestedTxns, txHash) + delete(sm.requestedTxns, txHash) } // Remove requested blocks from the global map so that they will be @@ -403,26 +401,26 @@ func (b *blockManager) handleDonePeerMsg(peer *peerpkg.Peer) { // TODO: we could possibly here check which peers have these blocks // and request them now to speed things up a little. for blockHash := range state.requestedBlocks { - delete(b.requestedBlocks, blockHash) + delete(sm.requestedBlocks, blockHash) } // Attempt to find a new peer to sync from if the quitting peer is the // sync peer. Also, reset the headers-first state if in headers-first // mode so - if b.syncPeer == peer { - b.syncPeer = nil - if b.headersFirstMode { - best := b.chain.BestSnapshot() - b.resetHeaderState(&best.Hash, best.Height) + if sm.syncPeer == peer { + sm.syncPeer = nil + if sm.headersFirstMode { + best := sm.chain.BestSnapshot() + sm.resetHeaderState(&best.Hash, best.Height) } - b.startSync() + sm.startSync() } } // handleTxMsg handles transaction messages from all peers. -func (b *blockManager) handleTxMsg(tmsg *txMsg) { +func (sm *SyncManager) handleTxMsg(tmsg *txMsg) { peer := tmsg.peer - state, exists := b.peerStates[peer] + state, exists := sm.peerStates[peer] if !exists { log.Warnf("Received tx message from unknown peer %s", peer) return @@ -441,7 +439,7 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) { // Ignore transactions that we have already rejected. Do not // send a reject message here because if the transaction was already // rejected, the transaction was unsolicited. - if _, exists = b.rejectedTxns[*txHash]; exists { + if _, exists = sm.rejectedTxns[*txHash]; exists { log.Debugf("Ignoring unsolicited previously rejected "+ "transaction %v from %s", txHash, peer) return @@ -449,7 +447,7 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) { // Process the transaction to include validation, insertion in the // memory pool, orphan handling, etc. - acceptedTxs, err := b.txMemPool.ProcessTransaction(tmsg.tx, + acceptedTxs, err := sm.txMemPool.ProcessTransaction(tmsg.tx, true, true, mempool.Tag(peer.ID())) // Remove transaction from request maps. Either the mempool/chain @@ -457,13 +455,13 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) { // instances of trying to fetch it, or we failed to insert and thus // we'll retry next time we get an inv. delete(state.requestedTxns, *txHash) - delete(b.requestedTxns, *txHash) + delete(sm.requestedTxns, *txHash) if err != nil { // Do not request this transaction again until a new block // has been processed. - b.rejectedTxns[*txHash] = struct{}{} - b.limitMap(b.rejectedTxns, maxRejectedTxns) + sm.rejectedTxns[*txHash] = struct{}{} + sm.limitMap(sm.rejectedTxns, maxRejectedTxns) // When the error is a rule error, it means the transaction was // simply rejected as opposed to something actually going wrong, @@ -484,34 +482,34 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) { return } - b.peerNotifier.AnnounceNewTransactions(acceptedTxs) + sm.peerNotifier.AnnounceNewTransactions(acceptedTxs) } // current returns true if we believe we are synced with our peers, false if we // still have blocks to check -func (b *blockManager) current() bool { - if !b.chain.IsCurrent() { +func (sm *SyncManager) current() bool { + if !sm.chain.IsCurrent() { return false } // if blockChain thinks we are current and we have no syncPeer it // is probably right. - if b.syncPeer == nil { + if sm.syncPeer == nil { return true } // No matter what chain thinks, if we are below the block we are syncing // to we are not current. - if b.chain.BestSnapshot().Height < b.syncPeer.LastBlock() { + if sm.chain.BestSnapshot().Height < sm.syncPeer.LastBlock() { return false } return true } // handleBlockMsg handles block messages from all peers. -func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { +func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) { peer := bmsg.peer - state, exists := b.peerStates[peer] + state, exists := sm.peerStates[peer] if !exists { log.Warnf("Received block message from unknown peer %s", peer) return @@ -525,7 +523,7 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { // the peer or ignore the block when we're in regression test // mode in this case so the chain code is actually fed the // duplicate blocks. - if b.chainParams != &chaincfg.RegressionNetParams { + if sm.chainParams != &chaincfg.RegressionNetParams { log.Warnf("Got unrequested block %v from %s -- "+ "disconnecting", blockHash, peer.Addr()) peer.Disconnect() @@ -542,16 +540,16 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { // properly. isCheckpointBlock := false behaviorFlags := blockchain.BFNone - if b.headersFirstMode { - firstNodeEl := b.headerList.Front() + if sm.headersFirstMode { + firstNodeEl := sm.headerList.Front() if firstNodeEl != nil { firstNode := firstNodeEl.Value.(*headerNode) if blockHash.IsEqual(firstNode.hash) { behaviorFlags |= blockchain.BFFastAdd - if firstNode.hash.IsEqual(b.nextCheckpoint.Hash) { + if firstNode.hash.IsEqual(sm.nextCheckpoint.Hash) { isCheckpointBlock = true } else { - b.headerList.Remove(firstNodeEl) + sm.headerList.Remove(firstNodeEl) } } } @@ -561,11 +559,11 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { // so we shouldn't have any more instances of trying to fetch it, or we // will fail the insert and thus we'll retry next time we get an inv. delete(state.requestedBlocks, *blockHash) - delete(b.requestedBlocks, *blockHash) + delete(sm.requestedBlocks, *blockHash) // Process the block to include validation, best chain selection, orphan // handling, etc. - _, isOrphan, err := b.chain.ProcessBlock(bmsg.block, behaviorFlags) + _, isOrphan, err := sm.chain.ProcessBlock(bmsg.block, behaviorFlags) if err != nil { // When the error is a rule error, it means the block was simply // rejected as opposed to something actually going wrong, so log @@ -624,8 +622,8 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { } } - orphanRoot := b.chain.GetOrphanRoot(blockHash) - locator, err := b.chain.LatestBlockLocator() + orphanRoot := sm.chain.GetOrphanRoot(blockHash) + locator, err := sm.chain.LatestBlockLocator() if err != nil { log.Warnf("Failed to get block locator for the "+ "latest block: %v", err) @@ -635,16 +633,16 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { } else { // When the block is not an orphan, log information about it and // update the chain state. - b.progressLogger.LogBlockHeight(bmsg.block) + sm.progressLogger.LogBlockHeight(bmsg.block) // Update this peer's latest block height, for future // potential sync node candidacy. - best := b.chain.BestSnapshot() + best := sm.chain.BestSnapshot() heightUpdate = best.Height blkHashUpdate = &best.Hash // Clear the rejected transactions. - b.rejectedTxns = make(map[chainhash.Hash]struct{}) + sm.rejectedTxns = make(map[chainhash.Hash]struct{}) } // Update the block height for this peer. But only send a message to @@ -653,14 +651,14 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { // if we're syncing the chain from scratch. if blkHashUpdate != nil && heightUpdate != 0 { peer.UpdateLastBlockHeight(heightUpdate) - if isOrphan || b.current() { - go b.peerNotifier.UpdatePeerHeights(blkHashUpdate, heightUpdate, + if isOrphan || sm.current() { + go sm.peerNotifier.UpdatePeerHeights(blkHashUpdate, heightUpdate, peer) } } // Nothing more to do if we aren't in headers-first mode. - if !b.headersFirstMode { + if !sm.headersFirstMode { return } @@ -668,9 +666,9 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { // request more blocks using the header list when the request queue is // getting short. if !isCheckpointBlock { - if b.startHeader != nil && + if sm.startHeader != nil && len(state.requestedBlocks) < minInFlightBlocks { - b.fetchHeaderBlocks() + sm.fetchHeaderBlocks() } return } @@ -679,28 +677,28 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { // there is a next checkpoint, get the next round of headers by asking // for headers starting from the block after this one up to the next // checkpoint. - prevHeight := b.nextCheckpoint.Height - prevHash := b.nextCheckpoint.Hash - b.nextCheckpoint = b.findNextHeaderCheckpoint(prevHeight) - if b.nextCheckpoint != nil { + prevHeight := sm.nextCheckpoint.Height + prevHash := sm.nextCheckpoint.Hash + sm.nextCheckpoint = sm.findNextHeaderCheckpoint(prevHeight) + if sm.nextCheckpoint != nil { locator := blockchain.BlockLocator([]*chainhash.Hash{prevHash}) - err := peer.PushGetHeadersMsg(locator, b.nextCheckpoint.Hash) + err := peer.PushGetHeadersMsg(locator, sm.nextCheckpoint.Hash) if err != nil { log.Warnf("Failed to send getheaders message to "+ "peer %s: %v", peer.Addr(), err) return } log.Infof("Downloading headers for blocks %d to %d from "+ - "peer %s", prevHeight+1, b.nextCheckpoint.Height, - b.syncPeer.Addr()) + "peer %s", prevHeight+1, sm.nextCheckpoint.Height, + sm.syncPeer.Addr()) return } // This is headers-first mode, the block is a checkpoint, and there are // no more checkpoints, so switch to normal mode by requesting blocks // from the block after this one up to the end of the chain (zero hash). - b.headersFirstMode = false - b.headerList.Init() + sm.headersFirstMode = false + sm.headerList.Init() log.Infof("Reached the final checkpoint -- switching to normal mode") locator := blockchain.BlockLocator([]*chainhash.Hash{blockHash}) err = peer.PushGetBlocksMsg(locator, &zeroHash) @@ -713,9 +711,9 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { // fetchHeaderBlocks creates and sends a request to the syncPeer for the next // list of blocks to be downloaded based on the current list of headers. -func (b *blockManager) fetchHeaderBlocks() { +func (sm *SyncManager) fetchHeaderBlocks() { // Nothing to do if there is no start header. - if b.startHeader == nil { + if sm.startHeader == nil { log.Warnf("fetchHeaderBlocks called with no start header") return } @@ -723,9 +721,9 @@ func (b *blockManager) fetchHeaderBlocks() { // Build up a getdata request for the list of blocks the headers // describe. The size hint will be limited to wire.MaxInvPerMsg by // the function, so no need to double check it here. - gdmsg := wire.NewMsgGetDataSizeHint(uint(b.headerList.Len())) + gdmsg := wire.NewMsgGetDataSizeHint(uint(sm.headerList.Len())) numRequested := 0 - for e := b.startHeader; e != nil; e = e.Next() { + for e := sm.startHeader; e != nil; e = e.Next() { node, ok := e.Value.(*headerNode) if !ok { log.Warn("Header list node type is not a headerNode") @@ -733,43 +731,43 @@ func (b *blockManager) fetchHeaderBlocks() { } iv := wire.NewInvVect(wire.InvTypeBlock, node.hash) - haveInv, err := b.haveInventory(iv) + haveInv, err := sm.haveInventory(iv) if err != nil { log.Warnf("Unexpected failure when checking for "+ "existing inventory during header block "+ "fetch: %v", err) } if !haveInv { - syncPeerState := b.peerStates[b.syncPeer] + syncPeerState := sm.peerStates[sm.syncPeer] - b.requestedBlocks[*node.hash] = struct{}{} + sm.requestedBlocks[*node.hash] = struct{}{} syncPeerState.requestedBlocks[*node.hash] = struct{}{} // If we're fetching from a witness enabled peer // post-fork, then ensure that we receive all the // witness data in the blocks. - if b.syncPeer.IsWitnessEnabled() { + if sm.syncPeer.IsWitnessEnabled() { iv.Type = wire.InvTypeWitnessBlock } gdmsg.AddInvVect(iv) numRequested++ } - b.startHeader = e.Next() + sm.startHeader = e.Next() if numRequested >= wire.MaxInvPerMsg { break } } if len(gdmsg.InvList) > 0 { - b.syncPeer.QueueMessage(gdmsg, nil) + sm.syncPeer.QueueMessage(gdmsg, nil) } } // handleHeadersMsg handles block header messages from all peers. Headers are // requested when performing a headers-first sync. -func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { +func (sm *SyncManager) handleHeadersMsg(hmsg *headersMsg) { peer := hmsg.peer - _, exists := b.peerStates[peer] + _, exists := sm.peerStates[peer] if !exists { log.Warnf("Received headers message from unknown peer %s", peer) return @@ -778,7 +776,7 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { // The remote peer is misbehaving if we didn't request headers. msg := hmsg.headers numHeaders := len(msg.Headers) - if !b.headersFirstMode { + if !sm.headersFirstMode { log.Warnf("Got %d unrequested headers from %s -- "+ "disconnecting", numHeaders, peer.Addr()) peer.Disconnect() @@ -799,7 +797,7 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { finalHash = &blockHash // Ensure there is a previous header to compare against. - prevNodeEl := b.headerList.Back() + prevNodeEl := sm.headerList.Back() if prevNodeEl == nil { log.Warnf("Header list does not contain a previous" + "element as expected -- disconnecting peer") @@ -813,9 +811,9 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { prevNode := prevNodeEl.Value.(*headerNode) if prevNode.hash.IsEqual(&blockHeader.PrevBlock) { node.height = prevNode.height + 1 - e := b.headerList.PushBack(&node) - if b.startHeader == nil { - b.startHeader = e + e := sm.headerList.PushBack(&node) + if sm.startHeader == nil { + sm.startHeader = e } } else { log.Warnf("Received block header that does not "+ @@ -826,8 +824,8 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { } // Verify the header at the next checkpoint height matches. - if node.height == b.nextCheckpoint.Height { - if node.hash.IsEqual(b.nextCheckpoint.Hash) { + if node.height == sm.nextCheckpoint.Height { + if node.hash.IsEqual(sm.nextCheckpoint.Hash) { receivedCheckpoint = true log.Infof("Verified downloaded block "+ "header against checkpoint at height "+ @@ -838,7 +836,7 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { "expected checkpoint hash of %s -- "+ "disconnecting", node.height, node.hash, peer.Addr(), - b.nextCheckpoint.Hash) + sm.nextCheckpoint.Hash) peer.Disconnect() return } @@ -853,11 +851,11 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { // that is already in the database and is only used to ensure // the next header links properly, it must be removed before // fetching the blocks. - b.headerList.Remove(b.headerList.Front()) + sm.headerList.Remove(sm.headerList.Front()) log.Infof("Received %v block headers: Fetching blocks", - b.headerList.Len()) - b.progressLogger.SetLastLogTime(time.Now()) - b.fetchHeaderBlocks() + sm.headerList.Len()) + sm.progressLogger.SetLastLogTime(time.Now()) + sm.fetchHeaderBlocks() return } @@ -865,7 +863,7 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { // headers starting from the latest known header and ending with the // next checkpoint. locator := blockchain.BlockLocator([]*chainhash.Hash{finalHash}) - err := peer.PushGetHeadersMsg(locator, b.nextCheckpoint.Hash) + err := peer.PushGetHeadersMsg(locator, sm.nextCheckpoint.Hash) if err != nil { log.Warnf("Failed to send getheaders message to "+ "peer %s: %v", peer.Addr(), err) @@ -878,27 +876,27 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { // inventory can be when it is in different states such as blocks that are part // of the main chain, on a side chain, in the orphan pool, and transactions that // are in the memory pool (either the main pool or orphan pool). -func (b *blockManager) haveInventory(invVect *wire.InvVect) (bool, error) { +func (sm *SyncManager) haveInventory(invVect *wire.InvVect) (bool, error) { switch invVect.Type { case wire.InvTypeWitnessBlock: fallthrough case wire.InvTypeBlock: // Ask chain if the block is known to it in any form (main // chain, side chain, or orphan). - return b.chain.HaveBlock(&invVect.Hash) + return sm.chain.HaveBlock(&invVect.Hash) case wire.InvTypeWitnessTx: fallthrough case wire.InvTypeTx: // Ask the transaction memory pool if the transaction is known // to it in any form (main pool or orphan). - if b.txMemPool.HaveTransaction(&invVect.Hash) { + if sm.txMemPool.HaveTransaction(&invVect.Hash) { return true, nil } // Check if the transaction exists from the point of view of the // end of the main chain. - entry, err := b.chain.FetchUtxoEntry(&invVect.Hash) + entry, err := sm.chain.FetchUtxoEntry(&invVect.Hash) if err != nil { return false, err } @@ -912,9 +910,9 @@ func (b *blockManager) haveInventory(invVect *wire.InvVect) (bool, error) { // handleInvMsg handles inv messages from all peers. // We examine the inventory advertised by the remote peer and act accordingly. -func (b *blockManager) handleInvMsg(imsg *invMsg) { +func (sm *SyncManager) handleInvMsg(imsg *invMsg) { peer := imsg.peer - state, exists := b.peerStates[peer] + state, exists := sm.peerStates[peer] if !exists { log.Warnf("Received inv message from unknown peer %s", peer) return @@ -936,20 +934,20 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { // announced block for this peer. We'll use this information later to // update the heights of peers based on blocks we've accepted that they // previously announced. - if lastBlock != -1 && (peer != b.syncPeer || b.current()) { + if lastBlock != -1 && (peer != sm.syncPeer || sm.current()) { peer.UpdateLastAnnouncedBlock(&invVects[lastBlock].Hash) } // Ignore invs from peers that aren't the sync if we are not current. // Helps prevent fetching a mass of orphans. - if peer != b.syncPeer && !b.current() { + if peer != sm.syncPeer && !sm.current() { return } // If our chain is current and a peer announces a block we already // know of, then update their current block height. - if lastBlock != -1 && b.current() { - blkHeight, err := b.chain.BlockHeightByHash(&invVects[lastBlock].Hash) + if lastBlock != -1 && sm.current() { + blkHeight, err := sm.chain.BlockHeightByHash(&invVects[lastBlock].Hash) if err == nil { peer.UpdateLastBlockHeight(blkHeight) } @@ -975,12 +973,12 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { peer.AddKnownInventory(iv) // Ignore inventory when we're in headers-first mode. - if b.headersFirstMode { + if sm.headersFirstMode { continue } // Request the inventory if we don't already have it. - haveInv, err := b.haveInventory(iv) + haveInv, err := sm.haveInventory(iv) if err != nil { log.Warnf("Unexpected failure when checking for "+ "existing inventory during inv message "+ @@ -991,7 +989,7 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { if iv.Type == wire.InvTypeTx { // Skip the transaction if it has already been // rejected. - if _, exists := b.rejectedTxns[iv.Hash]; exists { + if _, exists := sm.rejectedTxns[iv.Hash]; exists { continue } } @@ -1020,12 +1018,12 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { // resending the orphan block as an available block // to signal there are more missing blocks that need to // be requested. - if b.chain.IsKnownOrphan(&iv.Hash) { + if sm.chain.IsKnownOrphan(&iv.Hash) { // Request blocks starting at the latest known // up to the root of the orphan that just came // in. - orphanRoot := b.chain.GetOrphanRoot(&iv.Hash) - locator, err := b.chain.LatestBlockLocator() + orphanRoot := sm.chain.GetOrphanRoot(&iv.Hash) + locator, err := sm.chain.LatestBlockLocator() if err != nil { log.Errorf("PEER: Failed to get block "+ "locator for the latest block: "+ @@ -1044,7 +1042,7 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { // Request blocks after this one up to the // final one the remote peer knows about (zero // stop hash). - locator := b.chain.BlockLocatorFromHash(&iv.Hash) + locator := sm.chain.BlockLocatorFromHash(&iv.Hash) peer.PushGetBlocksMsg(locator, &zeroHash) } } @@ -1066,9 +1064,9 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { case wire.InvTypeBlock: // Request the block if there is not already a pending // request. - if _, exists := b.requestedBlocks[iv.Hash]; !exists { - b.requestedBlocks[iv.Hash] = struct{}{} - b.limitMap(b.requestedBlocks, maxRequestedBlocks) + if _, exists := sm.requestedBlocks[iv.Hash]; !exists { + sm.requestedBlocks[iv.Hash] = struct{}{} + sm.limitMap(sm.requestedBlocks, maxRequestedBlocks) state.requestedBlocks[iv.Hash] = struct{}{} if peer.IsWitnessEnabled() { @@ -1084,9 +1082,9 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { case wire.InvTypeTx: // Request the transaction if there is not already a // pending request. - if _, exists := b.requestedTxns[iv.Hash]; !exists { - b.requestedTxns[iv.Hash] = struct{}{} - b.limitMap(b.requestedTxns, maxRequestedTxns) + if _, exists := sm.requestedTxns[iv.Hash]; !exists { + sm.requestedTxns[iv.Hash] = struct{}{} + sm.limitMap(sm.requestedTxns, maxRequestedTxns) state.requestedTxns[iv.Hash] = struct{}{} // If the peer is capable, request the txn @@ -1113,7 +1111,7 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { // limitMap is a helper function for maps that require a maximum limit by // evicting a random transaction if adding a new value would cause it to // overflow the maximum allowed. -func (b *blockManager) limitMap(m map[chainhash.Hash]struct{}, limit int) { +func (sm *SyncManager) limitMap(m map[chainhash.Hash]struct{}, limit int) { if len(m)+1 > limit { // Remove a random entry from the map. For most compilers, Go's // range statement iterates starting at a random item although @@ -1128,47 +1126,47 @@ func (b *blockManager) limitMap(m map[chainhash.Hash]struct{}, limit int) { } } -// blockHandler is the main handler for the block manager. It must be run -// as a goroutine. It processes block and inv messages in a separate goroutine +// blockHandler is the main handler for the sync manager. It must be run as a +// goroutine. It processes block and inv messages in a separate goroutine // from the peer handlers so the block (MsgBlock) messages are handled by a // single thread without needing to lock memory data structures. This is -// important because the block manager controls which blocks are needed and how +// important because the sync manager controls which blocks are needed and how // the fetching should proceed. -func (b *blockManager) blockHandler() { +func (sm *SyncManager) blockHandler() { out: for { select { - case m := <-b.msgChan: + case m := <-sm.msgChan: switch msg := m.(type) { case *newPeerMsg: - b.handleNewPeerMsg(msg.peer) + sm.handleNewPeerMsg(msg.peer) case *txMsg: - b.handleTxMsg(msg) + sm.handleTxMsg(msg) msg.reply <- struct{}{} case *blockMsg: - b.handleBlockMsg(msg) + sm.handleBlockMsg(msg) msg.reply <- struct{}{} case *invMsg: - b.handleInvMsg(msg) + sm.handleInvMsg(msg) case *headersMsg: - b.handleHeadersMsg(msg) + sm.handleHeadersMsg(msg) case *donePeerMsg: - b.handleDonePeerMsg(msg.peer) + sm.handleDonePeerMsg(msg.peer) case getSyncPeerMsg: var peerID int32 - if b.syncPeer != nil { - peerID = b.syncPeer.ID() + if sm.syncPeer != nil { + peerID = sm.syncPeer.ID() } msg.reply <- peerID case processBlockMsg: - _, isOrphan, err := b.chain.ProcessBlock( + _, isOrphan, err := sm.chain.ProcessBlock( msg.block, msg.flags) if err != nil { msg.reply <- processBlockResponse{ @@ -1183,7 +1181,7 @@ out: } case isCurrentMsg: - msg.reply <- b.current() + msg.reply <- sm.current() case pauseMsg: // Wait until the sender unpauses the manager. @@ -1194,26 +1192,26 @@ out: "handler: %T", msg) } - case <-b.quit: + case <-sm.quit: break out } } - b.wg.Done() + sm.wg.Done() log.Trace("Block handler done") } // handleBlockchainNotification handles notifications from blockchain. It does // things such as request orphan block parents and relay accepted blocks to // connected peers. -func (b *blockManager) handleBlockchainNotification(notification *blockchain.Notification) { +func (sm *SyncManager) handleBlockchainNotification(notification *blockchain.Notification) { switch notification.Type { // A block has been accepted into the block chain. Relay it to other // peers. case blockchain.NTBlockAccepted: // Don't relay if we are not current. Other peers that are // current should already know about it. - if !b.current() { + if !sm.current() { return } @@ -1225,7 +1223,7 @@ func (b *blockManager) handleBlockchainNotification(notification *blockchain.Not // Generate the inventory vector and relay it. iv := wire.NewInvVect(wire.InvTypeBlock, block.Hash()) - b.peerNotifier.RelayInventory(iv, block.MsgBlock().Header) + sm.peerNotifier.RelayInventory(iv, block.MsgBlock().Header) // A block has been connected to the main block chain. case blockchain.NTBlockConnected: @@ -1243,12 +1241,12 @@ func (b *blockManager) handleBlockchainNotification(notification *blockchain.Not // transaction are NOT removed recursively because they are still // valid. for _, tx := range block.Transactions()[1:] { - b.txMemPool.RemoveTransaction(tx, false) - b.txMemPool.RemoveDoubleSpends(tx) - b.txMemPool.RemoveOrphan(tx) - b.peerNotifier.TransactionConfirmed(tx) - acceptedTxs := b.txMemPool.ProcessOrphans(tx) - b.peerNotifier.AnnounceNewTransactions(acceptedTxs) + sm.txMemPool.RemoveTransaction(tx, false) + sm.txMemPool.RemoveDoubleSpends(tx) + sm.txMemPool.RemoveOrphan(tx) + sm.peerNotifier.TransactionConfirmed(tx) + acceptedTxs := sm.txMemPool.ProcessOrphans(tx) + sm.peerNotifier.AnnounceNewTransactions(acceptedTxs) } // A block has been disconnected from the main block chain. @@ -1262,152 +1260,151 @@ func (b *blockManager) handleBlockchainNotification(notification *blockchain.Not // Reinsert all of the transactions (except the coinbase) into // the transaction pool. for _, tx := range block.Transactions()[1:] { - _, _, err := b.txMemPool.MaybeAcceptTransaction(tx, + _, _, err := sm.txMemPool.MaybeAcceptTransaction(tx, false, false) if err != nil { // Remove the transaction and all transactions // that depend on it if it wasn't accepted into // the transaction pool. - b.txMemPool.RemoveTransaction(tx, true) + sm.txMemPool.RemoveTransaction(tx, true) } } } } -// NewPeer informs the block manager of a newly active peer. -func (b *blockManager) NewPeer(peer *peerpkg.Peer) { +// NewPeer informs the sync manager of a newly active peer. +func (sm *SyncManager) NewPeer(peer *peerpkg.Peer) { // Ignore if we are shutting down. - if atomic.LoadInt32(&b.shutdown) != 0 { + if atomic.LoadInt32(&sm.shutdown) != 0 { return } - b.msgChan <- &newPeerMsg{peer: peer} + sm.msgChan <- &newPeerMsg{peer: peer} } // QueueTx adds the passed transaction message and peer to the block handling // queue. Responds to the done channel argument after the tx message is // processed. -func (b *blockManager) QueueTx(tx *btcutil.Tx, peer *peerpkg.Peer, done chan struct{}) { +func (sm *SyncManager) QueueTx(tx *btcutil.Tx, peer *peerpkg.Peer, done chan struct{}) { // Don't accept more transactions if we're shutting down. - if atomic.LoadInt32(&b.shutdown) != 0 { + if atomic.LoadInt32(&sm.shutdown) != 0 { done <- struct{}{} return } - b.msgChan <- &txMsg{tx: tx, peer: peer, reply: done} + sm.msgChan <- &txMsg{tx: tx, peer: peer, reply: done} } // QueueBlock adds the passed block message and peer to the block handling // queue. Responds to the done channel argument after the block message is // processed. -func (b *blockManager) QueueBlock(block *btcutil.Block, peer *peerpkg.Peer, done chan struct{}) { +func (sm *SyncManager) QueueBlock(block *btcutil.Block, peer *peerpkg.Peer, done chan struct{}) { // Don't accept more blocks if we're shutting down. - if atomic.LoadInt32(&b.shutdown) != 0 { + if atomic.LoadInt32(&sm.shutdown) != 0 { done <- struct{}{} return } - b.msgChan <- &blockMsg{block: block, peer: peer, reply: done} + sm.msgChan <- &blockMsg{block: block, peer: peer, reply: done} } // QueueInv adds the passed inv message and peer to the block handling queue. -func (b *blockManager) QueueInv(inv *wire.MsgInv, peer *peerpkg.Peer) { +func (sm *SyncManager) QueueInv(inv *wire.MsgInv, peer *peerpkg.Peer) { // No channel handling here because peers do not need to block on inv // messages. - if atomic.LoadInt32(&b.shutdown) != 0 { + if atomic.LoadInt32(&sm.shutdown) != 0 { return } - b.msgChan <- &invMsg{inv: inv, peer: peer} + sm.msgChan <- &invMsg{inv: inv, peer: peer} } // QueueHeaders adds the passed headers message and peer to the block handling // queue. -func (b *blockManager) QueueHeaders(headers *wire.MsgHeaders, peer *peerpkg.Peer) { +func (sm *SyncManager) QueueHeaders(headers *wire.MsgHeaders, peer *peerpkg.Peer) { // No channel handling here because peers do not need to block on // headers messages. - if atomic.LoadInt32(&b.shutdown) != 0 { + if atomic.LoadInt32(&sm.shutdown) != 0 { return } - b.msgChan <- &headersMsg{headers: headers, peer: peer} + sm.msgChan <- &headersMsg{headers: headers, peer: peer} } // DonePeer informs the blockmanager that a peer has disconnected. -func (b *blockManager) DonePeer(peer *peerpkg.Peer) { +func (sm *SyncManager) DonePeer(peer *peerpkg.Peer) { // Ignore if we are shutting down. - if atomic.LoadInt32(&b.shutdown) != 0 { + if atomic.LoadInt32(&sm.shutdown) != 0 { return } - b.msgChan <- &donePeerMsg{peer: peer} + sm.msgChan <- &donePeerMsg{peer: peer} } // Start begins the core block handler which processes block and inv messages. -func (b *blockManager) Start() { +func (sm *SyncManager) Start() { // Already started? - if atomic.AddInt32(&b.started, 1) != 1 { + if atomic.AddInt32(&sm.started, 1) != 1 { return } - log.Trace("Starting block manager") - b.wg.Add(1) - go b.blockHandler() + log.Trace("Starting sync manager") + sm.wg.Add(1) + go sm.blockHandler() } -// Stop gracefully shuts down the block manager by stopping all asynchronous +// Stop gracefully shuts down the sync manager by stopping all asynchronous // handlers and waiting for them to finish. -func (b *blockManager) Stop() error { - if atomic.AddInt32(&b.shutdown, 1) != 1 { - log.Warnf("Block manager is already in the process of " + +func (sm *SyncManager) Stop() error { + if atomic.AddInt32(&sm.shutdown, 1) != 1 { + log.Warnf("Sync manager is already in the process of " + "shutting down") return nil } - log.Infof("Block manager shutting down") - close(b.quit) - b.wg.Wait() + log.Infof("Sync manager shutting down") + close(sm.quit) + sm.wg.Wait() return nil } // SyncPeerID returns the ID of the current sync peer, or 0 if there is none. -func (b *blockManager) SyncPeerID() int32 { +func (sm *SyncManager) SyncPeerID() int32 { reply := make(chan int32) - b.msgChan <- getSyncPeerMsg{reply: reply} + sm.msgChan <- getSyncPeerMsg{reply: reply} return <-reply } // ProcessBlock makes use of ProcessBlock on an internal instance of a block -// chain. It is funneled through the block manager since btcchain is not safe -// for concurrent access. -func (b *blockManager) ProcessBlock(block *btcutil.Block, flags blockchain.BehaviorFlags) (bool, error) { +// chain. +func (sm *SyncManager) ProcessBlock(block *btcutil.Block, flags blockchain.BehaviorFlags) (bool, error) { reply := make(chan processBlockResponse, 1) - b.msgChan <- processBlockMsg{block: block, flags: flags, reply: reply} + sm.msgChan <- processBlockMsg{block: block, flags: flags, reply: reply} response := <-reply return response.isOrphan, response.err } -// IsCurrent returns whether or not the block manager believes it is synced with +// IsCurrent returns whether or not the sync manager believes it is synced with // the connected peers. -func (b *blockManager) IsCurrent() bool { +func (sm *SyncManager) IsCurrent() bool { reply := make(chan bool) - b.msgChan <- isCurrentMsg{reply: reply} + sm.msgChan <- isCurrentMsg{reply: reply} return <-reply } -// Pause pauses the block manager until the returned channel is closed. +// Pause pauses the sync manager until the returned channel is closed. // // Note that while paused, all peer and block processing is halted. The -// message sender should avoid pausing the block manager for long durations. -func (b *blockManager) Pause() chan<- struct{} { +// message sender should avoid pausing the sync manager for long durations. +func (sm *SyncManager) Pause() chan<- struct{} { c := make(chan struct{}) - b.msgChan <- pauseMsg{c} + sm.msgChan <- pauseMsg{c} return c } -// newBlockManager returns a new bitcoin block manager. -// Use Start to begin processing asynchronous block and inv updates. -func newBlockManager(config *blockManagerConfig) (*blockManager, error) { - bm := blockManager{ +// New constructs a new SyncManager. Use Start to begin processing asynchronous +// block, tx, and inv updates. +func New(config *Config) (*SyncManager, error) { + sm := SyncManager{ peerNotifier: config.PeerNotifier, chain: config.Chain, txMemPool: config.TxMemPool, @@ -1422,18 +1419,18 @@ func newBlockManager(config *blockManagerConfig) (*blockManager, error) { quit: make(chan struct{}), } - best := bm.chain.BestSnapshot() + best := sm.chain.BestSnapshot() if !config.DisableCheckpoints { // Initialize the next checkpoint based on the current height. - bm.nextCheckpoint = bm.findNextHeaderCheckpoint(best.Height) - if bm.nextCheckpoint != nil { - bm.resetHeaderState(&best.Hash, best.Height) + sm.nextCheckpoint = sm.findNextHeaderCheckpoint(best.Height) + if sm.nextCheckpoint != nil { + sm.resetHeaderState(&best.Hash, best.Height) } } else { log.Info("Checkpoints are disabled") } - bm.chain.Subscribe(bm.handleBlockchainNotification) + sm.chain.Subscribe(sm.handleBlockchainNotification) - return &bm, nil + return &sm, nil } diff --git a/rpcadapters.go b/rpcadapters.go index 771985e4..7d2c3a14 100644 --- a/rpcadapters.go +++ b/rpcadapters.go @@ -10,6 +10,7 @@ import ( "github.com/btcsuite/btcd/blockchain" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/mempool" + "github.com/btcsuite/btcd/netsync" "github.com/btcsuite/btcd/peer" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" @@ -225,8 +226,8 @@ func (cm *rpcConnManager) RelayTransactions(txns []*mempool.TxDesc) { // rpcSyncMgr provides a block manager for use with the RPC server and // implements the rpcserverSyncManager interface. type rpcSyncMgr struct { - server *server - blockMgr *blockManager + server *server + syncMgr *netsync.SyncManager } // Ensure rpcSyncMgr implements the rpcserverSyncManager interface. @@ -238,7 +239,7 @@ var _ rpcserverSyncManager = (*rpcSyncMgr)(nil) // This function is safe for concurrent access and is part of the // rpcserverSyncManager interface implementation. func (b *rpcSyncMgr) IsCurrent() bool { - return b.blockMgr.IsCurrent() + return b.syncMgr.IsCurrent() } // SubmitBlock submits the provided block to the network after processing it @@ -247,7 +248,7 @@ func (b *rpcSyncMgr) IsCurrent() bool { // This function is safe for concurrent access and is part of the // rpcserverSyncManager interface implementation. func (b *rpcSyncMgr) SubmitBlock(block *btcutil.Block, flags blockchain.BehaviorFlags) (bool, error) { - return b.blockMgr.ProcessBlock(block, flags) + return b.syncMgr.ProcessBlock(block, flags) } // Pause pauses the sync manager until the returned channel is closed. @@ -255,7 +256,7 @@ func (b *rpcSyncMgr) SubmitBlock(block *btcutil.Block, flags blockchain.Behavior // This function is safe for concurrent access and is part of the // rpcserverSyncManager interface implementation. func (b *rpcSyncMgr) Pause() chan<- struct{} { - return b.blockMgr.Pause() + return b.syncMgr.Pause() } // SyncPeerID returns the peer that is currently the peer being used to sync @@ -264,7 +265,7 @@ func (b *rpcSyncMgr) Pause() chan<- struct{} { // This function is safe for concurrent access and is part of the // rpcserverSyncManager interface implementation. func (b *rpcSyncMgr) SyncPeerID() int32 { - return b.blockMgr.SyncPeerID() + return b.syncMgr.SyncPeerID() } // LocateBlocks returns the hashes of the blocks after the first known block in diff --git a/server.go b/server.go index 626c8ee0..f29e6d2f 100644 --- a/server.go +++ b/server.go @@ -32,6 +32,7 @@ import ( "github.com/btcsuite/btcd/mempool" "github.com/btcsuite/btcd/mining" "github.com/btcsuite/btcd/mining/cpuminer" + "github.com/btcsuite/btcd/netsync" "github.com/btcsuite/btcd/peer" "github.com/btcsuite/btcd/txscript" "github.com/btcsuite/btcd/wire" @@ -67,6 +68,9 @@ var ( userAgentVersion = fmt.Sprintf("%d.%d.%d", appMajor, appMinor, appPatch) ) +// zeroHash is the zero value hash (all zeros). It is defined as a convenience. +var zeroHash chainhash.Hash + // onionAddr implements the net.Addr interface and represents a tor address. type onionAddr struct { addr string @@ -177,7 +181,7 @@ type server struct { sigCache *txscript.SigCache hashCache *txscript.HashCache rpcServer *rpcServer - blockManager *blockManager + syncManager *netsync.SyncManager chain *blockchain.BlockChain txMemPool *mempool.TxPool cpuMiner *cpuminer.CPUMiner @@ -245,7 +249,7 @@ func newServerPeer(s *server, isPersistent bool) *serverPeer { // newestBlock returns the current best block hash and height using the format // required by the configuration for the peer package. func (sp *serverPeer) newestBlock() (*chainhash.Hash, int32, error) { - best := sp.server.blockManager.chain.BestSnapshot() + best := sp.server.chain.BestSnapshot() return &best.Hash, best.Height, nil } @@ -343,8 +347,8 @@ func (sp *serverPeer) OnVersion(_ *peer.Peer, msg *wire.MsgVersion) { // the local clock to keep the network time in sync. sp.server.timeSource.AddTimeSample(sp.Addr(), msg.Timestamp) - // Signal the block manager this peer is a new sync candidate. - sp.server.blockManager.NewPeer(sp.Peer) + // Signal the sync manager this peer is a new sync candidate. + sp.server.syncManager.NewPeer(sp.Peer) // Choose whether or not to relay transactions before a filter command // is received. @@ -363,7 +367,7 @@ func (sp *serverPeer) OnVersion(_ *peer.Peer, msg *wire.MsgVersion) { // After soft-fork activation, only make outbound // connection to peers if they flag that they're segwit // enabled. - chain := sp.server.blockManager.chain + chain := sp.server.chain segwitActive, err := chain.IsDeploymentActive(chaincfg.DeploymentSegwit) if err != nil { peerLog.Errorf("Unable to query for segwit "+ @@ -475,12 +479,12 @@ func (sp *serverPeer) OnTx(_ *peer.Peer, msg *wire.MsgTx) { iv := wire.NewInvVect(wire.InvTypeTx, tx.Hash()) sp.AddKnownInventory(iv) - // Queue the transaction up to be handled by the block manager and + // Queue the transaction up to be handled by the sync manager and // intentionally block further receives until the transaction is fully // processed and known good or bad. This helps prevent a malicious peer // from queuing up a bunch of bad transactions before disconnecting (or // being disconnected) and wasting memory. - sp.server.blockManager.QueueTx(tx, sp.Peer, sp.txProcessed) + sp.server.syncManager.QueueTx(tx, sp.Peer, sp.txProcessed) <-sp.txProcessed } @@ -506,7 +510,7 @@ func (sp *serverPeer) OnBlock(_ *peer.Peer, msg *wire.MsgBlock, buf []byte) { // reference implementation processes blocks in the same // thread and therefore blocks further messages until // the bitcoin block has been fully processed. - sp.server.blockManager.QueueBlock(block, sp.Peer, sp.blockProcessed) + sp.server.syncManager.QueueBlock(block, sp.Peer, sp.blockProcessed) <-sp.blockProcessed } @@ -517,7 +521,7 @@ func (sp *serverPeer) OnBlock(_ *peer.Peer, msg *wire.MsgBlock, buf []byte) { func (sp *serverPeer) OnInv(_ *peer.Peer, msg *wire.MsgInv) { if !cfg.BlocksOnly { if len(msg.InvList) > 0 { - sp.server.blockManager.QueueInv(msg, sp.Peer) + sp.server.syncManager.QueueInv(msg, sp.Peer) } return } @@ -543,14 +547,14 @@ func (sp *serverPeer) OnInv(_ *peer.Peer, msg *wire.MsgInv) { } if len(newInv.InvList) > 0 { - sp.server.blockManager.QueueInv(newInv, sp.Peer) + sp.server.syncManager.QueueInv(newInv, sp.Peer) } } // OnHeaders is invoked when a peer receives a headers bitcoin -// message. The message is passed down to the block manager. +// message. The message is passed down to the sync manager. func (sp *serverPeer) OnHeaders(_ *peer.Peer, msg *wire.MsgHeaders) { - sp.server.blockManager.QueueHeaders(msg, sp.Peer) + sp.server.syncManager.QueueHeaders(msg, sp.Peer) } // handleGetData is invoked when a peer receives a getdata bitcoin message and @@ -646,7 +650,7 @@ func (sp *serverPeer) OnGetBlocks(_ *peer.Peer, msg *wire.MsgGetBlocks) { // over with the genesis block if unknown block locators are provided. // // This mirrors the behavior in the reference implementation. - chain := sp.server.blockManager.chain + chain := sp.server.chain hashList := chain.LocateBlocks(msg.BlockLocatorHashes, &msg.HashStop, wire.MaxBlocksPerMsg) @@ -676,7 +680,7 @@ func (sp *serverPeer) OnGetBlocks(_ *peer.Peer, msg *wire.MsgGetBlocks) { // message. func (sp *serverPeer) OnGetHeaders(_ *peer.Peer, msg *wire.MsgGetHeaders) { // Ignore getheaders requests if not in sync. - if !sp.server.blockManager.IsCurrent() { + if !sp.server.syncManager.IsCurrent() { return } @@ -690,7 +694,7 @@ func (sp *serverPeer) OnGetHeaders(_ *peer.Peer, msg *wire.MsgGetHeaders) { // over with the genesis block if unknown block locators are provided. // // This mirrors the behavior in the reference implementation. - chain := sp.server.blockManager.chain + chain := sp.server.chain headers := chain.LocateHeaders(msg.BlockLocatorHashes, &msg.HashStop) if len(headers) == 0 { // Nothing to send. @@ -1075,7 +1079,7 @@ func (s *server) pushBlockMsg(sp *serverPeer, hash *chainhash.Hash, doneChan cha // to trigger it to issue another getblocks message for the next // batch of inventory. if sendInv { - best := sp.server.blockManager.chain.BestSnapshot() + best := sp.server.chain.BestSnapshot() invMsg := wire.NewMsgInvSizeHint(1) iv := wire.NewInvVect(wire.InvTypeBlock, &best.Hash) invMsg.AddInvVect(iv) @@ -1101,7 +1105,7 @@ func (s *server) pushMerkleBlockMsg(sp *serverPeer, hash *chainhash.Hash, } // Fetch the raw block bytes from the database. - blk, err := sp.server.blockManager.chain.BlockByHash(hash) + blk, err := sp.server.chain.BlockByHash(hash) if err != nil { peerLog.Tracef("Unable to fetch requested block hash %v: %v", hash, err) @@ -1616,9 +1620,9 @@ func (s *server) peerDoneHandler(sp *serverPeer) { sp.WaitForDisconnect() s.donePeers <- sp - // Only tell block manager we are gone if we ever told it we existed. + // Only tell sync manager we are gone if we ever told it we existed. if sp.VersionKnown() { - s.blockManager.DonePeer(sp.Peer) + s.syncManager.DonePeer(sp.Peer) // Evict any remaining orphans that were sent by the peer. numEvicted := s.txMemPool.RemoveOrphansByTag(mempool.Tag(sp.ID())) @@ -1635,13 +1639,13 @@ func (s *server) peerDoneHandler(sp *serverPeer) { // peers to and from the server, banning peers, and broadcasting messages to // peers. It must be run in a goroutine. func (s *server) peerHandler() { - // Start the address manager and block manager, both of which are needed + // Start the address manager and sync manager, both of which are needed // by peers. This is done here since their lifecycle is closely tied // to this handler and rather than adding more channels to sychronize // things, it's easier and slightly faster to simply start and stop them // in this handler. s.addrManager.Start() - s.blockManager.Start() + s.syncManager.Start() srvrLog.Tracef("Starting peer handler") @@ -1709,7 +1713,7 @@ out: } s.connManager.Stop() - s.blockManager.Stop() + s.syncManager.Stop() s.addrManager.Stop() // Drain channels before exiting so nothing is left waiting around @@ -2366,7 +2370,7 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param } s.txMemPool = mempool.New(&txC) - s.blockManager, err = newBlockManager(&blockManagerConfig{ + s.syncManager, err = netsync.New(&netsync.Config{ PeerNotifier: &s, Chain: s.chain, TxMemPool: s.txMemPool, @@ -2398,9 +2402,9 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param ChainParams: chainParams, BlockTemplateGenerator: blockTemplateGenerator, MiningAddrs: cfg.miningAddrs, - ProcessBlock: s.blockManager.ProcessBlock, + ProcessBlock: s.syncManager.ProcessBlock, ConnectedCount: s.ConnectedCount, - IsCurrent: s.blockManager.IsCurrent, + IsCurrent: s.syncManager.IsCurrent, }) // Only setup a function to return new addresses to connect to when @@ -2500,9 +2504,9 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param Listeners: rpcListeners, StartupTime: s.startupTime, ConnMgr: &rpcConnManager{&s}, - SyncMgr: &rpcSyncMgr{&s, s.blockManager}, + SyncMgr: &rpcSyncMgr{&s, s.syncManager}, TimeSource: s.timeSource, - Chain: s.blockManager.chain, + Chain: s.chain, ChainParams: chainParams, DB: db, TxMemPool: s.txMemPool,