routerrpc: add TrackPayments

Add method to track all payments rather than tracking a payment by payment hash.
This commit is contained in:
Jesse de Wit 2022-03-15 12:12:16 +01:00
parent e65f020348
commit 275f085e6d
No known key found for this signature in database
GPG key ID: 78A9DCCE385AE6B4
9 changed files with 1211 additions and 684 deletions

File diff suppressed because it is too large Load diff

View file

@ -101,6 +101,34 @@ func request_Router_TrackPaymentV2_0(ctx context.Context, marshaler runtime.Mars
}
var (
filter_Router_TrackPayments_0 = &utilities.DoubleArray{Encoding: map[string]int{}, Base: []int(nil), Check: []int(nil)}
)
func request_Router_TrackPayments_0(ctx context.Context, marshaler runtime.Marshaler, client RouterClient, req *http.Request, pathParams map[string]string) (Router_TrackPaymentsClient, runtime.ServerMetadata, error) {
var protoReq TrackPaymentsRequest
var metadata runtime.ServerMetadata
if err := req.ParseForm(); err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
if err := runtime.PopulateQueryParameters(&protoReq, req.Form, filter_Router_TrackPayments_0); err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
stream, err := client.TrackPayments(ctx, &protoReq)
if err != nil {
return nil, metadata, err
}
header, err := stream.Header()
if err != nil {
return nil, metadata, err
}
metadata.HeaderMD = header
return stream, metadata, nil
}
func request_Router_EstimateRouteFee_0(ctx context.Context, marshaler runtime.Marshaler, client RouterClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq RouteFeeRequest
var metadata runtime.ServerMetadata
@ -556,6 +584,13 @@ func RegisterRouterHandlerServer(ctx context.Context, mux *runtime.ServeMux, ser
return
})
mux.Handle("GET", pattern_Router_TrackPayments_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
err := status.Error(codes.Unimplemented, "streaming calls are not yet supported in the in-process transport")
_, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
})
mux.Handle("POST", pattern_Router_EstimateRouteFee_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
@ -881,6 +916,26 @@ func RegisterRouterHandlerClient(ctx context.Context, mux *runtime.ServeMux, cli
})
mux.Handle("GET", pattern_Router_TrackPayments_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
rctx, err := runtime.AnnotateContext(ctx, mux, req, "/routerrpc.Router/TrackPayments", runtime.WithHTTPPathPattern("/v2/router/payments"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_Router_TrackPayments_0(rctx, inboundMarshaler, client, req, pathParams)
ctx = runtime.NewServerMetadataContext(ctx, md)
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
forward_Router_TrackPayments_0(ctx, mux, outboundMarshaler, w, req, func() (proto.Message, error) { return resp.Recv() }, mux.GetForwardResponseOptions()...)
})
mux.Handle("POST", pattern_Router_EstimateRouteFee_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
@ -1129,6 +1184,8 @@ var (
pattern_Router_TrackPaymentV2_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 1, 0, 4, 1, 5, 3}, []string{"v2", "router", "track", "payment_hash"}, ""))
pattern_Router_TrackPayments_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"v2", "router", "payments"}, ""))
pattern_Router_EstimateRouteFee_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 2, 3}, []string{"v2", "router", "route", "estimatefee"}, ""))
pattern_Router_SendToRouteV2_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 2, 3}, []string{"v2", "router", "route", "send"}, ""))
@ -1159,6 +1216,8 @@ var (
forward_Router_TrackPaymentV2_0 = runtime.ForwardResponseStream
forward_Router_TrackPayments_0 = runtime.ForwardResponseStream
forward_Router_EstimateRouteFee_0 = runtime.ForwardResponseMessage
forward_Router_SendToRouteV2_0 = runtime.ForwardResponseMessage

View file

@ -107,6 +107,48 @@ func RegisterRouterJSONCallbacks(registry map[string]func(ctx context.Context,
}()
}
registry["routerrpc.Router.TrackPayments"] = func(ctx context.Context,
conn *grpc.ClientConn, reqJSON string, callback func(string, error)) {
req := &TrackPaymentsRequest{}
err := marshaler.Unmarshal([]byte(reqJSON), req)
if err != nil {
callback("", err)
return
}
client := NewRouterClient(conn)
stream, err := client.TrackPayments(ctx, req)
if err != nil {
callback("", err)
return
}
go func() {
for {
select {
case <-stream.Context().Done():
callback("", stream.Context().Err())
return
default:
}
resp, err := stream.Recv()
if err != nil {
callback("", err)
return
}
respBytes, err := marshaler.Marshal(resp)
if err != nil {
callback("", err)
return
}
callback(string(respBytes), nil)
}
}()
}
registry["routerrpc.Router.EstimateRouteFee"] = func(ctx context.Context,
conn *grpc.ClientConn, reqJSON string, callback func(string, error)) {

View file

@ -22,6 +22,16 @@ service Router {
*/
rpc TrackPaymentV2 (TrackPaymentRequest) returns (stream lnrpc.Payment);
/*
TrackPayments returns an update stream for every payment that is not in a
terminal state. Note that if payments are in-flight while starting a new
subscription, the start of the payment stream could produce out-of-order
and/or duplicate events. In order to get updates for every in-flight
payment attempt make sure to subscribe to this method before initiating any
payments.
*/
rpc TrackPayments (TrackPaymentsRequest) returns (stream lnrpc.Payment);
/*
EstimateRouteFee allows callers to obtain a lower bound w.r.t how much it
may cost to send an HTLC to the target end destination.
@ -303,6 +313,14 @@ message TrackPaymentRequest {
bool no_inflight_updates = 2;
}
message TrackPaymentsRequest {
/*
If set, only the final payment updates are streamed back. Intermediate
updates that show which htlcs are still in flight are suppressed.
*/
bool no_inflight_updates = 1;
}
message RouteFeeRequest {
/*
The destination once wishes to obtain a routing fee quote to.

View file

@ -250,6 +250,47 @@
]
}
},
"/v2/router/payments": {
"get": {
"summary": "TrackPayments returns an update stream for every payment that is not in a\nterminal state. Note that if payments are in-flight while starting a new\nsubscription, the start of the payment stream could produce out-of-order\nand/or duplicate events. In order to get updates for every in-flight\npayment attempt make sure to subscribe to this method before initiating any\npayments.",
"operationId": "Router_TrackPayments",
"responses": {
"200": {
"description": "A successful response.(streaming responses)",
"schema": {
"type": "object",
"properties": {
"result": {
"$ref": "#/definitions/lnrpcPayment"
},
"error": {
"$ref": "#/definitions/rpcStatus"
}
},
"title": "Stream result of lnrpcPayment"
}
},
"default": {
"description": "An unexpected error response.",
"schema": {
"$ref": "#/definitions/rpcStatus"
}
}
},
"parameters": [
{
"name": "no_inflight_updates",
"description": "If set, only the final payment updates are streamed back. Intermediate\nupdates that show which htlcs are still in flight are suppressed.",
"in": "query",
"required": false,
"type": "boolean"
}
],
"tags": [
"Router"
]
}
},
"/v2/router/route": {
"post": {
"summary": "BuildRoute builds a fully specified route based on a list of hop public\nkeys. It retrieves the relevant channel policies from the graph in order to\ncalculate the correct fees and time locks.",

View file

@ -8,6 +8,8 @@ http:
body: "*"
- selector: routerrpc.Router.TrackPaymentV2
get: "/v2/router/track/{payment_hash}"
- selector: routerrpc.Router.TrackPayments
get: "/v2/router/payments"
- selector: routerrpc.Router.EstimateRouteFee
post: "/v2/router/route/estimatefee"
body: "*"

View file

@ -26,6 +26,13 @@ type RouterClient interface {
// TrackPaymentV2 returns an update stream for the payment identified by the
// payment hash.
TrackPaymentV2(ctx context.Context, in *TrackPaymentRequest, opts ...grpc.CallOption) (Router_TrackPaymentV2Client, error)
// TrackPayments returns an update stream for every payment that is not in a
// terminal state. Note that if payments are in-flight while starting a new
// subscription, the start of the payment stream could produce out-of-order
// and/or duplicate events. In order to get updates for every in-flight
// payment attempt make sure to subscribe to this method before initiating any
// payments.
TrackPayments(ctx context.Context, in *TrackPaymentsRequest, opts ...grpc.CallOption) (Router_TrackPaymentsClient, error)
// EstimateRouteFee allows callers to obtain a lower bound w.r.t how much it
// may cost to send an HTLC to the target end destination.
EstimateRouteFee(ctx context.Context, in *RouteFeeRequest, opts ...grpc.CallOption) (*RouteFeeResponse, error)
@ -165,6 +172,38 @@ func (x *routerTrackPaymentV2Client) Recv() (*lnrpc.Payment, error) {
return m, nil
}
func (c *routerClient) TrackPayments(ctx context.Context, in *TrackPaymentsRequest, opts ...grpc.CallOption) (Router_TrackPaymentsClient, error) {
stream, err := c.cc.NewStream(ctx, &Router_ServiceDesc.Streams[2], "/routerrpc.Router/TrackPayments", opts...)
if err != nil {
return nil, err
}
x := &routerTrackPaymentsClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type Router_TrackPaymentsClient interface {
Recv() (*lnrpc.Payment, error)
grpc.ClientStream
}
type routerTrackPaymentsClient struct {
grpc.ClientStream
}
func (x *routerTrackPaymentsClient) Recv() (*lnrpc.Payment, error) {
m := new(lnrpc.Payment)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *routerClient) EstimateRouteFee(ctx context.Context, in *RouteFeeRequest, opts ...grpc.CallOption) (*RouteFeeResponse, error) {
out := new(RouteFeeResponse)
err := c.cc.Invoke(ctx, "/routerrpc.Router/EstimateRouteFee", in, out, opts...)
@ -257,7 +296,7 @@ func (c *routerClient) BuildRoute(ctx context.Context, in *BuildRouteRequest, op
}
func (c *routerClient) SubscribeHtlcEvents(ctx context.Context, in *SubscribeHtlcEventsRequest, opts ...grpc.CallOption) (Router_SubscribeHtlcEventsClient, error) {
stream, err := c.cc.NewStream(ctx, &Router_ServiceDesc.Streams[2], "/routerrpc.Router/SubscribeHtlcEvents", opts...)
stream, err := c.cc.NewStream(ctx, &Router_ServiceDesc.Streams[3], "/routerrpc.Router/SubscribeHtlcEvents", opts...)
if err != nil {
return nil, err
}
@ -290,7 +329,7 @@ func (x *routerSubscribeHtlcEventsClient) Recv() (*HtlcEvent, error) {
// Deprecated: Do not use.
func (c *routerClient) SendPayment(ctx context.Context, in *SendPaymentRequest, opts ...grpc.CallOption) (Router_SendPaymentClient, error) {
stream, err := c.cc.NewStream(ctx, &Router_ServiceDesc.Streams[3], "/routerrpc.Router/SendPayment", opts...)
stream, err := c.cc.NewStream(ctx, &Router_ServiceDesc.Streams[4], "/routerrpc.Router/SendPayment", opts...)
if err != nil {
return nil, err
}
@ -323,7 +362,7 @@ func (x *routerSendPaymentClient) Recv() (*PaymentStatus, error) {
// Deprecated: Do not use.
func (c *routerClient) TrackPayment(ctx context.Context, in *TrackPaymentRequest, opts ...grpc.CallOption) (Router_TrackPaymentClient, error) {
stream, err := c.cc.NewStream(ctx, &Router_ServiceDesc.Streams[4], "/routerrpc.Router/TrackPayment", opts...)
stream, err := c.cc.NewStream(ctx, &Router_ServiceDesc.Streams[5], "/routerrpc.Router/TrackPayment", opts...)
if err != nil {
return nil, err
}
@ -355,7 +394,7 @@ func (x *routerTrackPaymentClient) Recv() (*PaymentStatus, error) {
}
func (c *routerClient) HtlcInterceptor(ctx context.Context, opts ...grpc.CallOption) (Router_HtlcInterceptorClient, error) {
stream, err := c.cc.NewStream(ctx, &Router_ServiceDesc.Streams[5], "/routerrpc.Router/HtlcInterceptor", opts...)
stream, err := c.cc.NewStream(ctx, &Router_ServiceDesc.Streams[6], "/routerrpc.Router/HtlcInterceptor", opts...)
if err != nil {
return nil, err
}
@ -405,6 +444,13 @@ type RouterServer interface {
// TrackPaymentV2 returns an update stream for the payment identified by the
// payment hash.
TrackPaymentV2(*TrackPaymentRequest, Router_TrackPaymentV2Server) error
// TrackPayments returns an update stream for every payment that is not in a
// terminal state. Note that if payments are in-flight while starting a new
// subscription, the start of the payment stream could produce out-of-order
// and/or duplicate events. In order to get updates for every in-flight
// payment attempt make sure to subscribe to this method before initiating any
// payments.
TrackPayments(*TrackPaymentsRequest, Router_TrackPaymentsServer) error
// EstimateRouteFee allows callers to obtain a lower bound w.r.t how much it
// may cost to send an HTLC to the target end destination.
EstimateRouteFee(context.Context, *RouteFeeRequest) (*RouteFeeResponse, error)
@ -483,6 +529,9 @@ func (UnimplementedRouterServer) SendPaymentV2(*SendPaymentRequest, Router_SendP
func (UnimplementedRouterServer) TrackPaymentV2(*TrackPaymentRequest, Router_TrackPaymentV2Server) error {
return status.Errorf(codes.Unimplemented, "method TrackPaymentV2 not implemented")
}
func (UnimplementedRouterServer) TrackPayments(*TrackPaymentsRequest, Router_TrackPaymentsServer) error {
return status.Errorf(codes.Unimplemented, "method TrackPayments not implemented")
}
func (UnimplementedRouterServer) EstimateRouteFee(context.Context, *RouteFeeRequest) (*RouteFeeResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method EstimateRouteFee not implemented")
}
@ -583,6 +632,27 @@ func (x *routerTrackPaymentV2Server) Send(m *lnrpc.Payment) error {
return x.ServerStream.SendMsg(m)
}
func _Router_TrackPayments_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(TrackPaymentsRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(RouterServer).TrackPayments(m, &routerTrackPaymentsServer{stream})
}
type Router_TrackPaymentsServer interface {
Send(*lnrpc.Payment) error
grpc.ServerStream
}
type routerTrackPaymentsServer struct {
grpc.ServerStream
}
func (x *routerTrackPaymentsServer) Send(m *lnrpc.Payment) error {
return x.ServerStream.SendMsg(m)
}
func _Router_EstimateRouteFee_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(RouteFeeRequest)
if err := dec(in); err != nil {
@ -933,6 +1003,11 @@ var Router_ServiceDesc = grpc.ServiceDesc{
Handler: _Router_TrackPaymentV2_Handler,
ServerStreams: true,
},
{
StreamName: "TrackPayments",
Handler: _Router_TrackPayments_Handler,
ServerStreams: true,
},
{
StreamName: "SubscribeHtlcEvents",
Handler: _Router_SubscribeHtlcEvents_Handler,

View file

@ -72,6 +72,10 @@ var (
Entity: "offchain",
Action: "read",
}},
"/routerrpc.Router/TrackPayments": {{
Entity: "offchain",
Action: "read",
}},
"/routerrpc.Router/EstimateRouteFee": {{
Entity: "offchain",
Action: "read",
@ -737,19 +741,62 @@ func (s *Server) trackPayment(identifier lntypes.Hash,
router := s.cfg.RouterBackend
// Subscribe to the outcome of this payment.
subscription, err := router.Tower.SubscribePayment(
identifier,
)
subscription, err := router.Tower.SubscribePayment(identifier)
switch {
case err == channeldb.ErrPaymentNotInitiated:
return status.Error(codes.NotFound, err.Error())
case err != nil:
return err
}
// Stream updates to the client.
err = s.trackPaymentStream(
stream.Context(), subscription, noInflightUpdates, stream.Send,
)
if errors.Is(err, context.Canceled) {
log.Debugf("Payment stream %v canceled", identifier)
}
return err
}
// TrackPayments returns a stream of payment state updates.
func (s *Server) TrackPayments(request *TrackPaymentsRequest,
stream Router_TrackPaymentsServer) error {
log.Debug("TrackPayments called")
router := s.cfg.RouterBackend
// Subscribe to payments.
subscription, err := router.Tower.SubscribeAllPayments()
if err != nil {
return err
}
// Stream updates to the client.
err = s.trackPaymentStream(
stream.Context(), subscription, request.NoInflightUpdates,
stream.Send,
)
if errors.Is(err, context.Canceled) {
log.Debugf("TrackPayments payment stream canceled.")
}
return err
}
// trackPaymentStream streams payment updates to the client.
func (s *Server) trackPaymentStream(context context.Context,
subscription *routing.ControlTowerSubscriber, noInflightUpdates bool,
send func(*lnrpc.Payment) error) error {
defer subscription.Close()
// Stream updates back to the client. The first update is always the
// current state of the payment.
// Stream updates back to the client.
for {
select {
case item, ok := <-subscription.Updates:
@ -766,13 +813,15 @@ func (s *Server) trackPayment(identifier lntypes.Hash,
continue
}
rpcPayment, err := router.MarshallPayment(result)
rpcPayment, err := s.cfg.RouterBackend.MarshallPayment(
result,
)
if err != nil {
return err
}
// Send event to the client.
err = stream.Send(rpcPayment)
err = send(rpcPayment)
if err != nil {
return err
}
@ -780,9 +829,8 @@ func (s *Server) trackPayment(identifier lntypes.Hash,
case <-s.quit:
return errServerShuttingDown
case <-stream.Context().Done():
log.Debugf("Payment status stream %v canceled", identifier)
return stream.Context().Err()
case <-context.Done():
return context.Err()
}
}
}

View file

@ -0,0 +1,169 @@
package routerrpc
import (
"context"
"testing"
"time"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/queue"
"github.com/lightningnetwork/lnd/routing"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
)
func makeStreamMock() *StreamMock {
return &StreamMock{
ctx: context.Background(),
sentFromServer: make(chan *lnrpc.Payment, 10),
}
}
type StreamMock struct {
grpc.ServerStream
ctx context.Context
sentFromServer chan *lnrpc.Payment
}
func makeControlTowerMock() *ControlTowerMock {
towerMock := &ControlTowerMock{
queue: queue.NewConcurrentQueue(20),
}
towerMock.queue.Start()
return towerMock
}
type ControlTowerMock struct {
queue *queue.ConcurrentQueue
routing.ControlTower
}
func (t *ControlTowerMock) SubscribeAllPayments() (
*routing.ControlTowerSubscriber, error) {
return &routing.ControlTowerSubscriber{
Updates: t.queue.ChanOut(),
}, nil
}
func (m *StreamMock) Context() context.Context {
return m.ctx
}
func (m *StreamMock) Send(p *lnrpc.Payment) error {
m.sentFromServer <- p
return nil
}
// TestTrackPaymentsInflightUpdate tests whether all updates from the control
// tower are propagated to the client.
func TestTrackPaymentsInflightUpdates(t *testing.T) {
// Setup mocks and request.
request := &TrackPaymentsRequest{
NoInflightUpdates: false,
}
towerMock := makeControlTowerMock()
stream := makeStreamMock()
server := &Server{
cfg: &Config{
RouterBackend: &RouterBackend{
Tower: towerMock,
},
},
}
// Listen to payment updates in a goroutine.
go func() {
err := server.TrackPayments(request, stream)
require.NoError(t, err)
}()
// Enqueue some payment updates on the mock.
towerMock.queue.ChanIn() <- &channeldb.MPPayment{
Info: &channeldb.PaymentCreationInfo{},
Status: channeldb.StatusInFlight,
}
towerMock.queue.ChanIn() <- &channeldb.MPPayment{
Info: &channeldb.PaymentCreationInfo{},
Status: channeldb.StatusSucceeded,
}
// Wait until there's 2 updates or the deadline is exceeded.
deadline := time.Now().Add(1 * time.Second)
for {
if len(stream.sentFromServer) == 2 {
break
}
if time.Now().After(deadline) {
require.FailNow(t, "deadline exceeded.")
}
}
// Both updates should be sent to the client.
require.Len(t, stream.sentFromServer, 2)
// The updates should be in the right order.
payment := <-stream.sentFromServer
require.Equal(t, lnrpc.Payment_IN_FLIGHT, payment.Status)
payment = <-stream.sentFromServer
require.Equal(t, lnrpc.Payment_SUCCEEDED, payment.Status)
}
// TestTrackPaymentsInflightUpdate tests whether only final updates from the
// control tower are propagated to the client when noInflightUpdates = true.
func TestTrackPaymentsNoInflightUpdates(t *testing.T) {
// Setup mocks and request.
request := &TrackPaymentsRequest{
NoInflightUpdates: true,
}
towerMock := &ControlTowerMock{
queue: queue.NewConcurrentQueue(20),
}
towerMock.queue.Start()
stream := makeStreamMock()
server := &Server{
cfg: &Config{
RouterBackend: &RouterBackend{
Tower: towerMock,
},
},
}
// Listen to payment updates in a goroutine.
go func() {
err := server.TrackPayments(request, stream)
require.NoError(t, err)
}()
// Enqueue some payment updates on the mock.
towerMock.queue.ChanIn() <- &channeldb.MPPayment{
Info: &channeldb.PaymentCreationInfo{},
Status: channeldb.StatusInFlight,
}
towerMock.queue.ChanIn() <- &channeldb.MPPayment{
Info: &channeldb.PaymentCreationInfo{},
Status: channeldb.StatusSucceeded,
}
// Wait until there's 1 update or the deadline is exceeded.
deadline := time.Now().Add(1 * time.Second)
for {
if len(stream.sentFromServer) == 1 {
break
}
if time.Now().After(deadline) {
require.FailNow(t, "deadline exceeded.")
}
}
// Only 1 update should be sent to the client.
require.Len(t, stream.sentFromServer, 1)
// Only the final states should be sent to the client.
payment := <-stream.sentFromServer
require.Equal(t, lnrpc.Payment_SUCCEEDED, payment.Status)
}