From c1175dcabed50e6b9fb8a2ba6a5e8c43b85ac72a Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Tue, 14 Dec 2021 03:02:28 +0800 Subject: [PATCH 01/11] discovery: add verbose network messages related logs --- discovery/gossiper.go | 60 +++++++++++++++++++++++++++++++++++---- discovery/sync_manager.go | 4 +++ 2 files changed, 59 insertions(+), 5 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 89b72ca32..e96c1bfb9 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -757,6 +757,8 @@ func (d *deDupedAnnouncements) reset() { // and the set of senders is updated to reflect which node sent us this // message. func (d *deDupedAnnouncements) addMsg(message networkMsg) { + log.Tracef("Adding network message: %v to batch", message.msg.MsgType()) + // Depending on the message type (channel announcement, channel update, // or node announcement), the message is added to the corresponding map // in deDupedAnnouncements. Because each identifying key can have at @@ -806,6 +808,10 @@ func (d *deDupedAnnouncements) addMsg(message networkMsg) { // If we already had this message with a strictly newer // timestamp, then we'll just discard the message we got. if oldTimestamp > msg.Timestamp { + log.Debugf("Ignored outdated network message: "+ + "peer=%v, source=%x, msg=%s, ", message.peer, + message.source.SerializeCompressed(), + msg.MsgType()) return } @@ -1025,6 +1031,9 @@ func (d *AuthenticatedGossiper) networkHandler() { // sub-systems below us, then craft, sign, and broadcast a new // ChannelUpdate for the set of affected clients. case policyUpdate := <-d.chanPolicyUpdates: + log.Tracef("Received channel %d policy update requests", + len(policyUpdate.edgesToUpdate)) + // First, we'll now create new fully signed updates for // the affected channels and also update the underlying // graph with the new state. @@ -1044,6 +1053,13 @@ func (d *AuthenticatedGossiper) networkHandler() { announcements.AddMsgs(newChanUpdates...) case announcement := <-d.networkMsgs: + log.Tracef("Received network message: "+ + "peer=%v, source=%x, msg=%s, is_remote=%v", + announcement.peer, + announcement.source.SerializeCompressed(), + announcement.msg.MsgType(), + announcement.isRemote) + // We should only broadcast this message forward if it // originated from us or it wasn't received as part of // our initial historical sync. @@ -1057,6 +1073,11 @@ func (d *AuthenticatedGossiper) networkHandler() { emittedAnnouncements, _ := d.processNetworkAnnouncement( announcement, ) + log.Debugf("Processed network message %s, "+ + "returned len(announcements)=%v", + announcement.msg.MsgType(), + len(emittedAnnouncements)) + if emittedAnnouncements != nil { announcements.AddMsgs( emittedAnnouncements..., @@ -1111,6 +1132,13 @@ func (d *AuthenticatedGossiper) networkHandler() { announcement, ) + log.Tracef("Processed network message %s, "+ + "returned len(announcements)=%v, "+ + "allowDependents=%v", + announcement.msg.MsgType(), + len(emittedAnnouncements), + allowDependents) + // If this message had any dependencies, then // we can now signal them to continue. validationBarrier.SignalDependants( @@ -1589,6 +1617,10 @@ func (d *AuthenticatedGossiper) addNode(msg *lnwire.NodeAnnouncement, func (d *AuthenticatedGossiper) processNetworkAnnouncement( nMsg *networkMsg) ([]networkMsg, bool) { + log.Debugf("Processing network message: peer=%v, source=%x, msg=%s, "+ + "is_remote=%v", nMsg.peer, nMsg.source.SerializeCompressed(), + nMsg.msg.MsgType(), nMsg.isRemote) + isPremature := func(chanID lnwire.ShortChannelID, delta uint32) bool { // TODO(roasbeef) make height delta 6 // * or configurable @@ -1616,17 +1648,25 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( // newer update for this node so we can skip validating // signatures if not required. if d.cfg.Router.IsStaleNode(msg.NodeID, timestamp) { + log.Debugf("Skipped processing stale node: %x", + msg.NodeID) nMsg.err <- nil return nil, true } if err := d.addNode(msg, schedulerOp...); err != nil { - if routing.IsError(err, routing.ErrOutdated, - routing.ErrIgnored) { + log.Debugf("Adding node: %x got error: %v", + msg.NodeID, err) - log.Debug(err) - } else if err != routing.ErrVBarrierShuttingDown { + switch { + case routing.IsError(err, routing.ErrOutdated, + routing.ErrIgnored): + + case err == routing.ErrVBarrierShuttingDown: + + default: log.Error(err) + } nMsg.err <- err @@ -1817,7 +1857,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( log.Debugf("Router rejected channel "+ "edge: %v", err) } else { - log.Tracef("Router rejected channel "+ + log.Debugf("Router rejected channel "+ "edge: %v", err) key := newRejectCacheKey( @@ -1943,6 +1983,12 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( if d.cfg.Router.IsStaleEdgePolicy( msg.ShortChannelID, timestamp, msg.ChannelFlags, ) { + + log.Debugf("Ignored stale edge policy: peer=%v, "+ + "source=%x, msg=%s, is_remote=%v", nMsg.peer, + nMsg.source.SerializeCompressed(), + nMsg.msg.MsgType(), nMsg.isRemote) + nMsg.err <- nil return nil, true } @@ -2166,6 +2212,10 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( chanInfo, msg.ChannelFlags, ) + log.Debugf("The message %v has no AuthProof, sending "+ + "the update to remote peer %x", + msg.MsgType(), remotePubKey) + // Now, we'll attempt to send the channel update message // reliably to the remote peer in the background, so // that we don't block if the peer happens to be offline diff --git a/discovery/sync_manager.go b/discovery/sync_manager.go index 68c277c6a..e5edb7744 100644 --- a/discovery/sync_manager.go +++ b/discovery/sync_manager.go @@ -492,6 +492,10 @@ func (m *SyncManager) createGossipSyncer(peer lnpeer.Peer) *GossipSyncer { // handle any sync transitions. s.setSyncState(chansSynced) s.setSyncType(PassiveSync) + + log.Debugf("Created new GossipSyncer[state=%s type=%s] for peer=%v", + s.syncState(), s.SyncType(), peer) + return s } From eb5e32f22167fadbe4cf8faa2fb80ee2419722d1 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Tue, 14 Dec 2021 03:11:05 +0800 Subject: [PATCH 02/11] chainntnfs: add more verbose logs for txnotifier --- chainntnfs/txnotifier.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/chainntnfs/txnotifier.go b/chainntnfs/txnotifier.go index 7a328e77c..b7fc638af 100644 --- a/chainntnfs/txnotifier.go +++ b/chainntnfs/txnotifier.go @@ -614,8 +614,8 @@ func (n *TxNotifier) RegisterConf(txid *chainhash.Hash, pkScript []byte, if err == nil { if hint > startHeight { Log.Debugf("Using height hint %d retrieved from cache "+ - "for %v instead of %d", hint, ntfn.ConfRequest, - startHeight) + "for %v instead of %d for conf subscription", + hint, ntfn.ConfRequest, startHeight) startHeight = hint } } else if err != ErrConfirmHintNotFound { @@ -1009,8 +1009,8 @@ func (n *TxNotifier) RegisterSpend(outpoint *wire.OutPoint, pkScript []byte, if err == nil { if hint > startHeight { Log.Debugf("Using height hint %d retrieved from cache "+ - "for %v instead of %d", hint, ntfn.SpendRequest, - startHeight) + "for %v instead of %d for spend subscription", + hint, ntfn.SpendRequest, startHeight) startHeight = hint } } else if err != ErrSpendHintNotFound { From 6f4a9d86613dda0b64b8884f5a3a4d7ad2dc4083 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Tue, 14 Dec 2021 03:12:00 +0800 Subject: [PATCH 03/11] funding: add String method for channelOpeningState The newly added String method can be helpful in reading logs. --- funding/manager.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/funding/manager.go b/funding/manager.go index 031b9e770..2a7b9bdb9 100644 --- a/funding/manager.go +++ b/funding/manager.go @@ -553,6 +553,19 @@ const ( addedToRouterGraph ) +func (c channelOpeningState) String() string { + switch c { + case markedOpen: + return "markedOpen" + case fundingLockedSent: + return "fundingLocked" + case addedToRouterGraph: + return "addedToRouterGraph" + default: + return "unknown" + } +} + // NewFundingManager creates and initializes a new instance of the // fundingManager. func NewFundingManager(cfg Config) (*Manager, error) { From 1c3cabee7f6a97ed078496a5dc7e4ba0f284618f Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Tue, 14 Dec 2021 03:15:04 +0800 Subject: [PATCH 04/11] funding: add explicit log to avoid confusion --- funding/manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/funding/manager.go b/funding/manager.go index 2a7b9bdb9..a17b33e96 100644 --- a/funding/manager.go +++ b/funding/manager.go @@ -2786,7 +2786,7 @@ func (f *Manager) annAfterSixConfs(completeChan *channeldb.OpenChannel, } log.Debugf("Channel with ChannelPoint(%v), short_chan_id=%v "+ - "announced", &fundingPoint, shortChanID) + "sent to gossiper", &fundingPoint, shortChanID) } return nil From 2250cb752bea24ff63619b3c1530eb96794f3d5a Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Tue, 14 Dec 2021 03:02:41 +0800 Subject: [PATCH 05/11] discovery: shorten mutex locking closure --- discovery/gossiper.go | 9 +++++++-- discovery/reliable_sender.go | 11 ++++++++--- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index e96c1bfb9..690b5a564 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -1822,8 +1822,10 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( // making decisions based on this DB state, before it // writes to the DB. d.channelMtx.Lock(msg.ShortChannelID.ToUint64()) - defer d.channelMtx.Unlock(msg.ShortChannelID.ToUint64()) - if err := d.cfg.Router.AddEdge(edge, schedulerOp...); err != nil { + err := d.cfg.Router.AddEdge(edge, schedulerOp...) + if err != nil { + defer d.channelMtx.Unlock(msg.ShortChannelID.ToUint64()) + // If the edge was rejected due to already being known, // then it may be that case that this new message has a // fresh channel proof, so we'll check. @@ -1871,6 +1873,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( return nil, false } + // If err is nil, release the lock immediately. + d.channelMtx.Unlock(msg.ShortChannelID.ToUint64()) + // If we earlier received any ChannelUpdates for this channel, // we can now process them, as the channel is added to the // graph. diff --git a/discovery/reliable_sender.go b/discovery/reliable_sender.go index 882eeebee..b22a15ae1 100644 --- a/discovery/reliable_sender.go +++ b/discovery/reliable_sender.go @@ -131,10 +131,10 @@ spawnHandler: // spawnPeerMsgHandler spawns a peerHandler for the given peer if there isn't // one already active. The boolean returned signals whether there was already // one active or not. -func (s *reliableSender) spawnPeerHandler(peerPubKey [33]byte) (peerManager, bool) { - s.activePeersMtx.Lock() - defer s.activePeersMtx.Unlock() +func (s *reliableSender) spawnPeerHandler( + peerPubKey [33]byte) (peerManager, bool) { + s.activePeersMtx.Lock() msgHandler, ok := s.activePeers[peerPubKey] if !ok { msgHandler = peerManager{ @@ -142,7 +142,12 @@ func (s *reliableSender) spawnPeerHandler(peerPubKey [33]byte) (peerManager, boo done: make(chan struct{}), } s.activePeers[peerPubKey] = msgHandler + } + s.activePeersMtx.Unlock() + // If this is a newly initiated peerManager, we will create a + // peerHandler. + if !ok { s.wg.Add(1) go s.peerHandler(msgHandler, peerPubKey) } From c15c8a1f0be7f585387e94fe722d6ed17f357dec Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Sat, 27 Nov 2021 05:13:23 +0800 Subject: [PATCH 06/11] discovery: transit all inactive syncers when needed --- discovery/sync_manager.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/discovery/sync_manager.go b/discovery/sync_manager.go index e5edb7744..0535bac4b 100644 --- a/discovery/sync_manager.go +++ b/discovery/sync_manager.go @@ -398,6 +398,13 @@ func (m *SyncManager) syncerHandler() { continue } + // We may not even have enough inactive syncers to be + // transitted. In that case, we will transit all the + // inactive syncers. + if len(m.inactiveSyncers) < numActiveLeft { + numActiveLeft = len(m.inactiveSyncers) + } + log.Debugf("Attempting to transition %v passive "+ "GossipSyncers to active", numActiveLeft) From dd74486b592b728d5c4c83d928b583833c9227ed Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Mon, 13 Dec 2021 08:25:34 +0800 Subject: [PATCH 07/11] routing+discovery: uniform error codes in routing --- discovery/gossiper.go | 30 +++++++++++++++++------------- routing/errors.go | 9 +++++++++ routing/router.go | 12 +++++++++--- routing/validation_barrier.go | 18 ++++-------------- routing/validation_barrier_test.go | 8 ++++++-- 5 files changed, 45 insertions(+), 32 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 690b5a564..5efafde33 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -1113,8 +1113,11 @@ func (d *AuthenticatedGossiper) networkHandler() { announcement.msg, ) if err != nil { - if err != routing.ErrVBarrierShuttingDown && - err != routing.ErrParentValidationFailed { + if !routing.IsError( + err, + routing.ErrVBarrierShuttingDown, + routing.ErrParentValidationFailed, + ) { log.Warnf("unexpected error "+ "during validation "+ "barrier shutdown: %v", @@ -1658,15 +1661,13 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( log.Debugf("Adding node: %x got error: %v", msg.NodeID, err) - switch { - case routing.IsError(err, routing.ErrOutdated, - routing.ErrIgnored): - - case err == routing.ErrVBarrierShuttingDown: - - default: + if !routing.IsError( + err, + routing.ErrOutdated, + routing.ErrIgnored, + routing.ErrVBarrierShuttingDown, + ) { log.Error(err) - } nMsg.err <- err @@ -2188,11 +2189,14 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( } if err := d.cfg.Router.UpdateEdge(update, schedulerOp...); err != nil { - if routing.IsError(err, routing.ErrOutdated, - routing.ErrIgnored) { + if routing.IsError( + err, routing.ErrOutdated, + routing.ErrIgnored, + routing.ErrVBarrierShuttingDown, + ) { log.Debug(err) - } else if err != routing.ErrVBarrierShuttingDown { + } else { key := newRejectCacheKey( msg.ShortChannelID.ToUint64(), nMsg.peer.PubKey(), diff --git a/routing/errors.go b/routing/errors.go index a5c7fd3ca..2511757bc 100644 --- a/routing/errors.go +++ b/routing/errors.go @@ -28,6 +28,15 @@ const ( // ErrInvalidFundingOutput is returned if the channle funding output // fails validation. ErrInvalidFundingOutput + + // ErrVBarrierShuttingDown signals that the barrier has been requested + // to shutdown, and that the caller should not treat the wait condition + // as fulfilled. + ErrVBarrierShuttingDown + + // ErrParentValidationFailed signals that the validation of a + // dependent's parent failed, so the dependent must not be processed. + ErrParentValidationFailed ) // routerError is a structure that represent the error inside the routing package, diff --git a/routing/router.go b/routing/router.go index 5baf6eccc..36d098fc4 100644 --- a/routing/router.go +++ b/routing/router.go @@ -1062,13 +1062,19 @@ func (r *ChannelRouter) networkHandler() { update.msg, ) if err != nil { - switch err { - case ErrVBarrierShuttingDown: + switch { + case IsError( + err, ErrVBarrierShuttingDown, + ): update.err <- err - case ErrParentValidationFailed: + + case IsError( + err, ErrParentValidationFailed, + ): update.err <- newErrf( ErrIgnored, err.Error(), ) + default: log.Warnf("unexpected error "+ "during validation "+ diff --git a/routing/validation_barrier.go b/routing/validation_barrier.go index c8c5e7b34..17d378826 100644 --- a/routing/validation_barrier.go +++ b/routing/validation_barrier.go @@ -1,7 +1,6 @@ package routing import ( - "errors" "sync" "github.com/lightningnetwork/lnd/channeldb" @@ -9,17 +8,6 @@ import ( "github.com/lightningnetwork/lnd/routing/route" ) -var ( - // ErrVBarrierShuttingDown signals that the barrier has been requested - // to shutdown, and that the caller should not treat the wait condition - // as fulfilled. - ErrVBarrierShuttingDown = errors.New("validation barrier shutting down") - - // ErrParentValidationFailed signals that the validation of a - // dependent's parent failed, so the dependent must not be processed. - ErrParentValidationFailed = errors.New("parent validation failed") -) - // validationSignals contains two signals which allows the ValidationBarrier to // communicate back to the caller whether a dependent should be processed or not // based on whether its parent was successfully validated. Only one of these @@ -228,9 +216,11 @@ func (v *ValidationBarrier) WaitForDependants(job interface{}) error { if ok { select { case <-v.quit: - return ErrVBarrierShuttingDown + return newErrf(ErrVBarrierShuttingDown, + "validation barrier shutting down") case <-signals.deny: - return ErrParentValidationFailed + return newErrf(ErrParentValidationFailed, + "parent validation failed") case <-signals.allow: return nil } diff --git a/routing/validation_barrier_test.go b/routing/validation_barrier_test.go index 248c1dc20..2eda0120f 100644 --- a/routing/validation_barrier_test.go +++ b/routing/validation_barrier_test.go @@ -141,14 +141,18 @@ func TestValidationBarrierQuit(t *testing.T) { switch { // First half should return without failure. - case i < numTasks/4 && err != routing.ErrParentValidationFailed: + case i < numTasks/4 && !routing.IsError( + err, routing.ErrParentValidationFailed, + ): t.Fatalf("unexpected failure while waiting: %v", err) case i >= numTasks/4 && i < numTasks/2 && err != nil: t.Fatalf("unexpected failure while waiting: %v", err) // Last half should return the shutdown error. - case i >= numTasks/2 && err != routing.ErrVBarrierShuttingDown: + case i >= numTasks/2 && !routing.IsError( + err, routing.ErrVBarrierShuttingDown, + ): t.Fatalf("expected failure after quitting: want %v, "+ "got %v", routing.ErrVBarrierShuttingDown, err) } From 8d0cae5e187ccf00ea5d17881e17863fcfd766ae Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Tue, 14 Dec 2021 03:29:34 +0800 Subject: [PATCH 08/11] discovery: sync blocks in a dedicated goroutine This commit moves syncing blocks into a dedicated goroutine to avoid the race condition where several go channels are ready and the block height update is pushed after a network message is processed. --- discovery/gossiper.go | 57 ++++++++++++++++++++++++++++--------------- 1 file changed, 37 insertions(+), 20 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 5efafde33..57eb12302 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -514,12 +514,48 @@ func (d *AuthenticatedGossiper) start() error { d.syncMgr.Start() - d.wg.Add(1) + // Start receiving blocks in its dedicated goroutine. + d.wg.Add(2) + go d.syncBlockHeight() go d.networkHandler() return nil } +// syncBlockHeight syncs the best block height for the gossiper by reading +// blockEpochs. +// +// NOTE: must be run as a goroutine. +func (d *AuthenticatedGossiper) syncBlockHeight() { + defer d.wg.Done() + + for { + select { + // A new block has arrived, so we can re-process the previously + // premature announcements. + case newBlock, ok := <-d.blockEpochs.Epochs: + // If the channel has been closed, then this indicates + // the daemon is shutting down, so we exit ourselves. + if !ok { + return + } + + // Once a new block arrives, we update our running + // track of the height of the chain tip. + d.Lock() + blockHeight := uint32(newBlock.Height) + d.bestHeight = blockHeight + d.Unlock() + + log.Debugf("New block: height=%d, hash=%s", blockHeight, + newBlock.Hash) + + case <-d.quit: + return + } + } +} + // Stop signals any active goroutines for a graceful closure. func (d *AuthenticatedGossiper) Stop() error { d.stopped.Do(func() { @@ -1166,25 +1202,6 @@ func (d *AuthenticatedGossiper) networkHandler() { }() - // A new block has arrived, so we can re-process the previously - // premature announcements. - case newBlock, ok := <-d.blockEpochs.Epochs: - // If the channel has been closed, then this indicates - // the daemon is shutting down, so we exit ourselves. - if !ok { - return - } - - // Once a new block arrives, we update our running - // track of the height of the chain tip. - d.Lock() - blockHeight := uint32(newBlock.Height) - d.bestHeight = blockHeight - d.Unlock() - - log.Debugf("New block: height=%d, hash=%s", blockHeight, - newBlock.Hash) - // The trickle timer has ticked, which indicates we should // flush to the network the pending batch of new announcements // we've received since the last trickle tick. From 17938b08ac8f76e37e2d5436b63e724e428da9ac Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Fri, 3 Dec 2021 15:20:38 +0800 Subject: [PATCH 09/11] discovery: resend premature messages when new blocks arrive This commit adds a method to resend premature when new blocks arrive. Previously when a message has specified a block+delta in the future, we would ignore the message and never process it again, causing an open channel never being broadcast under fast blocks generation. This commit fixes it by saving the future messages and resending them once the required block height is reached. --- discovery/gossiper.go | 121 +++++++++++++++++++++++++++++++++++++----- 1 file changed, 109 insertions(+), 12 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 57eb12302..c3a25fa07 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -360,6 +360,13 @@ type AuthenticatedGossiper struct { // networkHandler. networkMsgs chan *networkMsg + // futureMsgs is a list of premature network messages that have a block + // height specified in the future. We will save them and resend it to + // the chan networkMsgs once the block height has reached. The cached + // map format is, + // {blockHeight: [msg1, msg2, ...], ...} + futureMsgs *lru.Cache + // chanPolicyUpdates is a channel that requests to update the // forwarding policy of a set of channels is sent over. chanPolicyUpdates chan *chanPolicyUpdateRequest @@ -415,6 +422,7 @@ func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper selfKeyLoc: selfKeyDesc.KeyLocator, cfg: &cfg, networkMsgs: make(chan *networkMsg), + futureMsgs: lru.NewCache(maxPrematureUpdates), quit: make(chan struct{}), chanPolicyUpdates: make(chan *chanPolicyUpdateRequest), prematureChannelUpdates: lru.NewCache(maxPrematureUpdates), @@ -550,12 +558,46 @@ func (d *AuthenticatedGossiper) syncBlockHeight() { log.Debugf("New block: height=%d, hash=%s", blockHeight, newBlock.Hash) + // Resend future messages, if any. + d.resendFutureMessages(blockHeight) + case <-d.quit: return } } } +// resendFutureMessages takes a block height, resends all the future messages +// found at that height and deletes those messages found in the gossiper's +// futureMsgs. +func (d *AuthenticatedGossiper) resendFutureMessages(height uint32) { + result, err := d.futureMsgs.Get(height) + + // Return early if no messages found. + if err == cache.ErrElementNotFound { + return + } + + // The error must nil, we will log an error and exit. + if err != nil { + log.Errorf("Reading future messages got error: %v", err) + return + } + + msgs := result.(*cachedNetworkMsg).msgs + + log.Debugf("Resending %d network messages at height %d", + len(msgs), height) + + for _, msg := range msgs { + select { + case d.networkMsgs <- msg: + case <-d.quit: + msg.err <- ErrGossiperShuttingDown + } + } +} + // Stop signals any active goroutines for a graceful closure. func (d *AuthenticatedGossiper) Stop() error { d.stopped.Do(func() { @@ -1627,6 +1669,64 @@ func (d *AuthenticatedGossiper) addNode(msg *lnwire.NodeAnnouncement, return d.cfg.Router.AddNode(node, op...) } +// isPremature decides whether a given network message has a block height+delta +// value specified in the future. If so, the message will be added to the +// future message map and be processed when the block height as reached. +// +// NOTE: must be used inside a lock. +func (d *AuthenticatedGossiper) isPremature(chanID lnwire.ShortChannelID, + delta uint32, msg *networkMsg) bool { + // TODO(roasbeef) make height delta 6 + // * or configurable + + msgHeight := chanID.BlockHeight + delta + + // The message height is smaller or equal to our best known height, + // thus the message is mature. + if msgHeight <= d.bestHeight { + return false + } + + // Add the premature message to our future messages which will + // be resent once the block height has reached. + // + // Init an empty cached message and overwrite it if there are cached + // messages found. + cachedMsgs := &cachedNetworkMsg{ + msgs: make([]*networkMsg, 0), + } + + result, err := d.futureMsgs.Get(msgHeight) + // No error returned means we have old messages cached. + if err == nil { + cachedMsgs = result.(*cachedNetworkMsg) + } + + // Copy the networkMsgs since the old message's err chan will + // be consumed. + copied := &networkMsg{ + peer: msg.peer, + source: msg.source, + msg: msg.msg, + optionalMsgFields: msg.optionalMsgFields, + isRemote: msg.isRemote, + err: make(chan error, 1), + } + + // Add the network message. + cachedMsgs.msgs = append(cachedMsgs.msgs, copied) + _, err = d.futureMsgs.Put(msgHeight, cachedMsgs) + if err != nil { + log.Errorf("Adding future message got error: %v", err) + } + + log.Debugf("Network message: %v added to future messages for "+ + "msgHeight=%d, bestHeight=%d", msg.msg.MsgType(), + msgHeight, d.bestHeight) + + return true +} + // processNetworkAnnouncement processes a new network relate authenticated // channel or node announcement or announcements proofs. If the announcement // didn't affect the internal state due to either being out of date, invalid, @@ -1641,12 +1741,6 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( "is_remote=%v", nMsg.peer, nMsg.source.SerializeCompressed(), nMsg.msg.MsgType(), nMsg.isRemote) - isPremature := func(chanID lnwire.ShortChannelID, delta uint32) bool { - // TODO(roasbeef) make height delta 6 - // * or configurable - return chanID.BlockHeight+delta > d.bestHeight - } - // If this is a remote update, we set the scheduler option to lazily // add it to the graph. var schedulerOp []batch.SchedulerOption @@ -1745,8 +1839,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( // If the advertised inclusionary block is beyond our knowledge // of the chain tip, then we'll ignore for it now. d.Lock() - if nMsg.isRemote && isPremature(msg.ShortChannelID, 0) { - log.Infof("Announcement for chan_id=(%v), is "+ + if nMsg.isRemote && d.isPremature(msg.ShortChannelID, 0, nMsg) { + log.Warnf("Announcement for chan_id=(%v), is "+ "premature: advertises height %v, only "+ "height %v is known", msg.ShortChannelID.ToUint64(), @@ -1987,8 +2081,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( // of the chain tip, then we'll put the announcement in limbo // to be fully verified once we advance forward in the chain. d.Lock() - if nMsg.isRemote && isPremature(msg.ShortChannelID, 0) { - log.Infof("Update announcement for "+ + if nMsg.isRemote && d.isPremature(msg.ShortChannelID, 0, nMsg) { + log.Warnf("Update announcement for "+ "short_chan_id(%v), is premature: advertises "+ "height %v, only height %v is known", shortChanID, blockHeight, @@ -2293,8 +2387,11 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( // registered in bitcoin blockchain. Therefore, we check if the // proof is premature. d.Lock() - if isPremature(msg.ShortChannelID, d.cfg.ProofMatureDelta) { - log.Infof("Premature proof announcement, current "+ + premature := d.isPremature( + msg.ShortChannelID, d.cfg.ProofMatureDelta, nMsg, + ) + if premature { + log.Warnf("Premature proof announcement, current "+ "block height lower than needed: %v < %v", d.bestHeight, needBlockHeight) d.Unlock() From ead414c68960c7e7211d130644af0ffa5636dba0 Mon Sep 17 00:00:00 2001 From: eugene Date: Wed, 24 Nov 2021 11:36:57 -0500 Subject: [PATCH 10/11] discovery: use source instead of peer for accurate rejectCache --- discovery/gossiper.go | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index c3a25fa07..7ec7166f9 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -309,6 +309,14 @@ func newRejectCacheKey(cid uint64, pub [33]byte) rejectCacheKey { return k } +// sourceToPub returns a serialized-compressed public key for use in the reject +// cache. +func sourceToPub(pk *btcec.PublicKey) [33]byte { + var pub [33]byte + copy(pub[:], pk.SerializeCompressed()) + return pub +} + // cachedReject is the empty value used to track the value for rejects. type cachedReject struct { } @@ -1167,7 +1175,8 @@ func (d *AuthenticatedGossiper) networkHandler() { // If this message was recently rejected, then we won't // attempt to re-process it. if announcement.isRemote && d.isRecentlyRejectedMsg( - announcement.msg, announcement.peer.PubKey(), + announcement.msg, + sourceToPub(announcement.source), ) { announcement.err <- fmt.Errorf("recently " + "rejected") @@ -1828,7 +1837,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( key := newRejectCacheKey( msg.ShortChannelID.ToUint64(), - nMsg.peer.PubKey(), + sourceToPub(nMsg.source), ) _, _ = d.recentRejects.Put(key, &cachedReject{}) @@ -1871,7 +1880,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( key := newRejectCacheKey( msg.ShortChannelID.ToUint64(), - nMsg.peer.PubKey(), + sourceToPub(nMsg.source), ) _, _ = d.recentRejects.Put(key, &cachedReject{}) @@ -1949,7 +1958,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( key := newRejectCacheKey( msg.ShortChannelID.ToUint64(), - nMsg.peer.PubKey(), + sourceToPub(nMsg.source), ) _, _ = d.recentRejects.Put(key, &cachedReject{}) @@ -1976,7 +1985,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( key := newRejectCacheKey( msg.ShortChannelID.ToUint64(), - nMsg.peer.PubKey(), + sourceToPub(nMsg.source), ) _, _ = d.recentRejects.Put(key, &cachedReject{}) } @@ -2066,7 +2075,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( key := newRejectCacheKey( msg.ShortChannelID.ToUint64(), - nMsg.peer.PubKey(), + sourceToPub(nMsg.source), ) _, _ = d.recentRejects.Put(key, &cachedReject{}) @@ -2198,7 +2207,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( key := newRejectCacheKey( msg.ShortChannelID.ToUint64(), - nMsg.peer.PubKey(), + sourceToPub(nMsg.source), ) _, _ = d.recentRejects.Put(key, &cachedReject{}) @@ -2310,7 +2319,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( } else { key := newRejectCacheKey( msg.ShortChannelID.ToUint64(), - nMsg.peer.PubKey(), + sourceToPub(nMsg.source), ) _, _ = d.recentRejects.Put(key, &cachedReject{}) From c6190230077d8b7b41c506ff9d5a00990f83ccb3 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Sat, 4 Dec 2021 16:25:39 +0800 Subject: [PATCH 11/11] docs: add release note for premature msg fix --- docs/release-notes/release-notes-0.15.0.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/release-notes/release-notes-0.15.0.md b/docs/release-notes/release-notes-0.15.0.md index 53421658e..be243fd37 100644 --- a/docs/release-notes/release-notes-0.15.0.md +++ b/docs/release-notes/release-notes-0.15.0.md @@ -18,6 +18,11 @@ due to the cancel signal is processed before the creation. It is now properly handled by moving creation before deletion. +* When the block height+delta specified by a network message is greater than + the gossiper's best height, it will be considered as premature and ignored. + [These premature messages are now saved into a cache and processed once the + height has reached.](https://github.com/lightningnetwork/lnd/pull/6054) + ## Misc * [An example systemd service file](https://github.com/lightningnetwork/lnd/pull/6033)