chainntnfs: allow multiple subscriber to the same input

This commit changes the `subscribedInputs` to store a map of subscribers
so multiple subscribers are allowed to receive events from the same
outpoint.
This commit is contained in:
yyforyongyu 2023-04-06 03:39:08 +08:00
parent 81ee3b9fbe
commit 5743dd9601
No known key found for this signature in database
GPG Key ID: 9BCD95C4FF296868

View File

@ -2,6 +2,7 @@ package chainntnfs
import (
"sync"
"sync/atomic"
"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/wire"
@ -19,7 +20,11 @@ type MempoolNotifier struct {
// subscribedInputs stores the inputs that we want to watch their
// spending event for.
subscribedInputs lnutils.SyncMap[wire.OutPoint, *MempoolSpendEvent]
subscribedInputs *lnutils.SyncMap[wire.OutPoint,
*lnutils.SyncMap[uint64, *MempoolSpendEvent]]
// sCounter is used to generate unique subscription IDs.
sCounter atomic.Uint64
// quit is closed when the notifier is torn down.
quit chan struct{}
@ -64,9 +69,10 @@ func (m *MempoolSpendEvent) Cancel() {
// notifier.
func NewMempoolNotifier() *MempoolNotifier {
return &MempoolNotifier{
subscribedInputs: lnutils.SyncMap[
wire.OutPoint, *MempoolSpendEvent,
]{},
subscribedInputs: &lnutils.SyncMap[
wire.OutPoint, *lnutils.SyncMap[
uint64, *MempoolSpendEvent,
]]{},
quit: make(chan struct{}),
}
}
@ -77,9 +83,20 @@ func (m *MempoolNotifier) SubscribeInput(
outpoint wire.OutPoint) *MempoolSpendEvent {
Log.Debugf("Subscribing mempool event for input %s", outpoint)
// Get the current subscribers for this input or create a new one.
clients := &lnutils.SyncMap[uint64, *MempoolSpendEvent]{}
clients, _ = m.subscribedInputs.LoadOrStore(outpoint, clients)
// Create a new subscription.
sub := newMempoolSpendEvent()
m.subscribedInputs.Store(outpoint, sub)
// Add the subscriber with a unique id.
subscriptionID := m.sCounter.Add(1)
clients.Store(subscriptionID, sub)
// Update the subscribed inputs.
m.subscribedInputs.Store(outpoint, clients)
return sub
}
@ -148,24 +165,16 @@ func (m *MempoolNotifier) findRelevantInputs(tx *btcutil.Tx) inputsWithTx {
// notifySpent iterates all the spentInputs and notifies the subscribers about
// the spent details.
func (m *MempoolNotifier) notifySpent(spentInputs inputsWithTx) {
// notify is a helper closure that constructs a spend detail and sends
// it to the subscribers.
// notifySingle sends a notification to a single subscriber about the
// spending event.
//
// NOTE: must be used inside a goroutine.
notify := func(detail *SpendDetail, op wire.OutPoint) {
notifySingle := func(id uint64, sub *MempoolSpendEvent,
op wire.OutPoint, detail *SpendDetail) {
defer m.wg.Done()
txid := detail.SpendingTx.TxHash()
Log.Debugf("Notifying the spend of %s in tx %s", op, txid)
// Load the subscriber.
sub, loaded := m.subscribedInputs.Load(op)
if !loaded {
// TODO(yy): this is possible if the subscriber cancels
// the subscription?
Log.Errorf("Sub not found for %s", op)
return
}
Log.Debugf("Notifying client %d", id)
// Send the spend details to the subscriber.
select {
@ -176,8 +185,18 @@ func (m *MempoolNotifier) notifySpent(spentInputs inputsWithTx) {
Log.Debugf("Subscription canceled, skipped notifying "+
"mempool spent for input %s", op)
// Delete the subscriber.
m.subscribedInputs.Delete(op)
// Find all the subscribers for this outpoint.
clients, loaded := m.subscribedInputs.Load(op)
if !loaded {
Log.Errorf("Client %d not found", id)
return
}
// Delete the specific subscriber.
clients.Delete(id)
// Update the subscribers map.
m.subscribedInputs.Store(op, clients)
case <-m.quit:
Log.Debugf("Mempool notifier quit, skipped notifying "+
@ -185,9 +204,37 @@ func (m *MempoolNotifier) notifySpent(spentInputs inputsWithTx) {
}
}
// notifyAll is a helper closure that constructs a spend detail and
// sends it to all the subscribers of that particular input.
//
// NOTE: must be used inside a goroutine.
notifyAll := func(detail *SpendDetail, op wire.OutPoint) {
defer m.wg.Done()
txid := detail.SpendingTx.TxHash()
Log.Debugf("Notifying the spend of %s in tx %s", op, txid)
// Load the subscriber.
subs, loaded := m.subscribedInputs.Load(op)
if !loaded {
Log.Errorf("Sub not found for %s", op)
return
}
// Iterate all the subscribers for this input and notify them.
subs.ForEach(func(id uint64, sub *MempoolSpendEvent) error {
m.wg.Add(1)
go notifySingle(id, sub, op, detail)
return nil
})
}
// Iterate the spent inputs to notify the subscribers concurrently.
for op, tx := range spentInputs {
op, tx := op, tx
m.wg.Add(1)
go notify(tx, op)
go notifyAll(tx, op)
}
}