Merge pull request #8497 from ziggie1984/shutdown-bugfix

routing: shutdown chanrouter correctly.
This commit is contained in:
Olaoluwa Osuntokun 2024-08-01 16:48:50 -07:00 committed by GitHub
commit 4a3c4e4ba7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 230 additions and 83 deletions

View File

@ -151,7 +151,12 @@ func (b *BitcoindNotifier) Stop() error {
close(epochClient.epochChan)
}
b.txNotifier.TearDown()
// The txNotifier is only initialized in the start method therefore we
// need to make sure we don't access a nil pointer here.
if b.txNotifier != nil {
b.txNotifier.TearDown()
}
// Stop the mempool notifier.
b.memNotifier.TearDown()

View File

@ -152,7 +152,12 @@ func (n *NeutrinoNotifier) Stop() error {
close(epochClient.epochChan)
}
n.txNotifier.TearDown()
// The txNotifier is only initialized in the start method therefore we
// need to make sure we don't access a nil pointer here.
if n.txNotifier != nil {
n.txNotifier.TearDown()
}
return nil
}

View File

@ -12,7 +12,9 @@ package chanfitness
import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/btcsuite/btcd/wire"
@ -48,6 +50,9 @@ var (
// ChannelEventStore maintains a set of event logs for the node's channels to
// provide insight into the performance and health of channels.
type ChannelEventStore struct {
started atomic.Bool
stopped atomic.Bool
cfg *Config
// peers tracks all of our currently monitored peers and their channels.
@ -142,7 +147,11 @@ func NewChannelEventStore(config *Config) *ChannelEventStore {
// information from the store. If this function fails, it cancels its existing
// subscriptions and returns an error.
func (c *ChannelEventStore) Start() error {
log.Info("ChannelEventStore starting")
log.Info("ChannelEventStore starting...")
if c.started.Swap(true) {
return fmt.Errorf("ChannelEventStore started more than once")
}
// Create a subscription to channel events.
channelClient, err := c.cfg.SubscribeChannelEvents()
@ -198,13 +207,18 @@ func (c *ChannelEventStore) Start() error {
cancel: cancel,
})
log.Debug("ChannelEventStore started")
return nil
}
// Stop terminates all goroutines started by the event store.
func (c *ChannelEventStore) Stop() {
func (c *ChannelEventStore) Stop() error {
log.Info("ChannelEventStore shutting down...")
defer log.Debug("ChannelEventStore shutdown complete")
if c.stopped.Swap(true) {
return fmt.Errorf("ChannelEventStore stopped more than once")
}
// Stop the consume goroutine.
close(c.quit)
@ -212,7 +226,17 @@ func (c *ChannelEventStore) Stop() {
// Stop the ticker after the goroutine reading from it has exited, to
// avoid a race.
c.cfg.FlapCountTicker.Stop()
var err error
if c.cfg.FlapCountTicker == nil {
err = fmt.Errorf("ChannelEventStore FlapCountTicker not " +
"initialized")
} else {
c.cfg.FlapCountTicker.Stop()
}
log.Debugf("ChannelEventStore shutdown complete")
return err
}
// addChannel checks whether we are already tracking a channel's peer, creates a

View File

@ -753,7 +753,12 @@ func (d *AuthenticatedGossiper) stop() {
log.Debug("Authenticated Gossiper is stopping")
defer log.Debug("Authenticated Gossiper stopped")
d.blockEpochs.Cancel()
// `blockEpochs` is only initialized in the start routine so we make
// sure we don't panic here in the case where the `Stop` method is
// called when the `Start` method does not complete.
if d.blockEpochs != nil {
d.blockEpochs.Cancel()
}
d.syncMgr.Stop()

View File

@ -35,6 +35,10 @@
* [Fixed a bug](https://github.com/lightningnetwork/lnd/pull/8896) that caused
LND to use a default fee rate for the batch channel opening flow.
* [Fixed](https://github.com/lightningnetwork/lnd/pull/8497) a case where LND
would not shut down properly when interrupted via e.g. SIGTERM. Moreover, LND
now shutsdown correctly in case one subsystem fails to startup.
* The fee limit for payments [was made
compatible](https://github.com/lightningnetwork/lnd/pull/8941) with inbound

View File

@ -300,6 +300,8 @@ func (b *Builder) Start() error {
b.wg.Add(1)
go b.networkHandler()
log.Debug("Builder started")
return nil
}
@ -312,7 +314,6 @@ func (b *Builder) Stop() error {
}
log.Info("Builder shutting down...")
defer log.Debug("Builder shutdown complete")
// Our filtered chain view could've only been started if
// AssumeChannelValid isn't present.
@ -325,6 +326,8 @@ func (b *Builder) Stop() error {
close(b.quit)
b.wg.Wait()
log.Debug("Builder shutdown complete")
return nil
}

View File

@ -4,6 +4,7 @@ import (
"crypto/sha256"
"fmt"
"sync"
"sync/atomic"
"github.com/go-errors/errors"
"github.com/lightningnetwork/lnd/chainntnfs"
@ -33,6 +34,9 @@ var (
// Settle - routes UpdateFulfillHTLC to the originating link.
// Fail - routes UpdateFailHTLC to the originating link.
type InterceptableSwitch struct {
started atomic.Bool
stopped atomic.Bool
// htlcSwitch is the underline switch
htlcSwitch *Switch
@ -201,6 +205,12 @@ func (s *InterceptableSwitch) SetInterceptor(
}
func (s *InterceptableSwitch) Start() error {
log.Info("InterceptableSwitch starting...")
if s.started.Swap(true) {
return fmt.Errorf("InterceptableSwitch started more than once")
}
blockEpochStream, err := s.notifier.RegisterBlockEpochNtfn(nil)
if err != nil {
return err
@ -217,14 +227,28 @@ func (s *InterceptableSwitch) Start() error {
}
}()
log.Debug("InterceptableSwitch started")
return nil
}
func (s *InterceptableSwitch) Stop() error {
log.Info("InterceptableSwitch shutting down...")
if s.stopped.Swap(true) {
return fmt.Errorf("InterceptableSwitch stopped more than once")
}
close(s.quit)
s.wg.Wait()
s.blockEpochStream.Cancel()
// We need to check whether the start routine run and initialized the
// `blockEpochStream`.
if s.blockEpochStream != nil {
s.blockEpochStream.Cancel()
}
log.Debug("InterceptableSwitch shutdown complete")
return nil
}

View File

@ -572,14 +572,18 @@ func (l *channelLink) Stop() {
}
// Ensure the channel for the timer is drained.
if !l.updateFeeTimer.Stop() {
select {
case <-l.updateFeeTimer.C:
default:
if l.updateFeeTimer != nil {
if !l.updateFeeTimer.Stop() {
select {
case <-l.updateFeeTimer.C:
default:
}
}
}
l.hodlQueue.Stop()
if l.hodlQueue != nil {
l.hodlQueue.Stop()
}
close(l.quit)
l.wg.Wait()

View File

@ -101,6 +101,9 @@ func (r *htlcReleaseEvent) Less(other queue.PriorityQueueItem) bool {
// created by the daemon. The registry is a thin wrapper around a map in order
// to ensure that all updates/reads are thread safe.
type InvoiceRegistry struct {
started atomic.Bool
stopped atomic.Bool
sync.RWMutex
nextClientID uint32 // must be used atomically
@ -213,42 +216,66 @@ func (i *InvoiceRegistry) scanInvoicesOnStart(ctx context.Context) error {
// Start starts the registry and all goroutines it needs to carry out its task.
func (i *InvoiceRegistry) Start() error {
// Start InvoiceExpiryWatcher and prepopulate it with existing active
// invoices.
err := i.expiryWatcher.Start(func(hash lntypes.Hash, force bool) error {
return i.cancelInvoiceImpl(context.Background(), hash, force)
})
var err error
log.Info("InvoiceRegistry starting...")
if i.started.Swap(true) {
return fmt.Errorf("InvoiceRegistry started more than once")
}
// Start InvoiceExpiryWatcher and prepopulate it with existing
// active invoices.
err = i.expiryWatcher.Start(
func(hash lntypes.Hash, force bool) error {
return i.cancelInvoiceImpl(
context.Background(), hash, force,
)
})
if err != nil {
return err
}
log.Info("InvoiceRegistry starting")
i.wg.Add(1)
go i.invoiceEventLoop()
// Now scan all pending and removable invoices to the expiry watcher or
// delete them.
// Now scan all pending and removable invoices to the expiry
// watcher or delete them.
err = i.scanInvoicesOnStart(context.Background())
if err != nil {
_ = i.Stop()
return err
}
return nil
log.Debug("InvoiceRegistry started")
return err
}
// Stop signals the registry for a graceful shutdown.
func (i *InvoiceRegistry) Stop() error {
log.Info("InvoiceRegistry shutting down...")
if i.stopped.Swap(true) {
return fmt.Errorf("InvoiceRegistry stopped more than once")
}
log.Info("InvoiceRegistry shutting down...")
defer log.Debug("InvoiceRegistry shutdown complete")
i.expiryWatcher.Stop()
var err error
if i.expiryWatcher == nil {
err = fmt.Errorf("InvoiceRegistry expiryWatcher is not " +
"initialized")
} else {
i.expiryWatcher.Stop()
}
close(i.quit)
i.wg.Wait()
return nil
log.Debug("InvoiceRegistry shutdown complete")
return err
}
// invoiceEvent represents a new event that has modified on invoice on disk.

28
lnd.go
View File

@ -694,11 +694,33 @@ func Main(cfg *Config, lisCfg ListenerCfg, implCfg *ImplementationCfg,
bestHeight)
// With all the relevant chains initialized, we can finally start the
// server itself.
if err := server.Start(); err != nil {
// server itself. We start the server in an asynchronous goroutine so
// that we are able to interrupt and shutdown the daemon gracefully in
// case the startup of the subservers do not behave as expected.
errChan := make(chan error)
go func() {
errChan <- server.Start()
}()
defer func() {
err := server.Stop()
if err != nil {
ltndLog.Warnf("Stopping the server including all "+
"its subsystems failed with %v", err)
}
}()
select {
case err := <-errChan:
if err == nil {
break
}
return mkErr("unable to start server: %v", err)
case <-interceptor.ShutdownChannel():
return nil
}
defer server.Stop()
// We transition the server state to Active, as the server is up.
interceptorChain.SetServerActive()

View File

@ -884,7 +884,9 @@ func (w *WebAPIEstimator) Stop() error {
return nil
}
w.updateFeeTicker.Stop()
if w.updateFeeTicker != nil {
w.updateFeeTicker.Stop()
}
close(w.quit)
w.wg.Wait()

100
server.go
View File

@ -1923,6 +1923,8 @@ func (c cleaner) run() {
// Start starts the main daemon server, all requested listeners, and any helper
// goroutines.
// NOTE: This function is safe for concurrent access.
//
//nolint:funlen
func (s *server) Start() error {
var startErr error
@ -1932,26 +1934,26 @@ func (s *server) Start() error {
cleanup := cleaner{}
s.start.Do(func() {
cleanup = cleanup.add(s.customMessageServer.Stop)
if err := s.customMessageServer.Start(); err != nil {
startErr = err
return
}
cleanup = cleanup.add(s.customMessageServer.Stop)
if s.hostAnn != nil {
cleanup = cleanup.add(s.hostAnn.Stop)
if err := s.hostAnn.Start(); err != nil {
startErr = err
return
}
cleanup = cleanup.add(s.hostAnn.Stop)
}
if s.livenessMonitor != nil {
cleanup = cleanup.add(s.livenessMonitor.Stop)
if err := s.livenessMonitor.Start(); err != nil {
startErr = err
return
}
cleanup = cleanup.add(s.livenessMonitor.Stop)
}
// Start the notification server. This is used so channel
@ -1959,167 +1961,163 @@ func (s *server) Start() error {
// transaction reaches a sufficient number of confirmations, or
// when the input for the funding transaction is spent in an
// attempt at an uncooperative close by the counterparty.
cleanup = cleanup.add(s.sigPool.Stop)
if err := s.sigPool.Start(); err != nil {
startErr = err
return
}
cleanup = cleanup.add(s.sigPool.Stop)
cleanup = cleanup.add(s.writePool.Stop)
if err := s.writePool.Start(); err != nil {
startErr = err
return
}
cleanup = cleanup.add(s.writePool.Stop)
cleanup = cleanup.add(s.readPool.Stop)
if err := s.readPool.Start(); err != nil {
startErr = err
return
}
cleanup = cleanup.add(s.readPool.Stop)
cleanup = cleanup.add(s.cc.ChainNotifier.Stop)
if err := s.cc.ChainNotifier.Start(); err != nil {
startErr = err
return
}
cleanup = cleanup.add(s.cc.ChainNotifier.Stop)
cleanup = cleanup.add(s.cc.BestBlockTracker.Stop)
if err := s.cc.BestBlockTracker.Start(); err != nil {
startErr = err
return
}
cleanup = cleanup.add(s.cc.BestBlockTracker.Stop)
cleanup = cleanup.add(s.channelNotifier.Stop)
if err := s.channelNotifier.Start(); err != nil {
startErr = err
return
}
cleanup = cleanup.add(s.channelNotifier.Stop)
cleanup = cleanup.add(func() error {
return s.peerNotifier.Stop()
})
if err := s.peerNotifier.Start(); err != nil {
startErr = err
return
}
cleanup = cleanup.add(func() error {
return s.peerNotifier.Stop()
})
cleanup = cleanup.add(s.htlcNotifier.Stop)
if err := s.htlcNotifier.Start(); err != nil {
startErr = err
return
}
cleanup = cleanup.add(s.htlcNotifier.Stop)
if s.towerClientMgr != nil {
cleanup = cleanup.add(s.towerClientMgr.Stop)
if err := s.towerClientMgr.Start(); err != nil {
startErr = err
return
}
cleanup = cleanup.add(s.towerClientMgr.Stop)
}
cleanup = cleanup.add(s.txPublisher.Stop)
if err := s.txPublisher.Start(); err != nil {
startErr = err
return
}
cleanup = cleanup.add(func() error {
s.txPublisher.Stop()
return nil
})
cleanup = cleanup.add(s.sweeper.Stop)
if err := s.sweeper.Start(); err != nil {
startErr = err
return
}
cleanup = cleanup.add(s.sweeper.Stop)
cleanup = cleanup.add(s.utxoNursery.Stop)
if err := s.utxoNursery.Start(); err != nil {
startErr = err
return
}
cleanup = cleanup.add(s.utxoNursery.Stop)
cleanup = cleanup.add(s.breachArbitrator.Stop)
if err := s.breachArbitrator.Start(); err != nil {
startErr = err
return
}
cleanup = cleanup.add(s.breachArbitrator.Stop)
cleanup = cleanup.add(s.fundingMgr.Stop)
if err := s.fundingMgr.Start(); err != nil {
startErr = err
return
}
cleanup = cleanup.add(s.fundingMgr.Stop)
// htlcSwitch must be started before chainArb since the latter
// relies on htlcSwitch to deliver resolution message upon
// start.
cleanup = cleanup.add(s.htlcSwitch.Stop)
if err := s.htlcSwitch.Start(); err != nil {
startErr = err
return
}
cleanup = cleanup.add(s.htlcSwitch.Stop)
cleanup = cleanup.add(s.interceptableSwitch.Stop)
if err := s.interceptableSwitch.Start(); err != nil {
startErr = err
return
}
cleanup = cleanup.add(s.interceptableSwitch.Stop)
cleanup = cleanup.add(s.chainArb.Stop)
if err := s.chainArb.Start(); err != nil {
startErr = err
return
}
cleanup = cleanup.add(s.chainArb.Stop)
if err := s.authGossiper.Start(); err != nil {
startErr = err
return
}
cleanup = cleanup.add(s.authGossiper.Stop)
cleanup = cleanup.add(s.graphBuilder.Stop)
if err := s.graphBuilder.Start(); err != nil {
startErr = err
return
}
cleanup = cleanup.add(s.graphBuilder.Stop)
cleanup = cleanup.add(s.chanRouter.Stop)
if err := s.chanRouter.Start(); err != nil {
startErr = err
return
}
cleanup = cleanup.add(s.chanRouter.Stop)
// The authGossiper depends on the chanRouter and therefore
// should be started after it.
cleanup = cleanup.add(s.authGossiper.Stop)
if err := s.authGossiper.Start(); err != nil {
startErr = err
return
}
cleanup = cleanup.add(s.invoices.Stop)
if err := s.invoices.Start(); err != nil {
startErr = err
return
}
cleanup = cleanup.add(s.invoices.Stop)
cleanup = cleanup.add(s.sphinx.Stop)
if err := s.sphinx.Start(); err != nil {
startErr = err
return
}
cleanup = cleanup.add(s.sphinx.Stop)
cleanup = cleanup.add(s.chanStatusMgr.Stop)
if err := s.chanStatusMgr.Start(); err != nil {
startErr = err
return
}
cleanup = cleanup.add(s.chanStatusMgr.Stop)
cleanup = cleanup.add(s.chanEventStore.Stop)
if err := s.chanEventStore.Start(); err != nil {
startErr = err
return
}
cleanup = cleanup.add(func() error {
s.chanEventStore.Stop()
return nil
})
s.missionControl.RunStoreTicker()
cleanup.add(func() error {
s.missionControl.StopStoreTicker()
return nil
})
s.missionControl.RunStoreTicker()
// Before we start the connMgr, we'll check to see if we have
// any backups to recover. We do this now as we want to ensure
@ -2153,18 +2151,21 @@ func (s *server) Start() error {
}
}
// chanSubSwapper must be started after the `channelNotifier`
// because it depends on channel events as a synchronization
// point.
cleanup = cleanup.add(s.chanSubSwapper.Stop)
if err := s.chanSubSwapper.Start(); err != nil {
startErr = err
return
}
cleanup = cleanup.add(s.chanSubSwapper.Stop)
if s.torController != nil {
cleanup = cleanup.add(s.torController.Stop)
if err := s.createNewHiddenService(); err != nil {
startErr = err
return
}
cleanup = cleanup.add(s.torController.Stop)
}
if s.natTraversal != nil {
@ -2173,11 +2174,11 @@ func (s *server) Start() error {
}
// Start connmgr last to prevent connections before init.
s.connMgr.Start()
cleanup = cleanup.add(func() error {
s.connMgr.Stop()
return nil
})
s.connMgr.Start()
// If peers are specified as a config option, we'll add those
// peers first.
@ -2364,9 +2365,9 @@ func (s *server) Stop() error {
if err := s.sweeper.Stop(); err != nil {
srvrLog.Warnf("failed to stop sweeper: %v", err)
}
s.txPublisher.Stop()
if err := s.txPublisher.Stop(); err != nil {
srvrLog.Warnf("failed to stop txPublisher: %v", err)
}
if err := s.channelNotifier.Stop(); err != nil {
srvrLog.Warnf("failed to stop channelNotifier: %v", err)
}
@ -2386,7 +2387,10 @@ func (s *server) Stop() error {
srvrLog.Warnf("Unable to stop BestBlockTracker: %v",
err)
}
s.chanEventStore.Stop()
if err := s.chanEventStore.Stop(); err != nil {
srvrLog.Warnf("Unable to stop ChannelEventStore: %v",
err)
}
s.missionControl.StopStoreTicker()
// Disconnect from each active peers to ensure that

View File

@ -261,6 +261,9 @@ type TxPublisherConfig struct {
// until the tx is confirmed or the fee rate reaches the maximum fee rate
// specified by the caller.
type TxPublisher struct {
started atomic.Bool
stopped atomic.Bool
wg sync.WaitGroup
// cfg specifies the configuration of the TxPublisher.
@ -666,7 +669,10 @@ type monitorRecord struct {
// off the monitor loop.
func (t *TxPublisher) Start() error {
log.Info("TxPublisher starting...")
defer log.Debugf("TxPublisher started")
if t.started.Swap(true) {
return fmt.Errorf("TxPublisher started more than once")
}
blockEvent, err := t.cfg.Notifier.RegisterBlockEpochNtfn(nil)
if err != nil {
@ -676,17 +682,25 @@ func (t *TxPublisher) Start() error {
t.wg.Add(1)
go t.monitor(blockEvent)
log.Debugf("TxPublisher started")
return nil
}
// Stop stops the publisher and waits for the monitor loop to exit.
func (t *TxPublisher) Stop() {
func (t *TxPublisher) Stop() error {
log.Info("TxPublisher stopping...")
defer log.Debugf("TxPublisher stopped")
if t.stopped.Swap(true) {
return fmt.Errorf("TxPublisher stopped more than once")
}
close(t.quit)
t.wg.Wait()
log.Debug("TxPublisher stopped")
return nil
}
// monitor is the main loop driven by new blocks. Whevenr a new block arrives,

View File

@ -186,6 +186,10 @@ func (c *Controller) Stop() error {
// Reset service ID.
c.activeServiceID = ""
if c.conn == nil {
return fmt.Errorf("no connection available to the tor server")
}
return c.conn.Close()
}