diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index a34fe135b..fc5864de2 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -87,7 +87,8 @@ type mockGraphSource struct { zombies map[uint64][][33]byte chansToReject map[uint64]struct{} - callCount map[string]int + callCount map[string]int + pauseGetChannelByID chan chan struct{} } func newMockRouter(t *testing.T, height uint32) *mockGraphSource { @@ -98,9 +99,10 @@ func newMockRouter(t *testing.T, height uint32) *mockGraphSource { edges: make( map[uint64][]models.ChannelEdgePolicy, ), - zombies: make(map[uint64][][33]byte), - chansToReject: make(map[uint64]struct{}), - callCount: make(map[string]int), + zombies: make(map[uint64][][33]byte), + chansToReject: make(map[uint64]struct{}), + callCount: make(map[string]int), + pauseGetChannelByID: make(chan chan struct{}, 1), } } @@ -267,6 +269,18 @@ func (r *mockGraphSource) GetChannelByID(chanID lnwire.ShortChannelID) ( *models.ChannelEdgePolicy, *models.ChannelEdgePolicy, error) { + select { + // Check if a pause request channel has been loaded. If one has, then we + // wait for it to be closed before continuing. + case pauseChan := <-r.pauseGetChannelByID: + select { + case <-pauseChan: + case <-time.After(time.Second * 30): + r.t.Fatal("timeout waiting for pause channel") + } + default: + } + r.mu.Lock() defer func() { r.callCount["GetChannelByID"]++ @@ -4041,6 +4055,187 @@ func TestBroadcastAnnsAfterGraphSynced(t *testing.T) { assertBroadcast(chanAnn2, true, true) } +// TestRateLimitDeDup demonstrates a bug that currently exists in the handling +// of channel updates. It shows that if two identical channel updates are +// received in quick succession, then both of them might be counted towards the +// rate limit, even though only one of them should be. +// +// NOTE: this will be fixed in an upcoming commit. +func TestRateLimitDeDup(t *testing.T) { + t.Parallel() + + // Create our test harness. + const blockHeight = 100 + ctx, err := createTestCtx(t, blockHeight, false) + require.NoError(t, err, "can't create context") + ctx.gossiper.cfg.RebroadcastInterval = time.Hour + + // We set the burst to 2 here. The very first update should not count + // towards this _and_ any duplicates should also not count towards it. + ctx.gossiper.cfg.MaxChannelUpdateBurst = 2 + ctx.gossiper.cfg.ChannelUpdateInterval = time.Minute + + // The graph should start empty. + require.Empty(t, ctx.router.infos) + require.Empty(t, ctx.router.edges) + + // We'll create a batch of signed announcements, including updates for + // both sides, for a channel and process them. They should all be + // forwarded as this is our first time learning about the channel. + batch, err := ctx.createRemoteAnnouncements(blockHeight) + require.NoError(t, err) + + nodePeer1 := &mockPeer{ + remoteKeyPriv1.PubKey(), nil, nil, atomic.Bool{}, + } + select { + case err := <-ctx.gossiper.ProcessRemoteAnnouncement( + batch.chanAnn, nodePeer1, + ): + require.NoError(t, err) + case <-time.After(time.Second): + t.Fatal("remote announcement not processed") + } + + select { + case err := <-ctx.gossiper.ProcessRemoteAnnouncement( + batch.chanUpdAnn1, nodePeer1, + ): + require.NoError(t, err) + case <-time.After(time.Second): + t.Fatal("remote announcement not processed") + } + + nodePeer2 := &mockPeer{ + remoteKeyPriv2.PubKey(), nil, nil, atomic.Bool{}, + } + select { + case err := <-ctx.gossiper.ProcessRemoteAnnouncement( + batch.chanUpdAnn2, nodePeer2, + ): + require.NoError(t, err) + case <-time.After(time.Second): + t.Fatal("remote announcement not processed") + } + + timeout := time.After(2 * trickleDelay) + for i := 0; i < 3; i++ { + select { + case <-ctx.broadcastedMessage: + case <-timeout: + t.Fatal("expected announcement to be broadcast") + } + } + + shortChanID := batch.chanAnn.ShortChannelID.ToUint64() + require.Contains(t, ctx.router.infos, shortChanID) + require.Contains(t, ctx.router.edges, shortChanID) + + // Before we send anymore updates, we want to let our test harness + // hang during GetChannelByID so that we can ensure that two threads are + // waiting for the chan. + pause := make(chan struct{}) + ctx.router.pauseGetChannelByID <- pause + + // Take note of how many times IsStaleEdgePolicy has been called. + // It should be 2 since we have processed two channel updates. + staleCallCount := ctx.router.getCallCount("IsStaleEdgePolicy") + require.Equal(t, 2, staleCallCount) + + // The same is expected for the UpdateEdge call. + updateEdgeCount := ctx.router.getCallCount("UpdateEdge") + require.Equal(t, 2, updateEdgeCount) + + // Ok, now we will send the same channel update twice in quick + // succession. We wait for both to have hit the IsStaleEdgePolicy check + // before we un-pause the GetChannelByID call. + update := *batch.chanUpdAnn1 + + // Make sure it is not seen as a keep-alive update by updating some + // params along with the timestamp. Then sign the update. + update.Timestamp++ + update.BaseFee++ + require.NoError(t, signUpdate(remoteKeyPriv1, &update)) + + // We'll send the same update twice in quick succession. + go func() { + ctx.gossiper.ProcessRemoteAnnouncement(&update, nodePeer1) + }() + go func() { + ctx.gossiper.ProcessRemoteAnnouncement(&update, nodePeer1) + }() + + // We know that both are being processed once the count for + // IsStaleEdgePolicy has increased by 2. + err = wait.NoError(func() error { + count := ctx.router.getCallCount("IsStaleEdgePolicy") + + if count != 4 { + return fmt.Errorf("expected 4 calls to "+ + "IsStaleEdgePolicy, got %v", count) + } + + return nil + }, time.Second*5) + require.NoError(t, err) + + // Now we can un-pause the thread that grabbed the mutex first. + close(pause) + + // Currently, both updates make it to UpdateEdge. + err = wait.NoError(func() error { + count := ctx.router.getCallCount("UpdateEdge") + + if count != 4 { + return fmt.Errorf("expected 4 calls to UpdateEdge, "+ + "got %v", count) + } + + return nil + }, time.Second*5) + require.NoError(t, err) + + // We'll define a helper to assert whether updates should be rate + // limited or not depending on their contents. + assertRateLimit := func(shouldRateLimit bool) { + t.Helper() + + select { + case <-ctx.broadcastedMessage: + if shouldRateLimit { + t.Fatal("unexpected channel update broadcast") + } + case <-time.After(2 * trickleDelay): + if !shouldRateLimit { + t.Fatal("expected channel update broadcast") + } + } + } + + // Show that the last update was broadcast. + assertRateLimit(false) + + // We should be allowed to send another update now since only one of the + // above duplicates should count towards the rate limit. + // However, this is currently not the case, and so we will be rate + // limited early. This will be fixed in an upcoming commit. + update.Timestamp++ + update.BaseFee++ + require.NoError(t, signUpdate(remoteKeyPriv1, &update)) + + select { + case err := <-ctx.gossiper.ProcessRemoteAnnouncement( + &update, nodePeer1, + ): + + require.NoError(t, err) + case <-time.After(time.Second): + t.Fatal("remote announcement not processed") + } + + assertRateLimit(true) +} + // TestRateLimitChannelUpdates ensures that we properly rate limit incoming // channel updates. func TestRateLimitChannelUpdates(t *testing.T) {