diff --git a/peer/brontide.go b/peer/brontide.go index 38c60e357..aeae14144 100644 --- a/peer/brontide.go +++ b/peer/brontide.go @@ -39,6 +39,7 @@ import ( "github.com/lightningnetwork/lnd/netann" "github.com/lightningnetwork/lnd/pool" "github.com/lightningnetwork/lnd/queue" + "github.com/lightningnetwork/lnd/subscribe" "github.com/lightningnetwork/lnd/ticker" "github.com/lightningnetwork/lnd/watchtower/wtclient" ) @@ -592,6 +593,18 @@ func (p *Brontide) Start() error { p.log.Debugf("Loaded %v active channels from database", len(activeChans)) + // Subscribe channel events before loading channels so we won't miss + // events. This subscription is used to listen to active channel event + // when reenabling channels. Once the reenabling process is finished, + // this subscription will be canceled. + // + // NOTE: ChannelNotifier must be started before subscribing events + // otherwise we'd panic here. + sub, err := p.cfg.ChannelNotifier.SubscribeChannelEvents() + if err != nil { + return fmt.Errorf("SubscribeChannelEvents failed: %w", err) + } + msgs, err := p.loadActiveChannels(activeChans) if err != nil { return fmt.Errorf("unable to load channels: %v", err) @@ -603,7 +616,7 @@ func (p *Brontide) Start() error { go p.queueHandler() go p.writeHandler() go p.readHandler() - go p.channelManager() + go p.channelManager(sub) go p.pingHandler() // Signal to any external processes that the peer is now active. @@ -2311,7 +2324,7 @@ func (p *Brontide) genDeliveryScript() ([]byte, error) { // channels maintained with the remote peer. // // NOTE: This method MUST be run as a goroutine. -func (p *Brontide) channelManager() { +func (p *Brontide) channelManager(client *subscribe.Client) { defer p.wg.Done() // reenableTimeout will fire once after the configured channel status @@ -2472,7 +2485,7 @@ out: // TODO(conner): consolidate reenables timers inside chan status // manager case <-reenableTimeout: - p.reenableActiveChannels() + p.reenableActiveChannels(client) // Since this channel will never fire again during the // lifecycle of the peer, we nil the channel to mark it @@ -2482,6 +2495,11 @@ out: // select will ignore this case entirely. reenableTimeout = nil + // Once the reenabling is attempted, we also cancel the + // channel event subscription to free up the overflow + // queue used in channel notifier. + client.Cancel() + case <-p.quit: // As, we've been signalled to exit, we'll reset all // our active channel back to their default state. @@ -2505,24 +2523,55 @@ out: // peer, and reenables each public, non-pending channel. This is done at the // gossip level by broadcasting a new ChannelUpdate with the disabled bit unset. // No message will be sent if the channel is already enabled. -func (p *Brontide) reenableActiveChannels() { +func (p *Brontide) reenableActiveChannels(client *subscribe.Client) { // First, filter all known channels with this peer for ones that are // both public and not pending. activePublicChans := p.filterChannelsToEnable() + // Create a map to hold channels that needs to be retried. + retryChans := make(map[wire.OutPoint]struct{}, len(activePublicChans)) + // For each of the public, non-pending channels, set the channel // disabled bit to false and send out a new ChannelUpdate. If this // channel is already active, the update won't be sent. for _, chanPoint := range activePublicChans { err := p.cfg.ChanStatusMgr.RequestEnable(chanPoint, false) - if err == netann.ErrEnableManuallyDisabledChan { - p.log.Debugf("Channel(%v) was manually disabled, ignoring "+ - "automatic enable request", chanPoint) - } else if err != nil { - p.log.Errorf("Unable to enable channel %v: %v", - chanPoint, err) + + switch { + // No error occurred, continue to request the next channel. + case err == nil: + continue + + // Cannot auto enable a manually disabled channel so we do + // nothing but proceed to the next channel. + case errors.Is(err, netann.ErrEnableManuallyDisabledChan): + p.log.Debugf("Channel(%v) was manually disabled, "+ + "ignoring automatic enable request", chanPoint) + + continue + + // If the channel is reported as inactive, we will give it + // another chance. When handling the request, ChanStatusManager + // will check whether the link is active or not. One of the + // conditions is whether the link has been marked as + // reestablished, which happens inside a goroutine(htlcManager) + // after the link is started. And we may get a false negative + // saying the link is not active because that goroutine hasn't + // reached the line to mark the reestablishment. Thus we give + // it a second chance to send the request. + case errors.Is(err, netann.ErrEnableInactiveChan): + p.log.Warnf("Channel(%v) cannot be enabled as " + + "ChanStatusManager reported inactive, retrying") + + // Add the channel to the retry map. + retryChans[chanPoint] = struct{}{} } } + + // Retry the channels if we have any. + if len(retryChans) != 0 { + p.retryRequestEnable(retryChans, client) + } } // fetchActiveChanCloser attempts to fetch the active chan closer state machine @@ -2639,6 +2688,100 @@ func (p *Brontide) filterChannelsToEnable() []wire.OutPoint { return activePublicChans } +// retryRequestEnable takes a map of channel outpoints and a channel event +// client. It listens to the channel events and removes a channel from the map +// if it's matched to the event. Upon receiving an active channel event, it +// will send the enabling request again. +func (p *Brontide) retryRequestEnable(activeChans map[wire.OutPoint]struct{}, + client *subscribe.Client) { + + p.log.Debugf("Retry enabling %v channels", len(activeChans)) + + // retryEnable is a helper closure that sends an enable request and + // removes the channel from the map if it's matched. + retryEnable := func(chanPoint wire.OutPoint) error { + // If this is an active channel event, check whether it's in + // our targeted channels map. + _, found := activeChans[chanPoint] + + // If this channel is irrelevant, return nil so the loop can + // jump to next iteration. + if !found { + return nil + } + + // Otherwise we've just received an active signal for a channel + // that's previously failed to be enabled, we send the request + // again. + // + // We only give the channel one more shot, so we delete it from + // our map first to keep it from being attempted again. + delete(activeChans, chanPoint) + + // Send the request. + err := p.cfg.ChanStatusMgr.RequestEnable(chanPoint, false) + if err != nil { + return fmt.Errorf("request enabling channel %v "+ + "failed: %w", chanPoint, err) + } + + return nil + } + + for { + // If activeChans is empty, we've done processing all the + // channels. + if len(activeChans) == 0 { + p.log.Debug("Finished retry enabling channels") + return + } + + select { + // A new event has been sent by the ChannelNotifier. We now + // check whether it's an active or inactive channel event. + case e := <-client.Updates(): + // If this is an active channel event, try enable the + // channel then jump to the next iteration. + active, ok := e.(channelnotifier.ActiveChannelEvent) + if ok { + chanPoint := *active.ChannelPoint + + // If we received an error for this particular + // channel, we log an error and won't quit as + // we still want to retry other channels. + if err := retryEnable(chanPoint); err != nil { + p.log.Errorf("Retry failed: %v", err) + } + + continue + } + + // Otherwise check for inactive link event, and jump to + // next iteration if it's not. + inactive, ok := e.(channelnotifier.InactiveLinkEvent) + if !ok { + continue + } + + // Found an inactive link event, if this is our + // targeted channel, remove it from our map. + chanPoint := *inactive.ChannelPoint + _, found := activeChans[chanPoint] + if !found { + continue + } + + delete(activeChans, chanPoint) + p.log.Warnf("Re-enable channel %v failed, received "+ + "inactive link event", chanPoint) + + case <-p.quit: + p.log.Debugf("Peer shutdown during retry enabling") + return + } + } +} + // chooseDeliveryScript takes two optionally set shutdown scripts and returns // a suitable script to close out to. This may be nil if neither script is // set. If both scripts are set, this function will error if they do not match. diff --git a/peer/brontide_test.go b/peer/brontide_test.go index 44e63267b..cf71e0b92 100644 --- a/peer/brontide_test.go +++ b/peer/brontide_test.go @@ -12,6 +12,7 @@ import ( "github.com/btcsuite/btcd/wire" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/channelnotifier" "github.com/lightningnetwork/lnd/contractcourt" "github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/lntest/mock" @@ -1035,6 +1036,14 @@ func TestPeerCustomMessage(t *testing.T) { ConfChan: make(chan *chainntnfs.TxConfirmation), } + // TODO(yy): change ChannelNotifier to be an interface. + channelNotifier := channelnotifier.New(dbAlice.ChannelStateDB()) + require.NoError(t, channelNotifier.Start()) + t.Cleanup(func() { + require.NoError(t, channelNotifier.Stop(), + "stop channel notifier failed") + }) + alicePeer := NewBrontide(Config{ PubKeyBytes: remoteKey, ChannelDB: dbAlice.ChannelStateDB(), @@ -1057,7 +1066,8 @@ func TestPeerCustomMessage(t *testing.T) { } return nil }, - PongBuf: make([]byte, lnwire.MaxPongBytes), + PongBuf: make([]byte, lnwire.MaxPongBytes), + ChannelNotifier: channelNotifier, }) // Set up the init sequence. diff --git a/peer/test_utils.go b/peer/test_utils.go index 41f6d2c0f..223ee119f 100644 --- a/peer/test_utils.go +++ b/peer/test_utils.go @@ -16,6 +16,7 @@ import ( "github.com/btcsuite/btcd/wire" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/channelnotifier" "github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/keychain" @@ -377,6 +378,14 @@ func createTestPeer(t *testing.T, notifier chainntnfs.ChainNotifier, return nil, nil, err } + // TODO(yy): change ChannelNotifier to be an interface. + channelNotifier := channelnotifier.New(dbAlice.ChannelStateDB()) + require.NoError(t, channelNotifier.Start()) + t.Cleanup(func() { + require.NoError(t, channelNotifier.Stop(), + "stop channel notifier failed") + }) + cfg := &Config{ Addr: cfgAddr, PubKeyBytes: pubKey, @@ -392,6 +401,7 @@ func createTestPeer(t *testing.T, notifier chainntnfs.ChainNotifier, ChanStatusMgr: chanStatusMgr, Features: lnwire.NewFeatureVector(nil, lnwire.Features), DisconnectPeer: func(b *btcec.PublicKey) error { return nil }, + ChannelNotifier: channelNotifier, } alicePeer := NewBrontide(*cfg) @@ -400,8 +410,11 @@ func createTestPeer(t *testing.T, notifier chainntnfs.ChainNotifier, chanID := lnwire.NewChanIDFromOutPoint(channelAlice.ChannelPoint()) alicePeer.activeChannels[chanID] = channelAlice + sub, err := cfg.ChannelNotifier.SubscribeChannelEvents() + require.NoError(t, err) + alicePeer.wg.Add(1) - go alicePeer.channelManager() + go alicePeer.channelManager(sub) return alicePeer, channelBob, nil }