diff --git a/blockchain/chain.go b/blockchain/chain.go index 1026c2e4..aacda4b8 100644 --- a/blockchain/chain.go +++ b/blockchain/chain.go @@ -166,6 +166,7 @@ type BlockChain struct { chainParams *chaincfg.Params notifications NotificationCallback sigCache *txscript.SigCache + indexManager IndexManager // chainLock protects concurrent access to the vast majority of the // fields in this struct below this point. @@ -732,6 +733,20 @@ func (b *BlockChain) getReorganizeNodes(node *blockNode) (*list.List, *list.List return detachNodes, attachNodes } +// dbMaybeStoreBlock stores the provided block in the database if it's not +// already there. +func dbMaybeStoreBlock(dbTx database.Tx, block *btcutil.Block) error { + hasBlock, err := dbTx.HasBlock(block.Sha()) + if err != nil { + return err + } + if hasBlock { + return nil + } + + return dbTx.StoreBlock(block) +} + // connectBlock handles connecting the passed node/block to the end of the main // (best) chain. // @@ -797,12 +812,19 @@ func (b *BlockChain) connectBlock(node *blockNode, block *btcutil.Block, view *U } // Insert the block into the database if it's not already there. - hasBlock, err := dbTx.HasBlock(block.Sha()) + err = dbMaybeStoreBlock(dbTx, block) if err != nil { return err } - if !hasBlock { - return dbTx.StoreBlock(block) + + // Allow the index manager to call each of the currently active + // optional indexes with the block being connected so they can + // update themselves accordingly. + if b.indexManager != nil { + err := b.indexManager.ConnectBlock(dbTx, block, view) + if err != nil { + return err + } } return nil @@ -913,6 +935,16 @@ func (b *BlockChain) disconnectBlock(node *blockNode, block *btcutil.Block, view return err } + // Allow the index manager to call each of the currently active + // optional indexes with the block being disconnected so they + // can update themselves accordingly. + if b.indexManager != nil { + err := b.indexManager.DisconnectBlock(dbTx, block, view) + if err != nil { + return err + } + } + return nil }) if err != nil { @@ -1339,6 +1371,23 @@ func (b *BlockChain) BestSnapshot() *BestState { return snapshot } +// IndexManager provides a generic interface that the is called when blocks are +// connected and disconnected to and from the tip of the main chain for the +// purpose of supporting optional indexes. +type IndexManager interface { + // Init is invoked during chain initialize in order to allow the index + // manager to initialize itself and any indexes it is managing. + Init(*BlockChain) error + + // ConnectBlock is invoked when a new block has been connected to the + // main chain. + ConnectBlock(database.Tx, *btcutil.Block, *UtxoViewpoint) error + + // DisconnectBlock is invoked when a block has been disconnected from + // the main chain. + DisconnectBlock(database.Tx, *btcutil.Block, *UtxoViewpoint) error +} + // Config is a descriptor which specifies the blockchain instance configuration. type Config struct { // DB defines the database which houses the blocks and will be used to @@ -1370,6 +1419,13 @@ type Config struct { // This field can be nil if the caller is not interested in using a // signature cache. SigCache *txscript.SigCache + + // IndexManager defines an index manager to use when initializing the + // chain and connecting and disconnecting blocks. + // + // This field can be nil if the caller does not wish to make use of an + // index manager. + IndexManager IndexManager } // New returns a BlockChain instance using the provided configuration details. @@ -1399,6 +1455,7 @@ func New(config *Config) (*BlockChain, error) { chainParams: params, notifications: config.Notifications, sigCache: config.SigCache, + indexManager: config.IndexManager, root: nil, bestNode: nil, index: make(map[wire.ShaHash]*blockNode), @@ -1415,6 +1472,14 @@ func New(config *Config) (*BlockChain, error) { return nil, err } + // Initialize and catch up all of the currently active optional indexes + // as needed. + if config.IndexManager != nil { + if err := config.IndexManager.Init(&b); err != nil { + return nil, err + } + } + log.Infof("Chain state (height %d, hash %v, totaltx %d, work %v)", b.bestNode.height, b.bestNode.hash, b.stateSnapshot.TotalTxns, b.bestNode.workSum) diff --git a/blockchain/chainio.go b/blockchain/chainio.go index 02869366..531814b3 100644 --- a/blockchain/chainio.go +++ b/blockchain/chainio.go @@ -1286,6 +1286,19 @@ func dbMainChainHasBlock(dbTx database.Tx, hash *wire.ShaHash) bool { return hashIndex.Get(hash[:]) != nil } +// MainChainHasBlock returns whether or not the block with the given hash is in +// the main chain. +// +// This function is safe for concurrent access. +func (b *BlockChain) MainChainHasBlock(hash *wire.ShaHash) (bool, error) { + var exists bool + err := b.db.View(func(dbTx database.Tx) error { + exists = dbMainChainHasBlock(dbTx, hash) + return nil + }) + return exists, err +} + // BlockHeightByHash returns the height of the block with the given hash in the // main chain. // diff --git a/blockchain/indexers/README.md b/blockchain/indexers/README.md new file mode 100644 index 00000000..8ec12e2b --- /dev/null +++ b/blockchain/indexers/README.md @@ -0,0 +1,44 @@ +indexers +======== + +[![Build Status](https://travis-ci.org/btcsuite/btcd.png?branch=master)] +(https://travis-ci.org/btcsuite/btcd) + +Package indexers implements optional block chain indexes. + +These indexes are typically used to enhance the amount of information available +via an RPC interface. + +## Supported Indexers + +- Transaction-by-hash (txbyhashidx) Index + - Creates a mapping from the hash of each transaction to the block that + contains it along with its offset and length within the serialized block +- Transaction-by-address (txbyaddridx) Index + - Creates a mapping from every address to all transactions which either credit + or debit the address + - Requires the transaction-by-hash index + +## Documentation + +[![GoDoc](https://godoc.org/github.com/btcsuite/btcd/blockchain/indexers?status.png)] +(http://godoc.org/github.com/btcsuite/btcd/blockchain/indexers) + +Full `go doc` style documentation for the project can be viewed online without +installing this package by using the GoDoc site here: +http://godoc.org/github.com/btcsuite/btcd/blockchain/indexers + +You can also view the documentation locally once the package is installed with +the `godoc` tool by running `godoc -http=":6060"` and pointing your browser to +http://localhost:6060/pkg/github.com/btcsuite/btcd/blockchain/indexers + +## Installation + +```bash +$ go get -u github.com/btcsuite/btcd/blockchain/indexers +``` + +## License + +Package indexers is licensed under the [copyfree](http://copyfree.org) ISC +License. diff --git a/blockchain/indexers/addrindex.go b/blockchain/indexers/addrindex.go new file mode 100644 index 00000000..ce146ad9 --- /dev/null +++ b/blockchain/indexers/addrindex.go @@ -0,0 +1,932 @@ +// Copyright (c) 2016 The btcsuite developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package indexers + +import ( + "errors" + "fmt" + "sync" + + "github.com/btcsuite/btcd/blockchain" + "github.com/btcsuite/btcd/chaincfg" + database "github.com/btcsuite/btcd/database2" + "github.com/btcsuite/btcd/txscript" + "github.com/btcsuite/btcd/wire" + "github.com/btcsuite/btcutil" +) + +const ( + // addrIndexName is the human-readable name for the index. + addrIndexName = "address index" + + // level0MaxEntries is the maximum number of transactions that are + // stored in level 0 of an address index entry. Subsequent levels store + // 2^n * level0MaxEntries entries, or in words, double the maximum of + // the previous level. + level0MaxEntries = 8 + + // addrKeySize is the number of bytes an address key consumes in the + // index. It consists of 1 byte address type + 20 bytes hash160. + addrKeySize = 1 + 20 + + // levelKeySize is the number of bytes a level key in the address index + // consumes. It consists of the address key + 1 byte for the level. + levelKeySize = addrKeySize + 1 + + // levelOffset is the offset in the level key which identifes the level. + levelOffset = levelKeySize - 1 + + // addrKeyTypePubKeyHash is the address type in an address key which + // represents both a pay-to-pubkey-hash and a pay-to-pubkey address. + // This is done because both are identical for the purposes of the + // address index. + addrKeyTypePubKeyHash = 0 + + // addrKeyTypeScriptHash is the address type in an address key which + // represents a pay-to-script-hash address. This is necessary because + // the hash of a pubkey address might be the same as that of a script + // hash. + addrKeyTypeScriptHash = 1 + + // Size of a transaction entry. It consists of 4 bytes block id + 4 + // bytes offset + 4 bytes length. + txEntrySize = 4 + 4 + 4 +) + +var ( + // addrIndexKey is the key of the address index and the db bucket used + // to house it. + addrIndexKey = []byte("txbyaddridx") + + // errUnsupportedAddressType is an error that is used to signal an + // unsupported address type has been used. + errUnsupportedAddressType = errors.New("address type is not supported " + + "by the address index") +) + +// ----------------------------------------------------------------------------- +// The address index maps addresses referenced in the blockchain to a list of +// all the transactions involving that address. Transactions are stored +// according to their order of appearance in the blockchain. That is to say +// first by block height and then by offset inside the block. It is also +// important to note that this implementation requires the transaction index +// since it is needed in order to catch up old blocks due to the fact the spent +// outputs will already be pruned from the utxo set. +// +// The approach used to store the index is similar to a log-structured merge +// tree (LSM tree) and is thus similar to how leveldb works internally. +// +// Every address consists of one or more entries identified by a level starting +// from 0 where each level holds a maximum number of entries such that each +// subsequent level holds double the maximum of the previous one. In equation +// form, the number of entries each level holds is 2^n * firstLevelMaxSize. +// +// New transactions are appended to level 0 until it becomes full at which point +// the entire level 0 entry is appended to the level 1 entry and level 0 is +// cleared. This process continues until level 1 becomes full at which point it +// will be appended to level 2 and cleared and so on. +// +// The result of this is the lower levels contain newer transactions and the +// transactions within each level are ordered from oldest to newest. +// +// The intent of this approach is to provide a balance between space efficiency +// and indexing cost. Storing one entry per transaction would have the lowest +// indexing cost, but would waste a lot of space because the same address hash +// would be duplicated for every transaction key. On the other hand, storing a +// single entry with all transactions would be the most space efficient, but +// would cause indexing cost to grow quadratically with the number of +// transactions involving the same address. The approach used here provides +// logarithmic insertion and retrieval. +// +// The serialized key format is: +// +// +// +// Field Type Size +// addr type uint8 1 byte +// addr hash hash160 20 bytes +// level uint8 1 byte +// ----- +// Total: 22 bytes +// +// The serialized value format is: +// +// [,...] +// +// Field Type Size +// block id uint32 4 bytes +// start offset uint32 4 bytes +// tx length uint32 4 bytes +// ----- +// Total: 12 bytes per indexed tx +// ----------------------------------------------------------------------------- + +// fetchBlockHashFunc defines a callback function to use in order to convert a +// serialized block ID to an associated block hash. +type fetchBlockHashFunc func(serializedID []byte) (*wire.ShaHash, error) + +// serializeAddrIndexEntry serializes the provided block id and transaction +// location according to the format described in detail above. +func serializeAddrIndexEntry(blockID uint32, txLoc wire.TxLoc) []byte { + // Serialize the entry. + serialized := make([]byte, 12) + byteOrder.PutUint32(serialized, blockID) + byteOrder.PutUint32(serialized[4:], uint32(txLoc.TxStart)) + byteOrder.PutUint32(serialized[8:], uint32(txLoc.TxLen)) + return serialized +} + +// deserializeAddrIndexEntry decodes the passed serialized byte slice into the +// provided region struct according to the format described in detail above and +// uses the passed block hash fetching function in order to conver the block ID +// to the associated block hash. +func deserializeAddrIndexEntry(serialized []byte, region *database.BlockRegion, fetchBlockHash fetchBlockHashFunc) error { + // Ensure there are enough bytes to decode. + if len(serialized) < txEntrySize { + return errDeserialize("unexpected end of data") + } + + hash, err := fetchBlockHash(serialized[0:4]) + if err != nil { + return err + } + region.Hash = hash + region.Offset = byteOrder.Uint32(serialized[4:8]) + region.Len = byteOrder.Uint32(serialized[8:12]) + return nil +} + +// keyForLevel returns the key for a specific address and level in the address +// index entry. +func keyForLevel(addrKey [addrKeySize]byte, level uint8) [levelKeySize]byte { + var key [levelKeySize]byte + copy(key[:], addrKey[:]) + key[levelOffset] = level + return key +} + +// dbPutAddrIndexEntry updates the address index to include the provided entry +// according to the level-based scheme described in detail above. +func dbPutAddrIndexEntry(bucket internalBucket, addrKey [addrKeySize]byte, blockID uint32, txLoc wire.TxLoc) error { + // Start with level 0 and its initial max number of entries. + curLevel := uint8(0) + maxLevelBytes := level0MaxEntries * txEntrySize + + // Simply append the new entry to level 0 and return now when it will + // fit. This is the most common path. + newData := serializeAddrIndexEntry(blockID, txLoc) + level0Key := keyForLevel(addrKey, 0) + level0Data := bucket.Get(level0Key[:]) + if len(level0Data)+len(newData) <= maxLevelBytes { + mergedData := newData + if len(level0Data) > 0 { + mergedData = make([]byte, len(level0Data)+len(newData)) + copy(mergedData, level0Data) + copy(mergedData[len(level0Data):], newData) + } + return bucket.Put(level0Key[:], mergedData) + } + + // At this point, level 0 is full, so merge each level into higher + // levels as many times as needed to free up level 0. + prevLevelData := level0Data + for { + // Each new level holds twice as much as the previous one. + curLevel++ + maxLevelBytes *= 2 + + // Move to the next level as long as the current level is full. + curLevelKey := keyForLevel(addrKey, curLevel) + curLevelData := bucket.Get(curLevelKey[:]) + if len(curLevelData) == maxLevelBytes { + prevLevelData = curLevelData + continue + } + + // The current level has room for the data in the previous one, + // so merge the data from previous level into it. + mergedData := prevLevelData + if len(curLevelData) > 0 { + mergedData = make([]byte, len(curLevelData)+ + len(prevLevelData)) + copy(mergedData, curLevelData) + copy(mergedData[len(curLevelData):], prevLevelData) + } + err := bucket.Put(curLevelKey[:], mergedData) + if err != nil { + return err + } + + // Move all of the levels before the previous one up a level. + for mergeLevel := curLevel - 1; mergeLevel > 0; mergeLevel-- { + mergeLevelKey := keyForLevel(addrKey, mergeLevel) + prevLevelKey := keyForLevel(addrKey, mergeLevel-1) + prevData := bucket.Get(prevLevelKey[:]) + err := bucket.Put(mergeLevelKey[:], prevData) + if err != nil { + return err + } + } + break + } + + // Finally, insert the new entry into level 0 now that it is empty. + return bucket.Put(level0Key[:], newData) +} + +// dbFetchAddrIndexEntries returns block regions for transactions referenced by +// the given address key and the number of entries skipped since it could have +// been less in the case where there are less total entries than the requested +// number of entries to skip. +func dbFetchAddrIndexEntries(bucket internalBucket, addrKey [addrKeySize]byte, numToSkip, numRequested uint32, reverse bool, fetchBlockHash fetchBlockHashFunc) ([]database.BlockRegion, uint32, error) { + // When the reverse flag is not set, all levels need to be fetched + // because numToSkip and numRequested are counted from the oldest + // transactions (highest level) and thus the total count is needed. + // However, when the reverse flag is set, only enough records to satisfy + // the requested amount are needed. + var level uint8 + var serialized []byte + for !reverse || len(serialized) < int(numToSkip+numRequested)*txEntrySize { + curLevelKey := keyForLevel(addrKey, level) + levelData := bucket.Get(curLevelKey[:]) + if levelData == nil { + // Stop when there are no more levels. + break + } + + // Higher levels contain older transactions, so prepend them. + prepended := make([]byte, len(serialized)+len(levelData)) + copy(prepended, levelData) + copy(prepended[len(levelData):], serialized) + serialized = prepended + level++ + } + + // When the requested number of entries to skip is larger than the + // number available, skip them all and return now with the actual number + // skipped. + numEntries := uint32(len(serialized) / txEntrySize) + if numToSkip >= numEntries { + return nil, numEntries, nil + } + + // Nothing more to do when there are no requested entries. + if numRequested == 0 { + return nil, numToSkip, nil + } + + // Limit the number to load based on the number of available entries, + // the number to skip, and the number requested. + numToLoad := numEntries - numToSkip + if numToLoad > numRequested { + numToLoad = numRequested + } + + // Start the offset after all skipped entries and load the calculated + // number. + results := make([]database.BlockRegion, numToLoad) + for i := uint32(0); i < numToLoad; i++ { + // Calculate the read offset according to the reverse flag. + var offset uint32 + if reverse { + offset = (numEntries - numToSkip - i - 1) * txEntrySize + } else { + offset = (numToSkip + i) * txEntrySize + } + + // Deserialize and populate the result. + err := deserializeAddrIndexEntry(serialized[offset:], + &results[i], fetchBlockHash) + if err != nil { + // Ensure any deserialization errors are returned as + // database corruption errors. + if isDeserializeErr(err) { + err = database.Error{ + ErrorCode: database.ErrCorruption, + Description: fmt.Sprintf("failed to "+ + "deserialized address index "+ + "for key %x: %v", addrKey, err), + } + } + + return nil, 0, err + } + } + + return results, numToSkip, nil +} + +// minEntriesToReachLevel returns the minimum number of entries that are +// required to reach the given address index level. +func minEntriesToReachLevel(level uint8) int { + maxEntriesForLevel := level0MaxEntries + minRequired := 1 + for l := uint8(1); l <= level; l++ { + minRequired += maxEntriesForLevel + maxEntriesForLevel *= 2 + } + return minRequired +} + +// maxEntriesForLevel returns the maximum number of entries allowed for the +// given address index level. +func maxEntriesForLevel(level uint8) int { + numEntries := level0MaxEntries + for l := level; l > 0; l-- { + numEntries *= 2 + } + return numEntries +} + +// dbRemoveAddrIndexEntries removes the specified number of entries from from +// the address index for the provided key. An assertion error will be returned +// if the count exceeds the total number of entries in the index. +func dbRemoveAddrIndexEntries(bucket internalBucket, addrKey [addrKeySize]byte, count int) error { + // Nothing to do if no entries are being deleted. + if count <= 0 { + return nil + } + + // Make use of a local map to track pending updates and define a closure + // to apply it to the database. This is done in order to reduce the + // number of database reads and because there is more than one exit + // path that needs to apply the updates. + pendingUpdates := make(map[uint8][]byte) + applyPending := func() error { + for level, data := range pendingUpdates { + curLevelKey := keyForLevel(addrKey, level) + if len(data) == 0 { + err := bucket.Delete(curLevelKey[:]) + if err != nil { + return err + } + continue + } + err := bucket.Put(curLevelKey[:], data) + if err != nil { + return err + } + } + return nil + } + + // Loop fowards through the levels while removing entries until the + // specified number has been removed. This will potentially result in + // entirely empty lower levels which will be backfilled below. + var highestLoadedLevel uint8 + numRemaining := count + for level := uint8(0); numRemaining > 0; level++ { + // Load the data for the level from the database. + curLevelKey := keyForLevel(addrKey, level) + curLevelData := bucket.Get(curLevelKey[:]) + if len(curLevelData) == 0 && numRemaining > 0 { + return AssertError(fmt.Sprintf("dbRemoveAddrIndexEntries "+ + "not enough entries for address key %x to "+ + "delete %d entries", addrKey, count)) + } + pendingUpdates[level] = curLevelData + highestLoadedLevel = level + + // Delete the entire level as needed. + numEntries := len(curLevelData) / txEntrySize + if numRemaining >= numEntries { + pendingUpdates[level] = nil + numRemaining -= numEntries + continue + } + + // Remove remaining entries to delete from the level. + offsetEnd := len(curLevelData) - (numRemaining * txEntrySize) + pendingUpdates[level] = curLevelData[:offsetEnd] + break + } + + // When all elements in level 0 were not removed there is nothing left + // to do other than updating the database. + if len(pendingUpdates[0]) != 0 { + return applyPending() + } + + // At this point there are one or more empty levels before the current + // level which need to be backfilled and the current level might have + // had some entries deleted from it as well. Since all levels after + // level 0 are required to either be empty, half full, or completely + // full, the current level must be adjusted accordingly by backfilling + // each previous levels in a way which satisfies the requirements. Any + // entries that are left are assigned to level 0 after the loop as they + // are guaranteed to fit by the logic in the loop. In other words, this + // effectively squashes all remaining entries in the current level into + // the lowest possible levels while following the level rules. + // + // Note that the level after the current level might also have entries + // and gaps are not allowed, so this also keeps track of the lowest + // empty level so the code below knows how far to backfill in case it is + // required. + lowestEmptyLevel := uint8(255) + curLevelData := pendingUpdates[highestLoadedLevel] + curLevelMaxEntries := maxEntriesForLevel(highestLoadedLevel) + for level := highestLoadedLevel; level > 0; level-- { + // When there are not enough entries left in the current level + // for the number that would be required to reach it, clear the + // the current level which effectively moves them all up to the + // previous level on the next iteration. Otherwise, there are + // are sufficient entries, so update the current level to + // contain as many entries as possible while still leaving + // enough remaining entries required to reach the level. + numEntries := len(curLevelData) / txEntrySize + prevLevelMaxEntries := curLevelMaxEntries / 2 + minPrevRequired := minEntriesToReachLevel(level - 1) + if numEntries < prevLevelMaxEntries+minPrevRequired { + lowestEmptyLevel = level + pendingUpdates[level] = nil + } else { + // This level can only be completely full or half full, + // so choose the appropriate offset to ensure enough + // entries remain to reach the level. + var offset int + if numEntries-curLevelMaxEntries >= minPrevRequired { + offset = curLevelMaxEntries * txEntrySize + } else { + offset = prevLevelMaxEntries * txEntrySize + } + pendingUpdates[level] = curLevelData[:offset] + curLevelData = curLevelData[offset:] + } + + curLevelMaxEntries = prevLevelMaxEntries + } + pendingUpdates[0] = curLevelData + if len(curLevelData) == 0 { + lowestEmptyLevel = 0 + } + + // When the highest loaded level is empty, it's possible the level after + // it still has data and thus that data needs to be backfilled as well. + for len(pendingUpdates[highestLoadedLevel]) == 0 { + // When the next level is empty too, the is no data left to + // continue backfilling, so there is nothing left to do. + // Otherwise, populate the pending updates map with the newly + // loaded data and update the highest loaded level accordingly. + level := highestLoadedLevel + 1 + curLevelKey := keyForLevel(addrKey, level) + levelData := bucket.Get(curLevelKey[:]) + if len(levelData) == 0 { + break + } + pendingUpdates[level] = levelData + highestLoadedLevel = level + + // At this point the highest level is not empty, but it might + // be half full. When that is the case, move it up a level to + // simplify the code below which backfills all lower levels that + // are still empty. This also means the current level will be + // empty, so the loop will perform another another iteration to + // potentially backfill this level with data from the next one. + curLevelMaxEntries := maxEntriesForLevel(level) + if len(levelData)/txEntrySize != curLevelMaxEntries { + pendingUpdates[level] = nil + pendingUpdates[level-1] = levelData + level-- + curLevelMaxEntries /= 2 + } + + // Backfill all lower levels that are still empty by iteratively + // halfing the data until the lowest empty level is filled. + for level > lowestEmptyLevel { + offset := (curLevelMaxEntries / 2) * txEntrySize + pendingUpdates[level] = levelData[:offset] + levelData = levelData[offset:] + pendingUpdates[level-1] = levelData + level-- + curLevelMaxEntries /= 2 + } + + // The lowest possible empty level is now the highest loaded + // level. + lowestEmptyLevel = highestLoadedLevel + } + + // Apply the pending updates. + return applyPending() +} + +// addrToKey converts known address types to an addrindex key. An error is +// returned for unsupported types. +func addrToKey(addr btcutil.Address) ([addrKeySize]byte, error) { + switch addr := addr.(type) { + case *btcutil.AddressPubKeyHash: + var result [addrKeySize]byte + result[0] = addrKeyTypePubKeyHash + copy(result[1:], addr.Hash160()[:]) + return result, nil + + case *btcutil.AddressScriptHash: + var result [addrKeySize]byte + result[0] = addrKeyTypeScriptHash + copy(result[1:], addr.Hash160()[:]) + return result, nil + + case *btcutil.AddressPubKey: + var result [addrKeySize]byte + result[0] = addrKeyTypePubKeyHash + copy(result[1:], addr.AddressPubKeyHash().Hash160()[:]) + return result, nil + } + + return [addrKeySize]byte{}, errUnsupportedAddressType +} + +// AddrIndex implements a transaction by address index. That is to say, it +// supports querying all transactions that reference a given address because +// they are either crediting or debiting the address. The returned transactions +// are ordered according to their order of appearance in the blockchain. In +// other words, first by block height and then by offset inside the block. +// +// In addition, support is provided for a memory-only index of unconfirmed +// transactions such as those which are kept in the memory pool before inclusion +// in a block. +type AddrIndex struct { + // The following fields are set when the instance is created and can't + // be changed afterwards, so there is no need to protect them with a + // separate mutex. + db database.DB + chainParams *chaincfg.Params + + // The following fields are used to quickly link transactions and + // addresses that have not been included into a block yet when an + // address index is being maintained. The are protected by the + // unconfirmedLock field. + // + // The txnsByAddr field is used to keep an index of all transactions + // which either create an output to a given address or spend from a + // previous output to it keyed by the address. + // + // The addrsByTx field is essentially the reverse and is used to + // keep an index of all addresses which a given transaction involves. + // This allows fairly efficient updates when transactions are removed + // once they are included into a block. + unconfirmedLock sync.RWMutex + txnsByAddr map[[addrKeySize]byte]map[wire.ShaHash]*btcutil.Tx + addrsByTx map[wire.ShaHash]map[[addrKeySize]byte]struct{} +} + +// Ensure the AddrIndex type implements the Indexer interface. +var _ Indexer = (*AddrIndex)(nil) + +// Ensure the AddrIndex type implements the NeedsInputser interface. +var _ NeedsInputser = (*AddrIndex)(nil) + +// NeedsInputs signals that the index requires the referenced inputs in order +// to properly create the index. +// +// This implements the NeedsInputser interface. +func (idx *AddrIndex) NeedsInputs() bool { + return true +} + +// Init is only provided to satisfy the Indexer interface as there is nothing to +// initialize for this index. +// +// This is part of the Indexer interface. +func (idx *AddrIndex) Init() error { + // Nothing to do. + return nil +} + +// Key returns the database key to use for the index as a byte slice. +// +// This is part of the Indexer interface. +func (idx *AddrIndex) Key() []byte { + return addrIndexKey +} + +// Name returns the human-readable name of the index. +// +// This is part of the Indexer interface. +func (idx *AddrIndex) Name() string { + return addrIndexName +} + +// Create is invoked when the indexer manager determines the index needs +// to be created for the first time. It creates the bucket for the address +// index. +// +// This is part of the Indexer interface. +func (idx *AddrIndex) Create(dbTx database.Tx) error { + _, err := dbTx.Metadata().CreateBucket(addrIndexKey) + return err +} + +// writeIndexData represents the address index data to be written for one block. +// It consistens of the address mapped to an ordered list of the transactions +// that involve the address in block. It is ordered so the transactions can be +// stored in the order they appear in the block. +type writeIndexData map[[addrKeySize]byte][]int + +// indexPkScript extracts all standard addresses from the passed public key +// script and maps each of them to the associated transaction using the passed +// map. +func (idx *AddrIndex) indexPkScript(data writeIndexData, pkScript []byte, txIdx int) { + // Nothing to index if the script is non-standard or otherwise doesn't + // contain any addresses. + _, addrs, _, err := txscript.ExtractPkScriptAddrs(pkScript, + idx.chainParams) + if err != nil || len(addrs) == 0 { + return + } + + for _, addr := range addrs { + addrKey, err := addrToKey(addr) + if err != nil { + // Ignore unsupported address types. + continue + } + + // Avoid inserting the transaction more than once. Since the + // transactions are indexed serially any duplicates will be + // indexed in a row, so checking the most recent entry for the + // address is enough to detect duplicates. + indexedTxns := data[addrKey] + numTxns := len(indexedTxns) + if numTxns > 0 && indexedTxns[numTxns-1] == txIdx { + continue + } + indexedTxns = append(indexedTxns, txIdx) + data[addrKey] = indexedTxns + } +} + +// indexBlock extract all of the standard addresses from all of the transactions +// in the passed block and maps each of them to the assocaited transaction using +// the passed map. +func (idx *AddrIndex) indexBlock(data writeIndexData, block *btcutil.Block, view *blockchain.UtxoViewpoint) { + for txIdx, tx := range block.Transactions() { + // Coinbases do not reference any inputs. Since the block is + // required to have already gone through full validation, it has + // already been proven on the first transaction in the block is + // a coinbase. + if txIdx != 0 { + for _, txIn := range tx.MsgTx().TxIn { + // The view should always have the input since + // the index contract requires it, however, be + // safe and simply ignore any missing entries. + origin := &txIn.PreviousOutPoint + entry := view.LookupEntry(&origin.Hash) + if entry == nil { + continue + } + + pkScript := entry.PkScriptByIndex(origin.Index) + idx.indexPkScript(data, pkScript, txIdx) + } + } + + for _, txOut := range tx.MsgTx().TxOut { + idx.indexPkScript(data, txOut.PkScript, txIdx) + } + } +} + +// ConnectBlock is invoked by the index manager when a new block has been +// connected to the main chain. This indexer adds a mapping for each address +// the transactions in the block involve. +// +// This is part of the Indexer interface. +func (idx *AddrIndex) ConnectBlock(dbTx database.Tx, block *btcutil.Block, view *blockchain.UtxoViewpoint) error { + // The offset and length of the transactions within the serialized + // block. + txLocs, err := block.TxLoc() + if err != nil { + return err + } + + // Get the internal block ID associated with the block. + blockID, err := dbFetchBlockIDByHash(dbTx, block.Sha()) + if err != nil { + return err + } + + // Build all of the address to transaction mappings in a local map. + addrsToTxns := make(writeIndexData) + idx.indexBlock(addrsToTxns, block, view) + + // Add all of the index entries for each address. + addrIdxBucket := dbTx.Metadata().Bucket(addrIndexKey) + for addrKey, txIdxs := range addrsToTxns { + for _, txIdx := range txIdxs { + err := dbPutAddrIndexEntry(addrIdxBucket, addrKey, + blockID, txLocs[txIdx]) + if err != nil { + return err + } + } + } + + return nil +} + +// DisconnectBlock is invoked by the index manager when a block has been +// disconnected from the main chain. This indexer removes the address mappings +// each transaction in the block involve. +// +// This is part of the Indexer interface. +func (idx *AddrIndex) DisconnectBlock(dbTx database.Tx, block *btcutil.Block, view *blockchain.UtxoViewpoint) error { + // Build all of the address to transaction mappings in a local map. + addrsToTxns := make(writeIndexData) + idx.indexBlock(addrsToTxns, block, view) + + // Remove all of the index entries for each address. + bucket := dbTx.Metadata().Bucket(addrIndexKey) + for addrKey, txIdxs := range addrsToTxns { + err := dbRemoveAddrIndexEntries(bucket, addrKey, len(txIdxs)) + if err != nil { + return err + } + } + + return nil +} + +// TxRegionsForAddress returns a slice of block regions which identify each +// transaction that involves the passed address according to the specified +// number to skip, number requested, and whether or not the results should be +// reversed. It also returns the number actually skipped since it could be less +// in the case where there are not enough entries. +// +// NOTE: These results only include transactions confirmed in blocks. See the +// UnconfirmedTxnsForAddress method for obtaining unconfirmed transactions +// that involve a given address. +// +// This function is safe for concurrent access. +func (idx *AddrIndex) TxRegionsForAddress(dbTx database.Tx, addr btcutil.Address, numToSkip, numRequested uint32, reverse bool) ([]database.BlockRegion, uint32, error) { + addrKey, err := addrToKey(addr) + if err != nil { + return nil, 0, err + } + + var regions []database.BlockRegion + var skipped uint32 + err = idx.db.View(func(dbTx database.Tx) error { + // Create closure to lookup the block hash given the ID using + // the database transaction. + fetchBlockHash := func(id []byte) (*wire.ShaHash, error) { + // Deserialize and populate the result. + return dbFetchBlockHashBySerializedID(dbTx, id) + } + + var err error + addrIdxBucket := dbTx.Metadata().Bucket(addrIndexKey) + regions, skipped, err = dbFetchAddrIndexEntries(addrIdxBucket, + addrKey, numToSkip, numRequested, reverse, + fetchBlockHash) + return err + }) + + return regions, skipped, err +} + +// indexUnconfirmedAddresses modifies the unconfirmed (memory-only) address +// index to include mappings for the addresses encoded by the passed public key +// script to the transaction. +// +// This function is safe for concurrent access. +func (idx *AddrIndex) indexUnconfirmedAddresses(pkScript []byte, tx *btcutil.Tx) { + // The error is ignored here since the only reason it can fail is if the + // script fails to parse and it was already validated before being + // admitted to the mempool. + _, addresses, _, _ := txscript.ExtractPkScriptAddrs(pkScript, + idx.chainParams) + for _, addr := range addresses { + // Ignore unsupported address types. + addrKey, err := addrToKey(addr) + if err != nil { + continue + } + + // Add a mapping from the address to the transaction. + idx.unconfirmedLock.Lock() + addrIndexEntry := idx.txnsByAddr[addrKey] + if addrIndexEntry == nil { + addrIndexEntry = make(map[wire.ShaHash]*btcutil.Tx) + idx.txnsByAddr[addrKey] = addrIndexEntry + } + addrIndexEntry[*tx.Sha()] = tx + + // Add a mapping from the transaction to the address. + addrsByTxEntry := idx.addrsByTx[*tx.Sha()] + if addrsByTxEntry == nil { + addrsByTxEntry = make(map[[addrKeySize]byte]struct{}) + idx.addrsByTx[*tx.Sha()] = addrsByTxEntry + } + addrsByTxEntry[addrKey] = struct{}{} + idx.unconfirmedLock.Unlock() + } +} + +// AddUnconfirmedTx adds all addresses related to the transaction to the +// unconfirmed (memory-only) address index. +// +// NOTE: This transaction MUST have already been validated by the memory pool +// before calling this function with it and have all of the inputs available in +// the provided utxo view. Failure to do so could result in some or all +// addresses not being indexed. +// +// This function is safe for concurrent access. +func (idx *AddrIndex) AddUnconfirmedTx(tx *btcutil.Tx, utxoView *blockchain.UtxoViewpoint) { + // Index addresses of all referenced previous transaction outputs. + // + // The existence checks are elided since this is only called after the + // transaction has already been validated and thus all inputs are + // already known to exist. + for _, txIn := range tx.MsgTx().TxIn { + entry := utxoView.LookupEntry(&txIn.PreviousOutPoint.Hash) + if entry == nil { + // Ignore missing entries. This should never happen + // in practice since the function comments specifically + // call out all inputs must be available. + continue + } + pkScript := entry.PkScriptByIndex(txIn.PreviousOutPoint.Index) + idx.indexUnconfirmedAddresses(pkScript, tx) + } + + // Index addresses of all created outputs. + for _, txOut := range tx.MsgTx().TxOut { + idx.indexUnconfirmedAddresses(txOut.PkScript, tx) + } +} + +// RemoveUnconfirmedTx removes the passed transaction from the unconfirmed +// (memory-only) address index. +// +// This function is safe for concurrent access. +func (idx *AddrIndex) RemoveUnconfirmedTx(hash *wire.ShaHash) { + idx.unconfirmedLock.Lock() + defer idx.unconfirmedLock.Unlock() + + // Remove all address references to the transaction from the address + // index and remove the entry for the address altogether if it no longer + // references any transactions. + for addrKey := range idx.addrsByTx[*hash] { + delete(idx.txnsByAddr[addrKey], *hash) + if len(idx.txnsByAddr[addrKey]) == 0 { + delete(idx.txnsByAddr, addrKey) + } + } + + // Remove the entry from the transaction to address lookup map as well. + delete(idx.addrsByTx, *hash) +} + +// UnconfirmedTxnsForAddress returns all transactions currently in the +// unconfirmed (memory-only) address index that involve the passed address. +// Unsupported address types are ignored and will result in no results. +// +// This function is safe for concurrent access. +func (idx *AddrIndex) UnconfirmedTxnsForAddress(addr btcutil.Address) []*btcutil.Tx { + // Ignore unsupported address types. + addrKey, err := addrToKey(addr) + if err != nil { + return nil + } + + // Protect concurrent access. + idx.unconfirmedLock.RLock() + defer idx.unconfirmedLock.RUnlock() + + // Return a new slice with the results if there are any. This ensures + // safe concurrency. + if txns, exists := idx.txnsByAddr[addrKey]; exists { + addressTxns := make([]*btcutil.Tx, 0, len(txns)) + for _, tx := range txns { + addressTxns = append(addressTxns, tx) + } + return addressTxns + } + + return nil +} + +// NewAddrIndex returns a new instance of an indexer that is used to create a +// mapping of all addresses in the blockchain to the respective transactions +// that involve them. +// +// It implements the Indexer interface which plugs into the IndexManager that in +// turn is used by the blockchain package. This allows the index to be +// seamlessly maintained along with the chain. +func NewAddrIndex(db database.DB, chainParams *chaincfg.Params) *AddrIndex { + return &AddrIndex{ + db: db, + chainParams: chainParams, + txnsByAddr: make(map[[addrKeySize]byte]map[wire.ShaHash]*btcutil.Tx), + addrsByTx: make(map[wire.ShaHash]map[[addrKeySize]byte]struct{}), + } +} + +// DropAddrIndex drops the address index from the provided database if it +// exists. +func DropAddrIndex(db database.DB) error { + return dropIndex(db, addrIndexKey, addrIndexName) +} diff --git a/blockchain/indexers/addrindex_test.go b/blockchain/indexers/addrindex_test.go new file mode 100644 index 00000000..92fce538 --- /dev/null +++ b/blockchain/indexers/addrindex_test.go @@ -0,0 +1,275 @@ +// Copyright (c) 2016 The btcsuite developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package indexers + +import ( + "bytes" + "fmt" + "testing" + + "github.com/btcsuite/btcd/wire" +) + +// addrIndexBucket provides a mock address index database bucket by implementing +// the internalBucket interface. +type addrIndexBucket struct { + levels map[[levelKeySize]byte][]byte +} + +// Clone returns a deep copy of the mock adress index bucket. +func (b *addrIndexBucket) Clone() *addrIndexBucket { + levels := make(map[[levelKeySize]byte][]byte) + for k, v := range b.levels { + vCopy := make([]byte, len(v)) + copy(vCopy, v) + levels[k] = vCopy + } + return &addrIndexBucket{levels: levels} +} + +// Get returns the value associated with the key from the mock address index +// bucket. +// +// This is part of the internalBucket interface. +func (b *addrIndexBucket) Get(key []byte) []byte { + var levelKey [levelKeySize]byte + copy(levelKey[:], key) + return b.levels[levelKey] +} + +// Put stores the provided key/value pair to the mock address index bucket. +// +// This is part of the internalBucket interface. +func (b *addrIndexBucket) Put(key []byte, value []byte) error { + var levelKey [levelKeySize]byte + copy(levelKey[:], key) + b.levels[levelKey] = value + return nil +} + +// Delete removes the provided key from the mock address index bucket. +// +// This is part of the internalBucket interface. +func (b *addrIndexBucket) Delete(key []byte) error { + var levelKey [levelKeySize]byte + copy(levelKey[:], key) + delete(b.levels, levelKey) + return nil +} + +// printLevels returns a string with a visual representation of the provided +// address key taking into account the max size of each level. It is useful +// when creating and debugging test cases. +func (b *addrIndexBucket) printLevels(addrKey [addrKeySize]byte) string { + highestLevel := uint8(0) + for k := range b.levels { + if !bytes.Equal(k[:levelOffset], addrKey[:]) { + continue + } + level := uint8(k[levelOffset]) + if level > highestLevel { + highestLevel = level + } + } + + var levelBuf bytes.Buffer + _, _ = levelBuf.WriteString("\n") + maxEntries := level0MaxEntries + for level := uint8(0); level <= highestLevel; level++ { + data := b.levels[keyForLevel(addrKey, level)] + numEntries := len(data) / txEntrySize + for i := 0; i < numEntries; i++ { + start := i * txEntrySize + num := byteOrder.Uint32(data[start:]) + _, _ = levelBuf.WriteString(fmt.Sprintf("%02d ", num)) + } + for i := numEntries; i < maxEntries; i++ { + _, _ = levelBuf.WriteString("_ ") + } + _, _ = levelBuf.WriteString("\n") + maxEntries *= 2 + } + + return levelBuf.String() +} + +// sanityCheck ensures that all data stored in the bucket for the given address +// adheres to the level-based rules described by the address index +// documentation. +func (b *addrIndexBucket) sanityCheck(addrKey [addrKeySize]byte, expectedTotal int) error { + // Find the highest level for the key. + highestLevel := uint8(0) + for k := range b.levels { + if !bytes.Equal(k[:levelOffset], addrKey[:]) { + continue + } + level := uint8(k[levelOffset]) + if level > highestLevel { + highestLevel = level + } + } + + // Ensure the expected total number of entries are present and that + // all levels adhere to the rules described in the address index + // documentation. + var totalEntries int + maxEntries := level0MaxEntries + for level := uint8(0); level <= highestLevel; level++ { + // Level 0 can'have more entries than the max allowed if the + // levels after it have data and it can't be empty. All other + // levels must either be half full or full. + data := b.levels[keyForLevel(addrKey, level)] + numEntries := len(data) / txEntrySize + totalEntries += numEntries + if level == 0 { + if (highestLevel != 0 && numEntries == 0) || + numEntries > maxEntries { + + return fmt.Errorf("level %d has %d entries", + level, numEntries) + } + } else if numEntries != maxEntries && numEntries != maxEntries/2 { + return fmt.Errorf("level %d has %d entries", level, + numEntries) + } + maxEntries *= 2 + } + if totalEntries != expectedTotal { + return fmt.Errorf("expected %d entries - got %d", expectedTotal, + totalEntries) + } + + // Ensure all of the numbers are in order starting from the highest + // level moving to the lowest level. + expectedNum := uint32(0) + for level := highestLevel + 1; level > 0; level-- { + data := b.levels[keyForLevel(addrKey, level)] + numEntries := len(data) / txEntrySize + for i := 0; i < numEntries; i++ { + start := i * txEntrySize + num := byteOrder.Uint32(data[start:]) + if num != expectedNum { + return fmt.Errorf("level %d offset %d does "+ + "not contain the expected number of "+ + "%d - got %d", level, i, num, + expectedNum) + } + expectedNum++ + } + } + + return nil +} + +// TestAddrIndexLevels ensures that adding and deleting entries to the address +// index creates multiple levels as decribed by the address index documentation. +func TestAddrIndexLevels(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + key [addrKeySize]byte + numInsert int + printLevels bool // Set to help debug a specific test. + }{ + { + name: "level 0 not full", + numInsert: level0MaxEntries - 1, + }, + { + name: "level 1 half", + numInsert: level0MaxEntries + 1, + }, + { + name: "level 1 full", + numInsert: level0MaxEntries*2 + 1, + }, + { + name: "level 2 half, level 1 half", + numInsert: level0MaxEntries*3 + 1, + }, + { + name: "level 2 half, level 1 full", + numInsert: level0MaxEntries*4 + 1, + }, + { + name: "level 2 full, level 1 half", + numInsert: level0MaxEntries*5 + 1, + }, + { + name: "level 2 full, level 1 full", + numInsert: level0MaxEntries*6 + 1, + }, + { + name: "level 3 half, level 2 half, level 1 half", + numInsert: level0MaxEntries*7 + 1, + }, + { + name: "level 3 full, level 2 half, level 1 full", + numInsert: level0MaxEntries*12 + 1, + }, + } + +nextTest: + for testNum, test := range tests { + // Insert entries in order. + populatedBucket := &addrIndexBucket{ + levels: make(map[[levelKeySize]byte][]byte), + } + for i := 0; i < test.numInsert; i++ { + txLoc := wire.TxLoc{TxStart: i * 2} + err := dbPutAddrIndexEntry(populatedBucket, test.key, + uint32(i), txLoc) + if err != nil { + t.Errorf("dbPutAddrIndexEntry #%d (%s) - "+ + "unexpected error: %v", testNum, + test.name, err) + continue nextTest + } + } + if test.printLevels { + t.Log(populatedBucket.printLevels(test.key)) + } + + // Delete entries from the populated bucket until all entries + // have been deleted. The bucket is reset to the fully + // populated bucket on each iteration so every combination is + // tested. Notice the upper limit purposes exceeds the number + // of entries to ensure attempting to delete more entries than + // there are works correctly. + for numDelete := 0; numDelete <= test.numInsert+1; numDelete++ { + // Clone populated bucket to run each delete against. + bucket := populatedBucket.Clone() + + // Remove the number of entries for this iteration. + err := dbRemoveAddrIndexEntries(bucket, test.key, + numDelete) + if err != nil { + if numDelete <= test.numInsert { + t.Errorf("dbRemoveAddrIndexEntries (%s) "+ + " delete %d - unexpected error: "+ + "%v", test.name, numDelete, err) + continue nextTest + } + } + if test.printLevels { + t.Log(bucket.printLevels(test.key)) + } + + // Sanity check the levels to ensure the adhere to all + // rules. + numExpected := test.numInsert + if numDelete <= test.numInsert { + numExpected -= numDelete + } + err = bucket.sanityCheck(test.key, numExpected) + if err != nil { + t.Errorf("sanity check fail (%s) delete %d: %v", + test.name, numDelete, err) + continue nextTest + } + } + } +} diff --git a/blockchain/indexers/blocklogger.go b/blockchain/indexers/blocklogger.go new file mode 100644 index 00000000..88d6f269 --- /dev/null +++ b/blockchain/indexers/blocklogger.go @@ -0,0 +1,76 @@ +// Copyright (c) 2016 The btcsuite developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package indexers + +import ( + "sync" + "time" + + "github.com/btcsuite/btclog" + "github.com/btcsuite/btcutil" +) + +// blockProgressLogger provides periodic logging for other services in order +// to show users progress of certain "actions" involving some or all current +// blocks. Ex: syncing to best chain, indexing all blocks, etc. +type blockProgressLogger struct { + receivedLogBlocks int64 + receivedLogTx int64 + lastBlockLogTime time.Time + + subsystemLogger btclog.Logger + progressAction string + sync.Mutex +} + +// newBlockProgressLogger returns a new block progress logger. +// The progress message is templated as follows: +// {progressAction} {numProcessed} {blocks|block} in the last {timePeriod} +// ({numTxs}, height {lastBlockHeight}, {lastBlockTimeStamp}) +func newBlockProgressLogger(progressMessage string, logger btclog.Logger) *blockProgressLogger { + return &blockProgressLogger{ + lastBlockLogTime: time.Now(), + progressAction: progressMessage, + subsystemLogger: logger, + } +} + +// LogBlockHeight logs a new block height as an information message to show +// progress to the user. In order to prevent spam, it limits logging to one +// message every 10 seconds with duration and totals included. +func (b *blockProgressLogger) LogBlockHeight(block *btcutil.Block) { + b.Lock() + defer b.Unlock() + + b.receivedLogBlocks++ + b.receivedLogTx += int64(len(block.MsgBlock().Transactions)) + + now := time.Now() + duration := now.Sub(b.lastBlockLogTime) + if duration < time.Second*10 { + return + } + + // Truncate the duration to 10s of milliseconds. + durationMillis := int64(duration / time.Millisecond) + tDuration := 10 * time.Millisecond * time.Duration(durationMillis/10) + + // Log information about new block height. + blockStr := "blocks" + if b.receivedLogBlocks == 1 { + blockStr = "block" + } + txStr := "transactions" + if b.receivedLogTx == 1 { + txStr = "transaction" + } + b.subsystemLogger.Infof("%s %d %s in the last %s (%d %s, height %d, %s)", + b.progressAction, b.receivedLogBlocks, blockStr, tDuration, b.receivedLogTx, + txStr, block.Height(), block.MsgBlock().Header.Timestamp) + + b.receivedLogBlocks = 0 + b.receivedLogTx = 0 + b.lastBlockLogTime = now +} diff --git a/blockchain/indexers/common.go b/blockchain/indexers/common.go new file mode 100644 index 00000000..e18fdcdd --- /dev/null +++ b/blockchain/indexers/common.go @@ -0,0 +1,90 @@ +// Copyright (c) 2016 The btcsuite developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +/* +Package indexers implements optional block chain indexes. +*/ +package indexers + +import ( + "encoding/binary" + + "github.com/btcsuite/btcd/blockchain" + database "github.com/btcsuite/btcd/database2" + "github.com/btcsuite/btcutil" +) + +var ( + // byteOrder is the preferred byte order used for serializing numeric + // fields for storage in the database. + byteOrder = binary.LittleEndian +) + +// NeedsInputser provides a generic interface for an indexer to specify the it +// requires the ability to look up inputs for a transaction. +type NeedsInputser interface { + NeedsInputs() bool +} + +// Indexer provides a generic interface for an indexer that is managed by an +// index manager such as the Manager type provided by this package. +type Indexer interface { + // Key returns the key of the index as a byte slice. + Key() []byte + + // Name returns the human-readable name of the index. + Name() string + + // Create is invoked when the indexer manager determines the index needs + // to be created for the first time. + Create(dbTx database.Tx) error + + // Init is invoked when the index manager is first initializing the + // index. This differs from the Create method in that it is called on + // every load, including the case the index was just created. + Init() error + + // ConnectBlock is invoked when the index manager is notified that a new + // block has been connected to the main chain. + ConnectBlock(dbTx database.Tx, block *btcutil.Block, view *blockchain.UtxoViewpoint) error + + // DisconnectBlock is invoked when the index manager is notified that a + // block has been disconnected from the main chain. + DisconnectBlock(dbTx database.Tx, block *btcutil.Block, view *blockchain.UtxoViewpoint) error +} + +// AssertError identifies an error that indicates an internal code consistency +// issue and should be treated as a critical and unrecoverable error. +type AssertError string + +// Error returns the assertion error as a huma-readable string and satisfies +// the error interface. +func (e AssertError) Error() string { + return "assertion failed: " + string(e) +} + +// errDeserialize signifies that a problem was encountered when deserializing +// data. +type errDeserialize string + +// Error implements the error interface. +func (e errDeserialize) Error() string { + return string(e) +} + +// isDeserializeErr returns whether or not the passed error is an errDeserialize +// error. +func isDeserializeErr(err error) bool { + _, ok := err.(errDeserialize) + return ok +} + +// internalBucket is an abstraction over a database bucket. It is used to make +// the code easier to test since it allows mock objects in the tests to only +// implement these functions instead of everything a database.Bucket supports. +type internalBucket interface { + Get(key []byte) []byte + Put(key []byte, value []byte) error + Delete(key []byte) error +} diff --git a/blockchain/indexers/log.go b/blockchain/indexers/log.go new file mode 100644 index 00000000..0172da07 --- /dev/null +++ b/blockchain/indexers/log.go @@ -0,0 +1,30 @@ +// Copyright (c) 2016 The btcsuite developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package indexers + +import "github.com/btcsuite/btclog" + +// log is a logger that is initialized with no output filters. This +// means the package will not perform any logging by default until the caller +// requests it. +var log btclog.Logger + +// The default amount of logging is none. +func init() { + DisableLog() +} + +// DisableLog disables all library log output. Logging output is disabled +// by default until either UseLogger or SetLogWriter are called. +func DisableLog() { + log = btclog.Disabled +} + +// UseLogger uses a specified Logger to output package logging info. +// This should be used in preference to SetLogWriter if the caller is also +// using btclog. +func UseLogger(logger btclog.Logger) { + log = logger +} diff --git a/blockchain/indexers/manager.go b/blockchain/indexers/manager.go new file mode 100644 index 00000000..928b1f36 --- /dev/null +++ b/blockchain/indexers/manager.go @@ -0,0 +1,653 @@ +// Copyright (c) 2016 The btcsuite developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package indexers + +import ( + "bytes" + "fmt" + + "github.com/btcsuite/btcd/blockchain" + database "github.com/btcsuite/btcd/database2" + "github.com/btcsuite/btcd/wire" + "github.com/btcsuite/btcutil" +) + +var ( + // indexTipsBucketName is the name of the db bucket used to house the + // current tip of each index. + indexTipsBucketName = []byte("idxtips") +) + +// ----------------------------------------------------------------------------- +// The index manager tracks the current tip of each index by using a parent +// bucket that contains an entry for index. +// +// The serialized format for an index tip is: +// +// [],... +// +// Field Type Size +// block hash wire.ShaHash wire.HashSize +// block height uint32 4 bytes +// ----------------------------------------------------------------------------- + +// dbPutIndexerTip uses an existing database transaction to update or add the +// current tip for the given index to the provided values. +func dbPutIndexerTip(dbTx database.Tx, idxKey []byte, hash *wire.ShaHash, height int32) error { + serialized := make([]byte, wire.HashSize+4) + copy(serialized, hash[:]) + byteOrder.PutUint32(serialized[wire.HashSize:], uint32(height)) + + indexesBucket := dbTx.Metadata().Bucket(indexTipsBucketName) + return indexesBucket.Put(idxKey, serialized) +} + +// dbFetchIndexerTip uses an existing database transaction to retrieve the +// hash and height of the current tip for the provided index. +func dbFetchIndexerTip(dbTx database.Tx, idxKey []byte) (*wire.ShaHash, int32, error) { + indexesBucket := dbTx.Metadata().Bucket(indexTipsBucketName) + serialized := indexesBucket.Get(idxKey) + if len(serialized) < wire.HashSize+4 { + return nil, 0, database.Error{ + ErrorCode: database.ErrCorruption, + Description: fmt.Sprintf("unexpected end of data for "+ + "index %q tip", string(idxKey)), + } + } + + var hash wire.ShaHash + copy(hash[:], serialized[:wire.HashSize]) + height := int32(byteOrder.Uint32(serialized[wire.HashSize:])) + return &hash, height, nil +} + +// dbIndexConnectBlock adds all of the index entries associated with the +// given block using the provided indexer and updates the tip of the indexer +// accordingly. An error will be returned if the current tip for the indexer is +// not the previous block for the passed block. +func dbIndexConnectBlock(dbTx database.Tx, indexer Indexer, block *btcutil.Block, view *blockchain.UtxoViewpoint) error { + // Assert that the block being connected properly connects to the + // current tip of the index. + idxKey := indexer.Key() + curTipHash, _, err := dbFetchIndexerTip(dbTx, idxKey) + if err != nil { + return err + } + if !curTipHash.IsEqual(&block.MsgBlock().Header.PrevBlock) { + return AssertError(fmt.Sprintf("dbIndexConnectBlock must be "+ + "called with a block that extends the current index "+ + "tip (%s, tip %s, block %s)", indexer.Name(), + curTipHash, block.Sha())) + } + + // Notify the indexer with the connected block so it can index it. + if err := indexer.ConnectBlock(dbTx, block, view); err != nil { + return err + } + + // Update the current index tip. + return dbPutIndexerTip(dbTx, idxKey, block.Sha(), block.Height()) +} + +// dbIndexDisconnectBlock removes all of the index entries associated with the +// given block using the provided indexer and updates the tip of the indexer +// accordingly. An error will be returned if the current tip for the indexer is +// not the passed block. +func dbIndexDisconnectBlock(dbTx database.Tx, indexer Indexer, block *btcutil.Block, view *blockchain.UtxoViewpoint) error { + // Assert that the block being disconnected is the current tip of the + // index. + idxKey := indexer.Key() + curTipHash, _, err := dbFetchIndexerTip(dbTx, idxKey) + if err != nil { + return err + } + if !curTipHash.IsEqual(block.Sha()) { + return AssertError(fmt.Sprintf("dbIndexDisconnectBlock must "+ + "be called with the block at the current index tip "+ + "(%s, tip %s, block %s)", indexer.Name(), + curTipHash, block.Sha())) + } + + // Notify the indexer with the disconnected block so it can remove all + // of the appropriate entries. + if err := indexer.DisconnectBlock(dbTx, block, view); err != nil { + return err + } + + // Update the current index tip. + prevHash := &block.MsgBlock().Header.PrevBlock + return dbPutIndexerTip(dbTx, idxKey, prevHash, block.Height()-1) +} + +// Manager defines an index manager that manages multiple optional indexes and +// implements the blockchain.IndexManager interface so it can be seamlessly +// plugged into normal chain processing. +type Manager struct { + db database.DB + enabledIndexes []Indexer +} + +// Ensure the Manager type implements the blockchain.IndexManager interface. +var _ blockchain.IndexManager = (*Manager)(nil) + +// indexDropKey returns the key for an index which indicates it is in the +// process of being dropped. +func indexDropKey(idxKey []byte) []byte { + dropKey := make([]byte, len(idxKey)+1) + dropKey[0] = 'd' + copy(dropKey[1:], idxKey) + return dropKey +} + +// maybeFinishDrops determines if each of the enabled indexes are in the middle +// of being dropped and finishes dropping them when the are. This is necessary +// because dropping and index has to be done in several atomic steps rather than +// one big atomic step due to the massive number of entries. +func (m *Manager) maybeFinishDrops() error { + indexNeedsDrop := make([]bool, len(m.enabledIndexes)) + err := m.db.View(func(dbTx database.Tx) error { + // None of the indexes needs to be dropped if the index tips + // bucket hasn't been created yet. + indexesBucket := dbTx.Metadata().Bucket(indexTipsBucketName) + if indexesBucket == nil { + return nil + } + + // Make the indexer as requiring a drop if one is already in + // progress. + for i, indexer := range m.enabledIndexes { + dropKey := indexDropKey(indexer.Key()) + if indexesBucket.Get(dropKey) != nil { + indexNeedsDrop[i] = true + } + } + + return nil + }) + if err != nil { + return err + } + + // Finish dropping any of the enabled indexes that are already in the + // middle of being dropped. + for i, indexer := range m.enabledIndexes { + if !indexNeedsDrop[i] { + continue + } + + log.Infof("Resuming %s drop", indexer.Name()) + err := dropIndex(m.db, indexer.Key(), indexer.Name()) + if err != nil { + return err + } + } + + return nil +} + +// maybeCreateIndexes determines if each of the enabled indexes have already +// been created and creates them if not. +func (m *Manager) maybeCreateIndexes(dbTx database.Tx) error { + indexesBucket := dbTx.Metadata().Bucket(indexTipsBucketName) + for _, indexer := range m.enabledIndexes { + // Nothing to do if the index tip already exists. + idxKey := indexer.Key() + if indexesBucket.Get(idxKey) != nil { + continue + } + + // The tip for the index does not exist, so create it and + // invoke the create callback for the index so it can perform + // any one-time initialization it requires. + if err := indexer.Create(dbTx); err != nil { + return err + } + + // Set the tip for the index to values which represent an + // uninitialized index. + err := dbPutIndexerTip(dbTx, idxKey, &wire.ShaHash{}, -1) + if err != nil { + return err + } + } + + return nil +} + +// Init initializes the enabled indexes. This is called during chain +// initialization and primarily consists of catching up all indexes to the +// current best chain tip. This is necessary since each index can be disabled +// and re-enabled at any time and attempting to catch-up indexes at the same +// time new blocks are being downloaded would lead to an overall longer time to +// catch up due to the I/O contention. +// +// This is part of the blockchain.IndexManager interface. +func (m *Manager) Init(chain *blockchain.BlockChain) error { + // Nothing to do when no indexes are enabled. + if len(m.enabledIndexes) == 0 { + return nil + } + + // Finish and drops that were previously interrupted. + if err := m.maybeFinishDrops(); err != nil { + return err + } + + // Create the initial state for the indexes as needed. + err := m.db.Update(func(dbTx database.Tx) error { + // Create the bucket for the current tips as needed. + meta := dbTx.Metadata() + _, err := meta.CreateBucketIfNotExists(indexTipsBucketName) + if err != nil { + return err + } + + return m.maybeCreateIndexes(dbTx) + }) + if err != nil { + return err + } + + // Initialize each of the enabled indexes. + for _, indexer := range m.enabledIndexes { + if err := indexer.Init(); err != nil { + return err + } + } + + // Rollback indexes to the main chain if their tip is an orphaned fork. + // This is fairly unlikely, but it can happen if the chain is + // reorganized while the index is disabled. This has to be done in + // reverse order because later indexes can depend on earlier ones. + for i := len(m.enabledIndexes); i > 0; i-- { + indexer := m.enabledIndexes[i-1] + + // Fetch the current tip for the index. + var height int32 + var hash *wire.ShaHash + err := m.db.View(func(dbTx database.Tx) error { + idxKey := indexer.Key() + hash, height, err = dbFetchIndexerTip(dbTx, idxKey) + if err != nil { + return err + } + return nil + }) + if err != nil { + return err + } + + // Nothing to do if the index does not have any entries yet. + if height == -1 { + continue + } + + // Loop until the tip is a block that exists in the main chain. + initialHeight := height + for { + exists, err := chain.MainChainHasBlock(hash) + if err != nil { + return err + } + if exists { + break + } + + // At this point the index tip is orphaned, so load the + // orphaned block from the database directly and + // disconnect it from the index. The block has to be + // loaded directly since it is no longer in the main + // chain and thus the chain.BlockByHash function would + // error. + err = m.db.Update(func(dbTx database.Tx) error { + blockBytes, err := dbTx.FetchBlock(hash) + if err != nil { + return err + } + block, err := btcutil.NewBlockFromBytes(blockBytes) + if err != nil { + return err + } + block.SetHeight(height) + + // When the index requires all of the referenced + // txouts they need to be retrieved from the + // transaction index. + var view *blockchain.UtxoViewpoint + if indexNeedsInputs(indexer) { + var err error + view, err = makeUtxoView(dbTx, block) + if err != nil { + return err + } + } + + // Remove all of the index entries associated + // with the block and update the indexer tip. + err = dbIndexDisconnectBlock(dbTx, indexer, + block, view) + if err != nil { + return err + } + + // Update the tip to the previous block. + hash = &block.MsgBlock().Header.PrevBlock + height-- + + return nil + }) + if err != nil { + return err + } + } + + if initialHeight != height { + log.Infof("Removed %d orphaned blocks from %s "+ + "(heights %d to %d)", initialHeight-height, + indexer.Name(), height+1, initialHeight) + } + } + + // Fetch the current tip heights for each index along with tracking the + // lowest one so the catchup code only needs to start at the earliest + // block and is able to skip connecting the block for the indexes that + // don't need it. + bestHeight := chain.BestSnapshot().Height + lowestHeight := bestHeight + indexerHeights := make([]int32, len(m.enabledIndexes)) + err = m.db.View(func(dbTx database.Tx) error { + for i, indexer := range m.enabledIndexes { + idxKey := indexer.Key() + hash, height, err := dbFetchIndexerTip(dbTx, idxKey) + if err != nil { + return err + } + + log.Debugf("Current %s tip (height %d, hash %v)", + indexer.Name(), height, hash) + indexerHeights[i] = height + if height < lowestHeight { + lowestHeight = height + } + } + return nil + }) + if err != nil { + return err + } + + // Nothing to index if all of the indexes are caught up. + if lowestHeight == bestHeight { + return nil + } + + // Create a progress logger for the indexing process below. + progressLogger := newBlockProgressLogger("Indexed", log) + + // At this point, one or more indexes are behind the current best chain + // tip and need to be caught up, so log the details and loop through + // each block that needs to be indexed. + log.Infof("Catching up indexes from height %d to %d", lowestHeight, + bestHeight) + for height := lowestHeight + 1; height <= bestHeight; height++ { + // Load the block for the height since it is required to index + // it. + block, err := chain.BlockByHeight(height) + if err != nil { + return err + } + + // Connect the block for all indexes that need it. + var view *blockchain.UtxoViewpoint + for i, indexer := range m.enabledIndexes { + // Skip indexes that don't need to be updated with this + // block. + if indexerHeights[i] >= height { + continue + } + + err := m.db.Update(func(dbTx database.Tx) error { + // When the index requires all of the referenced + // txouts and they haven't been loaded yet, they + // need to be retrieved from the transaction + // index. + if view == nil && indexNeedsInputs(indexer) { + var err error + view, err = makeUtxoView(dbTx, block) + if err != nil { + return err + } + } + return dbIndexConnectBlock(dbTx, indexer, block, + view) + }) + if err != nil { + return err + } + indexerHeights[i] = height + } + + // Log indexing progress. + progressLogger.LogBlockHeight(block) + } + + log.Infof("Indexes caught up to height %d", bestHeight) + return nil +} + +// indexNeedsInputs returns whether or not the index needs access to the txouts +// referenced by the transaction inputs being indexed. +func indexNeedsInputs(index Indexer) bool { + if idx, ok := index.(NeedsInputser); ok { + return idx.NeedsInputs() + } + + return false +} + +// dbFetchTx looks up the passed transaction hash in the transaction index and +// loads it from the database. +func dbFetchTx(dbTx database.Tx, hash *wire.ShaHash) (*wire.MsgTx, error) { + // Look up the location of the transaction. + blockRegion, err := dbFetchTxIndexEntry(dbTx, hash) + if err != nil { + return nil, err + } + if blockRegion == nil { + return nil, fmt.Errorf("transaction %v not found", hash) + } + + // Load the raw transaction bytes from the database. + txBytes, err := dbTx.FetchBlockRegion(blockRegion) + if err != nil { + return nil, err + } + + // Deserialize the transaction. + var msgTx wire.MsgTx + err = msgTx.Deserialize(bytes.NewReader(txBytes)) + if err != nil { + return nil, err + } + + return &msgTx, nil +} + +// makeUtxoView creates a mock unspent transaction output view by using the +// transaction index in order to look up all inputs referenced by the +// transactions in the block. This is sometimes needed when catching indexes up +// because many of the txouts could actually already be spent however the +// associated scripts are still required to index them. +func makeUtxoView(dbTx database.Tx, block *btcutil.Block) (*blockchain.UtxoViewpoint, error) { + view := blockchain.NewUtxoViewpoint() + for txIdx, tx := range block.Transactions() { + // Coinbases do not reference any inputs. Since the block is + // required to have already gone through full validation, it has + // already been proven on the first transaction in the block is + // a coinbase. + if txIdx == 0 { + continue + } + + // Use the transaction index to load all of the referenced + // inputs and add their outputs to the view. + for _, txIn := range tx.MsgTx().TxIn { + originOut := &txIn.PreviousOutPoint + originTx, err := dbFetchTx(dbTx, &originOut.Hash) + if err != nil { + return nil, err + } + + view.AddTxOuts(btcutil.NewTx(originTx), 0) + } + } + + return view, nil +} + +// ConnectBlock must be invoked when a block is extending the main chain. It +// keeps track of the state of each index it is managing, performs some sanity +// checks, and invokes each indexer. +// +// This is part of the blockchain.IndexManager interface. +func (m *Manager) ConnectBlock(dbTx database.Tx, block *btcutil.Block, view *blockchain.UtxoViewpoint) error { + // Call each of the currently active optional indexes with the block + // being connected so they can update accordingly. + for _, index := range m.enabledIndexes { + err := dbIndexConnectBlock(dbTx, index, block, view) + if err != nil { + return err + } + } + return nil +} + +// DisconnectBlock must be invoked when a block is being disconnected from the +// end of the main chain. It keeps track of the state of each index it is +// managing, performs some sanity checks, and invokes each indexer to remove +// the index entries associated with the block. +// +// This is part of the blockchain.IndexManager interface. +func (m *Manager) DisconnectBlock(dbTx database.Tx, block *btcutil.Block, view *blockchain.UtxoViewpoint) error { + // Call each of the currently active optional indexes with the block + // being disconnected so they can update accordingly. + for _, index := range m.enabledIndexes { + err := dbIndexDisconnectBlock(dbTx, index, block, view) + if err != nil { + return err + } + } + return nil +} + +// NewManager returns a new index manager with the provided indexes enabled. +// +// The manager returned satisfies the blockchain.IndexManager interface and thus +// cleanly plugs into the normal blockchain processing path. +func NewManager(db database.DB, enabledIndexes []Indexer) *Manager { + return &Manager{ + db: db, + enabledIndexes: enabledIndexes, + } +} + +// dropIndex drops the passed index from the database. Since indexes can be +// massive, it deletes the index in multiple database transactions in order to +// keep memory usage to reasonable levels. It also marks the drop in progress +// so the drop can be resumed if it is stopped before it is done before the +// index can be used again. +func dropIndex(db database.DB, idxKey []byte, idxName string) error { + // Nothing to do if the index doesn't already exist. + var needsDelete bool + err := db.View(func(dbTx database.Tx) error { + indexesBucket := dbTx.Metadata().Bucket(indexTipsBucketName) + if indexesBucket != nil && indexesBucket.Get(idxKey) != nil { + needsDelete = true + } + return nil + }) + if err != nil { + return err + } + if !needsDelete { + log.Infof("Not dropping %s because it does not exist", idxName) + return nil + } + + // Mark that the index is in the process of being dropped so that it + // can be resumed on the next start if interrupted before the process is + // complete. + log.Infof("Dropping all %s entries. This might take a while...", + idxName) + err = db.Update(func(dbTx database.Tx) error { + indexesBucket := dbTx.Metadata().Bucket(indexTipsBucketName) + return indexesBucket.Put(indexDropKey(idxKey), idxKey) + }) + if err != nil { + return err + } + + // Since the indexes can be so large, attempting to simply delete + // the bucket in a single database transaction would result in massive + // memory usage and likely crash many systems due to ulimits. In order + // to avoid this, use a cursor to delete a maximum number of entries out + // of the bucket at a time. + const maxDeletions = 2000000 + var totalDeleted uint64 + for numDeleted := maxDeletions; numDeleted == maxDeletions; { + numDeleted = 0 + err := db.Update(func(dbTx database.Tx) error { + bucket := dbTx.Metadata().Bucket(idxKey) + cursor := bucket.Cursor() + for ok := cursor.First(); ok; ok = cursor.Next() && + numDeleted < maxDeletions { + + if err := cursor.Delete(); err != nil { + return err + } + numDeleted++ + } + return nil + }) + if err != nil { + return err + } + + if numDeleted > 0 { + totalDeleted += uint64(numDeleted) + log.Infof("Deleted %d keys (%d total) from %s", + numDeleted, totalDeleted, idxName) + } + } + + // Call extra index specific deinitialization for the transaction index. + if idxName == txIndexName { + if err := dropBlockIDIndex(db); err != nil { + return err + } + } + + // Remove the index tip, index bucket, and in-progress drop flag now + // that all index entries have been removed. + err = db.Update(func(dbTx database.Tx) error { + meta := dbTx.Metadata() + indexesBucket := meta.Bucket(indexTipsBucketName) + if err := indexesBucket.Delete(idxKey); err != nil { + return err + } + + if err := meta.DeleteBucket(idxKey); err != nil { + return err + } + + return indexesBucket.Delete(indexDropKey(idxKey)) + }) + if err != nil { + return err + } + + log.Infof("Dropped %s", idxName) + return nil +} diff --git a/blockchain/indexers/txindex.go b/blockchain/indexers/txindex.go new file mode 100644 index 00000000..0c6b207d --- /dev/null +++ b/blockchain/indexers/txindex.go @@ -0,0 +1,477 @@ +// Copyright (c) 2016 The btcsuite developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package indexers + +import ( + "errors" + "fmt" + + "github.com/btcsuite/btcd/blockchain" + database "github.com/btcsuite/btcd/database2" + "github.com/btcsuite/btcd/wire" + "github.com/btcsuite/btcutil" +) + +const ( + // txIndexName is the human-readable name for the index. + txIndexName = "transaction index" +) + +var ( + // txIndexKey is the key of the transaction index and the db bucket used + // to house it. + txIndexKey = []byte("txbyhashidx") + + // idByHashIndexBucketName is the name of the db bucket used to house + // the block id -> block hash index. + idByHashIndexBucketName = []byte("idbyhashidx") + + // hashByIDIndexBucketName is the name of the db bucket used to house + // the block hash -> block id index. + hashByIDIndexBucketName = []byte("hashbyididx") + + // errNoBlockIDEntry is an error that indicates a requested entry does + // not exist in the block ID index. + errNoBlockIDEntry = errors.New("no entry in the block ID index") +) + +// ----------------------------------------------------------------------------- +// The transaction index consists of an entry for every transaction in the main +// chain. In order to significanly optimize the space requirements a separate +// index which provides an internal mapping between each block that has been +// indexed and a unique ID for use within the hash to location mappings. The ID +// is simply a sequentially incremented uint32. This is useful because it is +// only 4 bytes versus 32 bytes hashes and thus saves a ton of space in the +// index. +// +// There are three buckets used in total. The first bucket maps the hash of +// each transaction to the specific block location. The second bucket maps the +// hash of each block to the unique ID and the third maps that ID back to the +// block hash. +// +// NOTE: Although it is technically possible for multiple transactions to have +// the same hash as long as the previous transaction with the same hash is fully +// spent, this code only stores the most recent one because doing otherwise +// would add a non-trivial amount of space and overhead for something that will +// realistically never happen per the probability and even if it did, the old +// one must be fully spent and so the most likely transaction a caller would +// want for a given hash is the most recent one anyways. +// +// The serialized format for keys and values in the block hash to ID bucket is: +// = +// +// Field Type Size +// hash wire.ShaHash 32 bytes +// ID uint32 4 bytes +// ----- +// Total: 36 bytes +// +// The serialized format for keys and values in the ID to block hash bucket is: +// = +// +// Field Type Size +// ID uint32 4 bytes +// hash wire.ShaHash 32 bytes +// ----- +// Total: 36 bytes +// +// The serialized format for the keys and values in the tx index bucket is: +// +// = +// +// Field Type Size +// txhash wire.ShaHash 32 bytes +// block id uint32 4 bytes +// start offset uint32 4 bytes +// tx length uint32 4 bytes +// ----- +// Total: 44 bytes +// ----------------------------------------------------------------------------- + +// dbPutBlockIDIndexEntry uses an existing database transaction to update or add +// the index entries for the hash to id and id to hash mappings for the provided +// values. +func dbPutBlockIDIndexEntry(dbTx database.Tx, hash *wire.ShaHash, id uint32) error { + // Serialize the height for use in the index entries. + var serializedID [4]byte + byteOrder.PutUint32(serializedID[:], id) + + // Add the block hash to ID mapping to the index. + meta := dbTx.Metadata() + hashIndex := meta.Bucket(idByHashIndexBucketName) + if err := hashIndex.Put(hash[:], serializedID[:]); err != nil { + return err + } + + // Add the block ID to hash mapping to the index. + idIndex := meta.Bucket(hashByIDIndexBucketName) + return idIndex.Put(serializedID[:], hash[:]) +} + +// dbRemoveBlockIDIndexEntry uses an existing database transaction remove index +// entries from the hash to id and id to hash mappings for the provided hash. +func dbRemoveBlockIDIndexEntry(dbTx database.Tx, hash *wire.ShaHash) error { + // Remove the block hash to ID mapping. + meta := dbTx.Metadata() + hashIndex := meta.Bucket(idByHashIndexBucketName) + serializedID := hashIndex.Get(hash[:]) + if serializedID == nil { + return nil + } + if err := hashIndex.Delete(hash[:]); err != nil { + return err + } + + // Remove the block ID to hash mapping. + idIndex := meta.Bucket(hashByIDIndexBucketName) + return idIndex.Delete(serializedID) +} + +// dbFetchBlockIDByHash uses an existing database transaction to retrieve the +// block id for the provided hash from the index. +func dbFetchBlockIDByHash(dbTx database.Tx, hash *wire.ShaHash) (uint32, error) { + hashIndex := dbTx.Metadata().Bucket(idByHashIndexBucketName) + serializedID := hashIndex.Get(hash[:]) + if serializedID == nil { + return 0, errNoBlockIDEntry + } + + return byteOrder.Uint32(serializedID), nil +} + +// dbFetchBlockHashBySerializedID uses an existing database transaction to +// retrieve the hash for the provided serialized block id from the index. +func dbFetchBlockHashBySerializedID(dbTx database.Tx, serializedID []byte) (*wire.ShaHash, error) { + idIndex := dbTx.Metadata().Bucket(hashByIDIndexBucketName) + hashBytes := idIndex.Get(serializedID) + if hashBytes == nil { + return nil, errNoBlockIDEntry + } + + var hash wire.ShaHash + copy(hash[:], hashBytes) + return &hash, nil +} + +// dbFetchBlockHashByID uses an existing database transaction to retrieve the +// hash for the provided block id from the index. +func dbFetchBlockHashByID(dbTx database.Tx, id uint32) (*wire.ShaHash, error) { + var serializedID [4]byte + byteOrder.PutUint32(serializedID[:], id) + return dbFetchBlockHashBySerializedID(dbTx, serializedID[:]) +} + +// putTxIndexEntry serializes the provided values according to the format +// described about for a transaction index entry. The target byte slice must +// be at least large enough to handle the number of bytes defined by the +// txEntrySize constant or it will panic. +func putTxIndexEntry(target []byte, blockID uint32, txLoc wire.TxLoc) { + byteOrder.PutUint32(target, blockID) + byteOrder.PutUint32(target[4:], uint32(txLoc.TxStart)) + byteOrder.PutUint32(target[8:], uint32(txLoc.TxLen)) +} + +// dbPutTxIndexEntry uses an existing database transaction to update the +// transaction index given the provided serialized data that is expected to have +// been serialized putTxIndexEntry. +func dbPutTxIndexEntry(dbTx database.Tx, txHash *wire.ShaHash, serializedData []byte) error { + txIndex := dbTx.Metadata().Bucket(txIndexKey) + return txIndex.Put(txHash[:], serializedData) +} + +// dbFetchTxIndexEntry uses an existing database transaction to fetch the block +// region for the provided transaction hash from the transaction index. When +// there is no entry for the provided hash, nil will be returned for the both +// the region and the error. +func dbFetchTxIndexEntry(dbTx database.Tx, txHash *wire.ShaHash) (*database.BlockRegion, error) { + // Load the record from the database and return now if it doesn't exist. + txIndex := dbTx.Metadata().Bucket(txIndexKey) + serializedData := txIndex.Get(txHash[:]) + if len(serializedData) == 0 { + return nil, nil + } + + // Ensure the serialized data has enough bytes to properly deserialize. + if len(serializedData) < 12 { + return nil, database.Error{ + ErrorCode: database.ErrCorruption, + Description: fmt.Sprintf("corrupt transaction index "+ + "entry for %s", txHash), + } + } + + // Load the block hash associated with the block ID. + hash, err := dbFetchBlockHashBySerializedID(dbTx, serializedData[0:4]) + if err != nil { + return nil, database.Error{ + ErrorCode: database.ErrCorruption, + Description: fmt.Sprintf("corrupt transaction index "+ + "entry for %s: %v", txHash, err), + } + } + + // Deserialize the final entry. + region := database.BlockRegion{Hash: &wire.ShaHash{}} + copy(region.Hash[:], hash[:]) + region.Offset = byteOrder.Uint32(serializedData[4:8]) + region.Len = byteOrder.Uint32(serializedData[8:12]) + + return ®ion, nil +} + +// dbAddTxIndexEntries uses an existing database transaction to add a +// transaction index entry for every transaction in the passed block. +func dbAddTxIndexEntries(dbTx database.Tx, block *btcutil.Block, blockID uint32) error { + // The offset and length of the transactions within the serialized + // block. + txLocs, err := block.TxLoc() + if err != nil { + return err + } + + // As an optimization, allocate a single slice big enough to hold all + // of the serialized transaction index entries for the block and + // serialize them directly into the slice. Then, pass the appropriate + // subslice to the database to be written. This approach significantly + // cuts down on the number of required allocations. + offset := 0 + serializedValues := make([]byte, len(block.Transactions())*txEntrySize) + for i, tx := range block.Transactions() { + putTxIndexEntry(serializedValues[offset:], blockID, txLocs[i]) + endOffset := offset + txEntrySize + err := dbPutTxIndexEntry(dbTx, tx.Sha(), + serializedValues[offset:endOffset:endOffset]) + if err != nil { + return err + } + offset += txEntrySize + } + + return nil +} + +// dbRemoveTxIndexEntry uses an existing database transaction to remove the most +// recent transaction index entry for the given hash. +func dbRemoveTxIndexEntry(dbTx database.Tx, txHash *wire.ShaHash) error { + txIndex := dbTx.Metadata().Bucket(txIndexKey) + serializedData := txIndex.Get(txHash[:]) + if len(serializedData) == 0 { + return fmt.Errorf("can't remove non-existent transaction %s "+ + "from the transaction index", txHash) + } + + return txIndex.Delete(txHash[:]) +} + +// dbRemoveTxIndexEntries uses an existing database transaction to remove the +// latest transaction entry for every transaction in the passed block. +func dbRemoveTxIndexEntries(dbTx database.Tx, block *btcutil.Block) error { + for _, tx := range block.Transactions() { + err := dbRemoveTxIndexEntry(dbTx, tx.Sha()) + if err != nil { + return err + } + } + + return nil +} + +// TxIndex implements a transaction by hash index. That is to say, it supports +// querying all transactions by their hash. +type TxIndex struct { + db database.DB + curBlockID uint32 +} + +// Ensure the TxIndex type implements the Indexer interface. +var _ Indexer = (*TxIndex)(nil) + +// Init initializes the hash-based transaction index. In particular, it finds +// the highest used block ID and stores it for later use when connecting or +// disconnecting blocks. +// +// This is part of the Indexer interface. +func (idx *TxIndex) Init() error { + // Find the latest known block id field for the internal block id + // index and initialize it. This is done because it's a lot more + // efficient to do a single search at initialize time than it is to + // write another value to the database on every update. + err := idx.db.View(func(dbTx database.Tx) error { + // Scan forward in large gaps to find a block id that doesn't + // exist yet to serve as an upper bound for the binary search + // below. + var highestKnown, nextUnknown uint32 + testBlockID := uint32(1) + increment := uint32(100000) + for { + _, err := dbFetchBlockHashByID(dbTx, testBlockID) + if err != nil { + nextUnknown = testBlockID + break + } + + highestKnown = testBlockID + testBlockID += increment + } + log.Tracef("Forward scan (highest known %d, next unknown %d)", + highestKnown, nextUnknown) + + // No used block IDs due to new database. + if nextUnknown == 1 { + return nil + } + + // Use a binary search to find the final highest used block id. + // This will take at most ceil(log_2(increment)) attempts. + for { + testBlockID = (highestKnown + nextUnknown) / 2 + _, err := dbFetchBlockHashByID(dbTx, testBlockID) + if err != nil { + nextUnknown = testBlockID + } else { + highestKnown = testBlockID + } + log.Tracef("Binary scan (highest known %d, next "+ + "unknown %d)", highestKnown, nextUnknown) + if highestKnown+1 == nextUnknown { + break + } + } + + idx.curBlockID = highestKnown + return nil + }) + if err != nil { + return err + } + + log.Debugf("Current internal block ID: %d", idx.curBlockID) + return nil +} + +// Key returns the database key to use for the index as a byte slice. +// +// This is part of the Indexer interface. +func (idx *TxIndex) Key() []byte { + return txIndexKey +} + +// Name returns the human-readable name of the index. +// +// This is part of the Indexer interface. +func (idx *TxIndex) Name() string { + return txIndexName +} + +// Create is invoked when the indexer manager determines the index needs +// to be created for the first time. It creates the buckets for the hash-based +// transaction index and the internal block ID indexes. +// +// This is part of the Indexer interface. +func (idx *TxIndex) Create(dbTx database.Tx) error { + meta := dbTx.Metadata() + if _, err := meta.CreateBucket(idByHashIndexBucketName); err != nil { + return err + } + if _, err := meta.CreateBucket(hashByIDIndexBucketName); err != nil { + return err + } + _, err := meta.CreateBucket(txIndexKey) + return err +} + +// ConnectBlock is invoked by the index manager when a new block has been +// connected to the main chain. This indexer adds a hash-to-transaction mapping +// for every transaction in the passed block. +// +// This is part of the Indexer interface. +func (idx *TxIndex) ConnectBlock(dbTx database.Tx, block *btcutil.Block, view *blockchain.UtxoViewpoint) error { + // Increment the internal block ID to use for the block being connected + // and add all of the transactions in the block to the index. + newBlockID := idx.curBlockID + 1 + if err := dbAddTxIndexEntries(dbTx, block, newBlockID); err != nil { + return err + } + + // Add the new block ID index entry for the block being connected and + // update the current internal block ID accordingly. + err := dbPutBlockIDIndexEntry(dbTx, block.Sha(), newBlockID) + if err != nil { + return err + } + idx.curBlockID = newBlockID + return nil +} + +// DisconnectBlock is invoked by the index manager when a block has been +// disconnected from the main chain. This indexer removes the +// hash-to-transaction mapping for every transaction in the block. +// +// This is part of the Indexer interface. +func (idx *TxIndex) DisconnectBlock(dbTx database.Tx, block *btcutil.Block, view *blockchain.UtxoViewpoint) error { + // Remove all of the transactions in the block from the index. + if err := dbRemoveTxIndexEntries(dbTx, block); err != nil { + return err + } + + // Remove the block ID index entry for the block being disconnected and + // decrement the current internal block ID to account for it. + if err := dbRemoveBlockIDIndexEntry(dbTx, block.Sha()); err != nil { + return err + } + idx.curBlockID-- + return nil +} + +// TxBlockRegion returns the block region for the provided transaction hash +// from the transaction index. The block region can in turn be used to load the +// raw transaction bytes. When there is no entry for the provided hash, nil +// will be returned for the both the entry and the error. +// +// This function is safe for concurrent access. +func (idx *TxIndex) TxBlockRegion(hash *wire.ShaHash) (*database.BlockRegion, error) { + var region *database.BlockRegion + err := idx.db.View(func(dbTx database.Tx) error { + var err error + region, err = dbFetchTxIndexEntry(dbTx, hash) + return err + }) + return region, err +} + +// NewTxIndex returns a new instance of an indexer that is used to create a +// mapping of the hashes of all transactions in the blockchain to the respective +// block, location within the block, and size of the transaction. +// +// It implements the Indexer interface which plugs into the IndexManager that in +// turn is used by the blockchain package. This allows the index to be +// seamlessly maintained along with the chain. +func NewTxIndex(db database.DB) *TxIndex { + return &TxIndex{db: db} +} + +// dropBlockIDIndex drops the internal block id index. +func dropBlockIDIndex(db database.DB) error { + return db.Update(func(dbTx database.Tx) error { + meta := dbTx.Metadata() + err := meta.DeleteBucket(idByHashIndexBucketName) + if err != nil { + return err + } + + return meta.DeleteBucket(hashByIDIndexBucketName) + }) +} + +// DropTxIndex drops the transaction index from the provided database if it +// exists. Since the address index relies on it, the address index will also be +// dropped when it exists. +func DropTxIndex(db database.DB) error { + if err := dropIndex(db, addrIndexKey, addrIndexName); err != nil { + return err + } + + return dropIndex(db, txIndexKey, txIndexName) +} diff --git a/blockmanager.go b/blockmanager.go index 074101a0..c50aeca1 100644 --- a/blockmanager.go +++ b/blockmanager.go @@ -576,6 +576,10 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { bmgrLog.Errorf("Failed to process block %v: %v", blockSha, err) } + if dbErr, ok := err.(database.Error); ok && dbErr.ErrorCode == + database.ErrCorruption { + panic(dbErr) + } // Convert the error into an appropriate reject message and // send it. @@ -1172,7 +1176,6 @@ func (b *blockManager) handleNotifyMsg(notification *blockchain.Notification) { case blockchain.NTBlockAccepted: // Don't relay if we are not current. Other peers that are // current should already know about it. - if !b.current() { return } @@ -1379,7 +1382,7 @@ func (b *blockManager) Pause() chan<- struct{} { // newBlockManager returns a new bitcoin block manager. // Use Start to begin processing asynchronous block and inv updates. -func newBlockManager(s *server) (*blockManager, error) { +func newBlockManager(s *server, indexManager blockchain.IndexManager) (*blockManager, error) { bm := blockManager{ server: s, rejectedTxns: make(map[wire.ShaHash]struct{}), @@ -1398,6 +1401,7 @@ func newBlockManager(s *server) (*blockManager, error) { ChainParams: s.chainParams, Notifications: bm.handleNotifyMsg, SigCache: s.sigCache, + IndexManager: indexManager, }) if err != nil { return nil, err diff --git a/btcd.go b/btcd.go index c6da3a56..e1fdf04b 100644 --- a/btcd.go +++ b/btcd.go @@ -13,6 +13,7 @@ import ( "runtime" "runtime/pprof" + "github.com/btcsuite/btcd/blockchain/indexers" "github.com/btcsuite/btcd/limits" ) @@ -87,6 +88,27 @@ func btcdMain(serverChan chan<- *server) error { db.Close() }) + // Drop indexes and exit if requested. + // + // NOTE: The order is important here because dropping the tx index also + // drops the address index since it relies on it. + if cfg.DropAddrIndex { + if err := indexers.DropAddrIndex(db); err != nil { + btcdLog.Errorf("%v", err) + return err + } + + return nil + } + if cfg.DropTxIndex { + if err := indexers.DropTxIndex(db); err != nil { + btcdLog.Errorf("%v", err) + return err + } + + return nil + } + // Create server and start it. server, err := newServer(cfg.Listeners, db, activeNetParams.Params) if err != nil { diff --git a/config.go b/config.go index 2654542c..e05d90ad 100644 --- a/config.go +++ b/config.go @@ -47,6 +47,8 @@ const ( defaultMaxOrphanTransactions = 1000 defaultMaxOrphanTxSize = 5000 defaultSigCacheMaxSize = 50000 + defaultTxIndex = false + defaultAddrIndex = false ) var ( @@ -128,9 +130,13 @@ type config struct { BlockMaxSize uint32 `long:"blockmaxsize" description:"Maximum block size in bytes to be used when creating a block"` BlockPrioritySize uint32 `long:"blockprioritysize" description:"Size in bytes for high-priority/low-fee transactions when creating a block"` GetWorkKeys []string `long:"getworkkey" description:"DEPRECATED -- Use the --miningaddr option instead"` - NoPeerBloomFilters bool `long:"nopeerbloomfilters" description:"Disable bloom filtering support."` - SigCacheMaxSize uint `long:"sigcachemaxsize" description:"The maximum number of entries in the signature verification cache."` + NoPeerBloomFilters bool `long:"nopeerbloomfilters" description:"Disable bloom filtering support"` + SigCacheMaxSize uint `long:"sigcachemaxsize" description:"The maximum number of entries in the signature verification cache"` BlocksOnly bool `long:"blocksonly" description:"Do not accept transactions from remote peers."` + TxIndex bool `long:"txindex" description:"Maintain a full hash-based transaction index which makes all transactions available via the getrawtransaction RPC"` + DropTxIndex bool `long:"droptxindex" description:"Deletes the hash-based transaction index from the database on start up and then exits."` + AddrIndex bool `long:"addrindex" description:"Maintain a full address-based transaction index which makes the searchrawtransactions RPC available"` + DropAddrIndex bool `long:"dropaddrindex" description:"Deletes the address-based transaction index from the database on start up and then exits."` onionlookup func(string) ([]net.IP, error) lookup func(string) ([]net.IP, error) oniondial func(string, string) (net.Conn, error) @@ -342,6 +348,8 @@ func loadConfig() (*config, []string, error) { MaxOrphanTxs: defaultMaxOrphanTransactions, SigCacheMaxSize: defaultSigCacheMaxSize, Generate: defaultGenerate, + TxIndex: defaultTxIndex, + AddrIndex: defaultAddrIndex, } // Service options which are only added on Windows. @@ -629,6 +637,38 @@ func loadConfig() (*config, []string, error) { cfg.BlockPrioritySize = minUint32(cfg.BlockPrioritySize, cfg.BlockMaxSize) cfg.BlockMinSize = minUint32(cfg.BlockMinSize, cfg.BlockMaxSize) + // --txindex and --droptxindex do not mix. + if cfg.TxIndex && cfg.DropTxIndex { + err := fmt.Errorf("%s: the --txindex and --droptxindex "+ + "options may not be activated at the same time", + funcName) + fmt.Fprintln(os.Stderr, err) + fmt.Fprintln(os.Stderr, usageMessage) + return nil, nil, err + } + + // --addrindex and --dropaddrindex do not mix. + if cfg.AddrIndex && cfg.DropAddrIndex { + err := fmt.Errorf("%s: the --addrindex and --dropaddrindex "+ + "options may not be activated at the same time", + funcName) + fmt.Fprintln(os.Stderr, err) + fmt.Fprintln(os.Stderr, usageMessage) + return nil, nil, err + } + + // --addrindex and --droptxindex do not mix. + if cfg.AddrIndex && cfg.DropTxIndex { + err := fmt.Errorf("%s: the --addrindex and --droptxindex "+ + "options may not be activated at the same time "+ + "because the address index relies on the transaction "+ + "index", + funcName) + fmt.Fprintln(os.Stderr, err) + fmt.Fprintln(os.Stderr, usageMessage) + return nil, nil, err + } + // Check getwork keys are valid and saved parsed versions. cfg.miningAddrs = make([]btcutil.Address, 0, len(cfg.GetWorkKeys)+ len(cfg.MiningAddrs)) diff --git a/database2/ffldb/db.go b/database2/ffldb/db.go index 1b3d59eb..1b79c921 100644 --- a/database2/ffldb/db.go +++ b/database2/ffldb/db.go @@ -1971,18 +1971,16 @@ func (db *db) Close() error { } db.closed = true + // NOTE: Since the above lock waits for all transactions to finish and + // prevents any new ones from being started, it is safe to flush the + // cache and clear all state without the individual locks. + // Close the database cache which will flush any existing entries to // disk and close the underlying leveldb database. Any error is saved // and returned at the end after the remaining cleanup since the // database will be marked closed even if this fails given there is no // good way for the caller to recover from a failure here anyways. - db.writeLock.Lock() closeErr := db.cache.Close() - db.writeLock.Unlock() - - // NOTE: Since the above lock waits for all transactions to finish and - // prevents any new ones from being started, it is safe to clear all - // state without the individual locks. // Close any open flat files that house the blocks. wc := db.store.writeCursor diff --git a/log.go b/log.go index 58279156..febbc340 100644 --- a/log.go +++ b/log.go @@ -12,6 +12,7 @@ import ( "github.com/btcsuite/btcd/addrmgr" "github.com/btcsuite/btcd/blockchain" + "github.com/btcsuite/btcd/blockchain/indexers" database "github.com/btcsuite/btcd/database2" "github.com/btcsuite/btcd/peer" "github.com/btcsuite/btcd/txscript" @@ -39,6 +40,7 @@ var ( btcdLog = btclog.Disabled chanLog = btclog.Disabled discLog = btclog.Disabled + indxLog = btclog.Disabled minrLog = btclog.Disabled peerLog = btclog.Disabled rpcsLog = btclog.Disabled @@ -56,6 +58,7 @@ var subsystemLoggers = map[string]btclog.Logger{ "BTCD": btcdLog, "CHAN": chanLog, "DISC": discLog, + "INDX": indxLog, "MINR": minrLog, "PEER": peerLog, "RPCS": rpcsLog, @@ -113,6 +116,10 @@ func useLogger(subsystemID string, logger btclog.Logger) { case "DISC": discLog = logger + case "INDX": + indxLog = logger + indexers.UseLogger(logger) + case "MINR": minrLog = logger diff --git a/mempool.go b/mempool.go index 2d5af096..6947424d 100644 --- a/mempool.go +++ b/mempool.go @@ -15,6 +15,7 @@ import ( "time" "github.com/btcsuite/btcd/blockchain" + "github.com/btcsuite/btcd/blockchain/indexers" "github.com/btcsuite/btcd/mining" "github.com/btcsuite/btcd/txscript" "github.com/btcsuite/btcd/wire" @@ -60,6 +61,11 @@ type mempoolConfig struct { // TimeSource defines the timesource to use. TimeSource blockchain.MedianTimeSource + + // AddrIndex defines the optional address index instance to use for + // indexing the unconfirmed transactions in the memory pool. + // This can be nil if the address index is not enabled. + AddrIndex *indexers.AddrIndex } // mempoolPolicy houses the policy (configuration parameters) which is used to @@ -324,16 +330,21 @@ func (mp *txMemPool) removeTransaction(tx *btcutil.Tx, removeRedeemers bool) { } } - // Remove the transaction and mark the referenced outpoints as unspent - // by the pool. + // Remove the transaction if needed. if txDesc, exists := mp.pool[*txHash]; exists { + // Remove unconfirmed address index entries associated with the + // transaction if enabled. + if mp.cfg.AddrIndex != nil { + mp.cfg.AddrIndex.RemoveUnconfirmedTx(txHash) + } + + // Mark the referenced outpoints as unspent by the pool. for _, txIn := range txDesc.Tx.MsgTx().TxIn { delete(mp.outpoints, txIn.PreviousOutPoint) } delete(mp.pool, *txHash) atomic.StoreInt64(&mp.lastUpdated, time.Now().Unix()) } - } // RemoveTransaction removes the passed transaction from the mempool. When the @@ -354,7 +365,7 @@ func (mp *txMemPool) RemoveTransaction(tx *btcutil.Tx, removeRedeemers bool) { // passed transaction from the memory pool. Removing those transactions then // leads to removing all transactions which rely on them, recursively. This is // necessary when a block is connected to the main chain because the block may -// contain transactions which were previously unknown to the memory pool +// contain transactions which were previously unknown to the memory pool. // // This function is safe for concurrent access. func (mp *txMemPool) RemoveDoubleSpends(tx *btcutil.Tx) { @@ -392,6 +403,12 @@ func (mp *txMemPool) addTransaction(utxoView *blockchain.UtxoViewpoint, tx *btcu mp.outpoints[txIn.PreviousOutPoint] = tx } atomic.StoreInt64(&mp.lastUpdated, time.Now().Unix()) + + // Add unconfirmed address index entries associated with the transaction + // if enabled. + if mp.cfg.AddrIndex != nil { + mp.cfg.AddrIndex.AddUnconfirmedTx(tx, utxoView) + } } // checkPoolDoubleSpend checks whether or not the passed transaction is diff --git a/rpcserver.go b/rpcserver.go index 2e2b11db..b68067c3 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -131,45 +131,46 @@ type commandHandler func(*rpcServer, interface{}, <-chan struct{}) (interface{}, // a dependency loop. var rpcHandlers map[string]commandHandler var rpcHandlersBeforeInit = map[string]commandHandler{ - "addnode": handleAddNode, - "createrawtransaction": handleCreateRawTransaction, - "debuglevel": handleDebugLevel, - "decoderawtransaction": handleDecodeRawTransaction, - "decodescript": handleDecodeScript, - "generate": handleGenerate, - "getaddednodeinfo": handleGetAddedNodeInfo, - "getbestblock": handleGetBestBlock, - "getbestblockhash": handleGetBestBlockHash, - "getblock": handleGetBlock, - "getblockcount": handleGetBlockCount, - "getblockhash": handleGetBlockHash, - "getblockheader": handleGetBlockHeader, - "getblocktemplate": handleGetBlockTemplate, - "getconnectioncount": handleGetConnectionCount, - "getcurrentnet": handleGetCurrentNet, - "getdifficulty": handleGetDifficulty, - "getgenerate": handleGetGenerate, - "gethashespersec": handleGetHashesPerSec, - "getinfo": handleGetInfo, - "getmempoolinfo": handleGetMempoolInfo, - "getmininginfo": handleGetMiningInfo, - "getnettotals": handleGetNetTotals, - "getnetworkhashps": handleGetNetworkHashPS, - "getpeerinfo": handleGetPeerInfo, - "getrawmempool": handleGetRawMempool, - "getrawtransaction": handleGetRawTransaction, - "gettxout": handleGetTxOut, - "getwork": handleGetWork, - "help": handleHelp, - "node": handleNode, - "ping": handlePing, - "sendrawtransaction": handleSendRawTransaction, - "setgenerate": handleSetGenerate, - "stop": handleStop, - "submitblock": handleSubmitBlock, - "validateaddress": handleValidateAddress, - "verifychain": handleVerifyChain, - "verifymessage": handleVerifyMessage, + "addnode": handleAddNode, + "createrawtransaction": handleCreateRawTransaction, + "debuglevel": handleDebugLevel, + "decoderawtransaction": handleDecodeRawTransaction, + "decodescript": handleDecodeScript, + "generate": handleGenerate, + "getaddednodeinfo": handleGetAddedNodeInfo, + "getbestblock": handleGetBestBlock, + "getbestblockhash": handleGetBestBlockHash, + "getblock": handleGetBlock, + "getblockcount": handleGetBlockCount, + "getblockhash": handleGetBlockHash, + "getblockheader": handleGetBlockHeader, + "getblocktemplate": handleGetBlockTemplate, + "getconnectioncount": handleGetConnectionCount, + "getcurrentnet": handleGetCurrentNet, + "getdifficulty": handleGetDifficulty, + "getgenerate": handleGetGenerate, + "gethashespersec": handleGetHashesPerSec, + "getinfo": handleGetInfo, + "getmempoolinfo": handleGetMempoolInfo, + "getmininginfo": handleGetMiningInfo, + "getnettotals": handleGetNetTotals, + "getnetworkhashps": handleGetNetworkHashPS, + "getpeerinfo": handleGetPeerInfo, + "getrawmempool": handleGetRawMempool, + "getrawtransaction": handleGetRawTransaction, + "gettxout": handleGetTxOut, + "getwork": handleGetWork, + "help": handleHelp, + "node": handleNode, + "ping": handlePing, + "searchrawtransactions": handleSearchRawTransactions, + "sendrawtransaction": handleSendRawTransaction, + "setgenerate": handleSetGenerate, + "stop": handleStop, + "submitblock": handleSubmitBlock, + "validateaddress": handleValidateAddress, + "verifychain": handleVerifyChain, + "verifymessage": handleVerifyMessage, } // list of commands that we recognize, but for which btcd has no support because @@ -222,12 +223,11 @@ var rpcAskWallet = map[string]struct{}{ // Commands that are currently unimplemented, but should ultimately be. var rpcUnimplemented = map[string]struct{}{ - "estimatefee": {}, - "estimatepriority": {}, - "getblockchaininfo": {}, - "getchaintips": {}, - "getnetworkinfo": {}, - "searchrawtransactions": {}, + "estimatefee": {}, + "estimatepriority": {}, + "getblockchaininfo": {}, + "getchaintips": {}, + "getnetworkinfo": {}, } // Commands that are available to a limited user @@ -301,6 +301,15 @@ func rpcDecodeHexError(gotHex string) *btcjson.RPCError { gotHex)) } +// rpcNoTxInfoError is a convenience function for returning a nicely formatted +// RPC error which indiactes there is no information available for the provided +// transaction hash. +func rpcNoTxInfoError(txHash *wire.ShaHash) *btcjson.RPCError { + return btcjson.NewRPCError(btcjson.ErrRPCNoTxInfo, + fmt.Sprintf("No information available about transaction %v", + txHash)) +} + // workStateBlockInfo houses information about how to reconstruct a block given // its template and signature script. type workStateBlockInfo struct { @@ -731,8 +740,8 @@ func createTxRawResult(chainParams *chaincfg.Params, mtx *wire.MsgTx, txReply := &btcjson.TxRawResult{ Hex: mtxHex, Txid: txHash, - Vout: createVoutList(mtx, chainParams, nil), Vin: createVinList(mtx), + Vout: createVoutList(mtx, chainParams, nil), Version: mtx.Version, LockTime: mtx.LockTime, } @@ -2282,34 +2291,118 @@ func handleGetRawTransaction(s *rpcServer, cmd interface{}, closeChan <-chan str verbose = *c.Verbose != 0 } - // Try to fetch the transaction from the memory pool. + // Try to fetch the transaction from the memory pool and if that fails, + // try the block database. + var mtx *wire.MsgTx + var blkHash *wire.ShaHash + var blkHeight int32 tx, err := s.server.txMemPool.FetchTransaction(txHash) if err != nil { - // TODO(davec): Implement optional transaction index. - return nil, &btcjson.RPCError{ - Code: btcjson.ErrRPCNoTxInfo, - Message: "No information available about transaction", + txIndex := s.server.txIndex + if txIndex == nil { + return nil, &btcjson.RPCError{ + Code: btcjson.ErrRPCNoTxInfo, + Message: "The transaction index must be " + + "enabled to query the blockchain " + + "(specify --txindex)", + } } - } - // When the verbose flag isn't set, simply return the - // network-serialized transaction as a hex-encoded string. - if !verbose { - // Note that this is intentionally not directly - // returning because the first return value is a - // string and it would result in returning an empty - // string to the client instead of nothing (nil) in the - // case of an error. - mtxHex, err := messageToHex(tx.MsgTx()) + // Look up the location of the transaction. + blockRegion, err := txIndex.TxBlockRegion(txHash) if err != nil { - return nil, err + context := "Failed to retrieve transaction location" + return nil, internalRPCError(err.Error(), context) } - return mtxHex, nil + if blockRegion == nil { + return nil, rpcNoTxInfoError(txHash) + } + + // Load the raw transaction bytes from the database. + var txBytes []byte + err = s.server.db.View(func(dbTx database.Tx) error { + var err error + txBytes, err = dbTx.FetchBlockRegion(blockRegion) + return err + }) + if err != nil { + return nil, rpcNoTxInfoError(txHash) + } + + // When the verbose flag isn't set, simply return the serialized + // transaction as a hex-encoded string. This is done here to + // avoid deserializing it only to reserialize it again later. + if !verbose { + return hex.EncodeToString(txBytes), nil + } + + // Grab the block height. + blkHash = blockRegion.Hash + blkHeight, err = s.chain.BlockHeightByHash(blkHash) + if err != nil { + context := "Failed to retrieve block height" + return nil, internalRPCError(err.Error(), context) + } + + // Deserialize the transaction + var msgTx wire.MsgTx + err = msgTx.Deserialize(bytes.NewReader(txBytes)) + if err != nil { + context := "Failed to deserialize transaction" + return nil, internalRPCError(err.Error(), context) + } + mtx = &msgTx + } else { + // When the verbose flag isn't set, simply return the + // network-serialized transaction as a hex-encoded string. + if !verbose { + // Note that this is intentionally not directly + // returning because the first return value is a + // string and it would result in returning an empty + // string to the client instead of nothing (nil) in the + // case of an error. + mtxHex, err := messageToHex(tx.MsgTx()) + if err != nil { + return nil, err + } + return mtxHex, nil + } + + mtx = tx.MsgTx() } // The verbose flag is set, so generate the JSON object and return it. - rawTxn, err := createTxRawResult(s.server.chainParams, tx.MsgTx(), - txHash.String(), nil, "", 0, 0) + var blkHeader *wire.BlockHeader + var blkHashStr string + var chainHeight int32 + if blkHash != nil { + // Load the raw header bytes. + var headerBytes []byte + err := s.server.db.View(func(dbTx database.Tx) error { + var err error + headerBytes, err = dbTx.FetchBlockHeader(blkHash) + return err + }) + if err != nil { + context := "Failed to fetch block header" + return nil, internalRPCError(err.Error(), context) + } + + // Deserialize the header. + var header wire.BlockHeader + err = header.Deserialize(bytes.NewReader(headerBytes)) + if err != nil { + context := "Failed to deserialize block header" + return nil, internalRPCError(err.Error(), context) + } + + blkHeader = &header + blkHashStr = blkHash.String() + chainHeight = s.chain.BestSnapshot().Height + } + + rawTxn, err := createTxRawResult(s.server.chainParams, mtx, + txHash.String(), blkHeader, blkHashStr, blkHeight, chainHeight) if err != nil { return nil, err } @@ -2378,10 +2471,7 @@ func handleGetTxOut(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (i if includeMempool && s.server.txMemPool.HaveTransaction(txHash) { tx, err := s.server.txMemPool.FetchTransaction(txHash) if err != nil { - return nil, &btcjson.RPCError{ - Code: btcjson.ErrRPCNoTxInfo, - Message: "No information available about transaction", - } + return nil, rpcNoTxInfoError(txHash) } mtx := tx.MsgTx() @@ -2410,10 +2500,7 @@ func handleGetTxOut(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (i } else { entry, err := s.chain.FetchUtxoEntry(txHash) if err != nil { - return nil, &btcjson.RPCError{ - Code: btcjson.ErrRPCNoTxInfo, - Message: "No information available about transaction", - } + return nil, rpcNoTxInfoError(txHash) } // To match the behavior of the reference client, return nil @@ -2835,6 +2922,500 @@ func handlePing(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (inter return nil, nil } +// retrievedTx represents a transaction that was either loaded from the +// transaction memory pool or from the database. When a transaction is loaded +// from the database, it is loaded with the raw serialized bytes while the +// mempool has the fully deserialized structure. This structure therefore will +// have one of the two fields set depending on where is was retrieved from. +// This is mainly done for efficiency to avoid extra serialization steps when +// possible. +type retrievedTx struct { + txBytes []byte + blkHash *wire.ShaHash // Only set when transaction is in a block. + tx *btcutil.Tx +} + +// fetchInputTxos fetches the outpoints from all transactions referenced by the +// inputs to the passed transaction by checking the transaction mempool first +// then the transaction index for those already mined into blocks. +func fetchInputTxos(s *rpcServer, tx *wire.MsgTx) (map[wire.OutPoint]wire.TxOut, error) { + mp := s.server.txMemPool + originOutputs := make(map[wire.OutPoint]wire.TxOut) + for txInIndex, txIn := range tx.TxIn { + // Attempt to fetch and use the referenced transaction from the + // memory pool. + origin := &txIn.PreviousOutPoint + originTx, err := mp.FetchTransaction(&origin.Hash) + if err == nil { + txOuts := originTx.MsgTx().TxOut + if origin.Index >= uint32(len(txOuts)) { + errStr := fmt.Sprintf("unable to find output "+ + "%v referenced from transaction %s:%d", + origin, tx.TxSha(), txInIndex) + return nil, internalRPCError(errStr, "") + } + + originOutputs[*origin] = *txOuts[origin.Index] + continue + } + + // Look up the location of the transaction. + blockRegion, err := s.server.txIndex.TxBlockRegion(&origin.Hash) + if err != nil { + context := "Failed to retrieve transaction location" + return nil, internalRPCError(err.Error(), context) + } + if blockRegion == nil { + return nil, rpcNoTxInfoError(&origin.Hash) + } + + // Load the raw transaction bytes from the database. + var txBytes []byte + err = s.server.db.View(func(dbTx database.Tx) error { + var err error + txBytes, err = dbTx.FetchBlockRegion(blockRegion) + return err + }) + if err != nil { + return nil, rpcNoTxInfoError(&origin.Hash) + } + + // Deserialize the transaction + var msgTx wire.MsgTx + err = msgTx.Deserialize(bytes.NewReader(txBytes)) + if err != nil { + context := "Failed to deserialize transaction" + return nil, internalRPCError(err.Error(), context) + } + + // Add the referenced output to the map. + if origin.Index >= uint32(len(msgTx.TxOut)) { + errStr := fmt.Sprintf("unable to find output %v "+ + "referenced from transaction %s:%d", origin, + tx.TxSha(), txInIndex) + return nil, internalRPCError(errStr, "") + } + originOutputs[*origin] = *msgTx.TxOut[origin.Index] + } + + return originOutputs, nil +} + +// createVinListPrevOut returns a slice of JSON objects for the inputs of the +// passed transaction. +func createVinListPrevOut(s *rpcServer, mtx *wire.MsgTx, chainParams *chaincfg.Params, vinExtra bool, filterAddrMap map[string]struct{}) ([]btcjson.VinPrevOut, error) { + // Coinbase transactions only have a single txin by definition. + if blockchain.IsCoinBaseTx(mtx) { + // Only include the transaction if the filter map is empty + // because a coinbase input has no addresses and so would never + // match a non-empty filter. + if len(filterAddrMap) != 0 { + return nil, nil + } + + txIn := mtx.TxIn[0] + vinList := make([]btcjson.VinPrevOut, 1) + vinList[0].Coinbase = hex.EncodeToString(txIn.SignatureScript) + vinList[0].Sequence = txIn.Sequence + return vinList, nil + } + + // Use a dynamically sized list to accomodate the address filter. + vinList := make([]btcjson.VinPrevOut, 0, len(mtx.TxIn)) + + // Lookup all of the referenced transaction outputs needed to populate + // the previous output information if requested. + var originOutputs map[wire.OutPoint]wire.TxOut + if vinExtra || len(filterAddrMap) > 0 { + var err error + originOutputs, err = fetchInputTxos(s, mtx) + if err != nil { + return nil, err + } + } + + for _, txIn := range mtx.TxIn { + // The disassembled string will contain [error] inline + // if the script doesn't fully parse, so ignore the + // error here. + disbuf, _ := txscript.DisasmString(txIn.SignatureScript) + + // Create the basic input entry without the additional optional + // previous output details which will be added later if + // requested and available. + prevOut := &txIn.PreviousOutPoint + vinEntry := btcjson.VinPrevOut{ + Txid: prevOut.Hash.String(), + Vout: prevOut.Index, + Sequence: txIn.Sequence, + ScriptSig: &btcjson.ScriptSig{ + Asm: disbuf, + Hex: hex.EncodeToString(txIn.SignatureScript), + }, + } + + // Add the entry to the list now if it already passed the filter + // since the previous output might not be available. + passesFilter := len(filterAddrMap) == 0 + if passesFilter { + vinList = append(vinList, vinEntry) + } + + // Only populate previous output information if requested and + // available. + if len(originOutputs) == 0 { + continue + } + originTxOut, ok := originOutputs[*prevOut] + if !ok { + continue + } + + // Ignore the error here since an error means the script + // couldn't parse and there is no additional information about + // it anyways. + _, addrs, _, _ := txscript.ExtractPkScriptAddrs( + originTxOut.PkScript, chainParams) + + // Encode the addresses while checking if the address passes the + // filter when needed. + encodedAddrs := make([]string, len(addrs)) + for j, addr := range addrs { + encodedAddr := addr.EncodeAddress() + encodedAddrs[j] = encodedAddr + + // No need to check the map again if the filter already + // passes. + if passesFilter { + continue + } + if _, exists := filterAddrMap[encodedAddr]; exists { + passesFilter = true + } + } + + // Ignore the entry if it doesn't pass the filter. + if !passesFilter { + continue + } + + // Add entry to the list if it wasn't already done above. + if len(filterAddrMap) != 0 { + vinList = append(vinList, vinEntry) + } + + // Update the entry with previous output information if + // requested. + if vinExtra { + vinListEntry := &vinList[len(vinList)-1] + vinListEntry.PrevOut = &btcjson.PrevOut{ + Addresses: encodedAddrs, + Value: btcutil.Amount(originTxOut.Value).ToBTC(), + } + } + } + + return vinList, nil +} + +// fetchMempoolTxnsForAddress queries the address index for all unconfirmed +// transactions that involve the provided address. The results will be limited +// by the number to skip and the number requested. +func fetchMempoolTxnsForAddress(s *rpcServer, addr btcutil.Address, numToSkip, numRequested uint32) ([]*btcutil.Tx, uint32) { + // There are no entries to return when there are less available than the + // number being skipped. + mpTxns := s.server.addrIndex.UnconfirmedTxnsForAddress(addr) + numAvailable := uint32(len(mpTxns)) + if numToSkip > numAvailable { + return nil, numAvailable + } + + // Filter the available entries based on the number to skip and number + // requested. + rangeEnd := numToSkip + numRequested + if rangeEnd > numAvailable { + rangeEnd = numAvailable + } + return mpTxns[numToSkip:rangeEnd], numToSkip +} + +// handleSearchRawTransactions implements the searchrawtransactions command. +func handleSearchRawTransactions(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) { + // Respond with an error if the address index is not enabled. + addrIndex := s.server.addrIndex + if addrIndex == nil { + return nil, &btcjson.RPCError{ + Code: btcjson.ErrRPCMisc, + Message: "Address index must be enabled (--addrindex)", + } + } + + // Override the flag for including extra previous output information in + // each input if needed. + c := cmd.(*btcjson.SearchRawTransactionsCmd) + vinExtra := false + if c.VinExtra != nil { + vinExtra = *c.VinExtra != 0 + } + + // Including the extra previous output information requires the + // transaction index. Currently the address index relies on the + // transaction index, so this check is redundant, but it's better to be + // safe in case the address index is ever changed to not rely on it. + if vinExtra && s.server.txIndex == nil { + return nil, &btcjson.RPCError{ + Code: btcjson.ErrRPCMisc, + Message: "Transaction index must be enabled (--txindex)", + } + } + + // Attempt to decode the supplied address. + addr, err := btcutil.DecodeAddress(c.Address, s.server.chainParams) + if err != nil { + return nil, &btcjson.RPCError{ + Code: btcjson.ErrRPCInvalidAddressOrKey, + Message: "Invalid address or key: " + err.Error(), + } + } + + // Override the default number of requested entries if needed. Also, + // just return now if the number of requested entries is zero to avoid + // extra work. + numRequested := 100 + if c.Count != nil { + numRequested = *c.Count + if numRequested < 0 { + numRequested = 1 + } + } + if numRequested == 0 { + return nil, nil + } + + // Override the default number of entries to skip if needed. + var numToSkip int + if c.Skip != nil { + numToSkip = *c.Skip + if numToSkip < 0 { + numToSkip = 0 + } + } + + // Override the reverse flag if needed. + var reverse bool + if c.Reverse != nil { + reverse = *c.Reverse + } + + // Add transactions from mempool first if client asked for reverse + // order. Otherwise, they will be added last (as needed depending on + // the requested counts). + // + // NOTE: This code doesn't sort by dependency. This might be something + // to do in the future for the client's convenience, or leave it to the + // client. + numSkipped := uint32(0) + addressTxns := make([]retrievedTx, 0, numRequested) + if reverse { + // Transactions in the mempool are not in a block header yet, + // so the block header field in the retieved transaction struct + // is left nil. + mpTxns, mpSkipped := fetchMempoolTxnsForAddress(s, addr, + uint32(numToSkip), uint32(numRequested)) + numSkipped += mpSkipped + for _, tx := range mpTxns { + addressTxns = append(addressTxns, retrievedTx{tx: tx}) + } + } + + // Fetch transactions from the database in the desired order if more are + // needed. + if len(addressTxns) < numRequested { + err = s.server.db.View(func(dbTx database.Tx) error { + regions, dbSkipped, err := addrIndex.TxRegionsForAddress( + dbTx, addr, uint32(numToSkip)-numSkipped, + uint32(numRequested-len(addressTxns)), reverse) + if err != nil { + return err + } + + // Load the raw transaction bytes from the database. + serializedTxns, err := dbTx.FetchBlockRegions(regions) + if err != nil { + return err + } + + // Add the transaction and the hash of the block it is + // contained in to the list. Note that the transaction + // is left serialized here since the caller might have + // requested non-verbose output and hence there would be + // no point in deserializing it just to reserialize it + // later. + for i, serializedTx := range serializedTxns { + addressTxns = append(addressTxns, retrievedTx{ + txBytes: serializedTx, + blkHash: regions[i].Hash, + }) + } + numSkipped += dbSkipped + + return nil + }) + if err != nil { + context := "Failed to load address index entries" + return nil, internalRPCError(err.Error(), context) + } + + } + + // Add transactions from mempool last if client did not request reverse + // order and the number of results is still under the number requested. + if !reverse && len(addressTxns) < numRequested { + // Transactions in the mempool are not in a block header yet, + // so the block header field in the retieved transaction struct + // is left nil. + mpTxns, mpSkipped := fetchMempoolTxnsForAddress(s, addr, + uint32(numToSkip)-numSkipped, uint32(numRequested- + len(addressTxns))) + numSkipped += mpSkipped + for _, tx := range mpTxns { + addressTxns = append(addressTxns, retrievedTx{tx: tx}) + } + } + + // Address has never been used if neither source yielded any results. + if len(addressTxns) == 0 { + return nil, &btcjson.RPCError{ + Code: btcjson.ErrRPCNoTxInfo, + Message: "No information available about address", + } + } + + // Serialize all of the transactions to hex. + hexTxns := make([]string, len(addressTxns)) + for i := range addressTxns { + // Simply encode the raw bytes to hex when the retrieved + // transaction is already in serialized form. + rtx := &addressTxns[i] + if rtx.txBytes != nil { + hexTxns[i] = hex.EncodeToString(rtx.txBytes) + continue + } + + // Serialize the transaction first and convert to hex when the + // retrieved transaction is the deserialized structure. + hexTxns[i], err = messageToHex(rtx.tx.MsgTx()) + if err != nil { + return nil, err + } + } + + // When not in verbose mode, simply return a list of serialized txns. + if c.Verbose != nil && *c.Verbose == 0 { + return hexTxns, nil + } + + // Normalize the provided filter addresses (if any) to ensure there are + // no duplicates. + filterAddrMap := make(map[string]struct{}) + if c.FilterAddrs != nil && len(*c.FilterAddrs) > 0 { + for _, addr := range *c.FilterAddrs { + filterAddrMap[addr] = struct{}{} + } + } + + // The verbose flag is set, so generate the JSON object and return it. + best := s.chain.BestSnapshot() + chainParams := s.server.chainParams + srtList := make([]btcjson.SearchRawTransactionsResult, len(addressTxns)) + for i := range addressTxns { + // The deserialized transaction is needed, so deserialize the + // retrieved transaction if it's in serialized form (which will + // be the case when it was lookup up from the database). + // Otherwise, use the existing deserialized transaction. + rtx := &addressTxns[i] + var mtx *wire.MsgTx + if rtx.tx == nil { + // Deserialize the transaction. + mtx = new(wire.MsgTx) + err := mtx.Deserialize(bytes.NewReader(rtx.txBytes)) + if err != nil { + context := "Failed to deserialize transaction" + return nil, internalRPCError(err.Error(), + context) + } + } else { + mtx = rtx.tx.MsgTx() + } + + result := &srtList[i] + result.Hex = hexTxns[i] + result.Txid = mtx.TxSha().String() + result.Vin, err = createVinListPrevOut(s, mtx, chainParams, + vinExtra, filterAddrMap) + if err != nil { + return nil, err + } + result.Vout = createVoutList(mtx, chainParams, filterAddrMap) + result.Version = mtx.Version + result.LockTime = mtx.LockTime + + // Transactions grabbed from the mempool aren't yet in a block, + // so conditionally fetch block details here. This will be + // reflected in the final JSON output (mempool won't have + // confirmations or block information). + var blkHeader *wire.BlockHeader + var blkHashStr string + var blkHeight int32 + if blkHash := rtx.blkHash; blkHash != nil { + // Load the raw header bytes from the database. + var headerBytes []byte + err := s.server.db.View(func(dbTx database.Tx) error { + var err error + headerBytes, err = dbTx.FetchBlockHeader(blkHash) + return err + }) + if err != nil { + return nil, &btcjson.RPCError{ + Code: btcjson.ErrRPCBlockNotFound, + Message: "Block not found", + } + } + + // Deserialize the block header. + var header wire.BlockHeader + err = header.Deserialize(bytes.NewReader(headerBytes)) + if err != nil { + context := "Failed to deserialize block header" + return nil, internalRPCError(err.Error(), context) + } + + // Get the block height from chain. + height, err := s.chain.BlockHeightByHash(blkHash) + if err != nil { + context := "Failed to obtain block height" + return nil, internalRPCError(err.Error(), context) + } + + blkHeader = &header + blkHashStr = blkHash.String() + blkHeight = height + } + + // Add the block information to the result if there is any. + if blkHeader != nil { + // This is not a typo, they are identical in Bitcoin + // Core as well. + result.Time = blkHeader.Timestamp.Unix() + result.Blocktime = blkHeader.Timestamp.Unix() + result.BlockHash = blkHashStr + result.Confirmations = uint64(1 + best.Height - blkHeight) + } + } + + return srtList, nil +} + // handleSendRawTransaction implements the sendrawtransaction command. func handleSendRawTransaction(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) { c := cmd.(*btcjson.SendRawTransactionCmd) @@ -3239,8 +3820,7 @@ func (s *rpcServer) decrementClients() { // the second bool return value specifies whether the user can change the state // of the server (true) or whether the user is limited (false). The second is // always false if the first is. -func (s *rpcServer) checkAuth(r *http.Request, require bool) (bool, bool, - error) { +func (s *rpcServer) checkAuth(r *http.Request, require bool) (bool, bool, error) { authhdr := r.Header["Authorization"] if len(authhdr) <= 0 { if require { @@ -3355,8 +3935,7 @@ func createMarshalledReply(id, result interface{}, replyErr error) ([]byte, erro } // jsonRPCRead handles reading and responding to RPC messages. -func (s *rpcServer) jsonRPCRead(w http.ResponseWriter, r *http.Request, - isAdmin bool) { +func (s *rpcServer) jsonRPCRead(w http.ResponseWriter, r *http.Request, isAdmin bool) { if atomic.LoadInt32(&s.shutdown) != 0 { return } diff --git a/sample-btcd.conf b/sample-btcd.conf index 61ec0eac..da4e4b8c 100644 --- a/sample-btcd.conf +++ b/sample-btcd.conf @@ -249,6 +249,19 @@ ; dropaddrindex=0 +; ------------------------------------------------------------------------------ +; Optional Indexes +; ------------------------------------------------------------------------------ + +; Build and maintain a full hash-based transaction index which makes all +; transactions available via the getrawtransaction RPC. +; txindex=1 + +; Build and maintain a full address-based transaction index which makes the +; searchrawtransactions RPC available. +; addrindex=1 + + ; ------------------------------------------------------------------------------ ; Signature Verification Cache ; ------------------------------------------------------------------------------ diff --git a/server.go b/server.go index d0238e34..1b2889ed 100644 --- a/server.go +++ b/server.go @@ -22,6 +22,7 @@ import ( "github.com/btcsuite/btcd/addrmgr" "github.com/btcsuite/btcd/blockchain" + "github.com/btcsuite/btcd/blockchain/indexers" "github.com/btcsuite/btcd/chaincfg" database "github.com/btcsuite/btcd/database2" "github.com/btcsuite/btcd/mining" @@ -201,6 +202,13 @@ type server struct { db database.DB timeSource blockchain.MedianTimeSource services wire.ServiceFlag + + // The following fields are used for optional indexes. They will be nil + // if the associated index is not enabled. These fields are set during + // initial creation of the server and never changed afterwards, so they + // do not need to be protected for concurrent access. + txIndex *indexers.TxIndex + addrIndex *indexers.AddrIndex } // serverPeer extends the peer to maintain state shared by the server and @@ -2480,7 +2488,40 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param services: services, sigCache: txscript.NewSigCache(cfg.SigCacheMaxSize), } - bm, err := newBlockManager(&s) + + // Create the transaction and address indexes if needed. + // + // CAUTION: the txindex needs to be first in the indexes array because + // the addrindex uses data from the txindex during catchup. If the + // addrindex is run first, it may not have the transactions from the + // current block indexed. + var indexes []indexers.Indexer + if cfg.TxIndex || cfg.AddrIndex { + // Enable transaction index if address index is enabled since it + // requires it. + if !cfg.TxIndex { + indxLog.Infof("Transaction index enabled because it " + + "is required by the address index") + cfg.TxIndex = true + } else { + indxLog.Info("Transaction index is enabled") + } + + s.txIndex = indexers.NewTxIndex(db) + indexes = append(indexes, s.txIndex) + } + if cfg.AddrIndex { + indxLog.Info("Address index is enabled") + s.addrIndex = indexers.NewAddrIndex(db, chainParams) + indexes = append(indexes, s.addrIndex) + } + + // Create an index manager if any of the optional indexes are enabled. + var indexManager blockchain.IndexManager + if len(indexes) > 0 { + indexManager = indexers.NewManager(db, indexes) + } + bm, err := newBlockManager(&s, indexManager) if err != nil { return nil, err } @@ -2500,6 +2541,7 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param RelayNtfnChan: s.relayNtfnChan, SigCache: s.sigCache, TimeSource: s.timeSource, + AddrIndex: s.addrIndex, } s.txMemPool = newTxMemPool(&txC)