From 16cd44f0e6c33e928193d8253c6b74c7dd088b7e Mon Sep 17 00:00:00 2001 From: Calvin Kim Date: Tue, 16 May 2023 23:27:01 +0900 Subject: [PATCH] blockchain, netsync, main, cmd/addblock: Use utxocache This change is part of the effort to add utxocache support to btcd. utxo cache is now used by the BlockChain struct. By default it's used and the minimum cache is set to 250MiB. The change made helps speed up block/tx validation as the cache allows for much faster lookup of utxos. The initial block download in particular is improved as the db i/o bottleneck is remedied by the cache. --- blockchain/chain.go | 123 +++++++++++---- blockchain/utxocache.go | 133 +++++++++++++++++ blockchain/utxocache_test.go | 282 +++++++++++++++++++++++++++++++++++ blockchain/utxoviewpoint.go | 60 ++++++-- blockchain/validate.go | 6 +- cmd/addblock/import.go | 10 ++ config.go | 3 + netsync/blocklogger.go | 9 +- netsync/manager.go | 14 +- server.go | 19 +-- 10 files changed, 600 insertions(+), 59 deletions(-) diff --git a/blockchain/chain.go b/blockchain/chain.go index 937b2fa1..84d4a0f3 100644 --- a/blockchain/chain.go +++ b/blockchain/chain.go @@ -131,6 +131,10 @@ type BlockChain struct { index *blockIndex bestChain *chainView + // The UTXO state holds a cached view of the UTXO state of the chain. + // It is protected by the chain lock. + utxoCache *utxoCache + // These fields are related to handling of orphan blocks. They are // protected by a combination of the chain lock and the orphan lock. orphanLock sync.RWMutex @@ -551,9 +555,14 @@ func (b *BlockChain) getReorganizeNodes(node *blockNode) (*list.List, *list.List // connectBlock handles connecting the passed node/block to the end of the main // (best) chain. // -// This passed utxo view must have all referenced txos the block spends marked -// as spent and all of the new txos the block creates added to it. In addition, -// the passed stxos slice must be populated with all of the information for the +// Passing in a utxo view is optional. If the passed in utxo view is nil, +// connectBlock will assume that the utxo cache has already connected all the +// txs in the block being connected. +// If a utxo view is passed in, this passed utxo view must have all referenced +// txos the block spends marked as spent and all of the new txos the block creates +// added to it. +// +// The passed stxos slice must be populated with all of the information for the // spent txos. This approach is used because the connection validation that // must happen prior to calling this function requires the same details, so // it would be inefficient to repeat it. @@ -602,6 +611,18 @@ func (b *BlockChain) connectBlock(node *blockNode, block *btcutil.Block, curTotalTxns+numTxns, CalcPastMedianTime(node), ) + // If a utxoviewpoint was passed in, we'll be writing that viewpoint + // directly to the database on disk. In order for the database to be + // consistent, we must flush the cache before writing the viewpoint. + if view != nil { + err = b.db.Update(func(dbTx database.Tx) error { + return b.utxoCache.flush(dbTx, FlushRequired, state) + }) + if err != nil { + return err + } + } + // Atomically insert info into the database. err = b.db.Update(func(dbTx database.Tx) error { // If the pruneTarget isn't 0, we should attempt to delete older blocks @@ -640,6 +661,8 @@ func (b *BlockChain) connectBlock(node *blockNode, block *btcutil.Block, // Update the utxo set using the state of the utxo view. This // entails removing all of the utxos spent and adding the new // ones created by the block. + // + // A nil viewpoint is a no-op. err = dbPutUtxoView(dbTx, view) if err != nil { return err @@ -670,7 +693,9 @@ func (b *BlockChain) connectBlock(node *blockNode, block *btcutil.Block, // Prune fully spent entries and mark all entries in the view unmodified // now that the modifications have been committed to the database. - view.commit() + if view != nil { + view.commit() + } // This node is now the end of the best chain. b.bestChain.SetTip(node) @@ -691,7 +716,11 @@ func (b *BlockChain) connectBlock(node *blockNode, block *btcutil.Block, b.sendNotification(NTBlockConnected, block) b.chainLock.Lock() - return nil + // Since we may have changed the UTXO cache, we make sure it didn't exceed its + // maximum size. If we're pruned and have flushed already, this will be a no-op. + return b.db.Update(func(dbTx database.Tx) error { + return b.utxoCache.flush(dbTx, FlushIfNeeded, b.BestSnapshot()) + }) } // disconnectBlock handles disconnecting the passed node/block from the end of @@ -840,6 +869,15 @@ func (b *BlockChain) reorganizeChain(detachNodes, attachNodes *list.List) error return nil } + // The rest of the reorg depends on all STXOs already being in the database + // so we flush before reorg. + err := b.db.Update(func(dbTx database.Tx) error { + return b.utxoCache.flush(dbTx, FlushRequired, b.BestSnapshot()) + }) + if err != nil { + return err + } + // Ensure the provided nodes match the current best chain. tip := b.bestChain.Tip() if detachNodes.Len() != 0 { @@ -901,7 +939,7 @@ func (b *BlockChain) reorganizeChain(detachNodes, attachNodes *list.List) error // Load all of the utxos referenced by the block that aren't // already in the view. - err = view.fetchInputUtxos(b.db, block) + err = view.fetchInputUtxos(b.db, nil, block) if err != nil { return err } @@ -968,7 +1006,7 @@ func (b *BlockChain) reorganizeChain(detachNodes, attachNodes *list.List) error // checkConnectBlock gets skipped, we still need to update the UTXO // view. if b.index.NodeStatus(n).KnownValid() { - err = view.fetchInputUtxos(b.db, block) + err = view.fetchInputUtxos(b.db, nil, block) if err != nil { return err } @@ -1020,7 +1058,7 @@ func (b *BlockChain) reorganizeChain(detachNodes, attachNodes *list.List) error // Load all of the utxos referenced by the block that aren't // already in the view. - err := view.fetchInputUtxos(b.db, block) + err := view.fetchInputUtxos(b.db, nil, block) if err != nil { return err } @@ -1047,7 +1085,7 @@ func (b *BlockChain) reorganizeChain(detachNodes, attachNodes *list.List) error // Load all of the utxos referenced by the block that aren't // already in the view. - err := view.fetchInputUtxos(b.db, block) + err := view.fetchInputUtxos(b.db, nil, block) if err != nil { return err } @@ -1069,6 +1107,15 @@ func (b *BlockChain) reorganizeChain(detachNodes, attachNodes *list.List) error } } + // We call the flush at the end to update the last flush hash to the new + // best tip. + err = b.db.Update(func(dbTx database.Tx) error { + return b.utxoCache.flush(dbTx, FlushRequired, b.BestSnapshot()) + }) + if err != nil { + return err + } + // Log the point where the chain forked and old and new best chain // heads. if forkNode != nil { @@ -1121,11 +1168,21 @@ func (b *BlockChain) connectBestChain(node *blockNode, block *btcutil.Block, fla // Perform several checks to verify the block can be connected // to the main chain without violating any rules and without // actually connecting the block. - view := NewUtxoViewpoint() - view.SetBestHash(parentHash) - stxos := make([]SpentTxOut, 0, countSpentOutputs(block)) if !fastAdd { - err := b.checkConnectBlock(node, block, view, &stxos) + // We create a viewpoint here to avoid spending or adding new + // coins to the utxo cache. + // + // checkConnectBlock spends and adds utxos before doing the + // signature validation and if the signature validation fails, + // we would be forced to undo the utxo cache. + // + // TODO (kcalvinalvin): Doing all of the validation before connecting + // the tx inside check connect block would allow us to pass the utxo + // cache directly to the check connect block. This would save on the + // expensive memory allocation done by fetch input utxos. + view := NewUtxoViewpoint() + view.SetBestHash(parentHash) + err := b.checkConnectBlock(node, block, view, nil) if err == nil { b.index.SetStatusFlags(node, statusValid) } else if _, ok := err.(RuleError); ok { @@ -1141,23 +1198,16 @@ func (b *BlockChain) connectBestChain(node *blockNode, block *btcutil.Block, fla } } - // In the fast add case the code to check the block connection - // was skipped, so the utxo view needs to load the referenced - // utxos, spend them, and add the new utxos being created by - // this block. - if fastAdd { - err := view.fetchInputUtxos(b.db, block) - if err != nil { - return false, err - } - err = view.connectTransactions(block, &stxos) - if err != nil { - return false, err - } + // Connect the transactions to the cache. All the txs are considered valid + // at this point as they have passed validation or was considered valid already. + stxos := make([]SpentTxOut, 0, countSpentOutputs(block)) + err := b.utxoCache.connectTransactions(block, &stxos) + if err != nil { + return false, err } // Connect the block to the main chain. - err := b.connectBlock(node, block, view, stxos) + err = b.connectBlock(node, block, nil, stxos) if err != nil { // If we got hit with a rule error, then we'll mark // that status of the block as invalid and flush the @@ -1785,6 +1835,11 @@ type Config struct { // This field is required. DB database.DB + // The maximum size in bytes of the UTXO cache. + // + // This field is required. + UtxoCacheMaxSize uint64 + // Interrupt specifies a channel the caller can close to signal that // long running operations, such as catching up indexes or performing // database migrations, should be interrupted. @@ -1893,6 +1948,7 @@ func New(config *Config) (*BlockChain, error) { maxRetargetTimespan: targetTimespan * adjustmentFactor, blocksPerRetarget: int32(targetTimespan / targetTimePerBlock), index: newBlockIndex(config.DB, params), + utxoCache: newUtxoCache(config.DB, config.UtxoCacheMaxSize), hashCache: config.HashCache, bestChain: newChainView(nil), orphans: make(map[chainhash.Hash]*orphanBlock), @@ -1942,10 +1998,23 @@ func New(config *Config) (*BlockChain, error) { return nil, err } + // Make sure the utxo state is catched up if it was left in an inconsistent + // state. bestNode := b.bestChain.Tip() + if err := b.InitConsistentState(bestNode, config.Interrupt); err != nil { + return nil, err + } log.Infof("Chain state (height %d, hash %v, totaltx %d, work %v)", bestNode.height, bestNode.hash, b.stateSnapshot.TotalTxns, bestNode.workSum) return &b, nil } + +// CachedStateSize returns the total size of the cached state of the blockchain +// in bytes. +func (b *BlockChain) CachedStateSize() uint64 { + b.chainLock.Lock() + defer b.chainLock.Unlock() + return b.utxoCache.totalMemoryUsage() +} diff --git a/blockchain/utxocache.go b/blockchain/utxocache.go index 3f3c246e..d36a36ff 100644 --- a/blockchain/utxocache.go +++ b/blockchain/utxocache.go @@ -5,6 +5,7 @@ package blockchain import ( + "container/list" "fmt" "sync" "time" @@ -578,3 +579,135 @@ func (s *utxoCache) flush(dbTx database.Tx, mode FlushMode, bestState *BestState return nil } + +// FlushUtxoCache flushes the UTXO state to the database if a flush is needed with the +// given flush mode. +// +// This function is safe for concurrent access. +func (b *BlockChain) FlushUtxoCache(mode FlushMode) error { + b.chainLock.Lock() + defer b.chainLock.Unlock() + + return b.db.Update(func(dbTx database.Tx) error { + return b.utxoCache.flush(dbTx, mode, b.BestSnapshot()) + }) +} + +// InitConsistentState checks the consistency status of the utxo state and +// replays blocks if it lags behind the best state of the blockchain. +// +// It needs to be ensured that the chainView passed to this method does not +// get changed during the execution of this method. +func (b *BlockChain) InitConsistentState(tip *blockNode, interrupt <-chan struct{}) error { + s := b.utxoCache + // Load the consistency status from the database. + var statusBytes []byte + s.db.View(func(dbTx database.Tx) error { + statusBytes = dbFetchUtxoStateConsistency(dbTx) + return nil + }) + + // If no status was found, the database is old and didn't have a cached utxo + // state yet. In that case, we set the status to the best state and write + // this to the database. + if statusBytes == nil { + err := s.db.Update(func(dbTx database.Tx) error { + return dbPutUtxoStateConsistency(dbTx, &tip.hash) + }) + + // Set the last flush hash as it's the default value of 0s. + s.lastFlushHash = tip.hash + + return err + } + + statusHash, err := chainhash.NewHash(statusBytes) + if err != nil { + return err + } + + // If state is consistent, we are done. + if statusHash.IsEqual(&tip.hash) { + log.Debugf("UTXO state consistent at (%d:%v)", tip.height, tip.hash) + + // The last flush hash is set to the default value of all 0s. Set + // it to the tip since we checked it's consistent. + s.lastFlushHash = tip.hash + + return nil + } + + lastFlushNode := b.index.LookupNode(statusHash) + log.Infof("Reconstructing UTXO state after an unclean shutdown. The UTXO state is "+ + "consistent at block %s (%d) but the chainstate is at block %s (%d), This may "+ + "take a long time...", statusHash.String(), lastFlushNode.height, + tip.hash.String(), tip.height) + + // Even though this should always be true, make sure the fetched hash is in + // the best chain. + fork := b.bestChain.FindFork(lastFlushNode) + if fork == nil { + return AssertError(fmt.Sprintf("last utxo consistency status contains "+ + "hash that is not in best chain: %v", statusHash)) + } + + // We never disconnect blocks as they cannot be inconsistent during a reorganization. + // This is because The cache is flushed before the reorganization begins and the utxo + // set at each block disconnect is written atomically to the database. + node := lastFlushNode + + // We replay the blocks from the last consistent state up to the best + // state. Iterate forward from the consistent node to the tip of the best + // chain. + attachNodes := list.New() + for n := tip; n.height >= 0; n = n.parent { + if n == fork { + break + } + attachNodes.PushFront(n) + } + + for e := attachNodes.Front(); e != nil; e = e.Next() { + node = e.Value.(*blockNode) + + var block *btcutil.Block + err := s.db.View(func(dbTx database.Tx) error { + block, err = dbFetchBlockByNode(dbTx, node) + if err != nil { + return err + } + + return err + }) + if err != nil { + return err + } + + err = b.utxoCache.connectTransactions(block, nil) + if err != nil { + return err + } + + // Flush the utxo cache if needed. This will in turn update the + // consistent state to this block. + err = s.db.Update(func(dbTx database.Tx) error { + return s.flush(dbTx, FlushIfNeeded, &BestState{Hash: node.hash, Height: node.height}) + }) + if err != nil { + return err + } + + if interruptRequested(interrupt) { + log.Warn("UTXO state reconstruction interrupted") + + return errInterruptRequested + } + } + log.Debug("UTXO state reconstruction done") + + // Set the last flush hash as it's the default value of 0s. + s.lastFlushHash = tip.hash + s.lastFlushTime = time.Now() + + return nil +} diff --git a/blockchain/utxocache_test.go b/blockchain/utxocache_test.go index aa748e50..7bad3b85 100644 --- a/blockchain/utxocache_test.go +++ b/blockchain/utxocache_test.go @@ -6,10 +6,16 @@ package blockchain import ( "crypto/sha256" "encoding/binary" + "fmt" "reflect" "sync" "testing" + "time" + "github.com/btcsuite/btcd/btcutil" + "github.com/btcsuite/btcd/chaincfg" + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/database" "github.com/btcsuite/btcd/wire" ) @@ -305,3 +311,279 @@ func TestUtxoCacheEntrySize(t *testing.T) { } } } + +// assertConsistencyState asserts the utxo consistency states of the blockchain. +func assertConsistencyState(chain *BlockChain, hash *chainhash.Hash) error { + var bytes []byte + err := chain.db.View(func(dbTx database.Tx) (err error) { + bytes = dbFetchUtxoStateConsistency(dbTx) + return + }) + if err != nil { + return fmt.Errorf("Error fetching utxo state consistency: %v", err) + } + actualHash, err := chainhash.NewHash(bytes) + if err != nil { + return err + } + if !actualHash.IsEqual(hash) { + return fmt.Errorf("Unexpected consistency hash: %v instead of %v", + actualHash, hash) + } + + return nil +} + +// assertNbEntriesOnDisk asserts that the total number of utxo entries on the +// disk is equal to the given expected number. +func assertNbEntriesOnDisk(chain *BlockChain, expectedNumber int) error { + var nb int + err := chain.db.View(func(dbTx database.Tx) error { + cursor := dbTx.Metadata().Bucket(utxoSetBucketName).Cursor() + nb = 0 + for b := cursor.First(); b; b = cursor.Next() { + nb++ + _, err := deserializeUtxoEntry(cursor.Value()) + if err != nil { + return fmt.Errorf("Failed to deserialize entry: %v", err) + } + } + return nil + }) + if err != nil { + return fmt.Errorf("Error fetching utxo entries: %v", err) + } + if nb != expectedNumber { + return fmt.Errorf("Expected %d elements in the UTXO set, but found %d", + expectedNumber, nb) + } + + return nil +} + +// utxoCacheTestChain creates a test BlockChain to be used for utxo cache tests. +// It uses the regression test parameters, a coin matutiry of 1 block and sets +// the cache size limit to 10 MiB. +func utxoCacheTestChain(testName string) (*BlockChain, *chaincfg.Params, func()) { + params := chaincfg.RegressionNetParams + chain, tearDown, err := chainSetup(testName, ¶ms) + if err != nil { + panic(fmt.Sprintf("error loading blockchain with database: %v", err)) + } + + chain.TstSetCoinbaseMaturity(1) + chain.utxoCache.maxTotalMemoryUsage = 10 * 1024 * 1024 + chain.utxoCache.cachedEntries.maxTotalMemoryUsage = chain.utxoCache.maxTotalMemoryUsage + + return chain, ¶ms, tearDown +} + +func TestUtxoCacheFlush(t *testing.T) { + chain, params, tearDown := utxoCacheTestChain("TestUtxoCacheFlush") + defer tearDown() + cache := chain.utxoCache + tip := btcutil.NewBlock(params.GenesisBlock) + + // The chainSetup init triggers the consistency status write. + err := assertConsistencyState(chain, params.GenesisHash) + if err != nil { + t.Fatal(err) + } + + err = assertNbEntriesOnDisk(chain, 0) + if err != nil { + t.Fatal(err) + } + + // LastFlushHash starts with genesis. + if cache.lastFlushHash != *params.GenesisHash { + t.Fatalf("lastFlushHash before first flush expected to be "+ + "genesis block hash, instead was %v", cache.lastFlushHash) + } + + // First, add 10 utxos without flushing. + outPoints := make([]wire.OutPoint, 10) + for i := range outPoints { + op := outpointFromInt(i) + outPoints[i] = op + + // Add the txout. + txOut := wire.TxOut{Value: 10000, PkScript: getValidP2PKHScript()} + cache.addTxOut(op, &txOut, true, int32(i)) + } + + if cache.cachedEntries.length() != len(outPoints) { + t.Fatalf("Expected 10 entries, has %d instead", cache.cachedEntries.length()) + } + + // All entries should be fresh and modified. + for _, m := range cache.cachedEntries.maps { + for outpoint, entry := range m { + if entry == nil { + t.Fatalf("Unexpected nil entry found for %v", outpoint) + } + if !entry.isModified() { + t.Fatal("Entry should be marked mofified") + } + if !entry.isFresh() { + t.Fatal("Entry should be marked fresh") + } + } + } + + // Spend the last outpoint and pop it off from the outpoints slice. + var spendOp wire.OutPoint + spendOp, outPoints = outPoints[len(outPoints)-1], outPoints[:len(outPoints)-1] + cache.addTxIn(&wire.TxIn{PreviousOutPoint: spendOp}, nil) + + if cache.cachedEntries.length() != len(outPoints) { + t.Fatalf("Expected %d entries, has %d instead", + len(outPoints), cache.cachedEntries.length()) + } + + // Not flushed yet. + err = assertConsistencyState(chain, params.GenesisHash) + if err != nil { + t.Fatal(err) + } + + err = assertNbEntriesOnDisk(chain, 0) + if err != nil { + t.Fatal(err) + } + + // Flush. + err = chain.db.Update(func(dbTx database.Tx) error { + return cache.flush(dbTx, FlushRequired, chain.stateSnapshot) + }) + if err != nil { + t.Fatalf("unexpected error while flushing cache: %v", err) + } + if cache.cachedEntries.length() != 0 { + t.Fatalf("Expected 0 entries, has %d instead", cache.cachedEntries.length()) + } + + err = assertConsistencyState(chain, tip.Hash()) + if err != nil { + t.Fatal(err) + } + err = assertNbEntriesOnDisk(chain, len(outPoints)) + if err != nil { + t.Fatal(err) + } + + // Fetch the flushed utxos. + entries, err := cache.fetchEntries(outPoints) + if err != nil { + t.Fatal(err) + } + + // Check that the returned entries are not marked fresh and modified. + for _, entry := range entries { + if entry.isFresh() { + t.Fatal("Entry should not be marked fresh") + } + if entry.isModified() { + t.Fatal("Entry should not be marked modified") + } + } + + // Check that the fetched entries in the cache are not marked fresh and modified. + for _, m := range cache.cachedEntries.maps { + for outpoint, elem := range m { + if elem == nil { + t.Fatalf("Unexpected nil entry found for %v", outpoint) + } + if elem.isFresh() { + t.Fatal("Entry should not be marked fresh") + } + if elem.isModified() { + t.Fatal("Entry should not be marked modified") + } + } + } + + // Spend 5 utxos. + prevLen := len(outPoints) + for i := 0; i < 5; i++ { + spendOp, outPoints = outPoints[len(outPoints)-1], outPoints[:len(outPoints)-1] + cache.addTxIn(&wire.TxIn{PreviousOutPoint: spendOp}, nil) + } + + // Should still have the entries in cache so they can be flushed to disk. + if cache.cachedEntries.length() != prevLen { + t.Fatalf("Expected 10 entries, has %d instead", cache.cachedEntries.length()) + } + + // Flush. + err = chain.db.Update(func(dbTx database.Tx) error { + return cache.flush(dbTx, FlushRequired, chain.stateSnapshot) + }) + if err != nil { + t.Fatalf("unexpected error while flushing cache: %v", err) + } + if cache.cachedEntries.length() != 0 { + t.Fatalf("Expected 0 entries, has %d instead", cache.cachedEntries.length()) + } + + err = assertConsistencyState(chain, tip.Hash()) + if err != nil { + t.Fatal(err) + } + err = assertNbEntriesOnDisk(chain, len(outPoints)) + if err != nil { + t.Fatal(err) + } + + // Add 5 utxos without flushing and test for periodic flushes. + outPoints1 := make([]wire.OutPoint, 5) + for i := range outPoints1 { + // i + prevLen here to avoid collision since we're just hashing + // the int. + op := outpointFromInt(i + prevLen) + outPoints1[i] = op + + // Add the txout. + txOut := wire.TxOut{Value: 10000, PkScript: getValidP2PKHScript()} + cache.addTxOut(op, &txOut, true, int32(i+prevLen)) + } + if cache.cachedEntries.length() != len(outPoints1) { + t.Fatalf("Expected %d entries, has %d instead", + len(outPoints1), cache.cachedEntries.length()) + } + + // Attempt to flush with flush periodic. Shouldn't flush. + err = chain.db.Update(func(dbTx database.Tx) error { + return cache.flush(dbTx, FlushPeriodic, chain.stateSnapshot) + }) + if err != nil { + t.Fatalf("unexpected error while flushing cache: %v", err) + } + if cache.cachedEntries.length() == 0 { + t.Fatalf("Expected %d entries, has %d instead", + len(outPoints1), cache.cachedEntries.length()) + } + + // Arbitrarily set the last flush time to 6 minutes ago. + cache.lastFlushTime = time.Now().Add(-time.Minute * 6) + + // Attempt to flush with flush periodic. Should flush now. + err = chain.db.Update(func(dbTx database.Tx) error { + return cache.flush(dbTx, FlushPeriodic, chain.stateSnapshot) + }) + if err != nil { + t.Fatalf("unexpected error while flushing cache: %v", err) + } + if cache.cachedEntries.length() != 0 { + t.Fatalf("Expected 0 entries, has %d instead", cache.cachedEntries.length()) + } + + err = assertConsistencyState(chain, tip.Hash()) + if err != nil { + t.Fatal(err) + } + err = assertNbEntriesOnDisk(chain, len(outPoints)+len(outPoints1)) + if err != nil { + t.Fatal(err) + } +} diff --git a/blockchain/utxoviewpoint.go b/blockchain/utxoviewpoint.go index c5ff673b..fdd165c0 100644 --- a/blockchain/utxoviewpoint.go +++ b/blockchain/utxoviewpoint.go @@ -554,10 +554,43 @@ func (view *UtxoViewpoint) fetchUtxosMain(db database.DB, outpoints []wire.OutPo }) } +// fetchUtxosFromCache fetches unspent transaction output data about the provided +// set of outpoints from the point of view of the end of the main chain at the +// time of the call. It attempts to fetch them from the cache and whatever entries +// that were not in the cache will be attempted to be fetched from the database and +// it'll be cached. +// +// Upon completion of this function, the view will contain an entry for each +// requested outpoint. Spent outputs, or those which otherwise don't exist, +// will result in a nil entry in the view. +func (view *UtxoViewpoint) fetchUtxosFromCache(cache *utxoCache, outpoints []wire.OutPoint) error { + // Nothing to do if there are no requested outputs. + if len(outpoints) == 0 { + return nil + } + + // Load the requested set of unspent transaction outputs from the point + // of view of the end of the main chain. Any missing entries will be + // fetched from the database and be cached. + // + // NOTE: Missing entries are not considered an error here and instead + // will result in nil entries in the view. This is intentionally done + // so other code can use the presence of an entry in the store as a way + // to unnecessarily avoid attempting to reload it from the database. + entries, err := cache.fetchEntries(outpoints) + if err != nil { + return err + } + for i, entry := range entries { + view.entries[outpoints[i]] = entry.Clone() + } + return nil +} + // fetchUtxos loads the unspent transaction outputs for the provided set of // outputs into the view from the database as needed unless they already exist // in the view in which case they are ignored. -func (view *UtxoViewpoint) fetchUtxos(db database.DB, outpoints []wire.OutPoint) error { +func (view *UtxoViewpoint) fetchUtxos(cache *utxoCache, outpoints []wire.OutPoint) error { // Nothing to do if there are no requested outputs. if len(outpoints) == 0 { return nil @@ -575,7 +608,7 @@ func (view *UtxoViewpoint) fetchUtxos(db database.DB, outpoints []wire.OutPoint) } // Request the input utxos from the database. - return view.fetchUtxosMain(db, needed) + return view.fetchUtxosFromCache(cache, needed) } // findInputsToFetch goes through all the blocks and returns all the outpoints of @@ -633,10 +666,13 @@ func (view *UtxoViewpoint) findInputsToFetch(block *btcutil.Block) []wire.OutPoi // fetchInputUtxos loads the unspent transaction outputs for the inputs // referenced by the transactions in the given block into the view from the -// database as needed. In particular, referenced entries that are earlier in -// the block are added to the view and entries that are already in the view are -// not modified. -func (view *UtxoViewpoint) fetchInputUtxos(db database.DB, block *btcutil.Block) error { +// database or the cache as needed. In particular, referenced entries that +// are earlier in the block are added to the view and entries that are already +// in the view are not modified. +func (view *UtxoViewpoint) fetchInputUtxos(db database.DB, cache *utxoCache, block *btcutil.Block) error { + if cache != nil { + return view.fetchUtxosFromCache(cache, view.findInputsToFetch(block)) + } // Request the input utxos from the cache. return view.fetchUtxosMain(db, view.findInputsToFetch(block)) } @@ -678,7 +714,7 @@ func (b *BlockChain) FetchUtxoView(tx *btcutil.Tx) (*UtxoViewpoint, error) { // chain. view := NewUtxoViewpoint() b.chainLock.RLock() - err := view.fetchUtxosMain(b.db, needed) + err := view.fetchUtxosFromCache(b.utxoCache, needed) b.chainLock.RUnlock() return view, err } @@ -697,16 +733,10 @@ func (b *BlockChain) FetchUtxoEntry(outpoint wire.OutPoint) (*UtxoEntry, error) b.chainLock.RLock() defer b.chainLock.RUnlock() - var entry *UtxoEntry - err := b.db.View(func(dbTx database.Tx) error { - var err error - utxoBucket := dbTx.Metadata().Bucket(utxoSetBucketName) - entry, err = dbFetchUtxoEntry(dbTx, utxoBucket, outpoint) - return err - }) + entries, err := b.utxoCache.fetchEntries([]wire.OutPoint{outpoint}) if err != nil { return nil, err } - return entry, nil + return entries[0], nil } diff --git a/blockchain/validate.go b/blockchain/validate.go index 438f4554..02d36134 100644 --- a/blockchain/validate.go +++ b/blockchain/validate.go @@ -889,7 +889,7 @@ func (b *BlockChain) checkBIP0030(node *blockNode, block *btcutil.Block, view *U fetch = append(fetch, prevOut) } } - err := view.fetchUtxos(b.db, fetch) + err := view.fetchUtxos(b.utxoCache, fetch) if err != nil { return err } @@ -1080,11 +1080,11 @@ func (b *BlockChain) checkConnectBlock(node *blockNode, block *btcutil.Block, vi } // Load all of the utxos referenced by the inputs for all transactions - // in the block don't already exist in the utxo view from the database. + // in the block don't already exist in the utxo view from the cache. // // These utxo entries are needed for verification of things such as // transaction inputs, counting pay-to-script-hashes, and scripts. - err := view.fetchInputUtxos(b.db, block) + err := view.fetchInputUtxos(nil, b.utxoCache, block) if err != nil { return err } diff --git a/cmd/addblock/import.go b/cmd/addblock/import.go index 7f4b9bb0..8eda8f8c 100644 --- a/cmd/addblock/import.go +++ b/cmd/addblock/import.go @@ -287,6 +287,16 @@ func (bi *blockImporter) Import() chan *importResults { // the status handler when done. go func() { bi.wg.Wait() + + // Flush the changes made to the blockchain. + log.Info("Flushing blockchain caches to the disk...") + if err := bi.chain.FlushUtxoCache(blockchain.FlushRequired); err != nil { + log.Errorf("Error while flushing the blockchain state: %v", err) + bi.errChan <- err + return + } + log.Info("Done flushing blockchain caches to disk") + bi.doneChan <- true }() diff --git a/config.go b/config.go index 67c47dbd..1fe0767f 100644 --- a/config.go +++ b/config.go @@ -63,6 +63,7 @@ const ( defaultMaxOrphanTransactions = 100 defaultMaxOrphanTxSize = 100000 defaultSigCacheMaxSize = 100000 + defaultUtxoCacheMaxSizeMiB = 250 sampleConfigFilename = "sample-btcd.conf" defaultTxIndex = false defaultAddrIndex = false @@ -171,6 +172,7 @@ type config struct { TestNet3 bool `long:"testnet" description:"Use the test network"` TorIsolation bool `long:"torisolation" description:"Enable Tor stream isolation by randomizing user credentials for each connection."` TrickleInterval time.Duration `long:"trickleinterval" description:"Minimum time between attempts to send new inventory to a connected peer"` + UtxoCacheMaxSizeMiB uint `long:"utxocachemaxsize" description:"The maximum size in MiB of the UTXO cache"` TxIndex bool `long:"txindex" description:"Maintain a full hash-based transaction index which makes all transactions available via the getrawtransaction RPC"` UserAgentComments []string `long:"uacomment" description:"Comment to add to the user agent -- See BIP 14 for more information."` Upnp bool `long:"upnp" description:"Use UPnP to map our listening port outside of NAT"` @@ -439,6 +441,7 @@ func loadConfig() (*config, []string, error) { BlockPrioritySize: mempool.DefaultBlockPrioritySize, MaxOrphanTxs: defaultMaxOrphanTransactions, SigCacheMaxSize: defaultSigCacheMaxSize, + UtxoCacheMaxSizeMiB: defaultUtxoCacheMaxSizeMiB, Generate: defaultGenerate, TxIndex: defaultTxIndex, AddrIndex: defaultAddrIndex, diff --git a/netsync/blocklogger.go b/netsync/blocklogger.go index 10f83d57..31a6a4c5 100644 --- a/netsync/blocklogger.go +++ b/netsync/blocklogger.go @@ -5,9 +5,11 @@ package netsync import ( + "fmt" "sync" "time" + "github.com/btcsuite/btcd/blockchain" "github.com/btcsuite/btcd/btcutil" "github.com/btcsuite/btclog" ) @@ -41,7 +43,7 @@ func newBlockProgressLogger(progressMessage string, logger btclog.Logger) *block // LogBlockHeight logs a new block height as an information message to show // progress to the user. In order to prevent spam, it limits logging to one // message every 10 seconds with duration and totals included. -func (b *blockProgressLogger) LogBlockHeight(block *btcutil.Block) { +func (b *blockProgressLogger) LogBlockHeight(block *btcutil.Block, chain *blockchain.BlockChain) { b.Lock() defer b.Unlock() @@ -67,9 +69,10 @@ func (b *blockProgressLogger) LogBlockHeight(block *btcutil.Block) { if b.receivedLogTx == 1 { txStr = "transaction" } - b.subsystemLogger.Infof("%s %d %s in the last %s (%d %s, height %d, %s)", + cacheSizeStr := fmt.Sprintf("~%d MiB", chain.CachedStateSize()/1024/1024) + b.subsystemLogger.Infof("%s %d %s in the last %s (%d %s, height %d, %s, %s cache)", b.progressAction, b.receivedLogBlocks, blockStr, tDuration, b.receivedLogTx, - txStr, block.Height(), block.MsgBlock().Header.Timestamp) + txStr, block.Height(), block.MsgBlock().Header.Timestamp, cacheSizeStr) b.receivedLogBlocks = 0 b.receivedLogTx = 0 diff --git a/netsync/manager.go b/netsync/manager.go index fa3cf3d0..41ba70aa 100644 --- a/netsync/manager.go +++ b/netsync/manager.go @@ -816,7 +816,7 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) { // When the block is not an orphan, log information about it and // update the chain state. - sm.progressLogger.LogBlockHeight(bmsg.block) + sm.progressLogger.LogBlockHeight(bmsg.block, sm.chain) // Update this peer's latest block height, for future // potential sync node candidacy. @@ -840,8 +840,13 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) { } } - // Nothing more to do if we aren't in headers-first mode. + // If we are not in headers first mode, it's a good time to periodically + // flush the blockchain cache because we don't expect new blocks immediately. + // After that, there is nothing more to do. if !sm.headersFirstMode { + if err := sm.chain.FlushUtxoCache(blockchain.FlushPeriodic); err != nil { + log.Errorf("Error while flushing the blockchain cache: %v", err) + } return } @@ -1414,6 +1419,11 @@ out: } } + log.Debug("Block handler shutting down: flushing blockchain caches...") + if err := sm.chain.FlushUtxoCache(blockchain.FlushRequired); err != nil { + log.Errorf("Error while flushing blockchain caches: %v", err) + } + sm.wg.Done() log.Trace("Block handler done") } diff --git a/server.go b/server.go index 4e88d36c..356326ab 100644 --- a/server.go +++ b/server.go @@ -2826,15 +2826,16 @@ func newServer(listenAddrs, agentBlacklist, agentWhitelist []string, // Create a new block chain instance with the appropriate configuration. var err error s.chain, err = blockchain.New(&blockchain.Config{ - DB: s.db, - Interrupt: interrupt, - ChainParams: s.chainParams, - Checkpoints: checkpoints, - TimeSource: s.timeSource, - SigCache: s.sigCache, - IndexManager: indexManager, - HashCache: s.hashCache, - Prune: cfg.Prune * 1024 * 1024, + DB: s.db, + Interrupt: interrupt, + ChainParams: s.chainParams, + Checkpoints: checkpoints, + TimeSource: s.timeSource, + SigCache: s.sigCache, + IndexManager: indexManager, + HashCache: s.hashCache, + Prune: cfg.Prune * 1024 * 1024, + UtxoCacheMaxSize: uint64(cfg.UtxoCacheMaxSizeMiB) * 1024 * 1024, }) if err != nil { return nil, err