lnd: start blockbeatDispatcher and register consumers

This commit is contained in:
yyforyongyu 2024-10-17 10:14:00 +08:00
parent 16a8b623b3
commit 8fc9154506
No known key found for this signature in database
GPG Key ID: 9BCD95C4FF296868
2 changed files with 45 additions and 0 deletions

2
log.go
View File

@ -9,6 +9,7 @@ import (
sphinx "github.com/lightningnetwork/lightning-onion"
"github.com/lightningnetwork/lnd/autopilot"
"github.com/lightningnetwork/lnd/build"
"github.com/lightningnetwork/lnd/chainio"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/chainreg"
"github.com/lightningnetwork/lnd/chanacceptor"
@ -198,6 +199,7 @@ func SetupLoggers(root *build.SubLoggerManager, interceptor signal.Interceptor)
root, blindedpath.Subsystem, interceptor, blindedpath.UseLogger,
)
AddV1SubLogger(root, graphdb.Subsystem, interceptor, graphdb.UseLogger)
AddSubLogger(root, chainio.Subsystem, interceptor, chainio.UseLogger)
}
// AddSubLogger is a helper method to conveniently create and register the

View File

@ -357,6 +357,10 @@ type server struct {
// txPublisher is a publisher with fee-bumping capability.
txPublisher *sweep.TxPublisher
// blockbeatDispatcher is a block dispatcher that notifies subscribers
// of new blocks.
blockbeatDispatcher *chainio.BlockbeatDispatcher
quit chan struct{}
wg sync.WaitGroup
@ -624,6 +628,9 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
readPool: readPool,
chansToRestore: chansToRestore,
blockbeatDispatcher: chainio.NewBlockbeatDispatcher(
cc.ChainNotifier,
),
channelNotifier: channelnotifier.New(
dbs.ChanStateDB.ChannelStateDB(),
),
@ -1825,6 +1832,9 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
}
s.connMgr = cmgr
// Finally, register the subsystems in blockbeat.
s.registerBlockConsumers()
return s, nil
}
@ -1857,6 +1867,25 @@ func (s *server) UpdateRoutingConfig(cfg *routing.MissionControlConfig) {
routerCfg.MaxMcHistory = cfg.MaxMcHistory
}
// registerBlockConsumers registers the subsystems that consume block events.
// By calling `RegisterQueue`, a list of subsystems are registered in the
// blockbeat for block notifications. When a new block arrives, the subsystems
// in the same queue are notified sequentially, and different queues are
// notified concurrently.
//
// NOTE: To put a subsystem in a different queue, create a slice and pass it to
// a new `RegisterQueue` call.
func (s *server) registerBlockConsumers() {
// In this queue, when a new block arrives, it will be received and
// processed in this order: chainArb -> sweeper -> txPublisher.
consumers := []chainio.Consumer{
s.chainArb,
s.sweeper,
s.txPublisher,
}
s.blockbeatDispatcher.RegisterQueue(consumers)
}
// signAliasUpdate takes a ChannelUpdate and returns the signature. This is
// used for option_scid_alias channels where the ChannelUpdate to be sent back
// may differ from what is on disk.
@ -2494,6 +2523,17 @@ func (s *server) Start() error {
srvrLog.Infof("Auto peer bootstrapping is disabled")
}
// Start the blockbeat after all other subsystems have been
// started so they are ready to receive new blocks.
cleanup = cleanup.add(func() error {
s.blockbeatDispatcher.Stop()
return nil
})
if err := s.blockbeatDispatcher.Start(); err != nil {
startErr = err
return
}
// Set the active flag now that we've completed the full
// startup.
atomic.StoreInt32(&s.active, 1)
@ -2518,6 +2558,9 @@ func (s *server) Stop() error {
// Shutdown connMgr first to prevent conns during shutdown.
s.connMgr.Stop()
// Stop dispatching blocks to other systems immediately.
s.blockbeatDispatcher.Stop()
// Shutdown the wallet, funding manager, and the rpc server.
if err := s.chanStatusMgr.Stop(); err != nil {
srvrLog.Warnf("failed to stop chanStatusMgr: %v", err)