rpcserver+lnrpc: make Subscribe RPCs context aware

This commit makes all the Subscribe RCP's context aware so that they
stop executing when the request context is cancelled.
This commit is contained in:
whythat 2020-07-18 23:41:11 +03:00 committed by Elle Mouton
parent 8f940a5ea3
commit 1adeb41a9d
3 changed files with 29 additions and 0 deletions

View File

@ -253,6 +253,9 @@ func (s *Server) SubscribeSingleInvoice(req *SubscribeSingleInvoiceRequest,
return nil
}
case <-updateStream.Context().Done():
return updateStream.Context().Err()
case <-s.quit:
return nil
}

View File

@ -269,3 +269,7 @@
<time> [ERR] RPCS: WS: error writing message: websocket: close sent
<time> [ERR] RPCS: [/routerrpc.Router/XImportMissionControl]: pair: <hex> -> <hex>: invalid failure: msat: <amt> and sat: 0.0000002 BTC values not equal
<time> [ERR] BTCN: utxo scan failed: neutrino shutting down
<time> [ERR] RPCS: [/lnrpc.Lightning/SubscribeChannelGraph]: context canceled
<time> [ERR] RPCS: [/lnrpc.Lightning/SubscribeInvoices]: context canceled
<time> [ERR] RPCS: [/lnrpc.Lightning/SubscribeChannelGraph]: context deadline exceeded
<time> [ERR] RPCS: [/invoicesrpc.Invoices/SubscribeSingleInvoice]: context canceled

View File

@ -2756,6 +2756,10 @@ func (r *rpcServer) SubscribePeerEvents(req *lnrpc.PeerEventSubscription,
if err := eventStream.Send(event); err != nil {
return err
}
case <-eventStream.Context().Done():
return eventStream.Context().Err()
case <-r.quit:
return nil
}
@ -4029,6 +4033,10 @@ func (r *rpcServer) SubscribeChannelEvents(req *lnrpc.ChannelEventSubscription,
if err := updateStream.Send(update); err != nil {
return err
}
case <-updateStream.Context().Done():
return updateStream.Context().Err()
case <-r.quit:
return nil
}
@ -4946,6 +4954,9 @@ func (r *rpcServer) SubscribeInvoices(req *lnrpc.InvoiceSubscription,
return err
}
case <-updateStream.Context().Done():
return updateStream.Context().Err()
case <-r.quit:
return nil
}
@ -5003,6 +5014,9 @@ func (r *rpcServer) SubscribeTransactions(req *lnrpc.GetTransactionsRequest,
return err
}
case <-updateStream.Context().Done():
return updateStream.Context().Err()
case <-r.quit:
return nil
}
@ -5524,6 +5538,11 @@ func (r *rpcServer) SubscribeChannelGraph(req *lnrpc.GraphTopologySubscription,
return err
}
// The context was cancelled so we report a cancellation error
// and exit immediately.
case <-updateStream.Context().Done():
return updateStream.Context().Err()
// The server is quitting, so we'll exit immediately. Returning
// nil will close the clients read end of the stream.
case <-r.quit:
@ -6446,6 +6465,9 @@ func (r *rpcServer) SubscribeChannelBackups(req *lnrpc.ChannelBackupSubscription
return err
}
case <-updateStream.Context().Done():
return updateStream.Context().Err()
case <-r.quit:
return nil
}