itest: refactor testForwardInterceptorBasic

This commit is contained in:
yyforyongyu 2022-08-11 05:59:10 +08:00
parent 97a7638c50
commit dd7e59237b
No known key found for this signature in database
GPG key ID: 9BCD95C4FF296868
4 changed files with 121 additions and 372 deletions

View file

@ -413,4 +413,8 @@ var allTestCasesTemp = []*lntemp.TestCase{
Name: "forward interceptor dedup htlcs", Name: "forward interceptor dedup htlcs",
TestFunc: testForwardInterceptorDedupHtlc, TestFunc: testForwardInterceptorDedupHtlc,
}, },
{
Name: "forward interceptor",
TestFunc: testForwardInterceptorBasic,
},
} }

View file

@ -1,21 +1,16 @@
package itest package itest
import ( import (
"context"
"encoding/hex"
"fmt" "fmt"
"strings" "strings"
"sync"
"time" "time"
"github.com/btcsuite/btcd/btcutil" "github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/chainreg" "github.com/lightningnetwork/lnd/chainreg"
"github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnrpc/routerrpc" "github.com/lightningnetwork/lnd/lnrpc/routerrpc"
"github.com/lightningnetwork/lnd/lntemp" "github.com/lightningnetwork/lnd/lntemp"
"github.com/lightningnetwork/lnd/lntemp/node" "github.com/lightningnetwork/lnd/lntemp/node"
"github.com/lightningnetwork/lnd/lntest"
"github.com/lightningnetwork/lnd/lntest/wait" "github.com/lightningnetwork/lnd/lntest/wait"
"github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/routing/route" "github.com/lightningnetwork/lnd/routing/route"
@ -190,410 +185,161 @@ func testForwardInterceptorDedupHtlc(ht *lntemp.HarnessTest) {
// 3. Intercepted held htlcs result in no payment (invoice is not settled). // 3. Intercepted held htlcs result in no payment (invoice is not settled).
// 4. When Interceptor disconnects it resumes all held htlcs, which result in // 4. When Interceptor disconnects it resumes all held htlcs, which result in
// valid payment (invoice is settled). // valid payment (invoice is settled).
func testForwardInterceptorBasic(net *lntest.NetworkHarness, t *harnessTest) { func testForwardInterceptorBasic(ht *lntemp.HarnessTest) {
// Initialize the test context with 3 connected nodes. ts := newInterceptorTestScenario(ht)
alice := net.NewNode(t.t, "alice", nil)
defer shutdownAndAssert(net, t, alice)
bob := net.NewNode(t.t, "bob", nil) alice, bob, carol := ts.alice, ts.bob, ts.carol
defer shutdownAndAssert(net, t, bob)
carol := net.NewNode(t.t, "carol", nil)
defer shutdownAndAssert(net, t, carol)
testContext := newInterceptorTestContext(t, net, alice, bob, carol)
const (
chanAmt = btcutil.Amount(300000)
)
// Open and wait for channels. // Open and wait for channels.
testContext.openChannel(testContext.alice, testContext.bob, chanAmt) const chanAmt = btcutil.Amount(300000)
testContext.openChannel(testContext.bob, testContext.carol, chanAmt) p := lntemp.OpenChannelParams{Amt: chanAmt}
defer testContext.closeChannels() reqs := []*lntemp.OpenChannelRequest{
testContext.waitForChannels() {Local: alice, Remote: bob, Param: p},
{Local: bob, Remote: carol, Param: p},
}
resp := ht.OpenMultiChannelsAsync(reqs)
cpAB, cpBC := resp[0], resp[1]
// Make sure Alice is aware of channel Bob=>Carol.
ht.AssertTopologyChannelOpen(alice, cpBC)
// Connect the interceptor. // Connect the interceptor.
ctxb := context.Background() interceptor, cancelInterceptor := bob.RPC.HtlcInterceptor()
ctxt, cancelInterceptor := context.WithTimeout(ctxb, defaultTimeout)
interceptor, err := testContext.bob.RouterClient.HtlcInterceptor(ctxt)
require.NoError(t.t, err, "failed to create HtlcInterceptor")
// Prepare the test cases. // Prepare the test cases.
testCases := testContext.prepareTestCases() testCases := ts.prepareTestCases()
// A channel for the interceptor go routine to send the requested packets. // For each test case make sure we initiate a payment from Alice to
interceptedChan := make(chan *routerrpc.ForwardHtlcInterceptRequest, // Carol routed through Bob. For each payment we also test its final
len(testCases)) // status according to the interceptorAction specified in the test
// case.
// Run the interceptor loop in its own go routine. done := make(chan struct{})
var wg sync.WaitGroup
wg.Add(1)
go func() { go func() {
defer wg.Done() // Signal that all the payments have been sent.
for { defer close(done)
request, err := interceptor.Recv()
if err != nil {
// If it is just the error result of the context cancellation
// the we exit silently.
status, ok := status.FromError(err)
if ok && status.Code() == codes.Canceled {
return
}
// Otherwise it an unexpected error, we fail the test.
require.NoError(t.t, err, "unexpected error in interceptor.Recv()")
return
}
interceptedChan <- request
}
}()
// For each test case make sure we initiate a payment from Alice to Carol
// routed through Bob. For each payment we also test its final status
// according to the interceptorAction specified in the test case.
wg.Add(1)
go func() {
defer wg.Done()
for _, tc := range testCases { for _, tc := range testCases {
attempt, err := testContext.sendAliceToCarolPayment( attempt := ts.sendPaymentAndAssertAction(tc)
context.Background(), tc.invoice.ValueMsat, ts.assertAction(tc, attempt)
tc.invoice.RHash, tc.payAddr,
)
if t.t.Failed() {
return
}
if err != nil {
require.NoError(t.t, err, "failed to send payment")
}
switch tc.interceptorAction {
// For 'fail' interceptor action we make sure the payment failed.
case routerrpc.ResolveHoldForwardAction_FAIL:
require.Equal(t.t, lnrpc.HTLCAttempt_FAILED,
attempt.Status, "expected payment to fail")
// Assert that we get a temporary channel
// failure which has a channel update.
require.NotNil(t.t, attempt.Failure)
require.NotNil(t.t, attempt.Failure.ChannelUpdate)
require.Equal(t.t,
lnrpc.Failure_TEMPORARY_CHANNEL_FAILURE,
attempt.Failure.Code)
// For settle and resume we make sure the payment is successful.
case routerrpc.ResolveHoldForwardAction_SETTLE:
fallthrough
case routerrpc.ResolveHoldForwardAction_RESUME:
require.Equal(t.t, lnrpc.HTLCAttempt_SUCCEEDED,
attempt.Status, "expected payment to succeed")
}
} }
}() }()
// We make sure here the interceptor has processed all packets before we // We make sure here the interceptor has processed all packets before
// check the payment statuses. // we check the payment statuses.
for i := 0; i < len(testCases); i++ { for _, tc := range testCases {
select { request := ht.ReceiveHtlcInterceptor(interceptor)
case request := <-interceptedChan:
// Assert sanity of informational packet data.
require.NotZero(t.t, request.OutgoingRequestedChanId)
require.NotZero(t.t, request.IncomingExpiry)
require.NotZero(t.t, request.IncomingAmountMsat)
require.Less( // Assert sanity of informational packet data.
t.t, require.NotZero(ht, request.OutgoingRequestedChanId)
request.OutgoingExpiry, request.IncomingExpiry, require.NotZero(ht, request.IncomingExpiry)
) require.NotZero(ht, request.IncomingAmountMsat)
require.Less(
t.t,
request.OutgoingAmountMsat,
request.IncomingAmountMsat,
)
value, ok := request.CustomRecords[customTestKey] require.Less(ht, request.OutgoingExpiry,
require.True(t.t, ok, "expected custom record") request.IncomingExpiry)
require.Equal(t.t, customTestValue, value) require.Less(ht, request.OutgoingAmountMsat,
request.IncomingAmountMsat)
testCase := testCases[i] value, ok := request.CustomRecords[customTestKey]
require.True(ht, ok, "expected custom record")
require.Equal(ht, customTestValue, value)
// For held packets we ignore, keeping them in hold status. // For held packets we ignore, keeping them in hold status.
if testCase.shouldHold { if tc.shouldHold {
continue continue
}
// For all other packets we resolve according to the test case.
_ = interceptor.Send(&routerrpc.ForwardHtlcInterceptResponse{
IncomingCircuitKey: request.IncomingCircuitKey,
Action: testCase.interceptorAction,
Preimage: testCase.invoice.RPreimage,
})
case <-time.After(defaultTimeout):
t.Fatalf("response from interceptor was not received %v", i)
} }
// For all other packets we resolve according to the test case.
err := interceptor.Send(&routerrpc.ForwardHtlcInterceptResponse{
IncomingCircuitKey: request.IncomingCircuitKey,
Action: tc.interceptorAction,
Preimage: tc.invoice.RPreimage,
})
require.NoError(ht, err, "failed to send request")
} }
// At this point we are left with the held packets, we want to make sure // At this point we are left with the held packets, we want to make
// each one of them has a corresponding 'in-flight' payment at // sure each one of them has a corresponding 'in-flight' payment at
// Alice's node. // Alice's node.
payments, err := testContext.alice.ListPayments(context.Background(),
&lnrpc.ListPaymentsRequest{IncludeIncomplete: true})
require.NoError(t.t, err, "failed to fetch payment")
for _, testCase := range testCases { for _, testCase := range testCases {
if testCase.shouldHold { if !testCase.shouldHold {
hashStr := hex.EncodeToString(testCase.invoice.RHash) continue
var foundPayment *lnrpc.Payment
expectedAmt := testCase.invoice.ValueMsat
for _, p := range payments.Payments {
if p.PaymentHash == hashStr {
foundPayment = p
break
}
}
require.NotNil(t.t, foundPayment, fmt.Sprintf("expected "+
"to find pending payment for held htlc %v",
hashStr))
require.Equal(t.t, lnrpc.Payment_IN_FLIGHT,
foundPayment.Status, "expected payment to be "+
"in flight")
require.Equal(t.t, expectedAmt, foundPayment.ValueMsat,
"incorrect in flight amount")
} }
var preimage lntypes.Preimage
copy(preimage[:], testCase.invoice.RPreimage)
payment := ht.AssertPaymentStatus(
alice, preimage, lnrpc.Payment_IN_FLIGHT,
)
expectedAmt := testCase.invoice.ValueMsat
require.Equal(ht, expectedAmt, payment.ValueMsat,
"incorrect in flight amount")
} }
// Disconnect interceptor should cause resume held packets. // Cancel the context, which will disconnect the above interceptor.
// After that we wait for all go routines to finish, including the one
// that tests the payment final status for the held payment.
cancelInterceptor() cancelInterceptor()
wg.Wait()
// Disconnect interceptor should cause resume held packets. After that
// we wait for all go routines to finish, including the one that tests
// the payment final status for the held payment.
select {
case <-done:
case <-time.After(defaultTimeout):
require.Fail(ht, "timeout waiting for sending payment")
}
// Verify that we don't get notified about already completed HTLCs // Verify that we don't get notified about already completed HTLCs
// We do that by restarting alice, the sender the HTLCs. Under // We do that by restarting alice, the sender the HTLCs. Under
// https://github.com/lightningnetwork/lnd/issues/5115 // https://github.com/lightningnetwork/lnd/issues/5115
// this should cause all HTLCs settled or failed by the interceptor to renotify. // this should cause all HTLCs settled or failed by the interceptor to
restartAlice, err := net.SuspendNode(alice) // renotify.
require.NoError(t.t, err, "failed to suspend alice") restartAlice := ht.SuspendNode(alice)
require.NoError(ht, restartAlice(), "failed to restart alice")
ctxt, cancelInterceptor = context.WithTimeout(ctxb, defaultTimeout) // Make sure the channel is active from Bob's PoV.
defer cancelInterceptor() ht.AssertChannelExists(bob, cpAB)
interceptor, err = testContext.bob.RouterClient.HtlcInterceptor(ctxt)
require.NoError(t.t, err, "failed to create HtlcInterceptor")
err = restartAlice() // Create a new interceptor as the old one has quit.
require.NoError(t.t, err, "failed to restart alice") interceptor, cancelInterceptor = bob.RPC.HtlcInterceptor()
done = make(chan struct{})
go func() { go func() {
request, err := interceptor.Recv() defer close(done)
if err != nil {
// If it is just the error result of the context cancellation _, err := interceptor.Recv()
// the we exit silently. require.Error(ht, err, "expected an error from interceptor")
status, ok := status.FromError(err)
if ok && status.Code() == codes.Canceled { status, ok := status.FromError(err)
return switch {
} // If it is just the error result of the context cancellation
// Otherwise it an unexpected error, we fail the test. // the we exit silently.
require.NoError( case ok && status.Code() == codes.Canceled:
t.t, err, "unexpected error in interceptor.Recv()", fallthrough
)
// When the test ends, during the node's shutdown it will close
// the connection.
case strings.Contains(err.Error(), "closed network connection"):
fallthrough
case strings.Contains(err.Error(), "EOF"):
return return
} }
require.Nil(t.t, request, "no more intercepts should arrive") // Otherwise we receive an unexpected error.
require.Failf(ht, "iinterceptor", "unexpected err: %v", err)
}() }()
err = wait.Predicate(func() bool { // Cancel the context, which will disconnect the above interceptor.
channels, err := bob.ListChannels(ctxt, &lnrpc.ListChannelsRequest{ cancelInterceptor()
ActiveOnly: true, Peer: alice.PubKey[:], select {
}) case <-done:
return err == nil && len(channels.Channels) > 0 case <-time.After(defaultTimeout):
}, defaultTimeout) require.Fail(ht, "timeout waiting for interceptor error")
require.NoError(t.t, err, "alice <> bob channel didn't re-activate")
}
// interceptorTestContext is a helper struct to hold the test context and
// provide the needed functionality.
type interceptorTestContext struct {
t *harnessTest
net *lntest.NetworkHarness
// Keep a list of all our active channels.
networkChans []*lnrpc.ChannelPoint
closeChannelFuncs []func()
alice, bob, carol *lntest.HarnessNode
nodes []*lntest.HarnessNode
}
func newInterceptorTestContext(t *harnessTest,
net *lntest.NetworkHarness,
alice, bob, carol *lntest.HarnessNode) *interceptorTestContext {
// Connect nodes
nodes := []*lntest.HarnessNode{alice, bob, carol}
for i := 0; i < len(nodes); i++ {
for j := i + 1; j < len(nodes); j++ {
net.EnsureConnected(t.t, nodes[i], nodes[j])
}
} }
ctx := interceptorTestContext{ // Finally, close channels.
t: t, ht.CloseChannel(alice, cpAB)
net: net, ht.CloseChannel(bob, cpBC)
alice: alice,
bob: bob,
carol: carol,
nodes: nodes,
}
return &ctx
}
// prepareTestCases prepares 4 tests:
// 1. failed htlc.
// 2. resumed htlc.
// 3. settling htlc externally.
// 4. held htlc that is resumed later.
func (c *interceptorTestContext) prepareTestCases() []*interceptorTestCase {
cases := []*interceptorTestCase{
{amountMsat: 1000, shouldHold: false,
interceptorAction: routerrpc.ResolveHoldForwardAction_FAIL},
{amountMsat: 1000, shouldHold: false,
interceptorAction: routerrpc.ResolveHoldForwardAction_RESUME},
{amountMsat: 1000, shouldHold: false,
interceptorAction: routerrpc.ResolveHoldForwardAction_SETTLE},
{amountMsat: 1000, shouldHold: true,
interceptorAction: routerrpc.ResolveHoldForwardAction_RESUME},
}
for _, t := range cases {
addResponse, err := c.carol.AddInvoice(context.Background(), &lnrpc.Invoice{
ValueMsat: t.amountMsat,
})
require.NoError(c.t.t, err, "unable to add invoice")
invoice, err := c.carol.LookupInvoice(context.Background(), &lnrpc.PaymentHash{
RHashStr: hex.EncodeToString(addResponse.RHash),
})
require.NoError(c.t.t, err, "unable to find invoice")
// We'll need to also decode the returned invoice so we can
// grab the payment address which is now required for ALL
// payments.
payReq, err := c.carol.DecodePayReq(context.Background(), &lnrpc.PayReqString{
PayReq: invoice.PaymentRequest,
})
require.NoError(c.t.t, err, "unable to decode invoice")
t.invoice = invoice
t.payAddr = payReq.PaymentAddr
}
return cases
}
func (c *interceptorTestContext) openChannel(from, to *lntest.HarnessNode,
chanSize btcutil.Amount) {
c.net.SendCoins(c.t.t, btcutil.SatoshiPerBitcoin, from)
chanPoint := openChannelAndAssert(
c.t, c.net, from, to,
lntest.OpenChannelParams{
Amt: chanSize,
},
)
c.closeChannelFuncs = append(c.closeChannelFuncs, func() {
closeChannelAndAssert(c.t, c.net, from, chanPoint, false)
})
c.networkChans = append(c.networkChans, chanPoint)
}
func (c *interceptorTestContext) closeChannels() {
for _, f := range c.closeChannelFuncs {
f()
}
}
func (c *interceptorTestContext) waitForChannels() {
// Wait for all nodes to have seen all channels.
for _, chanPoint := range c.networkChans {
for _, node := range c.nodes {
txid, err := lnrpc.GetChanPointFundingTxid(chanPoint)
require.NoError(c.t.t, err, "unable to get txid")
point := wire.OutPoint{
Hash: *txid,
Index: chanPoint.OutputIndex,
}
err = node.WaitForNetworkChannelOpen(chanPoint)
require.NoError(c.t.t, err, fmt.Sprintf("(%d): timeout "+
"waiting for channel(%s) open", node.NodeID,
point))
}
}
}
// sendAliceToCarolPayment sends a payment from alice to carol and make an
// attempt to pay. The lnrpc.HTLCAttempt is returned.
func (c *interceptorTestContext) sendAliceToCarolPayment(ctx context.Context,
amtMsat int64,
paymentHash, paymentAddr []byte) (*lnrpc.HTLCAttempt, error) {
// Build a route from alice to carol.
route, err := c.buildRoute(
ctx, amtMsat, []*lntest.HarnessNode{c.bob, c.carol},
paymentAddr,
)
if err != nil {
return nil, err
}
sendReq := &routerrpc.SendToRouteRequest{
PaymentHash: paymentHash,
Route: route,
}
// Send a custom record to the forwarding node.
route.Hops[0].CustomRecords = map[uint64][]byte{
customTestKey: customTestValue,
}
// Send the payment.
return c.alice.RouterClient.SendToRouteV2(ctx, sendReq)
}
// buildRoute is a helper function to build a route with given hops.
func (c *interceptorTestContext) buildRoute(ctx context.Context, amtMsat int64,
hops []*lntest.HarnessNode, payAddr []byte) (*lnrpc.Route, error) {
rpcHops := make([][]byte, 0, len(hops))
for _, hop := range hops {
k := hop.PubKeyStr
pubkey, err := route.NewVertexFromStr(k)
if err != nil {
return nil, fmt.Errorf("error parsing %v: %v",
k, err)
}
rpcHops = append(rpcHops, pubkey[:])
}
req := &routerrpc.BuildRouteRequest{
AmtMsat: amtMsat,
FinalCltvDelta: chainreg.DefaultBitcoinTimeLockDelta,
HopPubkeys: rpcHops,
PaymentAddr: payAddr,
}
routeResp, err := c.alice.RouterClient.BuildRoute(ctx, req)
if err != nil {
return nil, err
}
return routeResp.Route, nil
} }
// interceptorTestScenario is a helper struct to hold the test context and // interceptorTestScenario is a helper struct to hold the test context and

View file

@ -188,7 +188,10 @@ func newMppTestScenario(ht *lntemp.HarnessTest) *mppTestScenario {
// Create a five-node context consisting of Alice, Bob and three new // Create a five-node context consisting of Alice, Bob and three new
// nodes. // nodes.
carol := ht.NewNode("carol", []string{"--maxpendingchannels=2"}) carol := ht.NewNode("carol", []string{
"--maxpendingchannels=2",
"--accept-amp",
})
dave := ht.NewNode("dave", nil) dave := ht.NewNode("dave", nil)
eve := ht.NewNode("eve", nil) eve := ht.NewNode("eve", nil)

View file

@ -40,10 +40,6 @@ var allTestCases = []*testCase{
name: "sign psbt", name: "sign psbt",
test: testSignPsbt, test: testSignPsbt,
}, },
{
name: "forward interceptor",
test: testForwardInterceptorBasic,
},
{ {
name: "wallet import account", name: "wallet import account",
test: testWalletImportAccount, test: testWalletImportAccount,