diff --git a/lnd_test.go b/lnd_test.go index 07025a611..194f692cb 100644 --- a/lnd_test.go +++ b/lnd_test.go @@ -5694,15 +5694,137 @@ func testInvoiceSubscriptions(net *lntest.NetworkHarness, t *harnessTest) { closeChannelAndAssert(ctxt, t, net, net.Alice, chanPoint, false) } -// testBasicChannelCreation test multiple channel opening and closing. -func testBasicChannelCreation(net *lntest.NetworkHarness, t *harnessTest) { - ctxb := context.Background() +// channelSubscription houses the proxied update and error chans for a node's +// channel subscriptions. +type channelSubscription struct { + updateChan chan *lnrpc.ChannelEventUpdate + errChan chan error + quit chan struct{} +} +// subscribeChannelNotifications subscribes to channel updates and launches a +// goroutine that forwards these to the returned channel. +func subscribeChannelNotifications(ctxb context.Context, t *harnessTest, + node *lntest.HarnessNode) channelSubscription { + + // We'll first start by establishing a notification client which will + // send us notifications upon channels becoming active, inactive or + // closed. + req := &lnrpc.ChannelEventSubscription{} + ctx, cancelFunc := context.WithCancel(ctxb) + + chanUpdateClient, err := node.SubscribeChannelEvents(ctx, req) + if err != nil { + t.Fatalf("unable to create channel update client: %v", err) + } + + // We'll launch a goroutine that will be responsible for proxying all + // notifications recv'd from the client into the channel below. + errChan := make(chan error, 1) + quit := make(chan struct{}) + chanUpdates := make(chan *lnrpc.ChannelEventUpdate, 20) + go func() { + defer cancelFunc() + for { + select { + case <-quit: + return + default: + chanUpdate, err := chanUpdateClient.Recv() + select { + case <-quit: + return + default: + } + + if err == io.EOF { + return + } else if err != nil { + select { + case errChan <- err: + case <-quit: + } + return + } + + select { + case chanUpdates <- chanUpdate: + case <-quit: + return + } + } + } + }() + + return channelSubscription{ + updateChan: chanUpdates, + errChan: errChan, + quit: quit, + } +} + +// verifyCloseUpdate is used to verify that a closed channel update is of the +// expected type. +func verifyCloseUpdate(chanUpdate *lnrpc.ChannelEventUpdate, + force bool, forceType lnrpc.ChannelCloseSummary_ClosureType) error { + + // We should receive one inactive and one closed notification + // for each channel. + switch update := chanUpdate.Channel.(type) { + case *lnrpc.ChannelEventUpdate_InactiveChannel: + if chanUpdate.Type != lnrpc.ChannelEventUpdate_INACTIVE_CHANNEL { + return fmt.Errorf("update type mismatch: expected %v, got %v", + lnrpc.ChannelEventUpdate_INACTIVE_CHANNEL, + chanUpdate.Type) + } + case *lnrpc.ChannelEventUpdate_ClosedChannel: + if chanUpdate.Type != + lnrpc.ChannelEventUpdate_CLOSED_CHANNEL { + return fmt.Errorf("update type mismatch: expected %v, got %v", + lnrpc.ChannelEventUpdate_CLOSED_CHANNEL, + chanUpdate.Type) + } + + switch force { + case true: + if update.ClosedChannel.CloseType != forceType { + return fmt.Errorf("channel closure type mismatch: "+ + "expected %v, got %v", + forceType, + update.ClosedChannel.CloseType) + } + case false: + if update.ClosedChannel.CloseType != + lnrpc.ChannelCloseSummary_COOPERATIVE_CLOSE { + return fmt.Errorf("channel closure type "+ + "mismatch: expected %v, got %v", + lnrpc.ChannelCloseSummary_COOPERATIVE_CLOSE, + update.ClosedChannel.CloseType) + } + } + default: + return fmt.Errorf("channel update channel of wrong type, "+ + "expected closed channel, got %T", + update) + } + + return nil +} + +// testBasicChannelCreationAndUpdates tests multiple channel opening and closing, +// and ensures that if a node is subscribed to channel updates they will be +// received correctly for both cooperative and force closed channels. +func testBasicChannelCreationAndUpdates(net *lntest.NetworkHarness, t *harnessTest) { + ctxb := context.Background() const ( numChannels = 2 amount = maxBtcFundingAmount ) + // Let Bob subscribe to channel notifications. + bobChanSub := subscribeChannelNotifications(ctxb, t, net.Bob) + defer close(bobChanSub.quit) + // Open the channel between Alice and Bob, asserting that the // channel has been properly open on-chain. chanPoints := make([]*lnrpc.ChannelPoint, numChannels) @@ -5716,11 +5838,89 @@ func testBasicChannelCreation(net *lntest.NetworkHarness, t *harnessTest) { ) } - // Close the channel between Alice and Bob, asserting that the - // channel has been properly closed on-chain. - for _, chanPoint := range chanPoints { - ctxt, _ := context.WithTimeout(ctxb, channelCloseTimeout) - closeChannelAndAssert(ctxt, t, net, net.Alice, chanPoint, false) + // Since each of the channels just became open, Bob should we receive an + // open and an active notification for each channel. + var numChannelUpds int + for numChannelUpds < 2*numChannels { + select { + case update := <-bobChanSub.updateChan: + switch update.Type { + case lnrpc.ChannelEventUpdate_ACTIVE_CHANNEL: + case lnrpc.ChannelEventUpdate_OPEN_CHANNEL: + default: + t.Fatalf("update type mismatch: expected open or active "+ + "channel notification, got: %v", update.Type) + } + numChannelUpds++ + case <-time.After(time.Second * 10): + t.Fatalf("timeout waiting for channel notifications, "+ + "only received %d/%d chanupds", numChannelUpds, + numChannels) + } + } + + // Subscribe Alice to channel updates so we can test that both remote + // and local force close notifications are received correctly. + aliceChanSub := subscribeChannelNotifications(ctxb, t, net.Alice) + defer close(aliceChanSub.quit) + + // Close the channel between Alice and Bob, asserting that the channel + // has been properly closed on-chain. + for i, chanPoint := range chanPoints { + ctx, _ := context.WithTimeout(context.Background(), defaultTimeout) + + // Force close half of the channels. + force := i%2 == 0 + closeChannelAndAssert(ctx, t, net, net.Alice, chanPoint, force) + if force { + cleanupForceClose(t, net, net.Alice, chanPoint) + } + } + + // verifyCloseUpdatesReceived is used to verify that Alice and Bob + // receive the correct channel updates in order. + verifyCloseUpdatesReceived := func(sub channelSubscription, + forceType lnrpc.ChannelCloseSummary_ClosureType) error { + + // Ensure one inactive and one closed notification is received for each + // closed channel. + numChannelUpds := 0 + for numChannelUpds < 2*numChannels { + // Every other channel should be force closed. + force := (numChannelUpds/2)%2 == 0 + + select { + case chanUpdate := <-sub.updateChan: + err := verifyCloseUpdate(chanUpdate, force, forceType) + if err != nil { + return err + } + + numChannelUpds++ + case err := <-sub.errChan: + return err + case <-time.After(time.Second * 10): + return fmt.Errorf("timeout waiting for channel "+ + "notifications, only received %d/%d "+ + "chanupds", numChannelUpds, 2*numChannels) + } + } + + return nil + } + + // Verify Bob receives all closed channel notifications. He should + // receive a remote force close notification for force closed channels. + if err := verifyCloseUpdatesReceived(bobChanSub, + lnrpc.ChannelCloseSummary_REMOTE_FORCE_CLOSE); err != nil { + t.Fatalf("errored verifying close updates: %v", err) + } + + // Verify Alice receives all closed channel notifications. She should + // receive a remote force close notification for force closed channels. + if err := verifyCloseUpdatesReceived(aliceChanSub, + lnrpc.ChannelCloseSummary_LOCAL_FORCE_CLOSE); err != nil { + t.Fatalf("errored verifying close updates: %v", err) } } @@ -12943,8 +13143,8 @@ var testsCases = []*testCase{ test: testMultiHopOverPrivateChannels, }, { - name: "multiple channel creation", - test: testBasicChannelCreation, + name: "multiple channel creation and update subscription", + test: testBasicChannelCreationAndUpdates, }, { name: "invoice update subscription",