blockchain: Add utxocache

The implemented utxocache implements connectTransactions just like
utxoviewpoint and can be used as a drop in replacement for
connectTransactions.

One thing to note is that unlike the utxoViewpoint, the utxocache
immediately deletes the spent entry from the cache.  This means that the
utxocache is unfit for functions like checkConnectBlock where you expect
the entry to still exist but be marked as spent.

disconnectTransactions is purposely not implemented as using the cache
during reorganizations may leave the utxo state inconsistent if there is
an unexpected shutdown.  The utxoViewpoint will still have to be used
for reorganizations.
This commit is contained in:
Calvin Kim 2023-05-16 18:09:31 +09:00
parent 053ef330f2
commit 3c11e48dd2
2 changed files with 553 additions and 0 deletions

View File

@ -5,8 +5,14 @@
package blockchain
import (
"fmt"
"sync"
"time"
"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/database"
"github.com/btcsuite/btcd/txscript"
"github.com/btcsuite/btcd/wire"
)
@ -165,3 +171,410 @@ func (ms *mapSlice) deleteMaps() {
ms.maxEntries = []int{size}
ms.maps = ms.maps[:1]
}
const (
// utxoFlushPeriodicInterval is the interval at which a flush is performed
// when the flush mode FlushPeriodic is used. This is used when the initial
// block download is complete and it's useful to flush periodically in case
// of unforseen shutdowns.
utxoFlushPeriodicInterval = time.Minute * 5
)
// FlushMode is used to indicate the different urgency types for a flush.
type FlushMode uint8
const (
// FlushRequired is the flush mode that means a flush must be performed
// regardless of the cache state. For example right before shutting down.
FlushRequired FlushMode = iota
// FlushPeriodic is the flush mode that means a flush can be performed
// when it would be almost needed. This is used to periodically signal when
// no I/O heavy operations are expected soon, so there is time to flush.
FlushPeriodic
// FlushIfNeeded is the flush mode that means a flush must be performed only
// if the cache is exceeding a safety threshold very close to its maximum
// size. This is used mostly internally in between operations that can
// increase the cache size.
FlushIfNeeded
)
// utxoCache is a cached utxo view in the chainstate of a BlockChain.
type utxoCache struct {
db database.DB
// maxTotalMemoryUsage is the maximum memory usage in bytes that the state
// should contain in normal circumstances.
maxTotalMemoryUsage uint64
// cachedEntries keeps the internal cache of the utxo state. The tfModified
// flag indicates that the state of the entry (potentially) deviates from the
// state in the database. Explicit nil values in the map are used to
// indicate that the database does not contain the entry.
cachedEntries mapSlice
totalEntryMemory uint64 // Total memory usage in bytes.
// Below fields are used to indicate when the last flush happened.
lastFlushHash chainhash.Hash
lastFlushTime time.Time
}
// newUtxoCache initiates a new utxo cache instance with its memory usage limited
// to the given maximum.
func newUtxoCache(db database.DB, maxTotalMemoryUsage uint64) *utxoCache {
// While the entry isn't included in the map size, add the average size to the
// bucket size so we get some leftover space for entries to take up.
numMaxElements := calculateMinEntries(int(maxTotalMemoryUsage), bucketSize+avgEntrySize)
numMaxElements -= 1
log.Infof("Pre-alloacting for %d MiB: ", maxTotalMemoryUsage/(1024*1024)+1)
m := make(map[wire.OutPoint]*UtxoEntry, numMaxElements)
return &utxoCache{
db: db,
maxTotalMemoryUsage: maxTotalMemoryUsage,
cachedEntries: mapSlice{
maps: []map[wire.OutPoint]*UtxoEntry{m},
maxEntries: []int{numMaxElements},
maxTotalMemoryUsage: maxTotalMemoryUsage,
},
}
}
// totalMemoryUsage returns the total memory usage in bytes of the UTXO cache.
func (s *utxoCache) totalMemoryUsage() uint64 {
// Total memory is the map size + the size that the utxo entries are
// taking up.
size := uint64(s.cachedEntries.size())
size += s.totalEntryMemory
return size
}
// fetchEntries returns the UTXO entries for the given outpoints. The function always
// returns as many entries as there are outpoints and the returns entries are in the
// same order as the outpoints. It returns nil if there is no entry for the outpoint
// in the UTXO set.
//
// The returned entries are NOT safe for concurrent access.
func (s *utxoCache) fetchEntries(outpoints []wire.OutPoint) ([]*UtxoEntry, error) {
entries := make([]*UtxoEntry, len(outpoints))
var missingOps []wire.OutPoint
var missingOpsIdx []int
for i := range outpoints {
if entry, ok := s.cachedEntries.get(outpoints[i]); ok {
entries[i] = entry
continue
}
// At this point, we have missing outpoints. Allocate them now
// so that we never allocate if the cache never misses.
if len(missingOps) == 0 {
missingOps = make([]wire.OutPoint, 0, len(outpoints))
missingOpsIdx = make([]int, 0, len(outpoints))
}
missingOpsIdx = append(missingOpsIdx, i)
missingOps = append(missingOps, outpoints[i])
}
// Return early and don't attempt access the database if we don't have any
// missing outpoints.
if len(missingOps) == 0 {
return entries, nil
}
// Fetch the missing outpoints in the cache from the database.
dbEntries := make([]*UtxoEntry, len(missingOps))
err := s.db.View(func(dbTx database.Tx) error {
utxoBucket := dbTx.Metadata().Bucket(utxoSetBucketName)
for i := range missingOps {
entry, err := dbFetchUtxoEntry(dbTx, utxoBucket, missingOps[i])
if err != nil {
return err
}
dbEntries[i] = entry
}
return nil
})
if err != nil {
return nil, err
}
// Add each of the entries to the UTXO cache and update their memory
// usage.
//
// NOTE: When the fetched entry is nil, it is still added to the cache
// as a miss; this prevents future lookups to perform the same database
// fetch.
for i := range dbEntries {
s.cachedEntries.put(missingOps[i], dbEntries[i], s.totalEntryMemory)
s.totalEntryMemory += dbEntries[i].memoryUsage()
}
// Fill in the entries with the ones fetched from the database.
for i := range missingOpsIdx {
entries[missingOpsIdx[i]] = dbEntries[i]
}
return entries, nil
}
// addTxOut adds the specified output to the cache if it is not provably
// unspendable. When the cache already has an entry for the output, it will be
// overwritten with the given output. All fields will be updated for existing
// entries since it's possible it has changed during a reorg.
func (s *utxoCache) addTxOut(
outpoint wire.OutPoint, txOut *wire.TxOut, isCoinBase bool, blockHeight int32) error {
// Don't add provably unspendable outputs.
if txscript.IsUnspendable(txOut.PkScript) {
return nil
}
entry := new(UtxoEntry)
entry.amount = txOut.Value
// Deep copy the script when the script in the entry differs from the one in
// the txout. This is required since the txout script is a subslice of the
// overall contiguous buffer that the msg tx houses for all scripts within
// the tx. It is deep copied here since this entry may be added to the utxo
// cache, and we don't want the utxo cache holding the entry to prevent all
// of the other tx scripts from getting garbage collected.
entry.pkScript = make([]byte, len(txOut.PkScript))
copy(entry.pkScript, txOut.PkScript)
entry.blockHeight = blockHeight
entry.packedFlags = tfFresh | tfModified
if isCoinBase {
entry.packedFlags |= tfCoinBase
}
s.cachedEntries.put(outpoint, entry, s.totalEntryMemory)
s.totalEntryMemory += entry.memoryUsage()
return nil
}
// addTxOuts adds all outputs in the passed transaction which are not provably
// unspendable to the view. When the view already has entries for any of the
// outputs, they are simply marked unspent. All fields will be updated for
// existing entries since it's possible it has changed during a reorg.
func (s *utxoCache) addTxOuts(tx *btcutil.Tx, blockHeight int32) error {
// Loop all of the transaction outputs and add those which are not
// provably unspendable.
isCoinBase := IsCoinBase(tx)
prevOut := wire.OutPoint{Hash: *tx.Hash()}
for txOutIdx, txOut := range tx.MsgTx().TxOut {
// Update existing entries. All fields are updated because it's
// possible (although extremely unlikely) that the existing
// entry is being replaced by a different transaction with the
// same hash. This is allowed so long as the previous
// transaction is fully spent.
prevOut.Index = uint32(txOutIdx)
err := s.addTxOut(prevOut, txOut, isCoinBase, blockHeight)
if err != nil {
return err
}
}
return nil
}
// addTxIn will add the given input to the cache if the previous outpoint the txin
// is pointing to exists in the utxo set. The utxo that is being spent by the input
// will be marked as spent and if the utxo is fresh (meaning that the database on disk
// never saw it), it will be removed from the cache.
func (s *utxoCache) addTxIn(txIn *wire.TxIn, stxos *[]SpentTxOut) error {
// Ensure the referenced utxo exists in the view. This should
// never happen unless there is a bug is introduced in the code.
entries, err := s.fetchEntries([]wire.OutPoint{txIn.PreviousOutPoint})
if err != nil {
return err
}
if len(entries) != 1 || entries[0] == nil {
return AssertError(fmt.Sprintf("missing input %v",
txIn.PreviousOutPoint))
}
// Only create the stxo details if requested.
entry := entries[0]
if stxos != nil {
// Populate the stxo details using the utxo entry.
var stxo = SpentTxOut{
Amount: entry.Amount(),
PkScript: entry.PkScript(),
Height: entry.BlockHeight(),
IsCoinBase: entry.IsCoinBase(),
}
*stxos = append(*stxos, stxo)
}
// Mark the entry as spent.
entry.Spend()
// If an entry is fresh it indicates that this entry was spent before it could be
// flushed to the database. Because of this, we can just delete it from the map of
// cached entries.
if entry.isFresh() {
// If the entry is fresh, we will always have it in the cache.
s.cachedEntries.delete(txIn.PreviousOutPoint)
s.totalEntryMemory -= entry.memoryUsage()
} else {
// Can leave the entry to be garbage collected as the only purpose
// of this entry now is so that the entry on disk can be deleted.
entry = nil
s.totalEntryMemory -= entry.memoryUsage()
}
return nil
}
// addTxIns will add the given inputs of the tx if it's not a coinbase tx and if
// the previous output that the input is pointing to exists in the utxo set. The
// utxo that is being spent by the input will be marked as spent and if the utxo
// is fresh (meaning that the database on disk never saw it), it will be removed
// from the cache.
func (s *utxoCache) addTxIns(tx *btcutil.Tx, stxos *[]SpentTxOut) error {
// Coinbase transactions don't have any inputs to spend.
if IsCoinBase(tx) {
return nil
}
for _, txIn := range tx.MsgTx().TxIn {
err := s.addTxIn(txIn, stxos)
if err != nil {
return err
}
}
return nil
}
// connectTransaction updates the cache by adding all new utxos created by the
// passed transaction and marking and/or removing all utxos that the transactions
// spend as spent. In addition, when the 'stxos' argument is not nil, it will
// be updated to append an entry for each spent txout. An error will be returned
// if the cache and the database does not contain the required utxos.
func (s *utxoCache) connectTransaction(
tx *btcutil.Tx, blockHeight int32, stxos *[]SpentTxOut) error {
err := s.addTxIns(tx, stxos)
if err != nil {
return err
}
// Add the transaction's outputs as available utxos.
return s.addTxOuts(tx, blockHeight)
}
// connectTransactions updates the cache by adding all new utxos created by all
// of the transactions in the passed block, marking and/or removing all utxos
// the transactions spend as spent, and setting the best hash for the view to
// the passed block. In addition, when the 'stxos' argument is not nil, it will
// be updated to append an entry for each spent txout.
func (s *utxoCache) connectTransactions(block *btcutil.Block, stxos *[]SpentTxOut) error {
for _, tx := range block.Transactions() {
err := s.connectTransaction(tx, block.Height(), stxos)
if err != nil {
return err
}
}
return nil
}
// writeCache writes all the entries that are cached in memory to the database atomically.
func (s *utxoCache) writeCache(dbTx database.Tx, bestState *BestState) error {
// Update commits and flushes the cache to the database.
// NOTE: The database has its own cache which gets atomically written
// to leveldb.
utxoBucket := dbTx.Metadata().Bucket(utxoSetBucketName)
for i := range s.cachedEntries.maps {
for outpoint, entry := range s.cachedEntries.maps[i] {
// If the entry is nil or spent, remove the entry from the database
// and the cache.
if entry == nil || entry.IsSpent() {
err := dbDeleteUtxoEntry(utxoBucket, outpoint)
if err != nil {
return err
}
delete(s.cachedEntries.maps[i], outpoint)
continue
}
// No need to update the cache if the entry was not modified.
if !entry.isModified() {
delete(s.cachedEntries.maps[i], outpoint)
continue
}
// Entry is fresh and needs to be put into the database.
err := dbPutUtxoEntry(utxoBucket, outpoint, entry)
if err != nil {
return err
}
delete(s.cachedEntries.maps[i], outpoint)
}
}
s.cachedEntries.deleteMaps()
s.totalEntryMemory = 0
// When done, store the best state hash in the database to indicate the state
// is consistent until that hash.
err := dbPutUtxoStateConsistency(dbTx, &bestState.Hash)
if err != nil {
return err
}
// The best state is the new last flush hash.
s.lastFlushHash = bestState.Hash
s.lastFlushTime = time.Now()
return nil
}
// flush flushes the UTXO state to the database if a flush is needed with the given flush mode.
//
// This function MUST be called with the chain state lock held (for writes).
func (s *utxoCache) flush(dbTx database.Tx, mode FlushMode, bestState *BestState) error {
var threshold uint64
switch mode {
case FlushRequired:
threshold = 0
case FlushIfNeeded:
// If we performed a flush in the current best state, we have nothing to do.
if bestState.Hash == s.lastFlushHash {
return nil
}
threshold = s.maxTotalMemoryUsage
case FlushPeriodic:
// If the time since the last flush is over the periodic interval,
// force a flush. Otherwise just flush when the cache is full.
if time.Since(s.lastFlushTime) > utxoFlushPeriodicInterval {
threshold = 0
} else {
threshold = s.maxTotalMemoryUsage
}
}
if s.totalMemoryUsage() >= threshold {
// Add one to round up the integer division.
totalMiB := s.totalMemoryUsage() / ((1024 * 1024) + 1)
log.Infof("Flushing UTXO cache of %d MiB with %d entries to disk. For large sizes, "+
"this can take up to several minutes...", totalMiB, s.cachedEntries.length())
return s.writeCache(dbTx, bestState)
}
return nil
}

View File

@ -165,3 +165,143 @@ func TestMapsliceConcurrency(t *testing.T) {
wg.Wait()
}
}
// getValidP2PKHScript returns a valid P2PKH script. Useful as unspendables cannot be
// added to the cache.
func getValidP2PKHScript() []byte {
validP2PKHScript := []byte{
// OP_DUP
0x76,
// OP_HASH160
0xa9,
// OP_DATA_20
0x14,
// <20-byte pubkey hash>
0xf0, 0x7a, 0xb8, 0xce, 0x72, 0xda, 0x4e, 0x76,
0x0b, 0x74, 0x7d, 0x48, 0xd6, 0x65, 0xec, 0x96,
0xad, 0xf0, 0x24, 0xf5,
// OP_EQUALVERIFY
0x88,
// OP_CHECKSIG
0xac,
}
return validP2PKHScript
}
// outpointFromInt generates an outpoint from an int by hashing the int and making
// the given int the index.
func outpointFromInt(i int) wire.OutPoint {
// Boilerplate to create an outpoint.
var buf [4]byte
binary.BigEndian.PutUint32(buf[:], uint32(i))
hash := sha256.Sum256(buf[:])
return wire.OutPoint{Hash: hash, Index: uint32(i)}
}
func TestUtxoCacheEntrySize(t *testing.T) {
type block struct {
txOuts []*wire.TxOut
outOps []wire.OutPoint
txIns []*wire.TxIn
}
tests := []struct {
name string
blocks []block
expectedSize uint64
}{
{
name: "one entry",
blocks: func() []block {
return []block{
{
txOuts: []*wire.TxOut{
{Value: 10000, PkScript: getValidP2PKHScript()},
},
outOps: []wire.OutPoint{
outpointFromInt(0),
},
},
}
}(),
expectedSize: pubKeyHashLen + baseEntrySize,
},
{
name: "10 entries, 4 spend",
blocks: func() []block {
blocks := make([]block, 0, 10)
for i := 0; i < 10; i++ {
op := outpointFromInt(i)
block := block{
txOuts: []*wire.TxOut{
{Value: 10000, PkScript: getValidP2PKHScript()},
},
outOps: []wire.OutPoint{
op,
},
}
// Spend all outs in blocks less than 4.
if i < 4 {
block.txIns = []*wire.TxIn{
{PreviousOutPoint: op},
}
}
blocks = append(blocks, block)
}
return blocks
}(),
// Multipled by 6 since we'll have 6 entries left.
expectedSize: (pubKeyHashLen + baseEntrySize) * 6,
},
{
name: "spend everything",
blocks: func() []block {
blocks := make([]block, 0, 500)
for i := 0; i < 500; i++ {
op := outpointFromInt(i)
block := block{
txOuts: []*wire.TxOut{
{Value: 1000, PkScript: getValidP2PKHScript()},
},
outOps: []wire.OutPoint{
op,
},
}
// Spend all outs in blocks less than 4.
block.txIns = []*wire.TxIn{
{PreviousOutPoint: op},
}
blocks = append(blocks, block)
}
return blocks
}(),
expectedSize: 0,
},
}
for _, test := range tests {
// Size is just something big enough so that the mapslice doesn't
// run out of memory.
s := newUtxoCache(nil, 1*1024*1024)
for height, block := range test.blocks {
for i, out := range block.txOuts {
s.addTxOut(block.outOps[i], out, true, int32(height))
}
for _, in := range block.txIns {
s.addTxIn(in, nil)
}
}
if s.totalEntryMemory != test.expectedSize {
t.Errorf("Failed test %s. Expected size of %d, got %d",
test.name, test.expectedSize, s.totalEntryMemory)
}
}
}