discovery: rate limit incoming channel updates

This change was largely motivated by an increase in high disk usage as a
result of channel update spam. With an in memory graph, this would've
gone mostly undetected except for the increased bandwidth usage, which
this doesn't aim to solve yet. To minimize the effects to disks, we
begin to rate limit channel updates in two ways. Keep alive updates,
those which only increase their timestamps to signal liveliness, are now
limited to one per lnd's rebroadcast interval (current default of 24H).
Non keep alive updates are now limited to one per block per direction.
This commit is contained in:
Wilmer Paulino 2020-11-19 17:31:58 -08:00
parent ef503bf14e
commit 791ba3eb50
No known key found for this signature in database
GPG Key ID: 6DF57B9F9514972F
4 changed files with 252 additions and 7 deletions

View File

@ -2862,8 +2862,7 @@ func (c *ChannelEdgePolicy) SetSigBytes(sig []byte) {
// IsDisabled determines whether the edge has the disabled bit set.
func (c *ChannelEdgePolicy) IsDisabled() bool {
return c.ChannelFlags&lnwire.ChanUpdateDisabled ==
lnwire.ChanUpdateDisabled
return c.ChannelFlags.IsDisabled()
}
// ComputeFee computes the fee to forward an HTLC of `amt` milli-satoshis over

View File

@ -315,6 +315,13 @@ type AuthenticatedGossiper struct {
// network.
reliableSender *reliableSender
// heightForLastChanUpdate keeps track of the height at which we
// processed the latest channel update for a specific direction.
//
// NOTE: This map must be synchronized with the main
// AuthenticatedGossiper lock.
heightForLastChanUpdate map[uint64][2]uint32
sync.Mutex
}
@ -331,6 +338,7 @@ func New(cfg Config, selfKey *btcec.PublicKey) *AuthenticatedGossiper {
prematureChannelUpdates: make(map[uint64][]*networkMsg),
channelMtx: multimutex.NewMutex(),
recentRejects: make(map[uint64]struct{}),
heightForLastChanUpdate: make(map[uint64][2]uint32),
syncMgr: newSyncManager(&SyncManagerCfg{
ChainHash: cfg.ChainHash,
ChanSeries: cfg.ChanSeries,
@ -1847,7 +1855,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
// point and when we call UpdateEdge() later.
d.channelMtx.Lock(msg.ShortChannelID.ToUint64())
defer d.channelMtx.Unlock(msg.ShortChannelID.ToUint64())
chanInfo, _, _, err := d.cfg.Router.GetChannelByID(msg.ShortChannelID)
chanInfo, edge1, edge2, err := d.cfg.Router.GetChannelByID(msg.ShortChannelID)
switch err {
// No error, break.
case nil:
@ -1946,12 +1954,56 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
// The least-significant bit in the flag on the channel update
// announcement tells us "which" side of the channels directed
// edge is being updated.
var pubKey *btcec.PublicKey
switch {
case msg.ChannelFlags&lnwire.ChanUpdateDirection == 0:
var (
pubKey *btcec.PublicKey
edgeToUpdate *channeldb.ChannelEdgePolicy
)
direction := msg.ChannelFlags & lnwire.ChanUpdateDirection
switch direction {
case 0:
pubKey, _ = chanInfo.NodeKey1()
case msg.ChannelFlags&lnwire.ChanUpdateDirection == 1:
edgeToUpdate = edge1
case 1:
pubKey, _ = chanInfo.NodeKey2()
edgeToUpdate = edge2
}
// If we have a previous version of the edge being updated,
// we'll want to rate limit its updates to prevent spam
// throughout the network.
if nMsg.isRemote && edgeToUpdate != nil {
// If it's a keep-alive update, we'll only propagate one
// if it's been a day since the previous. This follows
// our own heuristic of sending keep-alive updates after
// the same duration (see retransmitStaleAnns).
timeSinceLastUpdate := timestamp.Sub(edgeToUpdate.LastUpdate)
if IsKeepAliveUpdate(msg, edgeToUpdate) {
if timeSinceLastUpdate < d.cfg.RebroadcastInterval {
log.Debugf("Ignoring keep alive update "+
"not within %v period for "+
"channel %v",
d.cfg.RebroadcastInterval,
shortChanID)
nMsg.err <- nil
return nil
}
} else {
// If it's not, we'll only allow a single update
// for this channel per block.
d.Lock()
lastUpdateHeight := d.heightForLastChanUpdate[shortChanID]
if lastUpdateHeight[direction] == d.bestHeight {
log.Debugf("Ignoring update for "+
"channel %v due to previous "+
"update occurring within the "+
"same block %v", shortChanID,
d.bestHeight)
d.Unlock()
nMsg.err <- nil
return nil
}
d.Unlock()
}
}
// Validate the channel announcement with the expected public key and
@ -1997,6 +2049,15 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
return nil
}
// With the edge successfully updated on disk, we'll note the
// current height so that we're able to rate limit any future
// updates for the same channel.
d.Lock()
lastUpdateHeight := d.heightForLastChanUpdate[shortChanID]
lastUpdateHeight[direction] = d.bestHeight
d.heightForLastChanUpdate[shortChanID] = lastUpdateHeight
d.Unlock()
// If this is a local ChannelUpdate without an AuthProof, it
// means it is an update to a channel that is not (yet)
// supposed to be announced to the greater network. However,
@ -2536,3 +2597,49 @@ func (d *AuthenticatedGossiper) updateChannel(info *channeldb.ChannelEdgeInfo,
func (d *AuthenticatedGossiper) SyncManager() *SyncManager {
return d.syncMgr
}
// IsKeepAliveUpdate determines whether this channel update is considered a
// keep-alive update based on the previous channel update processed for the same
// direction.
func IsKeepAliveUpdate(update *lnwire.ChannelUpdate,
prev *channeldb.ChannelEdgePolicy) bool {
// Both updates should be from the same direction.
if update.ChannelFlags&lnwire.ChanUpdateDirection !=
prev.ChannelFlags&lnwire.ChanUpdateDirection {
return false
}
// The timestamp should always increase for a keep-alive update.
timestamp := time.Unix(int64(update.Timestamp), 0)
if !timestamp.After(prev.LastUpdate) {
return false
}
// None of the remaining fields should change for a keep-alive update.
if update.ChannelFlags.IsDisabled() != prev.ChannelFlags.IsDisabled() {
return false
}
if lnwire.MilliSatoshi(update.BaseFee) != prev.FeeBaseMSat {
return false
}
if lnwire.MilliSatoshi(update.FeeRate) != prev.FeeProportionalMillionths {
return false
}
if update.TimeLockDelta != prev.TimeLockDelta {
return false
}
if update.HtlcMinimumMsat != prev.MinHTLC {
return false
}
if update.MessageFlags.HasMaxHtlc() && !prev.MessageFlags.HasMaxHtlc() {
return false
}
if update.HtlcMaximumMsat != prev.MaxHTLC {
return false
}
if !bytes.Equal(update.ExtraOpaqueData, prev.ExtraOpaqueData) {
return false
}
return true
}

View File

@ -31,6 +31,7 @@ import (
"github.com/lightningnetwork/lnd/routing"
"github.com/lightningnetwork/lnd/routing/route"
"github.com/lightningnetwork/lnd/ticker"
"github.com/stretchr/testify/require"
)
var (
@ -3942,3 +3943,136 @@ func TestBroadcastAnnsAfterGraphSynced(t *testing.T) {
}
assertBroadcast(chanAnn2, true, true)
}
// TestRateLimitChannelUpdates ensures that we properly rate limit incoming
// channel updates.
func TestRateLimitChannelUpdates(t *testing.T) {
t.Parallel()
// Create our test harness.
const blockHeight = 100
ctx, cleanup, err := createTestCtx(blockHeight)
if err != nil {
t.Fatalf("can't create context: %v", err)
}
defer cleanup()
ctx.gossiper.cfg.RebroadcastInterval = time.Hour
// The graph should start empty.
require.Empty(t, ctx.router.infos)
require.Empty(t, ctx.router.edges)
// We'll create a batch of signed announcements, including updates for
// both sides, for a channel and process them. They should all be
// forwarded as this is our first time learning about the channel.
batch, err := createAnnouncements(blockHeight)
require.NoError(t, err)
nodePeer1 := &mockPeer{nodeKeyPriv1.PubKey(), nil, nil}
select {
case err := <-ctx.gossiper.ProcessRemoteAnnouncement(
batch.remoteChanAnn, nodePeer1,
):
require.NoError(t, err)
case <-time.After(time.Second):
t.Fatal("remote announcement not processed")
}
select {
case err := <-ctx.gossiper.ProcessRemoteAnnouncement(
batch.chanUpdAnn1, nodePeer1,
):
require.NoError(t, err)
case <-time.After(time.Second):
t.Fatal("remote announcement not processed")
}
nodePeer2 := &mockPeer{nodeKeyPriv2.PubKey(), nil, nil}
select {
case err := <-ctx.gossiper.ProcessRemoteAnnouncement(
batch.chanUpdAnn2, nodePeer2,
):
require.NoError(t, err)
case <-time.After(time.Second):
t.Fatal("remote announcement not processed")
}
timeout := time.After(2 * trickleDelay)
for i := 0; i < 3; i++ {
select {
case <-ctx.broadcastedMessage:
case <-timeout:
t.Fatal("expected announcement to be broadcast")
}
}
shortChanID := batch.remoteChanAnn.ShortChannelID.ToUint64()
require.Contains(t, ctx.router.infos, shortChanID)
require.Contains(t, ctx.router.edges, shortChanID)
// We'll define a helper to assert whether updates should be rate
// limited or not depending on their contents.
assertRateLimit := func(update *lnwire.ChannelUpdate, peer lnpeer.Peer,
shouldRateLimit bool) {
t.Helper()
select {
case err := <-ctx.gossiper.ProcessRemoteAnnouncement(update, peer):
require.NoError(t, err)
case <-time.After(time.Second):
t.Fatal("remote announcement not processed")
}
select {
case <-ctx.broadcastedMessage:
if shouldRateLimit {
t.Fatal("unexpected channel update broadcast")
}
case <-time.After(2 * trickleDelay):
if !shouldRateLimit {
t.Fatal("expected channel update broadcast")
}
}
}
// We'll start with the keep alive case.
//
// We rate limit any keep alive updates that have not at least spanned
// our rebroadcast interval.
rateLimitKeepAliveUpdate := *batch.chanUpdAnn1
rateLimitKeepAliveUpdate.Timestamp++
require.NoError(t, signUpdate(nodeKeyPriv1, &rateLimitKeepAliveUpdate))
assertRateLimit(&rateLimitKeepAliveUpdate, nodePeer1, true)
keepAliveUpdate := *batch.chanUpdAnn1
keepAliveUpdate.Timestamp = uint32(
time.Unix(int64(batch.chanUpdAnn1.Timestamp), 0).
Add(ctx.gossiper.cfg.RebroadcastInterval).Unix(),
)
require.NoError(t, signUpdate(nodeKeyPriv1, &keepAliveUpdate))
assertRateLimit(&keepAliveUpdate, nodePeer1, false)
// Then, we'll move on to the non keep alive cases.
//
// Non keep alive updates are limited to one per block per direction.
// Since we've already processed updates for both sides, the new updates
// for both directions will not be broadcast until a new block arrives.
updateSameDirection := keepAliveUpdate
updateSameDirection.Timestamp++
updateSameDirection.BaseFee++
require.NoError(t, signUpdate(nodeKeyPriv1, &updateSameDirection))
assertRateLimit(&updateSameDirection, nodePeer1, true)
updateDiffDirection := *batch.chanUpdAnn2
updateDiffDirection.Timestamp++
updateDiffDirection.BaseFee++
require.NoError(t, signUpdate(nodeKeyPriv2, &updateDiffDirection))
assertRateLimit(&updateDiffDirection, nodePeer2, true)
// Notify a new block and reprocess the updates. They should no longer
// be rate limited.
ctx.notifier.notifyBlock(chainhash.Hash{}, blockHeight+1)
assertRateLimit(&updateSameDirection, nodePeer1, false)
assertRateLimit(&updateDiffDirection, nodePeer2, false)
}

View File

@ -47,6 +47,11 @@ const (
ChanUpdateDisabled
)
// IsDisabled determines whether the channel flags has the disabled bit set.
func (c ChanUpdateChanFlags) IsDisabled() bool {
return c&ChanUpdateDisabled == ChanUpdateDisabled
}
// String returns the bitfield flags as a string.
func (c ChanUpdateChanFlags) String() string {
return fmt.Sprintf("%08b", c)