chainntnfs: cache spend hints within the different chain notifiers

In this commit, we extend the different ChainNotifier implementations to
cache height hints for our spend events. Each outpoint we've requested a
spend notification for will have its initial height hint cached. We then
increment this height hint at every new block for unspent outpoints.
This allows us to retrieve the *exact* height at which the outpoint has
been spent. By doing this, we optimize the different ChainNotifier
implementations since they will no longer have to rescan forward (and
possibly fetch blocks in the neutrino/pruned node case) from the initial
height hint.
This commit is contained in:
Wilmer Paulino 2018-08-14 17:55:29 -07:00
parent 7e872566c4
commit 94beabf34b
No known key found for this signature in database
GPG Key ID: 6DF57B9F9514972F
3 changed files with 167 additions and 30 deletions

View File

@ -587,9 +587,6 @@ func (b *BitcoindNotifier) handleBlockConnected(block chainntnfs.BlockEpoch) err
return fmt.Errorf("unable to get block: %v", err)
}
chainntnfs.Log.Infof("New block: height=%v, sha=%v",
block.Height, block.Hash)
txns := btcutil.NewBlock(rawBlock).Transactions()
err = b.txConfNotifier.ConnectTip(
block.Hash, uint32(block.Height), txns)
@ -597,6 +594,9 @@ func (b *BitcoindNotifier) handleBlockConnected(block chainntnfs.BlockEpoch) err
return fmt.Errorf("unable to connect tip: %v", err)
}
chainntnfs.Log.Infof("New block: height=%v, sha=%v", block.Height,
block.Hash)
// We want to set the best block before dispatching notifications so
// if any subscribers make queries based on their received block epoch,
// our state is fully updated in time.
@ -604,6 +604,26 @@ func (b *BitcoindNotifier) handleBlockConnected(block chainntnfs.BlockEpoch) err
b.notifyBlockEpochs(block.Height, block.Hash)
// Finally, we'll update the spend height hint for all of our watched
// outpoints that have not been spent yet. This is safe to do as we do
// not watch already spent outpoints for spend notifications.
ops := make([]wire.OutPoint, 0, len(b.spendNotifications))
for op := range b.spendNotifications {
ops = append(ops, op)
}
if len(ops) > 0 {
err := b.spendHintCache.CommitSpendHint(
uint32(block.Height), ops...,
)
if err != nil {
// The error is not fatal, so we should not return an
// error to the caller.
chainntnfs.Log.Errorf("Unable to update spend hint to "+
"%d for %v: %v", block.Height, ops, err)
}
}
return nil
}
@ -662,6 +682,18 @@ type spendCancel struct {
func (b *BitcoindNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
pkScript []byte, heightHint uint32) (*chainntnfs.SpendEvent, error) {
// Before proceeding to register the notification, we'll query our
// height hint cache to determine whether a better one exists.
if hint, err := b.spendHintCache.QuerySpendHint(*outpoint); err == nil {
if hint > heightHint {
chainntnfs.Log.Debugf("Using height hint %d retrieved "+
"from cache for %v", hint, outpoint)
heightHint = hint
}
}
// Construct a notification request for the outpoint and send it to the
// main event loop.
ntfn := &spendNotification{
targetOutpoint: outpoint,
spendChan: make(chan *chainntnfs.SpendDetail, 1),
@ -688,7 +720,20 @@ func (b *BitcoindNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
return nil, err
}
if txOut == nil {
// If the output is unspent, then we'll write it to the cache with the
// given height hint. This allows us to increase the height hint as the
// chain extends and the output remains unspent.
if txOut != nil {
err := b.spendHintCache.CommitSpendHint(heightHint, *outpoint)
if err != nil {
// The error is not fatal, so we should not return an
// error to the caller.
chainntnfs.Log.Error("Unable to update spend hint to "+
"%d for %v: %v", heightHint, *outpoint, err)
}
} else {
// Otherwise, we'll determine when the output was spent.
//
// First, we'll attempt to retrieve the transaction's block hash
// using the backend's transaction index.
tx, err := b.chainConn.GetRawTransactionVerbose(&outpoint.Hash)
@ -718,7 +763,9 @@ func (b *BitcoindNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
int64(heightHint),
)
if err != nil {
return nil, err
return nil, fmt.Errorf("unable to retrieve "+
"hash for block with height %d: %v",
heightHint, err)
}
}
@ -810,11 +857,13 @@ func (b *BitcoindNotifier) dispatchSpendDetailsManually(op wire.OutPoint,
blockHash, err := b.chainConn.GetBlockHash(int64(height))
if err != nil {
return err
return fmt.Errorf("unable to retrieve hash for block "+
"with height %d: %v", height, err)
}
block, err := b.chainConn.GetBlock(blockHash)
if err != nil {
return err
return fmt.Errorf("unable to retrieve block with hash "+
"%v: %v", blockHash, err)
}
for _, tx := range block.Transactions {

View File

@ -662,23 +662,23 @@ func (b *BtcdNotifier) handleBlockConnected(epoch chainntnfs.BlockEpoch) error {
return fmt.Errorf("unable to get block: %v", err)
}
chainntnfs.Log.Infof("New block: height=%v, sha=%v",
epoch.Height, epoch.Hash)
txns := btcutil.NewBlock(rawBlock).Transactions()
newBlock := &filteredBlock{
hash: *epoch.Hash,
height: uint32(epoch.Height),
txns: txns,
txns: btcutil.NewBlock(rawBlock).Transactions(),
connect: true,
}
err = b.txConfNotifier.ConnectTip(&newBlock.hash, newBlock.height,
newBlock.txns)
err = b.txConfNotifier.ConnectTip(
&newBlock.hash, newBlock.height, newBlock.txns,
)
if err != nil {
return fmt.Errorf("unable to connect tip: %v", err)
}
chainntnfs.Log.Infof("New block: height=%v, sha=%v", epoch.Height,
epoch.Hash)
// We want to set the best block before dispatching notifications
// so if any subscribers make queries based on their received
// block epoch, our state is fully updated in time.
@ -687,8 +687,8 @@ func (b *BtcdNotifier) handleBlockConnected(epoch chainntnfs.BlockEpoch) error {
// Next we'll notify any subscribed clients of the block.
b.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash)
// Finally, we'll scan over the list of relevant transactions and
// possibly dispatch notifications for confirmations and spends.
// Scan over the list of relevant transactions and possibly dispatch
// notifications for spends.
for _, tx := range newBlock.txns {
mtx := tx.MsgTx()
txSha := mtx.TxHash()
@ -714,8 +714,10 @@ func (b *BtcdNotifier) handleBlockConnected(epoch chainntnfs.BlockEpoch) error {
}
for _, ntfn := range clients {
chainntnfs.Log.Infof("Dispatching spend notification for "+
"outpoint=%v", ntfn.targetOutpoint)
chainntnfs.Log.Infof("Dispatching spend "+
"notification for outpoint=%v",
ntfn.targetOutpoint)
ntfn.spendChan <- spendDetails
// Close spendChan to ensure that any calls to
@ -729,6 +731,26 @@ func (b *BtcdNotifier) handleBlockConnected(epoch chainntnfs.BlockEpoch) error {
}
}
// Finally, we'll update the spend height hint for all of our watched
// outpoints that have not been spent yet. This is safe to do as we do
// not watch already spent outpoints for spend notifications.
ops := make([]wire.OutPoint, 0, len(b.spendNotifications))
for op := range b.spendNotifications {
ops = append(ops, op)
}
if len(ops) > 0 {
err := b.spendHintCache.CommitSpendHint(
uint32(epoch.Height), ops...,
)
if err != nil {
// The error is not fatal, so we should not return an
// error to the caller.
chainntnfs.Log.Errorf("Unable to update spend hint to "+
"%d for %v: %v", epoch.Height, ops, err)
}
}
return nil
}
@ -787,6 +809,18 @@ type spendCancel struct {
func (b *BtcdNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
pkScript []byte, heightHint uint32) (*chainntnfs.SpendEvent, error) {
// Before proceeding to register the notification, we'll query our
// height hint cache to determine whether a better one exists.
if hint, err := b.spendHintCache.QuerySpendHint(*outpoint); err == nil {
if hint > heightHint {
chainntnfs.Log.Debugf("Using height hint %d retrieved "+
"from cache for %v", hint, outpoint)
heightHint = hint
}
}
// Construct a notification request for the outpoint and send it to the
// main event loop.
ntfn := &spendNotification{
targetOutpoint: outpoint,
spendChan: make(chan *chainntnfs.SpendDetail, 1),
@ -815,7 +849,20 @@ func (b *BtcdNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
return nil, err
}
if txOut == nil {
// If the output is unspent, then we'll write it to the cache with the
// given height hint. This allows us to increase the height hint as the
// chain extends and the output remains unspent.
if txOut != nil {
err := b.spendHintCache.CommitSpendHint(heightHint, *outpoint)
if err != nil {
// The error is not fatal, so we should not return an
// error to the caller.
chainntnfs.Log.Error("Unable to update spend hint to "+
"%d for %v: %v", heightHint, *outpoint, err)
}
} else {
// Otherwise, we'll determine when the output was spent.
//
// First, we'll attempt to retrieve the transaction's block hash
// using the backend's transaction index.
tx, err := b.chainConn.GetRawTransactionVerbose(&outpoint.Hash)

View File

@ -603,22 +603,23 @@ func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error {
// First process the block for our internal state. A new block has
// been connected to the main chain. Send out any N confirmation
// notifications which may have been triggered by this new block.
err := n.txConfNotifier.ConnectTip(&newBlock.hash, newBlock.height,
newBlock.txns)
err := n.txConfNotifier.ConnectTip(
&newBlock.hash, newBlock.height, newBlock.txns,
)
if err != nil {
return fmt.Errorf("unable to connect tip: %v", err)
}
chainntnfs.Log.Infof("New block: height=%v, sha=%v",
newBlock.height, newBlock.hash)
chainntnfs.Log.Infof("New block: height=%v, sha=%v", newBlock.height,
newBlock.hash)
n.bestHeight = newBlock.height
// Next, notify any subscribed clients of the block.
n.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash)
// Finally, we'll scan over the list of relevant transactions and
// possibly dispatch notifications for confirmations and spends.
// Scan over the list of relevant transactions and possibly dispatch
// notifications for spends.
for _, tx := range newBlock.txns {
mtx := tx.MsgTx()
txSha := mtx.TxHash()
@ -661,6 +662,24 @@ func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error {
}
}
// Finally, we'll update the spend height hint for all of our watched
// outpoints that have not been spent yet. This is safe to do as we do
// not watch already spent outpoints for spend notifications.
ops := make([]wire.OutPoint, 0, len(n.spendNotifications))
for op := range n.spendNotifications {
ops = append(ops, op)
}
if len(ops) > 0 {
err := n.spendHintCache.CommitSpendHint(newBlock.height, ops...)
if err != nil {
// The error is not fatal, so we should not return an
// error to the caller.
chainntnfs.Log.Errorf("Unable to update spend hint to "+
"%d for %v: %v", newBlock.height, ops, err)
}
}
return nil
}
@ -740,15 +759,26 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
currentHeight := n.bestHeight
n.heightMtx.RUnlock()
chainntnfs.Log.Infof("New spend notification for outpoint=%v, "+
"height_hint=%v", outpoint, heightHint)
// Before proceeding to register the notification, we'll query our
// height hint cache to determine whether a better one exists.
if hint, err := n.spendHintCache.QuerySpendHint(*outpoint); err == nil {
if hint > heightHint {
chainntnfs.Log.Debugf("Using height hint %d retrieved "+
"from cache for %v", hint, outpoint)
heightHint = hint
}
}
// Construct a notification request for the outpoint. We'll defer
// sending it to the main event loop until after we've guaranteed that
// the outpoint has not been spent.
ntfn := &spendNotification{
targetOutpoint: outpoint,
spendChan: make(chan *chainntnfs.SpendDetail, 1),
spendID: atomic.AddUint64(&n.spendClientCounter, 1),
heightHint: heightHint,
}
spendEvent := &chainntnfs.SpendEvent{
Spend: ntfn.spendChan,
Cancel: func() {
@ -760,8 +790,9 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
// Submit spend cancellation to notification dispatcher.
select {
case n.notificationCancels <- cancel:
// Cancellation is being handled, drain the spend chan until it is
// closed before yielding to the caller.
// Cancellation is being handled, drain the
// spend chan until it is closed before yielding
// to the caller.
for {
select {
case _, ok := <-ntfn.spendChan:
@ -851,6 +882,16 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
return nil, ErrChainNotifierShuttingDown
}
// Finally, we'll add a spent hint with the current height to the cache
// in order to better keep track of when this outpoint is spent.
err = n.spendHintCache.CommitSpendHint(currentHeight, *outpoint)
if err != nil {
// The error is not fatal, so we should not return an error to
// the caller.
chainntnfs.Log.Errorf("Unable to update spend hint to %d for "+
"%v: %v", currentHeight, outpoint, err)
}
return spendEvent, nil
}