discovery: demonstrate channel update rate limiting bug

This commit adds a test to demonstrate that if we receive two identical
updates (which can happen if we get the same update from two peers in
quick succession), then our rate limiting logic will be hit early as
both updates might be counted towards the rate limit. This will be fixed
in an upcoming commit.
This commit is contained in:
Elle Mouton 2025-03-03 14:06:36 +02:00
parent 78413f5b84
commit d00c724678
No known key found for this signature in database
GPG key ID: D7D916376026F177

View file

@ -87,7 +87,8 @@ type mockGraphSource struct {
zombies map[uint64][][33]byte
chansToReject map[uint64]struct{}
callCount map[string]int
callCount map[string]int
pauseGetChannelByID chan chan struct{}
}
func newMockRouter(t *testing.T, height uint32) *mockGraphSource {
@ -98,9 +99,10 @@ func newMockRouter(t *testing.T, height uint32) *mockGraphSource {
edges: make(
map[uint64][]models.ChannelEdgePolicy,
),
zombies: make(map[uint64][][33]byte),
chansToReject: make(map[uint64]struct{}),
callCount: make(map[string]int),
zombies: make(map[uint64][][33]byte),
chansToReject: make(map[uint64]struct{}),
callCount: make(map[string]int),
pauseGetChannelByID: make(chan chan struct{}, 1),
}
}
@ -267,6 +269,18 @@ func (r *mockGraphSource) GetChannelByID(chanID lnwire.ShortChannelID) (
*models.ChannelEdgePolicy,
*models.ChannelEdgePolicy, error) {
select {
// Check if a pause request channel has been loaded. If one has, then we
// wait for it to be closed before continuing.
case pauseChan := <-r.pauseGetChannelByID:
select {
case <-pauseChan:
case <-time.After(time.Second * 30):
r.t.Fatal("timeout waiting for pause channel")
}
default:
}
r.mu.Lock()
defer func() {
r.callCount["GetChannelByID"]++
@ -4041,6 +4055,187 @@ func TestBroadcastAnnsAfterGraphSynced(t *testing.T) {
assertBroadcast(chanAnn2, true, true)
}
// TestRateLimitDeDup demonstrates a bug that currently exists in the handling
// of channel updates. It shows that if two identical channel updates are
// received in quick succession, then both of them might be counted towards the
// rate limit, even though only one of them should be.
//
// NOTE: this will be fixed in an upcoming commit.
func TestRateLimitDeDup(t *testing.T) {
t.Parallel()
// Create our test harness.
const blockHeight = 100
ctx, err := createTestCtx(t, blockHeight, false)
require.NoError(t, err, "can't create context")
ctx.gossiper.cfg.RebroadcastInterval = time.Hour
// We set the burst to 2 here. The very first update should not count
// towards this _and_ any duplicates should also not count towards it.
ctx.gossiper.cfg.MaxChannelUpdateBurst = 2
ctx.gossiper.cfg.ChannelUpdateInterval = time.Minute
// The graph should start empty.
require.Empty(t, ctx.router.infos)
require.Empty(t, ctx.router.edges)
// We'll create a batch of signed announcements, including updates for
// both sides, for a channel and process them. They should all be
// forwarded as this is our first time learning about the channel.
batch, err := ctx.createRemoteAnnouncements(blockHeight)
require.NoError(t, err)
nodePeer1 := &mockPeer{
remoteKeyPriv1.PubKey(), nil, nil, atomic.Bool{},
}
select {
case err := <-ctx.gossiper.ProcessRemoteAnnouncement(
batch.chanAnn, nodePeer1,
):
require.NoError(t, err)
case <-time.After(time.Second):
t.Fatal("remote announcement not processed")
}
select {
case err := <-ctx.gossiper.ProcessRemoteAnnouncement(
batch.chanUpdAnn1, nodePeer1,
):
require.NoError(t, err)
case <-time.After(time.Second):
t.Fatal("remote announcement not processed")
}
nodePeer2 := &mockPeer{
remoteKeyPriv2.PubKey(), nil, nil, atomic.Bool{},
}
select {
case err := <-ctx.gossiper.ProcessRemoteAnnouncement(
batch.chanUpdAnn2, nodePeer2,
):
require.NoError(t, err)
case <-time.After(time.Second):
t.Fatal("remote announcement not processed")
}
timeout := time.After(2 * trickleDelay)
for i := 0; i < 3; i++ {
select {
case <-ctx.broadcastedMessage:
case <-timeout:
t.Fatal("expected announcement to be broadcast")
}
}
shortChanID := batch.chanAnn.ShortChannelID.ToUint64()
require.Contains(t, ctx.router.infos, shortChanID)
require.Contains(t, ctx.router.edges, shortChanID)
// Before we send anymore updates, we want to let our test harness
// hang during GetChannelByID so that we can ensure that two threads are
// waiting for the chan.
pause := make(chan struct{})
ctx.router.pauseGetChannelByID <- pause
// Take note of how many times IsStaleEdgePolicy has been called.
// It should be 2 since we have processed two channel updates.
staleCallCount := ctx.router.getCallCount("IsStaleEdgePolicy")
require.Equal(t, 2, staleCallCount)
// The same is expected for the UpdateEdge call.
updateEdgeCount := ctx.router.getCallCount("UpdateEdge")
require.Equal(t, 2, updateEdgeCount)
// Ok, now we will send the same channel update twice in quick
// succession. We wait for both to have hit the IsStaleEdgePolicy check
// before we un-pause the GetChannelByID call.
update := *batch.chanUpdAnn1
// Make sure it is not seen as a keep-alive update by updating some
// params along with the timestamp. Then sign the update.
update.Timestamp++
update.BaseFee++
require.NoError(t, signUpdate(remoteKeyPriv1, &update))
// We'll send the same update twice in quick succession.
go func() {
ctx.gossiper.ProcessRemoteAnnouncement(&update, nodePeer1)
}()
go func() {
ctx.gossiper.ProcessRemoteAnnouncement(&update, nodePeer1)
}()
// We know that both are being processed once the count for
// IsStaleEdgePolicy has increased by 2.
err = wait.NoError(func() error {
count := ctx.router.getCallCount("IsStaleEdgePolicy")
if count != 4 {
return fmt.Errorf("expected 4 calls to "+
"IsStaleEdgePolicy, got %v", count)
}
return nil
}, time.Second*5)
require.NoError(t, err)
// Now we can un-pause the thread that grabbed the mutex first.
close(pause)
// Currently, both updates make it to UpdateEdge.
err = wait.NoError(func() error {
count := ctx.router.getCallCount("UpdateEdge")
if count != 4 {
return fmt.Errorf("expected 4 calls to UpdateEdge, "+
"got %v", count)
}
return nil
}, time.Second*5)
require.NoError(t, err)
// We'll define a helper to assert whether updates should be rate
// limited or not depending on their contents.
assertRateLimit := func(shouldRateLimit bool) {
t.Helper()
select {
case <-ctx.broadcastedMessage:
if shouldRateLimit {
t.Fatal("unexpected channel update broadcast")
}
case <-time.After(2 * trickleDelay):
if !shouldRateLimit {
t.Fatal("expected channel update broadcast")
}
}
}
// Show that the last update was broadcast.
assertRateLimit(false)
// We should be allowed to send another update now since only one of the
// above duplicates should count towards the rate limit.
// However, this is currently not the case, and so we will be rate
// limited early. This will be fixed in an upcoming commit.
update.Timestamp++
update.BaseFee++
require.NoError(t, signUpdate(remoteKeyPriv1, &update))
select {
case err := <-ctx.gossiper.ProcessRemoteAnnouncement(
&update, nodePeer1,
):
require.NoError(t, err)
case <-time.After(time.Second):
t.Fatal("remote announcement not processed")
}
assertRateLimit(true)
}
// TestRateLimitChannelUpdates ensures that we properly rate limit incoming
// channel updates.
func TestRateLimitChannelUpdates(t *testing.T) {