discovery: resend premature messages when new blocks arrive

This commit adds a method to resend premature when new blocks arrive.
Previously when a message has specified a block+delta in the future, we
would ignore the message and never process it again, causing an open
channel never being broadcast under fast blocks generation. This commit
fixes it by saving the future messages and resending them once the
required block height is reached.
This commit is contained in:
yyforyongyu 2021-12-03 15:20:38 +08:00
parent 8d0cae5e18
commit 17938b08ac
No known key found for this signature in database
GPG Key ID: 9BCD95C4FF296868

View File

@ -360,6 +360,13 @@ type AuthenticatedGossiper struct {
// networkHandler. // networkHandler.
networkMsgs chan *networkMsg networkMsgs chan *networkMsg
// futureMsgs is a list of premature network messages that have a block
// height specified in the future. We will save them and resend it to
// the chan networkMsgs once the block height has reached. The cached
// map format is,
// {blockHeight: [msg1, msg2, ...], ...}
futureMsgs *lru.Cache
// chanPolicyUpdates is a channel that requests to update the // chanPolicyUpdates is a channel that requests to update the
// forwarding policy of a set of channels is sent over. // forwarding policy of a set of channels is sent over.
chanPolicyUpdates chan *chanPolicyUpdateRequest chanPolicyUpdates chan *chanPolicyUpdateRequest
@ -415,6 +422,7 @@ func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper
selfKeyLoc: selfKeyDesc.KeyLocator, selfKeyLoc: selfKeyDesc.KeyLocator,
cfg: &cfg, cfg: &cfg,
networkMsgs: make(chan *networkMsg), networkMsgs: make(chan *networkMsg),
futureMsgs: lru.NewCache(maxPrematureUpdates),
quit: make(chan struct{}), quit: make(chan struct{}),
chanPolicyUpdates: make(chan *chanPolicyUpdateRequest), chanPolicyUpdates: make(chan *chanPolicyUpdateRequest),
prematureChannelUpdates: lru.NewCache(maxPrematureUpdates), prematureChannelUpdates: lru.NewCache(maxPrematureUpdates),
@ -550,12 +558,46 @@ func (d *AuthenticatedGossiper) syncBlockHeight() {
log.Debugf("New block: height=%d, hash=%s", blockHeight, log.Debugf("New block: height=%d, hash=%s", blockHeight,
newBlock.Hash) newBlock.Hash)
// Resend future messages, if any.
d.resendFutureMessages(blockHeight)
case <-d.quit: case <-d.quit:
return return
} }
} }
} }
// resendFutureMessages takes a block height, resends all the future messages
// found at that height and deletes those messages found in the gossiper's
// futureMsgs.
func (d *AuthenticatedGossiper) resendFutureMessages(height uint32) {
result, err := d.futureMsgs.Get(height)
// Return early if no messages found.
if err == cache.ErrElementNotFound {
return
}
// The error must nil, we will log an error and exit.
if err != nil {
log.Errorf("Reading future messages got error: %v", err)
return
}
msgs := result.(*cachedNetworkMsg).msgs
log.Debugf("Resending %d network messages at height %d",
len(msgs), height)
for _, msg := range msgs {
select {
case d.networkMsgs <- msg:
case <-d.quit:
msg.err <- ErrGossiperShuttingDown
}
}
}
// Stop signals any active goroutines for a graceful closure. // Stop signals any active goroutines for a graceful closure.
func (d *AuthenticatedGossiper) Stop() error { func (d *AuthenticatedGossiper) Stop() error {
d.stopped.Do(func() { d.stopped.Do(func() {
@ -1627,6 +1669,64 @@ func (d *AuthenticatedGossiper) addNode(msg *lnwire.NodeAnnouncement,
return d.cfg.Router.AddNode(node, op...) return d.cfg.Router.AddNode(node, op...)
} }
// isPremature decides whether a given network message has a block height+delta
// value specified in the future. If so, the message will be added to the
// future message map and be processed when the block height as reached.
//
// NOTE: must be used inside a lock.
func (d *AuthenticatedGossiper) isPremature(chanID lnwire.ShortChannelID,
delta uint32, msg *networkMsg) bool {
// TODO(roasbeef) make height delta 6
// * or configurable
msgHeight := chanID.BlockHeight + delta
// The message height is smaller or equal to our best known height,
// thus the message is mature.
if msgHeight <= d.bestHeight {
return false
}
// Add the premature message to our future messages which will
// be resent once the block height has reached.
//
// Init an empty cached message and overwrite it if there are cached
// messages found.
cachedMsgs := &cachedNetworkMsg{
msgs: make([]*networkMsg, 0),
}
result, err := d.futureMsgs.Get(msgHeight)
// No error returned means we have old messages cached.
if err == nil {
cachedMsgs = result.(*cachedNetworkMsg)
}
// Copy the networkMsgs since the old message's err chan will
// be consumed.
copied := &networkMsg{
peer: msg.peer,
source: msg.source,
msg: msg.msg,
optionalMsgFields: msg.optionalMsgFields,
isRemote: msg.isRemote,
err: make(chan error, 1),
}
// Add the network message.
cachedMsgs.msgs = append(cachedMsgs.msgs, copied)
_, err = d.futureMsgs.Put(msgHeight, cachedMsgs)
if err != nil {
log.Errorf("Adding future message got error: %v", err)
}
log.Debugf("Network message: %v added to future messages for "+
"msgHeight=%d, bestHeight=%d", msg.msg.MsgType(),
msgHeight, d.bestHeight)
return true
}
// processNetworkAnnouncement processes a new network relate authenticated // processNetworkAnnouncement processes a new network relate authenticated
// channel or node announcement or announcements proofs. If the announcement // channel or node announcement or announcements proofs. If the announcement
// didn't affect the internal state due to either being out of date, invalid, // didn't affect the internal state due to either being out of date, invalid,
@ -1641,12 +1741,6 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
"is_remote=%v", nMsg.peer, nMsg.source.SerializeCompressed(), "is_remote=%v", nMsg.peer, nMsg.source.SerializeCompressed(),
nMsg.msg.MsgType(), nMsg.isRemote) nMsg.msg.MsgType(), nMsg.isRemote)
isPremature := func(chanID lnwire.ShortChannelID, delta uint32) bool {
// TODO(roasbeef) make height delta 6
// * or configurable
return chanID.BlockHeight+delta > d.bestHeight
}
// If this is a remote update, we set the scheduler option to lazily // If this is a remote update, we set the scheduler option to lazily
// add it to the graph. // add it to the graph.
var schedulerOp []batch.SchedulerOption var schedulerOp []batch.SchedulerOption
@ -1745,8 +1839,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
// If the advertised inclusionary block is beyond our knowledge // If the advertised inclusionary block is beyond our knowledge
// of the chain tip, then we'll ignore for it now. // of the chain tip, then we'll ignore for it now.
d.Lock() d.Lock()
if nMsg.isRemote && isPremature(msg.ShortChannelID, 0) { if nMsg.isRemote && d.isPremature(msg.ShortChannelID, 0, nMsg) {
log.Infof("Announcement for chan_id=(%v), is "+ log.Warnf("Announcement for chan_id=(%v), is "+
"premature: advertises height %v, only "+ "premature: advertises height %v, only "+
"height %v is known", "height %v is known",
msg.ShortChannelID.ToUint64(), msg.ShortChannelID.ToUint64(),
@ -1987,8 +2081,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
// of the chain tip, then we'll put the announcement in limbo // of the chain tip, then we'll put the announcement in limbo
// to be fully verified once we advance forward in the chain. // to be fully verified once we advance forward in the chain.
d.Lock() d.Lock()
if nMsg.isRemote && isPremature(msg.ShortChannelID, 0) { if nMsg.isRemote && d.isPremature(msg.ShortChannelID, 0, nMsg) {
log.Infof("Update announcement for "+ log.Warnf("Update announcement for "+
"short_chan_id(%v), is premature: advertises "+ "short_chan_id(%v), is premature: advertises "+
"height %v, only height %v is known", "height %v, only height %v is known",
shortChanID, blockHeight, shortChanID, blockHeight,
@ -2293,8 +2387,11 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
// registered in bitcoin blockchain. Therefore, we check if the // registered in bitcoin blockchain. Therefore, we check if the
// proof is premature. // proof is premature.
d.Lock() d.Lock()
if isPremature(msg.ShortChannelID, d.cfg.ProofMatureDelta) { premature := d.isPremature(
log.Infof("Premature proof announcement, current "+ msg.ShortChannelID, d.cfg.ProofMatureDelta, nMsg,
)
if premature {
log.Warnf("Premature proof announcement, current "+
"block height lower than needed: %v < %v", "block height lower than needed: %v < %v",
d.bestHeight, needBlockHeight) d.bestHeight, needBlockHeight)
d.Unlock() d.Unlock()