diff --git a/invoices/invoiceregistry.go b/invoices/invoiceregistry.go index a2ce60550..a72555fcf 100644 --- a/invoices/invoiceregistry.go +++ b/invoices/invoiceregistry.go @@ -108,15 +108,15 @@ type InvoiceRegistry struct { // cfg contains the registry's configuration parameters. cfg *RegistryConfig - notificationClients map[uint32]*InvoiceSubscription + notificationClients map[uint32]*InvoiceSubscription + + // TODO(yy): use map[lntypes.Hash]*SingleInvoiceSubscription for better + // performance. singleNotificationClients map[uint32]*SingleInvoiceSubscription - newSubscriptions chan *InvoiceSubscription - subscriptionCancels chan uint32 - - // invoiceEvents is a single channel over which both invoice updates and - // new single invoice subscriptions are carried. - invoiceEvents chan interface{} + // invoiceEvents is a single channel over which invoice updates are + // carried. + invoiceEvents chan *invoiceEvent // subscriptions is a map from a circuit key to a list of subscribers. // It is used for efficient notification of links. @@ -147,9 +147,7 @@ func NewRegistry(cdb *channeldb.DB, expiryWatcher *InvoiceExpiryWatcher, cdb: cdb, notificationClients: make(map[uint32]*InvoiceSubscription), singleNotificationClients: make(map[uint32]*SingleInvoiceSubscription), - newSubscriptions: make(chan *InvoiceSubscription), - subscriptionCancels: make(chan uint32), - invoiceEvents: make(chan interface{}, 100), + invoiceEvents: make(chan *invoiceEvent, 100), hodlSubscriptions: make(map[channeldb.CircuitKey]map[chan<- interface{}]struct{}), hodlReverseSubscriptions: make(map[chan<- interface{}]map[channeldb.CircuitKey]struct{}), cfg: cfg, @@ -301,61 +299,18 @@ func (i *InvoiceRegistry) invoiceEventLoop() { } select { - // A new invoice subscription for all invoices has just arrived! - // We'll query for any backlog notifications, then add it to the - // set of clients. - case newClient := <-i.newSubscriptions: - log.Infof("New invoice subscription "+ - "client: id=%v", newClient.id) - - // With the backlog notifications delivered (if any), - // we'll add this to our active subscriptions and - // continue. - i.notificationClients[newClient.id] = newClient - - // A client no longer wishes to receive invoice notifications. - // So we'll remove them from the set of active clients. - case clientID := <-i.subscriptionCancels: - log.Infof("Cancelling invoice subscription for "+ - "client=%v", clientID) - - delete(i.notificationClients, clientID) - delete(i.singleNotificationClients, clientID) - - // An invoice event has come in. This can either be an update to - // an invoice or a new single invoice subscriber. Both type of - // events are passed in via the same channel, to make sure that - // subscribers get a consistent view of the event sequence. + // A sub-systems has just modified the invoice state, so we'll + // dispatch notifications to all registered clients. case event := <-i.invoiceEvents: - switch e := event.(type) { + // For backwards compatibility, do not notify all + // invoice subscribers of cancel and accept events. + state := event.invoice.State + if state != channeldb.ContractCanceled && + state != channeldb.ContractAccepted { - // A sub-systems has just modified the invoice state, so - // we'll dispatch notifications to all registered - // clients. - case *invoiceEvent: - // For backwards compatibility, do not notify - // all invoice subscribers of cancel and accept - // events. - state := e.invoice.State - if state != channeldb.ContractCanceled && - state != channeldb.ContractAccepted { - - i.dispatchToClients(e) - } - i.dispatchToSingleClients(e) - - // A new single invoice subscription has arrived. Add it - // to the set of clients. It is important to do this in - // sequence with any other invoice events, because an - // initial invoice update has already been sent out to - // the subscriber. - case *SingleInvoiceSubscription: - log.Infof("New single invoice subscription "+ - "client: id=%v, ref=%v", e.id, - e.invoiceRef) - - i.singleNotificationClients[e.id] = e + i.dispatchToClients(event) } + i.dispatchToSingleClients(event) // A new htlc came in for auto-release. case event := <-i.htlcAutoReleaseChan: @@ -386,27 +341,36 @@ func (i *InvoiceRegistry) invoiceEventLoop() { } } -// dispatchToSingleClients passes the supplied event to all notification clients -// that subscribed to all the invoice this event applies to. +// dispatchToSingleClients passes the supplied event to all notification +// clients that subscribed to all the invoice this event applies to. func (i *InvoiceRegistry) dispatchToSingleClients(event *invoiceEvent) { // Dispatch to single invoice subscribers. - for _, client := range i.singleNotificationClients { + clients := i.copySingleClients() + for _, client := range clients { payHash := client.invoiceRef.PayHash() + if payHash == nil || *payHash != event.hash { continue } + if atomic.LoadUint32(&client.canceled) == 1 { + log.Errorf("Client(id=%v) has stopped, skipped "+ + "notification for event(pay_hash=%v)", + client.id, payHash) + continue + } client.notify(event) } } // dispatchToClients passes the supplied event to all notification clients that -// subscribed to all invoices. Add and settle indices are used to make sure that -// clients don't receive duplicate or unwanted events. +// subscribed to all invoices. Add and settle indices are used to make sure +// that clients don't receive duplicate or unwanted events. func (i *InvoiceRegistry) dispatchToClients(event *invoiceEvent) { invoice := event.invoice - for clientID, client := range i.notificationClients { + clients := i.copyClients() + for clientID, client := range clients { // Before we dispatch this event, we'll check // to ensure that this client hasn't already // received this notification in order to @@ -568,6 +532,9 @@ func (i *InvoiceRegistry) deliverSingleBacklogEvents( return err } + log.Debugf("Client(id=%v) delivered single backlog event: payHash=%v", + client.id, payHash) + return nil } @@ -1406,10 +1373,6 @@ func (i *InvoiceRegistry) notifyClients(hash lntypes.Hash, type invoiceSubscriptionKit struct { id uint32 // nolint:structcheck - // subscriptionCancels is a chan mounted to InvoiceRegistry that - // signals the current subscription has been cancled. - subscriptionCancels chan uint32 - // quit is a chan mouted to InvoiceRegistry that signals a shutdown. quit chan struct{} @@ -1471,11 +1434,6 @@ func (i *invoiceSubscriptionKit) Cancel() { return } - select { - case i.subscriptionCancels <- i.id: - case <-i.quit: - } - i.ntfnQueue.Stop() close(i.cancelChan) @@ -1506,10 +1464,9 @@ func (i *InvoiceRegistry) SubscribeNotifications( addIndex: addIndex, settleIndex: settleIndex, invoiceSubscriptionKit: invoiceSubscriptionKit{ - subscriptionCancels: i.subscriptionCancels, - quit: i.quit, - ntfnQueue: queue.NewConcurrentQueue(20), - cancelChan: make(chan struct{}), + quit: i.quit, + ntfnQueue: queue.NewConcurrentQueue(20), + cancelChan: make(chan struct{}), }, } client.ntfnQueue.Start() @@ -1525,6 +1482,7 @@ func (i *InvoiceRegistry) SubscribeNotifications( i.wg.Add(1) go func() { defer i.wg.Done() + defer i.deleteClient(client.id) for { select { @@ -1576,9 +1534,6 @@ func (i *InvoiceRegistry) SubscribeNotifications( } }() - i.Lock() - defer i.Unlock() - // Query the database to see if based on the provided addIndex and // settledIndex we need to deliver any backlog notifications. err := i.deliverBacklogEvents(client) @@ -1586,11 +1541,13 @@ func (i *InvoiceRegistry) SubscribeNotifications( return nil, err } - select { - case i.newSubscriptions <- client: - case <-i.quit: - return nil, ErrShuttingDown - } + log.Infof("New invoice subscription client: id=%v", client.id) + + i.Lock() + // With the backlog notifications delivered (if any), we'll add this to + // our active subscriptions. + i.notificationClients[client.id] = client + i.Unlock() return client, nil } @@ -1603,10 +1560,9 @@ func (i *InvoiceRegistry) SubscribeSingleInvoice( client := &SingleInvoiceSubscription{ Updates: make(chan *channeldb.Invoice), invoiceSubscriptionKit: invoiceSubscriptionKit{ - subscriptionCancels: i.subscriptionCancels, - quit: i.quit, - ntfnQueue: queue.NewConcurrentQueue(20), - cancelChan: make(chan struct{}), + quit: i.quit, + ntfnQueue: queue.NewConcurrentQueue(20), + cancelChan: make(chan struct{}), }, invoiceRef: channeldb.InvoiceRefByHash(hash), } @@ -1623,6 +1579,7 @@ func (i *InvoiceRegistry) SubscribeSingleInvoice( i.wg.Add(1) go func() { defer i.wg.Done() + defer i.deleteClient(client.id) for { select { @@ -1651,22 +1608,17 @@ func (i *InvoiceRegistry) SubscribeSingleInvoice( } }() - // Within the lock, we both query the invoice state and pass the client - // subscription to the invoiceEvents channel. This is to make sure that - // the client receives a consistent stream of events. - i.Lock() - defer i.Unlock() - err := i.deliverSingleBacklogEvents(client) if err != nil { return nil, err } - select { - case i.invoiceEvents <- client: - case <-i.quit: - return nil, ErrShuttingDown - } + log.Infof("New single invoice subscription client: id=%v, ref=%v", + client.id, client.invoiceRef) + + i.Lock() + i.singleNotificationClients[client.id] = client + i.Unlock() return client, nil } @@ -1731,3 +1683,40 @@ func (i *InvoiceRegistry) HodlUnsubscribeAll(subscriber chan<- interface{}) { delete(i.hodlReverseSubscriptions, subscriber) } + +// copySingleClients copies i.SingleInvoiceSubscription inside a lock. This is +// useful when we need to iterate the map to send notifications. +func (i *InvoiceRegistry) copySingleClients() map[uint32]*SingleInvoiceSubscription { + i.RLock() + defer i.RUnlock() + + clients := make(map[uint32]*SingleInvoiceSubscription) + for k, v := range i.singleNotificationClients { + clients[k] = v + } + return clients +} + +// copyClients copies i.notificationClients inside a lock. This is useful when +// we need to iterate the map to send notifications. +func (i *InvoiceRegistry) copyClients() map[uint32]*InvoiceSubscription { + i.RLock() + defer i.RUnlock() + + clients := make(map[uint32]*InvoiceSubscription) + for k, v := range i.notificationClients { + clients[k] = v + } + return clients +} + +// deleteClient removes a client by its ID inside a lock. Noop if the client is +// not found. +func (i *InvoiceRegistry) deleteClient(clientID uint32) { + i.Lock() + defer i.Unlock() + + log.Infof("Cancelling invoice subscription for client=%v", clientID) + delete(i.notificationClients, clientID) + delete(i.singleNotificationClients, clientID) +} diff --git a/lnrpc/invoicesrpc/invoices_server.go b/lnrpc/invoicesrpc/invoices_server.go index 102abb0c9..8081e011b 100644 --- a/lnrpc/invoicesrpc/invoices_server.go +++ b/lnrpc/invoicesrpc/invoices_server.go @@ -244,6 +244,8 @@ func (s *Server) SubscribeSingleInvoice(req *SubscribeSingleInvoiceRequest, } defer invoiceClient.Cancel() + log.Debugf("Created new single invoice(pay_hash=%v) subscription", hash) + for { select { case newInvoice := <-invoiceClient.Updates: @@ -265,7 +267,9 @@ func (s *Server) SubscribeSingleInvoice(req *SubscribeSingleInvoiceRequest, } case <-updateStream.Context().Done(): - return updateStream.Context().Err() + return fmt.Errorf("subscription for "+ + "invoice(pay_hash=%v): %w", hash, + updateStream.Context().Err()) case <-s.quit: return nil