From 318efa4c335ceda1efc6dd3e68fa3b43e7d08e06 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Thu, 6 Apr 2023 03:39:08 +0800 Subject: [PATCH] chainntnfs: allow multiple subscriber to the same input This commit changes the `subscribedInputs` to store a map of subscribers so multiple subscribers are allowed to receive events from the same outpoint. --- chainntnfs/mempool.go | 91 ++++++++++++++++++++++++++++++++----------- 1 file changed, 69 insertions(+), 22 deletions(-) diff --git a/chainntnfs/mempool.go b/chainntnfs/mempool.go index 9ef209bc0..132dce744 100644 --- a/chainntnfs/mempool.go +++ b/chainntnfs/mempool.go @@ -2,6 +2,7 @@ package chainntnfs import ( "sync" + "sync/atomic" "github.com/btcsuite/btcd/btcutil" "github.com/btcsuite/btcd/wire" @@ -19,7 +20,11 @@ type MempoolNotifier struct { // subscribedInputs stores the inputs that we want to watch their // spending event for. - subscribedInputs lnutils.SyncMap[wire.OutPoint, *MempoolSpendEvent] + subscribedInputs *lnutils.SyncMap[wire.OutPoint, + *lnutils.SyncMap[uint64, *MempoolSpendEvent]] + + // sCounter is used to generate unique subscription IDs. + sCounter atomic.Uint64 // quit is closed when the notifier is torn down. quit chan struct{} @@ -64,9 +69,10 @@ func (m *MempoolSpendEvent) Cancel() { // notifier. func NewMempoolNotifier() *MempoolNotifier { return &MempoolNotifier{ - subscribedInputs: lnutils.SyncMap[ - wire.OutPoint, *MempoolSpendEvent, - ]{}, + subscribedInputs: &lnutils.SyncMap[ + wire.OutPoint, *lnutils.SyncMap[ + uint64, *MempoolSpendEvent, + ]]{}, quit: make(chan struct{}), } } @@ -77,9 +83,20 @@ func (m *MempoolNotifier) SubscribeInput( outpoint wire.OutPoint) *MempoolSpendEvent { Log.Debugf("Subscribing mempool event for input %s", outpoint) + + // Get the current subscribers for this input or create a new one. + clients := &lnutils.SyncMap[uint64, *MempoolSpendEvent]{} + clients, _ = m.subscribedInputs.LoadOrStore(outpoint, clients) + + // Create a new subscription. sub := newMempoolSpendEvent() - m.subscribedInputs.Store(outpoint, sub) + // Add the subscriber with a unique id. + subscriptionID := m.sCounter.Add(1) + clients.Store(subscriptionID, sub) + + // Update the subscribed inputs. + m.subscribedInputs.Store(outpoint, clients) return sub } @@ -148,24 +165,16 @@ func (m *MempoolNotifier) findRelevantInputs(tx *btcutil.Tx) inputsWithTx { // notifySpent iterates all the spentInputs and notifies the subscribers about // the spent details. func (m *MempoolNotifier) notifySpent(spentInputs inputsWithTx) { - // notify is a helper closure that constructs a spend detail and sends - // it to the subscribers. + // notifySingle sends a notification to a single subscriber about the + // spending event. // // NOTE: must be used inside a goroutine. - notify := func(detail *SpendDetail, op wire.OutPoint) { + notifySingle := func(id uint64, sub *MempoolSpendEvent, + op wire.OutPoint, detail *SpendDetail) { + defer m.wg.Done() - txid := detail.SpendingTx.TxHash() - Log.Debugf("Notifying the spend of %s in tx %s", op, txid) - - // Load the subscriber. - sub, loaded := m.subscribedInputs.Load(op) - if !loaded { - // TODO(yy): this is possible if the subscriber cancels - // the subscription? - Log.Errorf("Sub not found for %s", op) - return - } + Log.Debugf("Notifying client %d", id) // Send the spend details to the subscriber. select { @@ -176,8 +185,18 @@ func (m *MempoolNotifier) notifySpent(spentInputs inputsWithTx) { Log.Debugf("Subscription canceled, skipped notifying "+ "mempool spent for input %s", op) - // Delete the subscriber. - m.subscribedInputs.Delete(op) + // Find all the subscribers for this outpoint. + clients, loaded := m.subscribedInputs.Load(op) + if !loaded { + Log.Errorf("Client %d not found", id) + return + } + + // Delete the specific subscriber. + clients.Delete(id) + + // Update the subscribers map. + m.subscribedInputs.Store(op, clients) case <-m.quit: Log.Debugf("Mempool notifier quit, skipped notifying "+ @@ -185,9 +204,37 @@ func (m *MempoolNotifier) notifySpent(spentInputs inputsWithTx) { } } + // notifyAll is a helper closure that constructs a spend detail and + // sends it to all the subscribers of that particular input. + // + // NOTE: must be used inside a goroutine. + notifyAll := func(detail *SpendDetail, op wire.OutPoint) { + defer m.wg.Done() + + txid := detail.SpendingTx.TxHash() + Log.Debugf("Notifying the spend of %s in tx %s", op, txid) + + // Load the subscriber. + subs, loaded := m.subscribedInputs.Load(op) + if !loaded { + Log.Errorf("Sub not found for %s", op) + return + } + + // Iterate all the subscribers for this input and notify them. + subs.ForEach(func(id uint64, sub *MempoolSpendEvent) error { + m.wg.Add(1) + go notifySingle(id, sub, op, detail) + + return nil + }) + } + // Iterate the spent inputs to notify the subscribers concurrently. for op, tx := range spentInputs { + op, tx := op, tx + m.wg.Add(1) - go notify(tx, op) + go notifyAll(tx, op) } }