mirror of
https://github.com/lightningnetwork/lnd.git
synced 2024-11-19 01:43:16 +01:00
invoices: refactor invoice subscriptions to avoid blocking
This commit fixes a potential blocking when notifying invoice updates. When a new subscription client is created followed by an immediate cancel, it's likely the client will be removed from the registry's map(noop) and then added to its map again. This subscription will then be kept in registry until lnd is restarted. Another more serious issue is when multiple subscriptions are made for the same invoice, when the above case happens, other subscriptions may never send invoice updates because a previous client has a stopped notification queue that blocks following notifications.
This commit is contained in:
parent
3230cc4822
commit
df2ecd6bc5
@ -108,15 +108,15 @@ type InvoiceRegistry struct {
|
||||
// cfg contains the registry's configuration parameters.
|
||||
cfg *RegistryConfig
|
||||
|
||||
notificationClients map[uint32]*InvoiceSubscription
|
||||
notificationClients map[uint32]*InvoiceSubscription
|
||||
|
||||
// TODO(yy): use map[lntypes.Hash]*SingleInvoiceSubscription for better
|
||||
// performance.
|
||||
singleNotificationClients map[uint32]*SingleInvoiceSubscription
|
||||
|
||||
newSubscriptions chan *InvoiceSubscription
|
||||
subscriptionCancels chan uint32
|
||||
|
||||
// invoiceEvents is a single channel over which both invoice updates and
|
||||
// new single invoice subscriptions are carried.
|
||||
invoiceEvents chan interface{}
|
||||
// invoiceEvents is a single channel over which invoice updates are
|
||||
// carried.
|
||||
invoiceEvents chan *invoiceEvent
|
||||
|
||||
// subscriptions is a map from a circuit key to a list of subscribers.
|
||||
// It is used for efficient notification of links.
|
||||
@ -147,9 +147,7 @@ func NewRegistry(cdb *channeldb.DB, expiryWatcher *InvoiceExpiryWatcher,
|
||||
cdb: cdb,
|
||||
notificationClients: make(map[uint32]*InvoiceSubscription),
|
||||
singleNotificationClients: make(map[uint32]*SingleInvoiceSubscription),
|
||||
newSubscriptions: make(chan *InvoiceSubscription),
|
||||
subscriptionCancels: make(chan uint32),
|
||||
invoiceEvents: make(chan interface{}, 100),
|
||||
invoiceEvents: make(chan *invoiceEvent, 100),
|
||||
hodlSubscriptions: make(map[channeldb.CircuitKey]map[chan<- interface{}]struct{}),
|
||||
hodlReverseSubscriptions: make(map[chan<- interface{}]map[channeldb.CircuitKey]struct{}),
|
||||
cfg: cfg,
|
||||
@ -301,61 +299,18 @@ func (i *InvoiceRegistry) invoiceEventLoop() {
|
||||
}
|
||||
|
||||
select {
|
||||
// A new invoice subscription for all invoices has just arrived!
|
||||
// We'll query for any backlog notifications, then add it to the
|
||||
// set of clients.
|
||||
case newClient := <-i.newSubscriptions:
|
||||
log.Infof("New invoice subscription "+
|
||||
"client: id=%v", newClient.id)
|
||||
|
||||
// With the backlog notifications delivered (if any),
|
||||
// we'll add this to our active subscriptions and
|
||||
// continue.
|
||||
i.notificationClients[newClient.id] = newClient
|
||||
|
||||
// A client no longer wishes to receive invoice notifications.
|
||||
// So we'll remove them from the set of active clients.
|
||||
case clientID := <-i.subscriptionCancels:
|
||||
log.Infof("Cancelling invoice subscription for "+
|
||||
"client=%v", clientID)
|
||||
|
||||
delete(i.notificationClients, clientID)
|
||||
delete(i.singleNotificationClients, clientID)
|
||||
|
||||
// An invoice event has come in. This can either be an update to
|
||||
// an invoice or a new single invoice subscriber. Both type of
|
||||
// events are passed in via the same channel, to make sure that
|
||||
// subscribers get a consistent view of the event sequence.
|
||||
// A sub-systems has just modified the invoice state, so we'll
|
||||
// dispatch notifications to all registered clients.
|
||||
case event := <-i.invoiceEvents:
|
||||
switch e := event.(type) {
|
||||
// For backwards compatibility, do not notify all
|
||||
// invoice subscribers of cancel and accept events.
|
||||
state := event.invoice.State
|
||||
if state != channeldb.ContractCanceled &&
|
||||
state != channeldb.ContractAccepted {
|
||||
|
||||
// A sub-systems has just modified the invoice state, so
|
||||
// we'll dispatch notifications to all registered
|
||||
// clients.
|
||||
case *invoiceEvent:
|
||||
// For backwards compatibility, do not notify
|
||||
// all invoice subscribers of cancel and accept
|
||||
// events.
|
||||
state := e.invoice.State
|
||||
if state != channeldb.ContractCanceled &&
|
||||
state != channeldb.ContractAccepted {
|
||||
|
||||
i.dispatchToClients(e)
|
||||
}
|
||||
i.dispatchToSingleClients(e)
|
||||
|
||||
// A new single invoice subscription has arrived. Add it
|
||||
// to the set of clients. It is important to do this in
|
||||
// sequence with any other invoice events, because an
|
||||
// initial invoice update has already been sent out to
|
||||
// the subscriber.
|
||||
case *SingleInvoiceSubscription:
|
||||
log.Infof("New single invoice subscription "+
|
||||
"client: id=%v, ref=%v", e.id,
|
||||
e.invoiceRef)
|
||||
|
||||
i.singleNotificationClients[e.id] = e
|
||||
i.dispatchToClients(event)
|
||||
}
|
||||
i.dispatchToSingleClients(event)
|
||||
|
||||
// A new htlc came in for auto-release.
|
||||
case event := <-i.htlcAutoReleaseChan:
|
||||
@ -386,27 +341,36 @@ func (i *InvoiceRegistry) invoiceEventLoop() {
|
||||
}
|
||||
}
|
||||
|
||||
// dispatchToSingleClients passes the supplied event to all notification clients
|
||||
// that subscribed to all the invoice this event applies to.
|
||||
// dispatchToSingleClients passes the supplied event to all notification
|
||||
// clients that subscribed to all the invoice this event applies to.
|
||||
func (i *InvoiceRegistry) dispatchToSingleClients(event *invoiceEvent) {
|
||||
// Dispatch to single invoice subscribers.
|
||||
for _, client := range i.singleNotificationClients {
|
||||
clients := i.copySingleClients()
|
||||
for _, client := range clients {
|
||||
payHash := client.invoiceRef.PayHash()
|
||||
|
||||
if payHash == nil || *payHash != event.hash {
|
||||
continue
|
||||
}
|
||||
|
||||
if atomic.LoadUint32(&client.canceled) == 1 {
|
||||
log.Errorf("Client(id=%v) has stopped, skipped "+
|
||||
"notification for event(pay_hash=%v)",
|
||||
client.id, payHash)
|
||||
continue
|
||||
}
|
||||
client.notify(event)
|
||||
}
|
||||
}
|
||||
|
||||
// dispatchToClients passes the supplied event to all notification clients that
|
||||
// subscribed to all invoices. Add and settle indices are used to make sure that
|
||||
// clients don't receive duplicate or unwanted events.
|
||||
// subscribed to all invoices. Add and settle indices are used to make sure
|
||||
// that clients don't receive duplicate or unwanted events.
|
||||
func (i *InvoiceRegistry) dispatchToClients(event *invoiceEvent) {
|
||||
invoice := event.invoice
|
||||
|
||||
for clientID, client := range i.notificationClients {
|
||||
clients := i.copyClients()
|
||||
for clientID, client := range clients {
|
||||
// Before we dispatch this event, we'll check
|
||||
// to ensure that this client hasn't already
|
||||
// received this notification in order to
|
||||
@ -568,6 +532,9 @@ func (i *InvoiceRegistry) deliverSingleBacklogEvents(
|
||||
return err
|
||||
}
|
||||
|
||||
log.Debugf("Client(id=%v) delivered single backlog event: payHash=%v",
|
||||
client.id, payHash)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -1406,10 +1373,6 @@ func (i *InvoiceRegistry) notifyClients(hash lntypes.Hash,
|
||||
type invoiceSubscriptionKit struct {
|
||||
id uint32 // nolint:structcheck
|
||||
|
||||
// subscriptionCancels is a chan mounted to InvoiceRegistry that
|
||||
// signals the current subscription has been cancled.
|
||||
subscriptionCancels chan uint32
|
||||
|
||||
// quit is a chan mouted to InvoiceRegistry that signals a shutdown.
|
||||
quit chan struct{}
|
||||
|
||||
@ -1471,11 +1434,6 @@ func (i *invoiceSubscriptionKit) Cancel() {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case i.subscriptionCancels <- i.id:
|
||||
case <-i.quit:
|
||||
}
|
||||
|
||||
i.ntfnQueue.Stop()
|
||||
close(i.cancelChan)
|
||||
|
||||
@ -1506,10 +1464,9 @@ func (i *InvoiceRegistry) SubscribeNotifications(
|
||||
addIndex: addIndex,
|
||||
settleIndex: settleIndex,
|
||||
invoiceSubscriptionKit: invoiceSubscriptionKit{
|
||||
subscriptionCancels: i.subscriptionCancels,
|
||||
quit: i.quit,
|
||||
ntfnQueue: queue.NewConcurrentQueue(20),
|
||||
cancelChan: make(chan struct{}),
|
||||
quit: i.quit,
|
||||
ntfnQueue: queue.NewConcurrentQueue(20),
|
||||
cancelChan: make(chan struct{}),
|
||||
},
|
||||
}
|
||||
client.ntfnQueue.Start()
|
||||
@ -1525,6 +1482,7 @@ func (i *InvoiceRegistry) SubscribeNotifications(
|
||||
i.wg.Add(1)
|
||||
go func() {
|
||||
defer i.wg.Done()
|
||||
defer i.deleteClient(client.id)
|
||||
|
||||
for {
|
||||
select {
|
||||
@ -1576,9 +1534,6 @@ func (i *InvoiceRegistry) SubscribeNotifications(
|
||||
}
|
||||
}()
|
||||
|
||||
i.Lock()
|
||||
defer i.Unlock()
|
||||
|
||||
// Query the database to see if based on the provided addIndex and
|
||||
// settledIndex we need to deliver any backlog notifications.
|
||||
err := i.deliverBacklogEvents(client)
|
||||
@ -1586,11 +1541,13 @@ func (i *InvoiceRegistry) SubscribeNotifications(
|
||||
return nil, err
|
||||
}
|
||||
|
||||
select {
|
||||
case i.newSubscriptions <- client:
|
||||
case <-i.quit:
|
||||
return nil, ErrShuttingDown
|
||||
}
|
||||
log.Infof("New invoice subscription client: id=%v", client.id)
|
||||
|
||||
i.Lock()
|
||||
// With the backlog notifications delivered (if any), we'll add this to
|
||||
// our active subscriptions.
|
||||
i.notificationClients[client.id] = client
|
||||
i.Unlock()
|
||||
|
||||
return client, nil
|
||||
}
|
||||
@ -1603,10 +1560,9 @@ func (i *InvoiceRegistry) SubscribeSingleInvoice(
|
||||
client := &SingleInvoiceSubscription{
|
||||
Updates: make(chan *channeldb.Invoice),
|
||||
invoiceSubscriptionKit: invoiceSubscriptionKit{
|
||||
subscriptionCancels: i.subscriptionCancels,
|
||||
quit: i.quit,
|
||||
ntfnQueue: queue.NewConcurrentQueue(20),
|
||||
cancelChan: make(chan struct{}),
|
||||
quit: i.quit,
|
||||
ntfnQueue: queue.NewConcurrentQueue(20),
|
||||
cancelChan: make(chan struct{}),
|
||||
},
|
||||
invoiceRef: channeldb.InvoiceRefByHash(hash),
|
||||
}
|
||||
@ -1623,6 +1579,7 @@ func (i *InvoiceRegistry) SubscribeSingleInvoice(
|
||||
i.wg.Add(1)
|
||||
go func() {
|
||||
defer i.wg.Done()
|
||||
defer i.deleteClient(client.id)
|
||||
|
||||
for {
|
||||
select {
|
||||
@ -1651,22 +1608,17 @@ func (i *InvoiceRegistry) SubscribeSingleInvoice(
|
||||
}
|
||||
}()
|
||||
|
||||
// Within the lock, we both query the invoice state and pass the client
|
||||
// subscription to the invoiceEvents channel. This is to make sure that
|
||||
// the client receives a consistent stream of events.
|
||||
i.Lock()
|
||||
defer i.Unlock()
|
||||
|
||||
err := i.deliverSingleBacklogEvents(client)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
select {
|
||||
case i.invoiceEvents <- client:
|
||||
case <-i.quit:
|
||||
return nil, ErrShuttingDown
|
||||
}
|
||||
log.Infof("New single invoice subscription client: id=%v, ref=%v",
|
||||
client.id, client.invoiceRef)
|
||||
|
||||
i.Lock()
|
||||
i.singleNotificationClients[client.id] = client
|
||||
i.Unlock()
|
||||
|
||||
return client, nil
|
||||
}
|
||||
@ -1731,3 +1683,40 @@ func (i *InvoiceRegistry) HodlUnsubscribeAll(subscriber chan<- interface{}) {
|
||||
|
||||
delete(i.hodlReverseSubscriptions, subscriber)
|
||||
}
|
||||
|
||||
// copySingleClients copies i.SingleInvoiceSubscription inside a lock. This is
|
||||
// useful when we need to iterate the map to send notifications.
|
||||
func (i *InvoiceRegistry) copySingleClients() map[uint32]*SingleInvoiceSubscription {
|
||||
i.RLock()
|
||||
defer i.RUnlock()
|
||||
|
||||
clients := make(map[uint32]*SingleInvoiceSubscription)
|
||||
for k, v := range i.singleNotificationClients {
|
||||
clients[k] = v
|
||||
}
|
||||
return clients
|
||||
}
|
||||
|
||||
// copyClients copies i.notificationClients inside a lock. This is useful when
|
||||
// we need to iterate the map to send notifications.
|
||||
func (i *InvoiceRegistry) copyClients() map[uint32]*InvoiceSubscription {
|
||||
i.RLock()
|
||||
defer i.RUnlock()
|
||||
|
||||
clients := make(map[uint32]*InvoiceSubscription)
|
||||
for k, v := range i.notificationClients {
|
||||
clients[k] = v
|
||||
}
|
||||
return clients
|
||||
}
|
||||
|
||||
// deleteClient removes a client by its ID inside a lock. Noop if the client is
|
||||
// not found.
|
||||
func (i *InvoiceRegistry) deleteClient(clientID uint32) {
|
||||
i.Lock()
|
||||
defer i.Unlock()
|
||||
|
||||
log.Infof("Cancelling invoice subscription for client=%v", clientID)
|
||||
delete(i.notificationClients, clientID)
|
||||
delete(i.singleNotificationClients, clientID)
|
||||
}
|
||||
|
@ -244,6 +244,8 @@ func (s *Server) SubscribeSingleInvoice(req *SubscribeSingleInvoiceRequest,
|
||||
}
|
||||
defer invoiceClient.Cancel()
|
||||
|
||||
log.Debugf("Created new single invoice(pay_hash=%v) subscription", hash)
|
||||
|
||||
for {
|
||||
select {
|
||||
case newInvoice := <-invoiceClient.Updates:
|
||||
@ -265,7 +267,9 @@ func (s *Server) SubscribeSingleInvoice(req *SubscribeSingleInvoiceRequest,
|
||||
}
|
||||
|
||||
case <-updateStream.Context().Done():
|
||||
return updateStream.Context().Err()
|
||||
return fmt.Errorf("subscription for "+
|
||||
"invoice(pay_hash=%v): %w", hash,
|
||||
updateStream.Context().Err())
|
||||
|
||||
case <-s.quit:
|
||||
return nil
|
||||
|
Loading…
Reference in New Issue
Block a user