discovery: add additional gossiper level reject cache

In this commit, we’ll add a new reject cache to ensure that we don’t
attempt to re-process any announcements already rejected by the
ChannelRouter.
This commit is contained in:
Olaoluwa Osuntokun 2018-01-30 20:41:27 -08:00
parent e578cea375
commit bf05e47780
No known key found for this signature in database
GPG Key ID: 964EA263DD637C21

View File

@ -193,6 +193,9 @@ type AuthenticatedGossiper struct {
// consistent between when the DB is first read until it's written. // consistent between when the DB is first read until it's written.
channelMtx *multimutex.Mutex channelMtx *multimutex.Mutex
rejectMtx sync.RWMutex
recentRejects map[uint64]struct{}
sync.Mutex sync.Mutex
} }
@ -214,6 +217,7 @@ func New(cfg Config, selfKey *btcec.PublicKey) (*AuthenticatedGossiper, error) {
prematureChannelUpdates: make(map[uint64][]*networkMsg), prematureChannelUpdates: make(map[uint64][]*networkMsg),
waitingProofs: storage, waitingProofs: storage,
channelMtx: multimutex.NewMutex(), channelMtx: multimutex.NewMutex(),
recentRejects: make(map[uint64]struct{}),
}, nil }, nil
} }
@ -890,6 +894,13 @@ func (d *AuthenticatedGossiper) networkHandler() {
continue continue
} }
// If this message was recently rejected, then we won't
// attempt to re-process it.
if d.isRecentlyRejectedMsg(announcement.msg) {
announcement.err <- fmt.Errorf("recently rejected")
continue
}
// We'll set up any dependent, and wait until a free // We'll set up any dependent, and wait until a free
// slot for this job opens up, this allow us to not // slot for this job opens up, this allow us to not
// have thousands of goroutines active. // have thousands of goroutines active.
@ -1014,6 +1025,26 @@ func (d *AuthenticatedGossiper) networkHandler() {
} }
} }
// isRecentlyRejectedMsg returns true if we recently rejected a message, and
// false otherwise, This avoids expensive reprocessing of the message.
func (d *AuthenticatedGossiper) isRecentlyRejectedMsg(msg lnwire.Message) bool {
d.rejectMtx.RLock()
defer d.rejectMtx.RUnlock()
switch m := msg.(type) {
case *lnwire.ChannelUpdate:
_, ok := d.recentRejects[m.ShortChannelID.ToUint64()]
return ok
case *lnwire.ChannelAnnouncement:
_, ok := d.recentRejects[m.ShortChannelID.ToUint64()]
return ok
default:
return false
}
}
// retransmitStaleChannels examines all outgoing channels that the source node // retransmitStaleChannels examines all outgoing channels that the source node
// is known to maintain to check to see if any of them are "stale". A channel // is known to maintain to check to see if any of them are "stale". A channel
// is stale iff, the last timestamp of its rebroadcast is older then // is stale iff, the last timestamp of its rebroadcast is older then
@ -1325,6 +1356,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
log.Error("Ignoring ChannelAnnouncement from "+ log.Error("Ignoring ChannelAnnouncement from "+
"chain=%v, gossiper on chain=%v", msg.ChainHash, "chain=%v, gossiper on chain=%v", msg.ChainHash,
d.cfg.ChainHash) d.cfg.ChainHash)
d.rejectMtx.Lock()
d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{}
d.rejectMtx.Unlock()
return nil return nil
} }
@ -1356,6 +1390,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
if err := ValidateChannelAnn(msg); err != nil { if err := ValidateChannelAnn(msg); err != nil {
err := errors.Errorf("unable to validate "+ err := errors.Errorf("unable to validate "+
"announcement: %v", err) "announcement: %v", err)
d.rejectMtx.Lock()
d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{}
d.rejectMtx.Unlock()
log.Error(err) log.Error(err)
nMsg.err <- err nMsg.err <- err
@ -1416,6 +1453,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
// see if we get any new announcements. // see if we get any new announcements.
anns, rErr := d.processRejectedEdge(msg, proof) anns, rErr := d.processRejectedEdge(msg, proof)
if rErr != nil { if rErr != nil {
d.rejectMtx.Lock()
d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{}
d.rejectMtx.Unlock()
nMsg.err <- rErr nMsg.err <- rErr
return nil return nil
} }
@ -1517,6 +1557,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
log.Error("Ignoring ChannelUpdate from "+ log.Error("Ignoring ChannelUpdate from "+
"chain=%v, gossiper on chain=%v", msg.ChainHash, "chain=%v, gossiper on chain=%v", msg.ChainHash,
d.cfg.ChainHash) d.cfg.ChainHash)
d.rejectMtx.Lock()
d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{}
d.rejectMtx.Unlock()
return nil return nil
} }
@ -1590,6 +1633,10 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
shortChanID, err) shortChanID, err)
log.Error(err) log.Error(err)
nMsg.err <- err nMsg.err <- err
d.rejectMtx.Lock()
d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{}
d.rejectMtx.Unlock()
return nil return nil
} }
} }
@ -1633,6 +1680,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
if routing.IsError(err, routing.ErrOutdated, routing.ErrIgnored) { if routing.IsError(err, routing.ErrOutdated, routing.ErrIgnored) {
log.Debug(err) log.Debug(err)
} else { } else {
d.rejectMtx.Lock()
d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{}
d.rejectMtx.Unlock()
log.Error(err) log.Error(err)
} }
@ -1808,7 +1858,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
chanInfo.AuthProof, chanInfo, e1, e2, chanInfo.AuthProof, chanInfo, e1, e2,
) )
if err != nil { if err != nil {
log.Error("unable to gen ann: %v", err) log.Errorf("unable to gen ann: %v", err)
return return
} }
err = d.cfg.SendToPeer(nMsg.peer, chanAnn) err = d.cfg.SendToPeer(nMsg.peer, chanAnn)