From ae2c37e043225c12bcc69790201d3d3dcd28db62 Mon Sep 17 00:00:00 2001 From: Roei Erez Date: Thu, 30 Apr 2020 12:54:33 +0300 Subject: [PATCH] Ensure chain notifier is started before accessed. The use case comes from the RPC layer that is ready before the chain notifier which is used in the sub server. --- chainntnfs/bitcoindnotify/bitcoind.go | 10 ++++++++++ chainntnfs/btcdnotify/btcd.go | 10 ++++++++++ chainntnfs/interface.go | 3 +++ chainntnfs/neutrinonotify/neutrino.go | 10 ++++++++++ contractcourt/chain_watcher_test.go | 4 ++++ discovery/gossiper_test.go | 4 ++++ fundingmanager_test.go | 4 ++++ htlcswitch/mock.go | 4 ++++ lnrpc/chainrpc/chainnotifier_server.go | 17 +++++++++++++++++ mock.go | 4 ++++ sweep/test_utils.go | 5 +++++ 11 files changed, 75 insertions(+) diff --git a/chainntnfs/bitcoindnotify/bitcoind.go b/chainntnfs/bitcoindnotify/bitcoind.go index bb96a7ec5..b39f78a43 100644 --- a/chainntnfs/bitcoindnotify/bitcoind.go +++ b/chainntnfs/bitcoindnotify/bitcoind.go @@ -35,6 +35,7 @@ type BitcoindNotifier struct { epochClientCounter uint64 // To be used atomically. start sync.Once + active int32 // To be used atomically. stopped int32 // To be used atomically. chainConn *chain.BitcoindClient @@ -130,6 +131,11 @@ func (b *BitcoindNotifier) Stop() error { return nil } +// Started returns true if this instance has been started, and false otherwise. +func (b *BitcoindNotifier) Started() bool { + return atomic.LoadInt32(&b.active) != 0 +} + func (b *BitcoindNotifier) startNotifier() error { // Connect to bitcoind, and register for notifications on connected, // and disconnected blocks. @@ -158,6 +164,10 @@ func (b *BitcoindNotifier) startNotifier() error { b.wg.Add(1) go b.notificationDispatcher() + // Set the active flag now that we've completed the full + // startup. + atomic.StoreInt32(&b.active, 1) + return nil } diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index 1ce8c6305..d23a30599 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -54,6 +54,7 @@ type BtcdNotifier struct { epochClientCounter uint64 // To be used atomically. start sync.Once + active int32 // To be used atomically. stopped int32 // To be used atomically. chainConn *rpcclient.Client @@ -141,6 +142,11 @@ func (b *BtcdNotifier) Start() error { return startErr } +// Started returns true if this instance has been started, and false otherwise. +func (b *BtcdNotifier) Started() bool { + return atomic.LoadInt32(&b.active) != 0 +} + // Stop shutsdown the BtcdNotifier. func (b *BtcdNotifier) Stop() error { // Already shutting down? @@ -212,6 +218,10 @@ func (b *BtcdNotifier) startNotifier() error { b.wg.Add(1) go b.notificationDispatcher() + // Set the active flag now that we've completed the full + // startup. + atomic.StoreInt32(&b.active, 1) + return nil } diff --git a/chainntnfs/interface.go b/chainntnfs/interface.go index ace86e777..c224181c8 100644 --- a/chainntnfs/interface.go +++ b/chainntnfs/interface.go @@ -139,6 +139,9 @@ type ChainNotifier interface { // ready, and able to receive notification registrations from clients. Start() error + // Started returns true if this instance has been started, and false otherwise. + Started() bool + // Stops the concrete ChainNotifier. Once stopped, the ChainNotifier // should disallow any future requests from potential clients. // Additionally, all pending client notifications will be canceled diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index be1e8f806..567fa520f 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -40,6 +40,7 @@ type NeutrinoNotifier struct { epochClientCounter uint64 // To be used atomically. start sync.Once + active int32 // To be used atomically. stopped int32 // To be used atomically. bestBlockMtx sync.RWMutex @@ -144,6 +145,11 @@ func (n *NeutrinoNotifier) Stop() error { return nil } +// Started returns true if this instance has been started, and false otherwise. +func (n *NeutrinoNotifier) Started() bool { + return atomic.LoadInt32(&n.active) != 0 +} + func (n *NeutrinoNotifier) startNotifier() error { // Start our concurrent queues before starting the rescan, to ensure // onFilteredBlockConnected and onRelavantTx callbacks won't be @@ -200,6 +206,10 @@ func (n *NeutrinoNotifier) startNotifier() error { n.wg.Add(1) go n.notificationDispatcher() + // Set the active flag now that we've completed the full + // startup. + atomic.StoreInt32(&n.active, 1) + return nil } diff --git a/contractcourt/chain_watcher_test.go b/contractcourt/chain_watcher_test.go index 6dc47f530..62c768717 100644 --- a/contractcourt/chain_watcher_test.go +++ b/contractcourt/chain_watcher_test.go @@ -43,6 +43,10 @@ func (m *mockNotifier) Start() error { return nil } +func (m *mockNotifier) Started() bool { + return true +} + func (m *mockNotifier) Stop() error { return nil } diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index 75d3cef67..5281d4aa0 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -453,6 +453,10 @@ func (m *mockNotifier) Start() error { return nil } +func (m *mockNotifier) Started() bool { + return true +} + func (m *mockNotifier) Stop() error { return nil } diff --git a/fundingmanager_test.go b/fundingmanager_test.go index b222bf94b..0fa5831f1 100644 --- a/fundingmanager_test.go +++ b/fundingmanager_test.go @@ -131,6 +131,10 @@ func (m *mockNotifier) Start() error { return nil } +func (m *mockNotifier) Started() bool { + return true +} + func (m *mockNotifier) Stop() error { return nil } diff --git a/htlcswitch/mock.go b/htlcswitch/mock.go index e9a2a1efa..0ce5adf61 100644 --- a/htlcswitch/mock.go +++ b/htlcswitch/mock.go @@ -931,6 +931,10 @@ func (m *mockNotifier) Start() error { return nil } +func (m *mockNotifier) Started() bool { + return true +} + func (m *mockNotifier) Stop() error { return nil } diff --git a/lnrpc/chainrpc/chainnotifier_server.go b/lnrpc/chainrpc/chainnotifier_server.go index 2d662c8e7..261f76051 100644 --- a/lnrpc/chainrpc/chainnotifier_server.go +++ b/lnrpc/chainrpc/chainnotifier_server.go @@ -63,6 +63,11 @@ var ( // has been shut down. ErrChainNotifierServerShuttingDown = errors.New("chain notifier RPC " + "subserver shutting down") + + // ErrChainNotifierServerNotActive indicates that the chain notifier hasn't + // finished the startup process. + ErrChainNotifierServerNotActive = errors.New("chain notifier RPC is" + + "still in the process of starting") ) // fileExists reports whether the named file or directory exists. @@ -196,6 +201,10 @@ func (s *Server) RegisterWithRootServer(grpcServer *grpc.Server) error { func (s *Server) RegisterConfirmationsNtfn(in *ConfRequest, confStream ChainNotifier_RegisterConfirmationsNtfnServer) error { + if !s.cfg.ChainNotifier.Started() { + return ErrChainNotifierServerNotActive + } + // We'll start by reconstructing the RPC request into what the // underlying ChainNotifier expects. var txid chainhash.Hash @@ -292,6 +301,10 @@ func (s *Server) RegisterConfirmationsNtfn(in *ConfRequest, func (s *Server) RegisterSpendNtfn(in *SpendRequest, spendStream ChainNotifier_RegisterSpendNtfnServer) error { + if !s.cfg.ChainNotifier.Started() { + return ErrChainNotifierServerNotActive + } + // We'll start by reconstructing the RPC request into what the // underlying ChainNotifier expects. var op *wire.OutPoint @@ -399,6 +412,10 @@ func (s *Server) RegisterSpendNtfn(in *SpendRequest, func (s *Server) RegisterBlockEpochNtfn(in *BlockEpoch, epochStream ChainNotifier_RegisterBlockEpochNtfnServer) error { + if !s.cfg.ChainNotifier.Started() { + return ErrChainNotifierServerNotActive + } + // We'll start by reconstructing the RPC request into what the // underlying ChainNotifier expects. var hash chainhash.Hash diff --git a/mock.go b/mock.go index 0b71a6dc3..34e466d9e 100644 --- a/mock.go +++ b/mock.go @@ -112,6 +112,10 @@ func (m *mockNotfier) Start() error { return nil } +func (m *mockNotfier) Started() bool { + return true +} + func (m *mockNotfier) Stop() error { return nil } diff --git a/sweep/test_utils.go b/sweep/test_utils.go index 7c28710be..5b83d730e 100644 --- a/sweep/test_utils.go +++ b/sweep/test_utils.go @@ -200,6 +200,11 @@ func (m *MockNotifier) Start() error { return nil } +// Started checks if started +func (m *MockNotifier) Started() bool { + return true +} + // Stop the notifier. func (m *MockNotifier) Stop() error { return nil