diff --git a/netsync/manager.go b/netsync/manager.go index 3215a86a..5a4c4516 100644 --- a/netsync/manager.go +++ b/netsync/manager.go @@ -6,6 +6,7 @@ package netsync import ( "container/list" + "fmt" "math/rand" "net" "sync" @@ -203,6 +204,7 @@ type SyncManager struct { headerList *list.List startHeader *list.Element nextCheckpoint *chaincfg.Checkpoint + queuedBlocks map[chainhash.Hash]*blockMsg // An optional fee estimator. feeEstimator *mempool.FeeEstimator @@ -211,9 +213,9 @@ type SyncManager struct { // resetHeaderState sets the headers-first mode state to values appropriate for // syncing from a new peer. func (sm *SyncManager) resetHeaderState(newestHash *chainhash.Hash, newestHeight int32) { - sm.headersFirstMode = false - sm.headerList.Init() - sm.startHeader = nil + if sm.headerList.Len() != 0 { + return + } // When there is a next checkpoint, add an entry for the latest known // block into the header pool. This allows the next downloaded header @@ -328,18 +330,55 @@ func (sm *SyncManager) startSync() { // Clear the requestedBlocks if the sync peer changes, otherwise // we may ignore blocks we need that the last sync peer failed // to send. - sm.requestedBlocks = make(map[chainhash.Hash]struct{}) - - locator, err := sm.chain.LatestBlockLocator() - if err != nil { - log.Errorf("Failed to get block locator for the "+ - "latest block: %v", err) - return + // + // We don't reset it during headersFirstMode since it's not used + // during headersFirstMode. + if !sm.headersFirstMode { + sm.requestedBlocks = make(map[chainhash.Hash]struct{}) } log.Infof("Syncing to block height %d from peer %v", bestPeer.LastBlock(), bestPeer.Addr()) + sm.syncPeer = bestPeer + + // Reset the last progress time now that we have a non-nil + // syncPeer to avoid instantly detecting it as stalled in the + // event the progress time hasn't been updated recently. + sm.lastProgressTime = time.Now() + + // Check if we have some headers already downloaded. + var locator blockchain.BlockLocator + if sm.headerList.Len() > 0 && sm.nextCheckpoint != nil { + e := sm.headerList.Back() + node := e.Value.(*headerNode) + + // If the final hash equals next checkpoint, that + // means we've verified the downloaded headers and + // can start fetching blocks. + if node.hash.IsEqual(sm.nextCheckpoint.Hash) { + sm.startHeader = sm.headerList.Front() + sm.fetchHeaderBlocks() + return + } + + // If the last hash doesn't equal the checkpoint, + // make the locator as the last hash. + locator = blockchain.BlockLocator( + []*chainhash.Hash{node.hash}) + } + + // If we don't already have headers downloaded we need to fetch + // the block locator from chain. + if len(locator) == 0 { + locator, err = sm.chain.LatestBlockLocator() + if err != nil { + log.Errorf("Failed to get block locator for the "+ + "latest block: %v", err) + return + } + } + // When the current height is less than a known checkpoint we // can use block headers to learn about which blocks comprise // the chain up to the checkpoint and perform less validation @@ -369,12 +408,6 @@ func (sm *SyncManager) startSync() { } else { bestPeer.PushGetBlocksMsg(locator, &zeroHash) } - sm.syncPeer = bestPeer - - // Reset the last progress time now that we have a non-nil - // syncPeer to avoid instantly detecting it as stalled in the - // event the progress time hasn't been updated recently. - sm.lastProgressTime = time.Now() } else { log.Warnf("No sync peer candidates available") } @@ -565,12 +598,15 @@ func (sm *SyncManager) clearRequestedState(state *peerSyncState) { delete(sm.requestedTxns, txHash) } - // Remove requested blocks from the global map so that they will be - // fetched from elsewhere next time we get an inv. - // TODO: we could possibly here check which peers have these blocks - // and request them now to speed things up a little. - for blockHash := range state.requestedBlocks { - delete(sm.requestedBlocks, blockHash) + // The global map of requestedBlocks is not used during headersFirstMode. + if !sm.headersFirstMode { + // Remove requested blocks from the global map so that they will + // be fetched from elsewhere next time we get an inv. + // TODO: we could possibly here check which peers have these + // blocks and request them now to speed things up a little. + for blockHash := range state.requestedBlocks { + delete(sm.requestedBlocks, blockHash) + } } } @@ -710,30 +746,6 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) { } } - // When in headers-first mode, if the block matches the hash of the - // first header in the list of headers that are being fetched, it's - // eligible for less validation since the headers have already been - // verified to link together and are valid up to the next checkpoint. - // Also, remove the list entry for all blocks except the checkpoint - // since it is needed to verify the next round of headers links - // properly. - isCheckpointBlock := false - behaviorFlags := blockchain.BFNone - if sm.headersFirstMode { - firstNodeEl := sm.headerList.Front() - if firstNodeEl != nil { - firstNode := firstNodeEl.Value.(*headerNode) - if blockHash.IsEqual(firstNode.hash) { - behaviorFlags |= blockchain.BFFastAdd - if firstNode.hash.IsEqual(sm.nextCheckpoint.Hash) { - isCheckpointBlock = true - } else { - sm.headerList.Remove(firstNodeEl) - } - } - } - } - // Remove block from request maps. Either chain will know about it and // so we shouldn't have any more instances of trying to fetch it, or we // will fail the insert and thus we'll retry next time we get an inv. @@ -742,7 +754,7 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) { // Process the block to include validation, best chain selection, orphan // handling, etc. - _, isOrphan, err := sm.chain.ProcessBlock(bmsg.block, behaviorFlags) + _, isOrphan, err := sm.chain.ProcessBlock(bmsg.block, blockchain.BFNone) if err != nil { // When the error is a rule error, it means the block was simply // rejected as opposed to something actually going wrong, so log @@ -840,16 +852,131 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) { } } - // If we are not in headers first mode, it's a good time to periodically - // flush the blockchain cache because we don't expect new blocks immediately. - // After that, there is nothing more to do. - if !sm.headersFirstMode { - if err := sm.chain.FlushUtxoCache(blockchain.FlushPeriodic); err != nil { - log.Errorf("Error while flushing the blockchain cache: %v", err) - } + // It's a good time to periodically flush the blockchain cache because + // we don't expect new blocks immediately. After that, there is + // nothing more to do. + if err := sm.chain.FlushUtxoCache(blockchain.FlushPeriodic); err != nil { + log.Errorf("Error while flushing the blockchain cache: %v", err) + } +} + +// handleBlockMsgInHeadersFirst handles block messages from all peers when the +// sync manager is in headers first mode. For blocks received out of order, it +// first keeps them in memory and sends them to be processed when the next block +// from the tip is available. +func (sm *SyncManager) handleBlockMsgHeadersFirst(bmsg *blockMsg) { + peer := bmsg.peer + state, exists := sm.peerStates[peer] + if !exists { + log.Warnf("Received block message from unknown peer %s", peer) return } + // If we didn't ask for this block then the peer is misbehaving. + blockHash := bmsg.block.Hash() + if _, exists = state.requestedBlocks[*blockHash]; !exists { + // The regression test intentionally sends some blocks twice + // to test duplicate block insertion fails. Don't disconnect + // the peer or ignore the block when we're in regression test + // mode in this case so the chain code is actually fed the + // duplicate blocks. + if sm.chainParams != &chaincfg.RegressionNetParams { + log.Warnf("Got unrequested block %v from %s -- "+ + "disconnecting", blockHash, peer.Addr()) + peer.Disconnect() + return + } + } + + // Add the block to the queue. + sm.queuedBlocks[*blockHash] = bmsg + + // Remove block from the request map as we've added it to queuedBlocks. + delete(state.requestedBlocks, *blockHash) + + // Log the progress as we have received the block. + sm.lastProgressTime = time.Now() + + firstNodeEl := sm.headerList.Front() + if firstNodeEl == nil { + log.Errorf("missing first node in the headerlist on block %v", + bmsg.block.Hash()) + return + } + + // Look for blocks that we can process next. + processBlocks := make([]*blockMsg, 0, len(sm.queuedBlocks)+1) + var next *list.Element + for e := firstNodeEl; e != nil; e = next { + next = e.Next() + node, ok := e.Value.(*headerNode) + if !ok { + log.Warn("Header list node type is not a headerNode") + continue + } + + b, found := sm.queuedBlocks[*node.hash] + if !found { + // Break when we're missing the next block in + // sequence. + break + } + + // We have a block we can process so remove from the queue and + // add it to the processBlocks slice. + processBlocks = append(processBlocks, b) + delete(sm.queuedBlocks, *node.hash) + + // Remove the block from the headerList. + // + // NOTE We leave in the checkpointed hash so that we can connect + // headers on the next get headers request. + if !node.hash.IsEqual(sm.nextCheckpoint.Hash) { + sm.headerList.Remove(e) + } + } + + // If we have any blocks to process, process them now. + isCheckpointBlock := false + for _, processBlock := range processBlocks { + // Process the block to include validation, best chain selection, orphan + // handling, etc. + _, _, err := sm.chain.ProcessBlock(processBlock.block, blockchain.BFFastAdd) + if err != nil { + // This is a checkpointed block and therefore should never fail + // validation. If it did, then the binary is likely corrupted. + if ruleErr, ok := err.(blockchain.RuleError); ok { + // If it's not a duplicate block error, we can panic. + if ruleErr.ErrorCode != blockchain.ErrDuplicateBlock { + panicErr := fmt.Errorf("Rejected checkpointed block %v from %s: %v"+ + " -- The binary is likely corrupted and should "+ + "not be trusted. Exiting.", + blockHash, peer, err) + panic(panicErr) + } + } else { + log.Errorf("Failed to process block %v: %v", + blockHash, err) + } + if dbErr, ok := err.(database.Error); ok && dbErr.ErrorCode == + database.ErrCorruption { + panic(dbErr) + } + } + + // Look for checkpointed blocks. + if processBlock.block.Hash().IsEqual(sm.nextCheckpoint.Hash) { + isCheckpointBlock = true + } + + // If we have a lot of blocks we're processing, then the last + // progress time will be in the past and we may incorrectly + // punish a peer thus we need to update the lastProgressTime + // now. + sm.lastProgressTime = time.Now() + sm.progressLogger.LogBlockHeight(bmsg.block, sm.chain) + } + // This is headers-first mode, so if the block is not a checkpoint // request more blocks using the header list when the request queue is // getting short. @@ -889,7 +1016,7 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) { sm.headerList.Init() log.Infof("Reached the final checkpoint -- switching to normal mode") locator := blockchain.BlockLocator([]*chainhash.Hash{blockHash}) - err = peer.PushGetBlocksMsg(locator, &zeroHash) + err := peer.PushGetBlocksMsg(locator, &zeroHash) if err != nil { log.Warnf("Failed to send getblocks message to peer %s: %v", peer.Addr(), err) @@ -927,8 +1054,6 @@ func (sm *SyncManager) fetchHeaderBlocks() { } if !haveInv { syncPeerState := sm.peerStates[sm.syncPeer] - - sm.requestedBlocks[*node.hash] = struct{}{} syncPeerState.requestedBlocks[*node.hash] = struct{}{} // If we're fetching from a witness enabled peer @@ -1076,7 +1201,11 @@ func (sm *SyncManager) handleNotFoundMsg(nfmsg *notFoundMsg) { case wire.InvTypeBlock: if _, exists := state.requestedBlocks[inv.Hash]; exists { delete(state.requestedBlocks, inv.Hash) - delete(sm.requestedBlocks, inv.Hash) + // The global map of requestedBlocks is not used + // during headersFirstMode. + if !sm.headersFirstMode { + delete(sm.requestedBlocks, inv.Hash) + } } case wire.InvTypeWitnessTx: @@ -1186,6 +1315,11 @@ func (sm *SyncManager) handleInvMsg(imsg *invMsg) { } } + // Don't request on inventory messages when we're in headers-first mode. + if sm.headersFirstMode { + return + } + // Request the advertised inventory if we don't already have it. Also, // request parent blocks of orphans if we receive one we already have. // Finally, attempt to detect potential stalls due to long side chains @@ -1205,11 +1339,6 @@ func (sm *SyncManager) handleInvMsg(imsg *invMsg) { // for the peer. peer.AddKnownInventory(iv) - // Ignore inventory when we're in headers-first mode. - if sm.headersFirstMode { - continue - } - // Request the inventory if we don't already have it. haveInv, err := sm.haveInventory(iv) if err != nil { @@ -1297,6 +1426,9 @@ func (sm *SyncManager) handleInvMsg(imsg *invMsg) { case wire.InvTypeBlock: // Request the block if there is not already a pending // request. + // + // No check for if we're in headers first since it's + // already done so earlier in the method. if _, exists := sm.requestedBlocks[iv.Hash]; !exists { limitAdd(sm.requestedBlocks, iv.Hash, maxRequestedBlocks) limitAdd(state.requestedBlocks, iv.Hash, maxRequestedBlocks) @@ -1362,7 +1494,11 @@ out: msg.reply <- struct{}{} case *blockMsg: - sm.handleBlockMsg(msg) + if sm.headersFirstMode { + sm.handleBlockMsgHeadersFirst(msg) + } else { + sm.handleBlockMsg(msg) + } msg.reply <- struct{}{} case *invMsg: @@ -1681,6 +1817,7 @@ func New(config *Config) (*SyncManager, error) { progressLogger: newBlockProgressLogger("Processed", log), msgChan: make(chan interface{}, config.MaxPeers*3), headerList: list.New(), + queuedBlocks: make(map[chainhash.Hash]*blockMsg), quit: make(chan struct{}), feeEstimator: config.FeeEstimator, }