chainntnfs: remove subscriptions when the relevant tx is confirmed

This commit removes the subscribed inputs from mempool notifier when the
relevant transaction is confirmed.
This commit is contained in:
yyforyongyu 2023-04-07 04:35:27 +08:00 committed by Olaoluwa Osuntokun
parent 1fa269425e
commit f9b1250ecf
4 changed files with 146 additions and 28 deletions

View File

@ -455,23 +455,19 @@ out:
case chain.RelevantTx:
tx := btcutil.NewTx(&item.TxRecord.MsgTx)
// If this is a mempool spend, we'll ask the
// mempool notifier to hanlde it.
// Init values.
isMempool := false
height := uint32(0)
// Unwrap values.
if item.Block == nil {
b.memNotifier.ProcessRelevantSpendTx(tx)
continue
isMempool = true
} else {
height = uint32(item.Block.Height)
}
// Otherwise this is a confirmed spend, and
// we'll ask the tx notifier to handle it.
err := b.txNotifier.ProcessRelevantSpendTx(
tx, uint32(item.Block.Height),
)
if err != nil {
chainntnfs.Log.Errorf("Unable to "+
"process transaction %v: %v",
tx.Hash(), err)
}
// Handle the transaction.
b.handleRelevantTx(tx, isMempool, height)
}
case <-b.quit:
@ -480,6 +476,39 @@ out:
}
}
// handleRelevantTx handles a new transaction that has been seen either in a
// block or in the mempool. If in mempool, it will ask the mempool notifier to
// handle it. If in a block, it will ask the txNotifier to handle it, and
// cancel any relevant subscriptions made in the mempool.
func (b *BitcoindNotifier) handleRelevantTx(tx *btcutil.Tx,
mempool bool, height uint32) {
// If this is a mempool spend, we'll ask the mempool notifier to hanlde
// it.
if mempool {
b.memNotifier.ProcessRelevantSpendTx(tx)
return
}
// Otherwise this is a confirmed spend, and we'll ask the tx notifier
// to handle it.
err := b.txNotifier.ProcessRelevantSpendTx(tx, height)
if err != nil {
chainntnfs.Log.Errorf("Unable to process transaction %v: %v",
tx.Hash(), err)
return
}
// Once the tx is processed, we will ask the memNotifier to unsubscribe
// the input.
//
// NOTE(yy): we could build it into txNotifier.ProcessRelevantSpendTx,
// but choose to implement it here so we can easily decouple the two
// notifiers in the future.
b.memNotifier.UnsubsribeConfirmedSpentTx(tx)
}
// historicalConfDetails looks up whether a confirmation request (txid/output
// script) has already been included in a block in the active chain and, if so,
// returns details about said block.

View File

@ -505,23 +505,19 @@ out:
newSpend := item.(*txUpdate)
tx := newSpend.tx
// If this is a mempool spend, we'll ask the mempool
// notifier to hanlde it.
// Init values.
isMempool := false
height := uint32(0)
// Unwrap values.
if newSpend.details == nil {
b.memNotifier.ProcessRelevantSpendTx(tx)
continue
isMempool = true
} else {
height = uint32(newSpend.details.Height)
}
// Otherwise this is a confirmed spend, and we'll ask
// the tx notifier to handle it.
err := b.txNotifier.ProcessRelevantSpendTx(
tx, uint32(newSpend.details.Height),
)
if err != nil {
chainntnfs.Log.Errorf("Unable to process "+
"transaction %v: %v",
newSpend.tx.Hash(), err)
}
// Handle the transaction.
b.handleRelevantTx(tx, isMempool, height)
case <-b.quit:
break out
@ -529,6 +525,39 @@ out:
}
}
// handleRelevantTx handles a new transaction that has been seen either in a
// block or in the mempool. If in mempool, it will ask the mempool notifier to
// handle it. If in a block, it will ask the txNotifier to handle it, and
// cancel any relevant subscriptions made in the mempool.
func (b *BtcdNotifier) handleRelevantTx(tx *btcutil.Tx,
mempool bool, height uint32) {
// If this is a mempool spend, we'll ask the mempool notifier to hanlde
// it.
if mempool {
b.memNotifier.ProcessRelevantSpendTx(tx)
return
}
// Otherwise this is a confirmed spend, and we'll ask the tx notifier
// to handle it.
err := b.txNotifier.ProcessRelevantSpendTx(tx, height)
if err != nil {
chainntnfs.Log.Errorf("Unable to process transaction %v: %v",
tx.Hash(), err)
return
}
// Once the tx is processed, we will ask the memNotifier to unsubscribe
// the input.
//
// NOTE(yy): we could build it into txNotifier.ProcessRelevantSpendTx,
// but choose to implement it here so we can easily decouple the two
// notifiers in the future.
b.memNotifier.UnsubsribeConfirmedSpentTx(tx)
}
// historicalConfDetails looks up whether a confirmation request (txid/output
// script) has already been included in a block in the active chain and, if so,
// returns details about said block.

View File

@ -140,6 +140,23 @@ func (m *MempoolNotifier) UnsubscribeEvent(sub *MempoolSpendEvent) {
clients.Delete(sub.id)
}
// UnsubsribeConfirmedSpentTx takes a transaction and removes the subscriptions
// identified using its inputs.
func (m *MempoolNotifier) UnsubsribeConfirmedSpentTx(tx *btcutil.Tx) {
Log.Tracef("Unsubscribe confirmed tx %s", tx.Hash())
// Get the spent inputs of interest.
spentInputs := m.findRelevantInputs(tx)
// Unsubscribe the subscribers.
for outpoint := range spentInputs {
m.UnsubscribeInput(outpoint)
}
Log.Tracef("Finished unsubscribing confirmed tx %s, found %d inputs",
tx.Hash(), len(spentInputs))
}
// ProcessRelevantSpendTx takes a transaction and checks whether it spends any
// of the subscribed inputs. If so, spend notifications are sent to the
// relevant subscribers.

View File

@ -342,3 +342,46 @@ func TestMempoolNotifySpentCancel(t *testing.T) {
// Expected
}
}
// TestMempoolUnsubscribeConfirmedSpentTx tests that the subscriptions for a
// confirmed tx are removed when calling the method.
func TestMempoolUnsubscribeConfirmedSpentTx(t *testing.T) {
t.Parallel()
// Create a new mempool notifier instance.
notifier := NewMempoolNotifier()
// Create two inputs and subscribe to them.
input1 := wire.OutPoint{Hash: [32]byte{1}, Index: 0}
input2 := wire.OutPoint{Hash: [32]byte{2}, Index: 0}
// sub1 and sub2 are subscribed to the same input.
notifier.SubscribeInput(input1)
notifier.SubscribeInput(input1)
// sub3 is subscribed to a different input.
sub3 := notifier.SubscribeInput(input2)
// Create a transaction that spends input1.
msgTx := &wire.MsgTx{
TxIn: []*wire.TxIn{
{PreviousOutPoint: input1},
},
}
tx := btcutil.NewTx(msgTx)
// Unsubscribe the relevant transaction.
notifier.UnsubsribeConfirmedSpentTx(tx)
// Verify that the sub1 and sub2 are removed from the notifier.
_, loaded := notifier.subscribedInputs.Load(input1)
require.False(t, loaded)
// Verify that the sub3 is not affected.
subs, loaded := notifier.subscribedInputs.Load(input2)
require.True(t, loaded)
// sub3 should still be found.
_, loaded = subs.Load(sub3.id)
require.True(t, loaded)
}