This commit is contained in:
Calvin Kim 2025-03-11 00:04:03 -05:00 committed by GitHub
commit 2a1dfb31f5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -6,6 +6,7 @@ package netsync
import (
"container/list"
"fmt"
"math/rand"
"net"
"sync"
@ -203,6 +204,7 @@ type SyncManager struct {
headerList *list.List
startHeader *list.Element
nextCheckpoint *chaincfg.Checkpoint
queuedBlocks map[chainhash.Hash]*blockMsg
// An optional fee estimator.
feeEstimator *mempool.FeeEstimator
@ -211,9 +213,9 @@ type SyncManager struct {
// resetHeaderState sets the headers-first mode state to values appropriate for
// syncing from a new peer.
func (sm *SyncManager) resetHeaderState(newestHash *chainhash.Hash, newestHeight int32) {
sm.headersFirstMode = false
sm.headerList.Init()
sm.startHeader = nil
if sm.headerList.Len() != 0 {
return
}
// When there is a next checkpoint, add an entry for the latest known
// block into the header pool. This allows the next downloaded header
@ -328,18 +330,55 @@ func (sm *SyncManager) startSync() {
// Clear the requestedBlocks if the sync peer changes, otherwise
// we may ignore blocks we need that the last sync peer failed
// to send.
sm.requestedBlocks = make(map[chainhash.Hash]struct{})
locator, err := sm.chain.LatestBlockLocator()
if err != nil {
log.Errorf("Failed to get block locator for the "+
"latest block: %v", err)
return
//
// We don't reset it during headersFirstMode since it's not used
// during headersFirstMode.
if !sm.headersFirstMode {
sm.requestedBlocks = make(map[chainhash.Hash]struct{})
}
log.Infof("Syncing to block height %d from peer %v",
bestPeer.LastBlock(), bestPeer.Addr())
sm.syncPeer = bestPeer
// Reset the last progress time now that we have a non-nil
// syncPeer to avoid instantly detecting it as stalled in the
// event the progress time hasn't been updated recently.
sm.lastProgressTime = time.Now()
// Check if we have some headers already downloaded.
var locator blockchain.BlockLocator
if sm.headerList.Len() > 0 && sm.nextCheckpoint != nil {
e := sm.headerList.Back()
node := e.Value.(*headerNode)
// If the final hash equals next checkpoint, that
// means we've verified the downloaded headers and
// can start fetching blocks.
if node.hash.IsEqual(sm.nextCheckpoint.Hash) {
sm.startHeader = sm.headerList.Front()
sm.fetchHeaderBlocks()
return
}
// If the last hash doesn't equal the checkpoint,
// make the locator as the last hash.
locator = blockchain.BlockLocator(
[]*chainhash.Hash{node.hash})
}
// If we don't already have headers downloaded we need to fetch
// the block locator from chain.
if len(locator) == 0 {
locator, err = sm.chain.LatestBlockLocator()
if err != nil {
log.Errorf("Failed to get block locator for the "+
"latest block: %v", err)
return
}
}
// When the current height is less than a known checkpoint we
// can use block headers to learn about which blocks comprise
// the chain up to the checkpoint and perform less validation
@ -369,12 +408,6 @@ func (sm *SyncManager) startSync() {
} else {
bestPeer.PushGetBlocksMsg(locator, &zeroHash)
}
sm.syncPeer = bestPeer
// Reset the last progress time now that we have a non-nil
// syncPeer to avoid instantly detecting it as stalled in the
// event the progress time hasn't been updated recently.
sm.lastProgressTime = time.Now()
} else {
log.Warnf("No sync peer candidates available")
}
@ -565,12 +598,15 @@ func (sm *SyncManager) clearRequestedState(state *peerSyncState) {
delete(sm.requestedTxns, txHash)
}
// Remove requested blocks from the global map so that they will be
// fetched from elsewhere next time we get an inv.
// TODO: we could possibly here check which peers have these blocks
// and request them now to speed things up a little.
for blockHash := range state.requestedBlocks {
delete(sm.requestedBlocks, blockHash)
// The global map of requestedBlocks is not used during headersFirstMode.
if !sm.headersFirstMode {
// Remove requested blocks from the global map so that they will
// be fetched from elsewhere next time we get an inv.
// TODO: we could possibly here check which peers have these
// blocks and request them now to speed things up a little.
for blockHash := range state.requestedBlocks {
delete(sm.requestedBlocks, blockHash)
}
}
}
@ -710,30 +746,6 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
}
}
// When in headers-first mode, if the block matches the hash of the
// first header in the list of headers that are being fetched, it's
// eligible for less validation since the headers have already been
// verified to link together and are valid up to the next checkpoint.
// Also, remove the list entry for all blocks except the checkpoint
// since it is needed to verify the next round of headers links
// properly.
isCheckpointBlock := false
behaviorFlags := blockchain.BFNone
if sm.headersFirstMode {
firstNodeEl := sm.headerList.Front()
if firstNodeEl != nil {
firstNode := firstNodeEl.Value.(*headerNode)
if blockHash.IsEqual(firstNode.hash) {
behaviorFlags |= blockchain.BFFastAdd
if firstNode.hash.IsEqual(sm.nextCheckpoint.Hash) {
isCheckpointBlock = true
} else {
sm.headerList.Remove(firstNodeEl)
}
}
}
}
// Remove block from request maps. Either chain will know about it and
// so we shouldn't have any more instances of trying to fetch it, or we
// will fail the insert and thus we'll retry next time we get an inv.
@ -742,7 +754,7 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
// Process the block to include validation, best chain selection, orphan
// handling, etc.
_, isOrphan, err := sm.chain.ProcessBlock(bmsg.block, behaviorFlags)
_, isOrphan, err := sm.chain.ProcessBlock(bmsg.block, blockchain.BFNone)
if err != nil {
// When the error is a rule error, it means the block was simply
// rejected as opposed to something actually going wrong, so log
@ -840,16 +852,131 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
}
}
// If we are not in headers first mode, it's a good time to periodically
// flush the blockchain cache because we don't expect new blocks immediately.
// After that, there is nothing more to do.
if !sm.headersFirstMode {
if err := sm.chain.FlushUtxoCache(blockchain.FlushPeriodic); err != nil {
log.Errorf("Error while flushing the blockchain cache: %v", err)
}
// It's a good time to periodically flush the blockchain cache because
// we don't expect new blocks immediately. After that, there is
// nothing more to do.
if err := sm.chain.FlushUtxoCache(blockchain.FlushPeriodic); err != nil {
log.Errorf("Error while flushing the blockchain cache: %v", err)
}
}
// handleBlockMsgInHeadersFirst handles block messages from all peers when the
// sync manager is in headers first mode. For blocks received out of order, it
// first keeps them in memory and sends them to be processed when the next block
// from the tip is available.
func (sm *SyncManager) handleBlockMsgHeadersFirst(bmsg *blockMsg) {
peer := bmsg.peer
state, exists := sm.peerStates[peer]
if !exists {
log.Warnf("Received block message from unknown peer %s", peer)
return
}
// If we didn't ask for this block then the peer is misbehaving.
blockHash := bmsg.block.Hash()
if _, exists = state.requestedBlocks[*blockHash]; !exists {
// The regression test intentionally sends some blocks twice
// to test duplicate block insertion fails. Don't disconnect
// the peer or ignore the block when we're in regression test
// mode in this case so the chain code is actually fed the
// duplicate blocks.
if sm.chainParams != &chaincfg.RegressionNetParams {
log.Warnf("Got unrequested block %v from %s -- "+
"disconnecting", blockHash, peer.Addr())
peer.Disconnect()
return
}
}
// Add the block to the queue.
sm.queuedBlocks[*blockHash] = bmsg
// Remove block from the request map as we've added it to queuedBlocks.
delete(state.requestedBlocks, *blockHash)
// Log the progress as we have received the block.
sm.lastProgressTime = time.Now()
firstNodeEl := sm.headerList.Front()
if firstNodeEl == nil {
log.Errorf("missing first node in the headerlist on block %v",
bmsg.block.Hash())
return
}
// Look for blocks that we can process next.
processBlocks := make([]*blockMsg, 0, len(sm.queuedBlocks)+1)
var next *list.Element
for e := firstNodeEl; e != nil; e = next {
next = e.Next()
node, ok := e.Value.(*headerNode)
if !ok {
log.Warn("Header list node type is not a headerNode")
continue
}
b, found := sm.queuedBlocks[*node.hash]
if !found {
// Break when we're missing the next block in
// sequence.
break
}
// We have a block we can process so remove from the queue and
// add it to the processBlocks slice.
processBlocks = append(processBlocks, b)
delete(sm.queuedBlocks, *node.hash)
// Remove the block from the headerList.
//
// NOTE We leave in the checkpointed hash so that we can connect
// headers on the next get headers request.
if !node.hash.IsEqual(sm.nextCheckpoint.Hash) {
sm.headerList.Remove(e)
}
}
// If we have any blocks to process, process them now.
isCheckpointBlock := false
for _, processBlock := range processBlocks {
// Process the block to include validation, best chain selection, orphan
// handling, etc.
_, _, err := sm.chain.ProcessBlock(processBlock.block, blockchain.BFFastAdd)
if err != nil {
// This is a checkpointed block and therefore should never fail
// validation. If it did, then the binary is likely corrupted.
if ruleErr, ok := err.(blockchain.RuleError); ok {
// If it's not a duplicate block error, we can panic.
if ruleErr.ErrorCode != blockchain.ErrDuplicateBlock {
panicErr := fmt.Errorf("Rejected checkpointed block %v from %s: %v"+
" -- The binary is likely corrupted and should "+
"not be trusted. Exiting.",
blockHash, peer, err)
panic(panicErr)
}
} else {
log.Errorf("Failed to process block %v: %v",
blockHash, err)
}
if dbErr, ok := err.(database.Error); ok && dbErr.ErrorCode ==
database.ErrCorruption {
panic(dbErr)
}
}
// Look for checkpointed blocks.
if processBlock.block.Hash().IsEqual(sm.nextCheckpoint.Hash) {
isCheckpointBlock = true
}
// If we have a lot of blocks we're processing, then the last
// progress time will be in the past and we may incorrectly
// punish a peer thus we need to update the lastProgressTime
// now.
sm.lastProgressTime = time.Now()
sm.progressLogger.LogBlockHeight(bmsg.block, sm.chain)
}
// This is headers-first mode, so if the block is not a checkpoint
// request more blocks using the header list when the request queue is
// getting short.
@ -889,7 +1016,7 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
sm.headerList.Init()
log.Infof("Reached the final checkpoint -- switching to normal mode")
locator := blockchain.BlockLocator([]*chainhash.Hash{blockHash})
err = peer.PushGetBlocksMsg(locator, &zeroHash)
err := peer.PushGetBlocksMsg(locator, &zeroHash)
if err != nil {
log.Warnf("Failed to send getblocks message to peer %s: %v",
peer.Addr(), err)
@ -927,8 +1054,6 @@ func (sm *SyncManager) fetchHeaderBlocks() {
}
if !haveInv {
syncPeerState := sm.peerStates[sm.syncPeer]
sm.requestedBlocks[*node.hash] = struct{}{}
syncPeerState.requestedBlocks[*node.hash] = struct{}{}
// If we're fetching from a witness enabled peer
@ -1076,7 +1201,11 @@ func (sm *SyncManager) handleNotFoundMsg(nfmsg *notFoundMsg) {
case wire.InvTypeBlock:
if _, exists := state.requestedBlocks[inv.Hash]; exists {
delete(state.requestedBlocks, inv.Hash)
delete(sm.requestedBlocks, inv.Hash)
// The global map of requestedBlocks is not used
// during headersFirstMode.
if !sm.headersFirstMode {
delete(sm.requestedBlocks, inv.Hash)
}
}
case wire.InvTypeWitnessTx:
@ -1186,6 +1315,11 @@ func (sm *SyncManager) handleInvMsg(imsg *invMsg) {
}
}
// Don't request on inventory messages when we're in headers-first mode.
if sm.headersFirstMode {
return
}
// Request the advertised inventory if we don't already have it. Also,
// request parent blocks of orphans if we receive one we already have.
// Finally, attempt to detect potential stalls due to long side chains
@ -1205,11 +1339,6 @@ func (sm *SyncManager) handleInvMsg(imsg *invMsg) {
// for the peer.
peer.AddKnownInventory(iv)
// Ignore inventory when we're in headers-first mode.
if sm.headersFirstMode {
continue
}
// Request the inventory if we don't already have it.
haveInv, err := sm.haveInventory(iv)
if err != nil {
@ -1297,6 +1426,9 @@ func (sm *SyncManager) handleInvMsg(imsg *invMsg) {
case wire.InvTypeBlock:
// Request the block if there is not already a pending
// request.
//
// No check for if we're in headers first since it's
// already done so earlier in the method.
if _, exists := sm.requestedBlocks[iv.Hash]; !exists {
limitAdd(sm.requestedBlocks, iv.Hash, maxRequestedBlocks)
limitAdd(state.requestedBlocks, iv.Hash, maxRequestedBlocks)
@ -1362,7 +1494,11 @@ out:
msg.reply <- struct{}{}
case *blockMsg:
sm.handleBlockMsg(msg)
if sm.headersFirstMode {
sm.handleBlockMsgHeadersFirst(msg)
} else {
sm.handleBlockMsg(msg)
}
msg.reply <- struct{}{}
case *invMsg:
@ -1681,6 +1817,7 @@ func New(config *Config) (*SyncManager, error) {
progressLogger: newBlockProgressLogger("Processed", log),
msgChan: make(chan interface{}, config.MaxPeers*3),
headerList: list.New(),
queuedBlocks: make(map[chainhash.Hash]*blockMsg),
quit: make(chan struct{}),
feeEstimator: config.FeeEstimator,
}