lntemp+itest: refactor testInvoiceSubscriptions

This commit is contained in:
yyforyongyu 2022-08-04 08:37:28 +08:00
parent d260ac1ab5
commit 4e821a6e3d
No known key found for this signature in database
GPG key ID: 9BCD95C4FF296868
5 changed files with 122 additions and 167 deletions

View file

@ -1442,3 +1442,36 @@ func (h *HarnessTest) OpenMultiChannelsAsync(
return channelPoints return channelPoints
} }
// ReceiveInvoiceUpdate waits until a message is received on the subscribe
// invoice stream or the timeout is reached.
func (h *HarnessTest) ReceiveInvoiceUpdate(
stream rpc.InvoiceUpdateClient) *lnrpc.Invoice {
chanMsg := make(chan *lnrpc.Invoice)
errChan := make(chan error)
go func() {
// Consume one message. This will block until the message is
// received.
resp, err := stream.Recv()
if err != nil {
errChan <- err
return
}
chanMsg <- resp
}()
select {
case <-time.After(DefaultTimeout):
require.Fail(h, "timeout", "timeout receiving invoice update")
case err := <-errChan:
require.Failf(h, "err from stream",
"received err from stream: %v", err)
case updateMsg := <-chanMsg:
return updateMsg
}
return nil
}

View file

@ -494,3 +494,26 @@ func (h *HarnessRPC) UpdateChannelPolicy(
return resp return resp
} }
type InvoiceUpdateClient lnrpc.Lightning_SubscribeInvoicesClient
// SubscribeInvoices creates a subscription client for invoice events and
// asserts its creation.
//
// NOTE: make sure to subscribe an invoice as early as possible as it takes
// some time for the lnd to create the subscription client. If an invoice is
// added right after the subscription, it may be missed. However, if AddIndex
// or SettleIndex is used in the request, it will be fine as a backlog will
// always be sent.
func (h *HarnessRPC) SubscribeInvoices(
req *lnrpc.InvoiceSubscription) InvoiceUpdateClient {
// SubscribeInvoices needs to have the context alive for the
// entire test case as the returned client will be used for send and
// receive events stream. Thus we use runCtx here instead of a timeout
// context.
client, err := h.LN.SubscribeInvoices(h.runCtx, req)
require.NoError(h, err, "unable to create invoice subscription client")
return client
}

View file

@ -187,4 +187,8 @@ var allTestCasesTemp = []*lntemp.TestCase{
Name: "immediate payment after channel opened", Name: "immediate payment after channel opened",
TestFunc: testPaymentFollowingChannelOpen, TestFunc: testPaymentFollowingChannelOpen,
}, },
{
Name: "invoice update subscription",
TestFunc: testInvoiceSubscriptions,
},
} }

View file

@ -511,182 +511,86 @@ func testBidirectionalAsyncPayments(net *lntest.NetworkHarness, t *harnessTest)
closeChannelAndAssert(t, net, net.Alice, chanPoint, false) closeChannelAndAssert(t, net, net.Alice, chanPoint, false)
} }
func testInvoiceSubscriptions(net *lntest.NetworkHarness, t *harnessTest) { func testInvoiceSubscriptions(ht *lntemp.HarnessTest) {
ctxb := context.Background()
const chanAmt = btcutil.Amount(500000) const chanAmt = btcutil.Amount(500000)
// Open a channel with 500k satoshis between Alice and Bob with Alice alice, bob := ht.Alice, ht.Bob
// being the sole funder of the channel.
chanPoint := openChannelAndAssert(
t, net, net.Alice, net.Bob,
lntest.OpenChannelParams{
Amt: chanAmt,
},
)
// Next create a new invoice for Bob requesting 1k satoshis.
// TODO(roasbeef): make global list of invoices for each node to re-use
// and avoid collisions
const paymentAmt = 1000
invoice := &lnrpc.Invoice{
Memo: "testing",
RPreimage: makeFakePayHash(t),
Value: paymentAmt,
}
ctxt, _ := context.WithTimeout(ctxb, defaultTimeout)
invoiceResp, err := net.Bob.AddInvoice(ctxt, invoice)
if err != nil {
t.Fatalf("unable to add invoice: %v", err)
}
lastAddIndex := invoiceResp.AddIndex
// Create a new invoice subscription client for Bob, the notification // Create a new invoice subscription client for Bob, the notification
// should be dispatched shortly below. // should be dispatched shortly below.
req := &lnrpc.InvoiceSubscription{} req := &lnrpc.InvoiceSubscription{}
ctx, cancelInvoiceSubscription := context.WithCancel(ctxb) bobInvoiceSubscription := bob.RPC.SubscribeInvoices(req)
bobInvoiceSubscription, err := net.Bob.SubscribeInvoices(ctx, req)
if err != nil { // Open a channel with 500k satoshis between Alice and Bob with Alice
t.Fatalf("unable to subscribe to bob's invoice updates: %v", err) // being the sole funder of the channel.
chanPoint := ht.OpenChannel(
alice, bob, lntemp.OpenChannelParams{Amt: chanAmt},
)
// Next create a new invoice for Bob requesting 1k satoshis.
const paymentAmt = 1000
invoice := &lnrpc.Invoice{
Memo: "testing",
RPreimage: ht.Random32Bytes(),
Value: paymentAmt,
} }
invoiceResp := bob.RPC.AddInvoice(invoice)
lastAddIndex := invoiceResp.AddIndex
var settleIndex uint64 // With the above invoice added, we should receive an update event.
quit := make(chan struct{}) invoiceUpdate := ht.ReceiveInvoiceUpdate(bobInvoiceSubscription)
updateSent := make(chan struct{}) require.NotEqual(ht, lnrpc.Invoice_SETTLED, invoiceUpdate.State,
go func() { "invoice should not be settled")
invoiceUpdate, err := bobInvoiceSubscription.Recv()
select {
case <-quit:
// Received cancellation
return
default:
}
if err != nil {
t.Fatalf("unable to recv invoice update: %v", err)
}
// The invoice update should exactly match the invoice created
// above, but should now be settled and have SettleDate
if !invoiceUpdate.Settled { // nolint:staticcheck
t.Fatalf("invoice not settled but should be")
}
if invoiceUpdate.SettleDate == 0 {
t.Fatalf("invoice should have non zero settle date, but doesn't")
}
if !bytes.Equal(invoiceUpdate.RPreimage, invoice.RPreimage) {
t.Fatalf("payment preimages don't match: expected %v, got %v",
invoice.RPreimage, invoiceUpdate.RPreimage)
}
if invoiceUpdate.SettleIndex == 0 {
t.Fatalf("invoice should have settle index")
}
settleIndex = invoiceUpdate.SettleIndex
close(updateSent)
}()
// Wait for the channel to be recognized by both Alice and Bob before
// continuing the rest of the test.
err = net.Alice.WaitForNetworkChannelOpen(chanPoint)
if err != nil {
// TODO(roasbeef): will need to make num blocks to advertise a
// node param
close(quit)
t.Fatalf("channel not seen by alice before timeout: %v", err)
}
// With the assertion above set up, send a payment from Alice to Bob // With the assertion above set up, send a payment from Alice to Bob
// which should finalize and settle the invoice. // which should finalize and settle the invoice.
sendReq := &routerrpc.SendPaymentRequest{ ht.CompletePaymentRequests(alice, []string{invoiceResp.PaymentRequest})
PaymentRequest: invoiceResp.PaymentRequest,
TimeoutSeconds: 60,
FeeLimitMsat: noFeeLimitMsat,
}
ctxt, _ = context.WithTimeout(ctxb, defaultTimeout)
stream, err := net.Alice.RouterClient.SendPaymentV2(ctxt, sendReq)
if err != nil {
close(quit)
t.Fatalf("unable to send payment: %v", err)
}
result, err := getPaymentResult(stream)
if err != nil {
close(quit)
t.Fatalf("cannot get payment result: %v", err)
}
if result.Status != lnrpc.Payment_SUCCEEDED {
close(quit)
t.Fatalf("error when attempting recv: %v", result.Status)
}
select { // The invoice update should exactly match the invoice created
case <-time.After(time.Second * 10): // above, but should now be settled and have SettleDate
close(quit) invoiceUpdate = ht.ReceiveInvoiceUpdate(bobInvoiceSubscription)
t.Fatalf("update not sent after 10 seconds") require.Equal(ht, lnrpc.Invoice_SETTLED, invoiceUpdate.State,
case <-updateSent: // Fall through on success "invoice not settled but should be")
} require.NotZero(ht, invoiceUpdate.SettleDate,
"invoice should have non zero settle date, but doesn't")
// With the base case working, we'll now cancel Bob's current require.Equal(ht, invoice.RPreimage, invoiceUpdate.RPreimage,
// subscription in order to exercise the backlog fill behavior. "payment preimages don't match")
cancelInvoiceSubscription() require.NotZero(ht, invoiceUpdate.SettleIndex,
"invoice should have settle index")
settleIndex := invoiceUpdate.SettleIndex
// We'll now add 3 more invoices to Bob's invoice registry. // We'll now add 3 more invoices to Bob's invoice registry.
const numInvoices = 3 const numInvoices = 3
payReqs, _, newInvoices, err := createPayReqs( payReqs, _, newInvoices := ht.CreatePayReqs(
net.Bob, paymentAmt, numInvoices, bob, paymentAmt, numInvoices,
) )
if err != nil {
t.Fatalf("unable to create pay reqs: %v", err)
}
// Now that the set of invoices has been added, we'll re-register for // Now that the set of invoices has been added, we'll re-register for
// streaming invoice notifications for Bob, this time specifying the // streaming invoice notifications for Bob, this time specifying the
// add invoice of the last prior invoice. // add invoice of the last prior invoice.
req = &lnrpc.InvoiceSubscription{ req = &lnrpc.InvoiceSubscription{AddIndex: lastAddIndex}
AddIndex: lastAddIndex, bobInvoiceSubscription = bob.RPC.SubscribeInvoices(req)
}
ctx, cancelInvoiceSubscription = context.WithCancel(ctxb)
bobInvoiceSubscription, err = net.Bob.SubscribeInvoices(ctx, req)
if err != nil {
t.Fatalf("unable to subscribe to bob's invoice updates: %v", err)
}
// Since we specified a value of the prior add index above, we should // Since we specified a value of the prior add index above, we should
// now immediately get the invoices we just added as we should get the // now immediately get the invoices we just added as we should get the
// backlog of notifications. // backlog of notifications.
for i := 0; i < numInvoices; i++ { for i := 0; i < numInvoices; i++ {
invoiceUpdate, err := bobInvoiceSubscription.Recv() invoiceUpdate := ht.ReceiveInvoiceUpdate(bobInvoiceSubscription)
if err != nil {
t.Fatalf("unable to receive subscription")
}
// We should now get the ith invoice we added, as they should // We should now get the ith invoice we added, as they should
// be returned in order. // be returned in order.
if invoiceUpdate.Settled { // nolint:staticcheck require.NotEqual(ht, lnrpc.Invoice_SETTLED, invoiceUpdate.State,
t.Fatalf("should have only received add events") "should have only received add events")
}
originalInvoice := newInvoices[i] originalInvoice := newInvoices[i]
rHash := sha256.Sum256(originalInvoice.RPreimage) rHash := sha256.Sum256(originalInvoice.RPreimage)
if !bytes.Equal(invoiceUpdate.RHash, rHash[:]) { require.Equal(ht, rHash[:], invoiceUpdate.RHash,
t.Fatalf("invoices have mismatched payment hashes: "+ "invoices have mismatched payment hashes")
"expected %x, got %x", rHash[:],
invoiceUpdate.RHash)
}
} }
cancelInvoiceSubscription()
// We'll now have Bob settle out the remainder of these invoices so we // We'll now have Bob settle out the remainder of these invoices so we
// can test that all settled invoices are properly notified. // can test that all settled invoices are properly notified.
err = completePaymentRequests( ht.CompletePaymentRequests(alice, payReqs)
net.Alice, net.Alice.RouterClient, payReqs, true,
)
if err != nil {
t.Fatalf("unable to send payment: %v", err)
}
// With the set of invoices paid, we'll now cancel the old // With the set of invoices paid, we'll now cancel the old
// subscription, and create a new one for Bob, this time using the // subscription, and create a new one for Bob, this time using the
@ -694,13 +598,7 @@ func testInvoiceSubscriptions(net *lntest.NetworkHarness, t *harnessTest) {
req = &lnrpc.InvoiceSubscription{ req = &lnrpc.InvoiceSubscription{
SettleIndex: settleIndex, SettleIndex: settleIndex,
} }
ctx, cancelInvoiceSubscription = context.WithCancel(ctxb) bobInvoiceSubscription = bob.RPC.SubscribeInvoices(req)
bobInvoiceSubscription, err = net.Bob.SubscribeInvoices(ctx, req)
if err != nil {
t.Fatalf("unable to subscribe to bob's invoice updates: %v", err)
}
defer cancelInvoiceSubscription()
// As we specified the index of the past settle index, we should now // As we specified the index of the past settle index, we should now
// receive notifications for the three HTLCs that we just settled. As // receive notifications for the three HTLCs that we just settled. As
@ -712,30 +610,31 @@ func testInvoiceSubscriptions(net *lntest.NetworkHarness, t *harnessTest) {
settledInvoices[rHash] = struct{}{} settledInvoices[rHash] = struct{}{}
} }
for i := 0; i < numInvoices; i++ { for i := 0; i < numInvoices; i++ {
invoiceUpdate, err := bobInvoiceSubscription.Recv() invoiceUpdate := ht.ReceiveInvoiceUpdate(bobInvoiceSubscription)
if err != nil {
t.Fatalf("unable to receive subscription")
}
// We should now get the ith invoice we added, as they should // We should now get the ith invoice we added, as they should
// be returned in order. // be returned in order.
if !invoiceUpdate.Settled { // nolint:staticcheck require.Equal(ht, lnrpc.Invoice_SETTLED, invoiceUpdate.State,
t.Fatalf("should have only received settle events") "should have only received settle events")
}
var rHash [32]byte var rHash [32]byte
copy(rHash[:], invoiceUpdate.RHash) copy(rHash[:], invoiceUpdate.RHash)
if _, ok := settledInvoices[rHash]; !ok { require.Contains(ht, settledInvoices, rHash,
t.Fatalf("unknown invoice settled: %x", rHash) "unknown invoice settled")
}
delete(settledInvoices, rHash) delete(settledInvoices, rHash)
} }
// At this point, all the invoices should be fully settled. // At this point, all the invoices should be fully settled.
if len(settledInvoices) != 0 { require.Empty(ht, settledInvoices, "not all invoices settled")
t.Fatalf("not all invoices settled")
}
closeChannelAndAssert(t, net, net.Alice, chanPoint, false) // TODO(yy): remove the sleep once the following bug is fixed.
// When the invoice is reported settled, the commitment dance is not
// yet finished, which can cause an error when closing the channel,
// saying there's active HTLCs. We need to investigate this issue and
// reverse the order to, first finish the commitment dance, then report
// the invoice as settled.
time.Sleep(2 * time.Second)
ht.CloseChannel(alice, chanPoint)
} }

View file

@ -56,10 +56,6 @@ var allTestCases = []*testCase{
name: "multiple channel creation and update subscription", name: "multiple channel creation and update subscription",
test: testBasicChannelCreationAndUpdates, test: testBasicChannelCreationAndUpdates,
}, },
{
name: "invoice update subscription",
test: testInvoiceSubscriptions,
},
{ {
name: "multi-hop htlc error propagation", name: "multi-hop htlc error propagation",
test: testHtlcErrorPropagation, test: testHtlcErrorPropagation,