mirror of
https://github.com/lightningnetwork/lnd.git
synced 2024-11-19 01:43:16 +01:00
discovery: revamp recent rejects map
Similar to the prior commit, in this commit, we move to using a basic LRU cache to store the set of prior rejected messages.
This commit is contained in:
parent
8627b5d128
commit
5a8255550b
@ -44,6 +44,11 @@ const (
|
||||
// maxPrematureUpdates tracks the max amount of premature channel
|
||||
// updates that we'll hold onto.
|
||||
maxPrematureUpdates = 100
|
||||
|
||||
// maxRejectedUpdates tracks the max amount of rejected channel updates
|
||||
// we'll maintain. This is the global size across all peers. We'll
|
||||
// allocate ~3 MB max to the cache.
|
||||
maxRejectedUpdates = 10_000
|
||||
)
|
||||
|
||||
var (
|
||||
@ -287,6 +292,33 @@ func (c *cachedNetworkMsg) Size() (uint64, error) {
|
||||
return uint64(len(c.msgs)), nil
|
||||
}
|
||||
|
||||
// rejectCacheKey is the cache key that we'll use to track announcements we've
|
||||
// recently rejected.
|
||||
type rejectCacheKey struct {
|
||||
pubkey [33]byte
|
||||
chanID uint64
|
||||
}
|
||||
|
||||
// newRejectCacheKey returns a new cache key for the reject cache.
|
||||
func newRejectCacheKey(cid uint64, pub [33]byte) rejectCacheKey {
|
||||
k := rejectCacheKey{
|
||||
chanID: cid,
|
||||
pubkey: pub,
|
||||
}
|
||||
|
||||
return k
|
||||
}
|
||||
|
||||
// cachedReject is the empty value used to track the value for rejects.
|
||||
type cachedReject struct {
|
||||
}
|
||||
|
||||
// Size returns the "size" of an entry. We return 1 as we just want to limit
|
||||
// the total size.
|
||||
func (c *cachedReject) Size() (uint64, error) {
|
||||
return 1, nil
|
||||
}
|
||||
|
||||
// AuthenticatedGossiper is a subsystem which is responsible for receiving
|
||||
// announcements, validating them and applying the changes to router, syncing
|
||||
// lightning network with newly connected nodes, broadcasting announcements
|
||||
@ -345,8 +377,7 @@ type AuthenticatedGossiper struct {
|
||||
// consistent between when the DB is first read until it's written.
|
||||
channelMtx *multimutex.Mutex
|
||||
|
||||
rejectMtx sync.RWMutex
|
||||
recentRejects map[uint64]struct{}
|
||||
recentRejects *lru.Cache
|
||||
|
||||
// syncMgr is a subsystem responsible for managing the gossip syncers
|
||||
// for peers currently connected. When a new peer is connected, the
|
||||
@ -388,7 +419,7 @@ func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper
|
||||
chanPolicyUpdates: make(chan *chanPolicyUpdateRequest),
|
||||
prematureChannelUpdates: lru.NewCache(maxPrematureUpdates),
|
||||
channelMtx: multimutex.NewMutex(),
|
||||
recentRejects: make(map[uint64]struct{}),
|
||||
recentRejects: lru.NewCache(maxRejectedUpdates),
|
||||
chanUpdateRateLimiter: make(map[uint64][2]*rate.Limiter),
|
||||
}
|
||||
|
||||
@ -1036,7 +1067,9 @@ func (d *AuthenticatedGossiper) networkHandler() {
|
||||
|
||||
// If this message was recently rejected, then we won't
|
||||
// attempt to re-process it.
|
||||
if d.isRecentlyRejectedMsg(announcement.msg) {
|
||||
if announcement.isRemote && d.isRecentlyRejectedMsg(
|
||||
announcement.msg, announcement.peer.PubKey(),
|
||||
) {
|
||||
announcement.err <- fmt.Errorf("recently " +
|
||||
"rejected")
|
||||
continue
|
||||
@ -1202,22 +1235,23 @@ func (d *AuthenticatedGossiper) PruneSyncState(peer route.Vertex) {
|
||||
|
||||
// isRecentlyRejectedMsg returns true if we recently rejected a message, and
|
||||
// false otherwise, This avoids expensive reprocessing of the message.
|
||||
func (d *AuthenticatedGossiper) isRecentlyRejectedMsg(msg lnwire.Message) bool {
|
||||
d.rejectMtx.RLock()
|
||||
defer d.rejectMtx.RUnlock()
|
||||
func (d *AuthenticatedGossiper) isRecentlyRejectedMsg(msg lnwire.Message,
|
||||
peerPub [33]byte) bool {
|
||||
|
||||
var scid uint64
|
||||
switch m := msg.(type) {
|
||||
case *lnwire.ChannelUpdate:
|
||||
_, ok := d.recentRejects[m.ShortChannelID.ToUint64()]
|
||||
return ok
|
||||
scid = m.ShortChannelID.ToUint64()
|
||||
|
||||
case *lnwire.ChannelAnnouncement:
|
||||
_, ok := d.recentRejects[m.ShortChannelID.ToUint64()]
|
||||
return ok
|
||||
scid = m.ShortChannelID.ToUint64()
|
||||
|
||||
default:
|
||||
return false
|
||||
}
|
||||
|
||||
_, err := d.recentRejects.Get(newRejectCacheKey(scid, peerPub))
|
||||
return err != cache.ErrElementNotFound
|
||||
}
|
||||
|
||||
// retransmitStaleAnns examines all outgoing channels that the source node is
|
||||
@ -1640,9 +1674,11 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
|
||||
d.cfg.ChainHash)
|
||||
log.Errorf(err.Error())
|
||||
|
||||
d.rejectMtx.Lock()
|
||||
d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{}
|
||||
d.rejectMtx.Unlock()
|
||||
key := newRejectCacheKey(
|
||||
msg.ShortChannelID.ToUint64(),
|
||||
nMsg.peer.PubKey(),
|
||||
)
|
||||
_, _ = d.recentRejects.Put(key, &cachedReject{})
|
||||
|
||||
nMsg.err <- err
|
||||
return nil, false
|
||||
@ -1680,9 +1716,12 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
|
||||
if err := routing.ValidateChannelAnn(msg); err != nil {
|
||||
err := fmt.Errorf("unable to validate "+
|
||||
"announcement: %v", err)
|
||||
d.rejectMtx.Lock()
|
||||
d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{}
|
||||
d.rejectMtx.Unlock()
|
||||
|
||||
key := newRejectCacheKey(
|
||||
msg.ShortChannelID.ToUint64(),
|
||||
nMsg.peer.PubKey(),
|
||||
)
|
||||
_, _ = d.recentRejects.Put(key, &cachedReject{})
|
||||
|
||||
log.Error(err)
|
||||
nMsg.err <- err
|
||||
@ -1753,9 +1792,13 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
|
||||
// see if we get any new announcements.
|
||||
anns, rErr := d.processRejectedEdge(msg, proof)
|
||||
if rErr != nil {
|
||||
d.rejectMtx.Lock()
|
||||
d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{}
|
||||
d.rejectMtx.Unlock()
|
||||
|
||||
key := newRejectCacheKey(
|
||||
msg.ShortChannelID.ToUint64(),
|
||||
nMsg.peer.PubKey(),
|
||||
)
|
||||
_, _ = d.recentRejects.Put(key, &cachedReject{})
|
||||
|
||||
nMsg.err <- rErr
|
||||
return nil, false
|
||||
}
|
||||
@ -1777,9 +1820,11 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
|
||||
log.Tracef("Router rejected channel "+
|
||||
"edge: %v", err)
|
||||
|
||||
d.rejectMtx.Lock()
|
||||
d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{}
|
||||
d.rejectMtx.Unlock()
|
||||
key := newRejectCacheKey(
|
||||
msg.ShortChannelID.ToUint64(),
|
||||
nMsg.peer.PubKey(),
|
||||
)
|
||||
_, _ = d.recentRejects.Put(key, &cachedReject{})
|
||||
}
|
||||
|
||||
nMsg.err <- err
|
||||
@ -1862,9 +1907,11 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
|
||||
d.cfg.ChainHash)
|
||||
log.Errorf(err.Error())
|
||||
|
||||
d.rejectMtx.Lock()
|
||||
d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{}
|
||||
d.rejectMtx.Unlock()
|
||||
key := newRejectCacheKey(
|
||||
msg.ShortChannelID.ToUint64(),
|
||||
nMsg.peer.PubKey(),
|
||||
)
|
||||
_, _ = d.recentRejects.Put(key, &cachedReject{})
|
||||
|
||||
nMsg.err <- err
|
||||
return nil, false
|
||||
@ -1952,21 +1999,23 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
|
||||
shortChanID,
|
||||
)
|
||||
switch {
|
||||
// Nothing in the cache yeyt, we can just directly
|
||||
// Nothing in the cache yet, we can just directly
|
||||
// insert this element.
|
||||
case err == cache.ErrElementNotFound:
|
||||
d.prematureChannelUpdates.Put(shortChanID, &cachedNetworkMsg{
|
||||
msgs: []*networkMsg{nMsg},
|
||||
})
|
||||
_, _ = d.prematureChannelUpdates.Put(
|
||||
shortChanID, &cachedNetworkMsg{
|
||||
msgs: []*networkMsg{nMsg},
|
||||
})
|
||||
|
||||
// There's already something in the cache, so we'll
|
||||
// combine the set of messagesa into a single value.
|
||||
// combine the set of messages into a single value.
|
||||
default:
|
||||
msgs := earlyMsgs.(*cachedNetworkMsg).msgs
|
||||
msgs = append(msgs, nMsg)
|
||||
d.prematureChannelUpdates.Put(shortChanID, &cachedNetworkMsg{
|
||||
msgs: msgs,
|
||||
})
|
||||
_, _ = d.prematureChannelUpdates.Put(
|
||||
shortChanID, &cachedNetworkMsg{
|
||||
msgs: msgs,
|
||||
})
|
||||
}
|
||||
|
||||
log.Debugf("Got ChannelUpdate for edge not found in "+
|
||||
@ -1984,9 +2033,12 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
|
||||
log.Error(err)
|
||||
nMsg.err <- err
|
||||
|
||||
d.rejectMtx.Lock()
|
||||
d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{}
|
||||
d.rejectMtx.Unlock()
|
||||
key := newRejectCacheKey(
|
||||
msg.ShortChannelID.ToUint64(),
|
||||
nMsg.peer.PubKey(),
|
||||
)
|
||||
_, _ = d.recentRejects.Put(key, &cachedReject{})
|
||||
|
||||
return nil, false
|
||||
}
|
||||
|
||||
@ -2089,9 +2141,13 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
|
||||
routing.ErrIgnored) {
|
||||
log.Debug(err)
|
||||
} else if err != routing.ErrVBarrierShuttingDown {
|
||||
d.rejectMtx.Lock()
|
||||
d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{}
|
||||
d.rejectMtx.Unlock()
|
||||
|
||||
key := newRejectCacheKey(
|
||||
msg.ShortChannelID.ToUint64(),
|
||||
nMsg.peer.PubKey(),
|
||||
)
|
||||
_, _ = d.recentRejects.Put(key, &cachedReject{})
|
||||
|
||||
log.Error(err)
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user