diff --git a/chainntnfs/bitcoindnotify/bitcoind.go b/chainntnfs/bitcoindnotify/bitcoind.go index e9393ff6d..ef29c4503 100644 --- a/chainntnfs/bitcoindnotify/bitcoind.go +++ b/chainntnfs/bitcoindnotify/bitcoind.go @@ -455,23 +455,19 @@ out: case chain.RelevantTx: tx := btcutil.NewTx(&item.TxRecord.MsgTx) - // If this is a mempool spend, we'll ask the - // mempool notifier to hanlde it. + // Init values. + isMempool := false + height := uint32(0) + + // Unwrap values. if item.Block == nil { - b.memNotifier.ProcessRelevantSpendTx(tx) - continue + isMempool = true + } else { + height = uint32(item.Block.Height) } - // Otherwise this is a confirmed spend, and - // we'll ask the tx notifier to handle it. - err := b.txNotifier.ProcessRelevantSpendTx( - tx, uint32(item.Block.Height), - ) - if err != nil { - chainntnfs.Log.Errorf("Unable to "+ - "process transaction %v: %v", - tx.Hash(), err) - } + // Handle the transaction. + b.handleRelevantTx(tx, isMempool, height) } case <-b.quit: @@ -480,6 +476,39 @@ out: } } +// handleRelevantTx handles a new transaction that has been seen either in a +// block or in the mempool. If in mempool, it will ask the mempool notifier to +// handle it. If in a block, it will ask the txNotifier to handle it, and +// cancel any relevant subscriptions made in the mempool. +func (b *BitcoindNotifier) handleRelevantTx(tx *btcutil.Tx, + mempool bool, height uint32) { + + // If this is a mempool spend, we'll ask the mempool notifier to hanlde + // it. + if mempool { + b.memNotifier.ProcessRelevantSpendTx(tx) + return + } + + // Otherwise this is a confirmed spend, and we'll ask the tx notifier + // to handle it. + err := b.txNotifier.ProcessRelevantSpendTx(tx, height) + if err != nil { + chainntnfs.Log.Errorf("Unable to process transaction %v: %v", + tx.Hash(), err) + + return + } + + // Once the tx is processed, we will ask the memNotifier to unsubscribe + // the input. + // + // NOTE(yy): we could build it into txNotifier.ProcessRelevantSpendTx, + // but choose to implement it here so we can easily decouple the two + // notifiers in the future. + b.memNotifier.UnsubsribeConfirmedSpentTx(tx) +} + // historicalConfDetails looks up whether a confirmation request (txid/output // script) has already been included in a block in the active chain and, if so, // returns details about said block. diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index 32f776fe8..2c2619b2c 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -505,23 +505,19 @@ out: newSpend := item.(*txUpdate) tx := newSpend.tx - // If this is a mempool spend, we'll ask the mempool - // notifier to hanlde it. + // Init values. + isMempool := false + height := uint32(0) + + // Unwrap values. if newSpend.details == nil { - b.memNotifier.ProcessRelevantSpendTx(tx) - continue + isMempool = true + } else { + height = uint32(newSpend.details.Height) } - // Otherwise this is a confirmed spend, and we'll ask - // the tx notifier to handle it. - err := b.txNotifier.ProcessRelevantSpendTx( - tx, uint32(newSpend.details.Height), - ) - if err != nil { - chainntnfs.Log.Errorf("Unable to process "+ - "transaction %v: %v", - newSpend.tx.Hash(), err) - } + // Handle the transaction. + b.handleRelevantTx(tx, isMempool, height) case <-b.quit: break out @@ -529,6 +525,39 @@ out: } } +// handleRelevantTx handles a new transaction that has been seen either in a +// block or in the mempool. If in mempool, it will ask the mempool notifier to +// handle it. If in a block, it will ask the txNotifier to handle it, and +// cancel any relevant subscriptions made in the mempool. +func (b *BtcdNotifier) handleRelevantTx(tx *btcutil.Tx, + mempool bool, height uint32) { + + // If this is a mempool spend, we'll ask the mempool notifier to hanlde + // it. + if mempool { + b.memNotifier.ProcessRelevantSpendTx(tx) + return + } + + // Otherwise this is a confirmed spend, and we'll ask the tx notifier + // to handle it. + err := b.txNotifier.ProcessRelevantSpendTx(tx, height) + if err != nil { + chainntnfs.Log.Errorf("Unable to process transaction %v: %v", + tx.Hash(), err) + + return + } + + // Once the tx is processed, we will ask the memNotifier to unsubscribe + // the input. + // + // NOTE(yy): we could build it into txNotifier.ProcessRelevantSpendTx, + // but choose to implement it here so we can easily decouple the two + // notifiers in the future. + b.memNotifier.UnsubsribeConfirmedSpentTx(tx) +} + // historicalConfDetails looks up whether a confirmation request (txid/output // script) has already been included in a block in the active chain and, if so, // returns details about said block. diff --git a/chainntnfs/mempool.go b/chainntnfs/mempool.go index d613e829a..59ce28f7b 100644 --- a/chainntnfs/mempool.go +++ b/chainntnfs/mempool.go @@ -140,6 +140,23 @@ func (m *MempoolNotifier) UnsubscribeEvent(sub *MempoolSpendEvent) { clients.Delete(sub.id) } +// UnsubsribeConfirmedSpentTx takes a transaction and removes the subscriptions +// identified using its inputs. +func (m *MempoolNotifier) UnsubsribeConfirmedSpentTx(tx *btcutil.Tx) { + Log.Tracef("Unsubscribe confirmed tx %s", tx.Hash()) + + // Get the spent inputs of interest. + spentInputs := m.findRelevantInputs(tx) + + // Unsubscribe the subscribers. + for outpoint := range spentInputs { + m.UnsubscribeInput(outpoint) + } + + Log.Tracef("Finished unsubscribing confirmed tx %s, found %d inputs", + tx.Hash(), len(spentInputs)) +} + // ProcessRelevantSpendTx takes a transaction and checks whether it spends any // of the subscribed inputs. If so, spend notifications are sent to the // relevant subscribers. diff --git a/chainntnfs/mempool_test.go b/chainntnfs/mempool_test.go index 6802ac427..c5ee25d81 100644 --- a/chainntnfs/mempool_test.go +++ b/chainntnfs/mempool_test.go @@ -342,3 +342,46 @@ func TestMempoolNotifySpentCancel(t *testing.T) { // Expected } } + +// TestMempoolUnsubscribeConfirmedSpentTx tests that the subscriptions for a +// confirmed tx are removed when calling the method. +func TestMempoolUnsubscribeConfirmedSpentTx(t *testing.T) { + t.Parallel() + + // Create a new mempool notifier instance. + notifier := NewMempoolNotifier() + + // Create two inputs and subscribe to them. + input1 := wire.OutPoint{Hash: [32]byte{1}, Index: 0} + input2 := wire.OutPoint{Hash: [32]byte{2}, Index: 0} + + // sub1 and sub2 are subscribed to the same input. + notifier.SubscribeInput(input1) + notifier.SubscribeInput(input1) + + // sub3 is subscribed to a different input. + sub3 := notifier.SubscribeInput(input2) + + // Create a transaction that spends input1. + msgTx := &wire.MsgTx{ + TxIn: []*wire.TxIn{ + {PreviousOutPoint: input1}, + }, + } + tx := btcutil.NewTx(msgTx) + + // Unsubscribe the relevant transaction. + notifier.UnsubsribeConfirmedSpentTx(tx) + + // Verify that the sub1 and sub2 are removed from the notifier. + _, loaded := notifier.subscribedInputs.Load(input1) + require.False(t, loaded) + + // Verify that the sub3 is not affected. + subs, loaded := notifier.subscribedInputs.Load(input2) + require.True(t, loaded) + + // sub3 should still be found. + _, loaded = subs.Load(sub3.id) + require.True(t, loaded) +}