lntemp+itest: refactor testChannelBackupUpdates

This commit is contained in:
yyforyongyu 2022-08-04 09:05:48 +08:00
parent 4e821a6e3d
commit 40a5f94ef7
No known key found for this signature in database
GPG Key ID: 9BCD95C4FF296868
4 changed files with 81 additions and 40 deletions

View File

@ -2,6 +2,7 @@ package rpc
import ( import (
"context" "context"
"strings"
"github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnrpc"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -132,6 +133,29 @@ func (h *HarnessRPC) PendingChannels() *lnrpc.PendingChannelsResponse {
pendingChansRequest := &lnrpc.PendingChannelsRequest{} pendingChansRequest := &lnrpc.PendingChannelsRequest{}
resp, err := h.LN.PendingChannels(ctxt, pendingChansRequest) resp, err := h.LN.PendingChannels(ctxt, pendingChansRequest)
// TODO(yy): We may get a `unable to find arbitrator` error from the
// rpc point, due to a timing issue in rpcserver,
// 1. `r.server.chanStateDB.FetchClosedChannels` fetches
// the pending force close channel.
// 2. `r.arbitratorPopulateForceCloseResp` relies on the
// channel arbitrator to get the report, and,
// 3. the arbitrator may be deleted due to the force close
// channel being resolved.
// Somewhere along the line is missing a lock to keep the data
// consistent.
//
// Return if there's no error.
if err == nil {
return resp
}
// Otherwise, give it a second shot if it's the arbitrator error.
if strings.Contains(err.Error(), "unable to find arbitrator") {
resp, err = h.LN.PendingChannels(ctxt, pendingChansRequest)
}
// It's very unlikely we'd get the arbitrator not found error again.
h.NoError(err, "PendingChannels") h.NoError(err, "PendingChannels")
return resp return resp
@ -517,3 +541,30 @@ func (h *HarnessRPC) SubscribeInvoices(
return client return client
} }
type BackupSubscriber lnrpc.Lightning_SubscribeChannelBackupsClient
// SubscribeChannelBackups creates a client to listen to channel backup stream.
func (h *HarnessRPC) SubscribeChannelBackups() BackupSubscriber {
// Use runCtx here instead of timeout context to keep the stream client
// alive.
backupStream, err := h.LN.SubscribeChannelBackups(
h.runCtx, &lnrpc.ChannelBackupSubscription{},
)
require.NoErrorf(h, err, "unable to create backup stream")
return backupStream
}
// VerifyChanBackup makes a RPC call to node's VerifyChanBackup and asserts.
func (h *HarnessRPC) VerifyChanBackup(
ss *lnrpc.ChanBackupSnapshot) *lnrpc.VerifyChanBackupResponse {
ctxt, cancel := context.WithTimeout(h.runCtx, DefaultTimeout)
defer cancel()
resp, err := h.LN.VerifyChanBackup(ctxt, ss)
require.NoErrorf(h, err, "unable to verify backup")
return resp
}

View File

@ -191,4 +191,8 @@ var allTestCasesTemp = []*lntemp.TestCase{
Name: "invoice update subscription", Name: "invoice update subscription",
TestFunc: testInvoiceSubscriptions, TestFunc: testInvoiceSubscriptions,
}, },
{
Name: "streaming channel backup update",
TestFunc: testChannelBackupUpdates,
},
} }

View File

@ -782,12 +782,12 @@ func runChanRestoreScenarioForceClose(ht *lntemp.HarnessTest, zeroConf bool) {
// testChannelBackupUpdates tests that both the streaming channel update RPC, // testChannelBackupUpdates tests that both the streaming channel update RPC,
// and the on-disk channel.backup are updated each time a channel is // and the on-disk channel.backup are updated each time a channel is
// opened/closed. // opened/closed.
func testChannelBackupUpdates(net *lntest.NetworkHarness, t *harnessTest) { func testChannelBackupUpdates(ht *lntemp.HarnessTest) {
ctxb := context.Background() alice := ht.Alice
// First, we'll make a temp directory that we'll use to store our // First, we'll make a temp directory that we'll use to store our
// backup file, so we can check in on it during the test easily. // backup file, so we can check in on it during the test easily.
backupDir := t.t.TempDir() backupDir := ht.T.TempDir()
// First, we'll create a new node, Carol. We'll also create a temporary // First, we'll create a new node, Carol. We'll also create a temporary
// file that Carol will use to store her channel backups. // file that Carol will use to store her channel backups.
@ -795,17 +795,11 @@ func testChannelBackupUpdates(net *lntest.NetworkHarness, t *harnessTest) {
backupDir, chanbackup.DefaultBackupFileName, backupDir, chanbackup.DefaultBackupFileName,
) )
carolArgs := fmt.Sprintf("--backupfilepath=%v", backupFilePath) carolArgs := fmt.Sprintf("--backupfilepath=%v", backupFilePath)
carol := net.NewNode(t.t, "carol", []string{carolArgs}) carol := ht.NewNode("carol", []string{carolArgs})
defer shutdownAndAssert(net, t, carol)
// Next, we'll register for streaming notifications for changes to the // Next, we'll register for streaming notifications for changes to the
// backup file. // backup file.
backupStream, err := carol.SubscribeChannelBackups( backupStream := carol.RPC.SubscribeChannelBackups()
ctxb, &lnrpc.ChannelBackupSubscription{},
)
if err != nil {
t.Fatalf("unable to create backup stream: %v", err)
}
// We'll use this goroutine to proxy any updates to a channel we can // We'll use this goroutine to proxy any updates to a channel we can
// easily use below. // easily use below.
@ -838,18 +832,16 @@ func testChannelBackupUpdates(net *lntest.NetworkHarness, t *harnessTest) {
// With Carol up, we'll now connect her to Alice, and open a channel // With Carol up, we'll now connect her to Alice, and open a channel
// between them. // between them.
net.ConnectNodes(t.t, carol, net.Alice) ht.ConnectNodes(carol, alice)
// Next, we'll open two channels between Alice and Carol back to back. // Next, we'll open two channels between Alice and Carol back to back.
var chanPoints []*lnrpc.ChannelPoint var chanPoints []*lnrpc.ChannelPoint
numChans := 2 numChans := 2
chanAmt := btcutil.Amount(1000000) chanAmt := btcutil.Amount(1000000)
for i := 0; i < numChans; i++ { for i := 0; i < numChans; i++ {
chanPoint := openChannelAndAssert( chanPoint := ht.OpenChannel(
t, net, net.Alice, carol, alice, carol, lntemp.OpenChannelParams{Amt: chanAmt},
lntest.OpenChannelParams{Amt: chanAmt},
) )
chanPoints = append(chanPoints, chanPoint) chanPoints = append(chanPoints, chanPoint)
} }
@ -860,12 +852,14 @@ func testChannelBackupUpdates(net *lntest.NetworkHarness, t *harnessTest) {
for i := 0; i < numNtfns; i++ { for i := 0; i < numNtfns; i++ {
select { select {
case err := <-streamErr: case err := <-streamErr:
t.Fatalf("error with backup stream: %v", err) require.Failf(ht, "stream err",
"error with backup stream: %v", err)
case currentBackup = <-backupUpdates: case currentBackup = <-backupUpdates:
case <-time.After(time.Second * 5): case <-time.After(time.Second * 5):
t.Fatalf("didn't receive channel backup "+ require.Failf(ht, "timeout", "didn't "+
"receive channel backup "+
"notification %v", i+1) "notification %v", i+1)
} }
} }
@ -885,32 +879,29 @@ func testChannelBackupUpdates(net *lntest.NetworkHarness, t *harnessTest) {
// nonce, we can't compare them directly, so instead // nonce, we can't compare them directly, so instead
// we'll compare the length which is a proxy for the // we'll compare the length which is a proxy for the
// number of channels that the multi-backup contains. // number of channels that the multi-backup contains.
rawBackup := currentBackup.MultiChanBackup.MultiChanBackup backup := currentBackup.MultiChanBackup.MultiChanBackup
if len(rawBackup) != len(packedBackup) { if len(backup) != len(packedBackup) {
return fmt.Errorf("backup files don't match: "+ return fmt.Errorf("backup files don't match: "+
"expected %x got %x", rawBackup, packedBackup) "expected %x got %x", backup,
packedBackup)
} }
// Additionally, we'll assert that both backups up // Additionally, we'll assert that both backups up
// returned are valid. // returned are valid.
for i, backup := range [][]byte{rawBackup, packedBackup} { for _, backup := range [][]byte{backup, packedBackup} {
snapshot := &lnrpc.ChanBackupSnapshot{ snapshot := &lnrpc.ChanBackupSnapshot{
MultiChanBackup: &lnrpc.MultiChanBackup{ MultiChanBackup: &lnrpc.MultiChanBackup{
MultiChanBackup: backup, MultiChanBackup: backup,
}, },
} }
_, err := carol.VerifyChanBackup(ctxb, snapshot)
if err != nil { carol.RPC.VerifyChanBackup(snapshot)
return fmt.Errorf("unable to verify "+
"backup #%d: %v", i, err)
}
} }
return nil return nil
}, defaultTimeout) }, defaultTimeout)
if err != nil { require.NoError(ht, err, "timeout while checking "+
t.Fatalf("backup state invalid: %v", err) "backup state: %v", err)
}
} }
// As these two channels were just opened, we should've got two times // As these two channels were just opened, we should've got two times
@ -931,11 +922,11 @@ func testChannelBackupUpdates(net *lntest.NetworkHarness, t *harnessTest) {
chanPoint := chanPoints[i] chanPoint := chanPoints[i]
closeChannelAndAssert(t, net, net.Alice, chanPoint, forceClose)
// If we force closed the channel, then we'll mine enough // If we force closed the channel, then we'll mine enough
// blocks to ensure all outputs have been swept. // blocks to ensure all outputs have been swept.
if forceClose { if forceClose {
ht.ForceCloseChannel(alice, chanPoint)
// A local force closed channel will trigger a // A local force closed channel will trigger a
// notification once the commitment TX confirms on // notification once the commitment TX confirms on
// chain. But that won't remove the channel from the // chain. But that won't remove the channel from the
@ -943,13 +934,12 @@ func testChannelBackupUpdates(net *lntest.NetworkHarness, t *harnessTest) {
// locked contract was fully resolved on chain. // locked contract was fully resolved on chain.
assertBackupNtfns(1) assertBackupNtfns(1)
cleanupForceClose(t, net, net.Alice, chanPoint) // Now that the channel's been fully resolved, we
// expect another notification.
// Now that the channel's been fully resolved, we expect
// another notification.
assertBackupNtfns(1) assertBackupNtfns(1)
assertBackupFileState() assertBackupFileState()
} else { } else {
ht.CloseChannel(alice, chanPoint)
// We should get a single notification after closing, // We should get a single notification after closing,
// and the on-disk state should match this latest // and the on-disk state should match this latest
// notifications. // notifications.

View File

@ -126,10 +126,6 @@ var allTestCases = []*testCase{
name: "route fee cutoff", name: "route fee cutoff",
test: testRouteFeeCutoff, test: testRouteFeeCutoff,
}, },
{
name: "streaming channel backup update",
test: testChannelBackupUpdates,
},
{ {
name: "export channel backup", name: "export channel backup",
test: testExportChannelBackup, test: testExportChannelBackup,