mirror of
https://github.com/btcsuite/btcd.git
synced 2024-11-19 01:40:07 +01:00
btcd: Simplify shutdown signal handling logic. (#733)
This rewrites the shutdown logic to simplify the shutdown signalling. All cleanup is now run from deferred functions in the main function and channels are used to signal shutdown either from OS signals or from other subsystems such as the RPC server and windows service controller. The RPC server has been modified to use a new channel for signalling shutdown that is exposed via the RequestedProcessShutdown function instead of directly calling Stop on the server as it previously did. Finally, it adds a few checks for early termination during the main start sequence so the process can be stopped without starting all the subsystems if desired. This is a backport of the equivalent logic from Decred with a few slight modifications. Credits go to @jrick.
This commit is contained in:
parent
a7b35d9f9e
commit
044a11c9fc
52
btcd.go
52
btcd.go
@ -19,8 +19,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
cfg *config
|
cfg *config
|
||||||
shutdownChannel = make(chan struct{})
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// winServiceMain is only invoked on Windows. It detects when btcd is running
|
// winServiceMain is only invoked on Windows. It detects when btcd is running
|
||||||
@ -42,6 +41,12 @@ func btcdMain(serverChan chan<- *server) error {
|
|||||||
cfg = tcfg
|
cfg = tcfg
|
||||||
defer backendLog.Flush()
|
defer backendLog.Flush()
|
||||||
|
|
||||||
|
// Get a channel that will be closed when a shutdown signal has been
|
||||||
|
// triggered either from an OS signal such as SIGINT (Ctrl+C) or from
|
||||||
|
// another subsystem such as the RPC server.
|
||||||
|
interruptedChan := interruptListener()
|
||||||
|
defer btcdLog.Info("Shutdown complete")
|
||||||
|
|
||||||
// Show version at startup.
|
// Show version at startup.
|
||||||
btcdLog.Infof("Version %s", version())
|
btcdLog.Infof("Version %s", version())
|
||||||
|
|
||||||
@ -75,19 +80,27 @@ func btcdMain(serverChan chan<- *server) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Return now if an interrupt signal was triggered.
|
||||||
|
if interruptRequested(interruptedChan) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Load the block database.
|
// Load the block database.
|
||||||
db, err := loadBlockDB()
|
db, err := loadBlockDB()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
btcdLog.Errorf("%v", err)
|
btcdLog.Errorf("%v", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer db.Close()
|
defer func() {
|
||||||
|
// Ensure the database is sync'd and closed on shutdown.
|
||||||
// Ensure the database is sync'd and closed on Ctrl+C.
|
|
||||||
addInterruptHandler(func() {
|
|
||||||
btcdLog.Infof("Gracefully shutting down the database...")
|
btcdLog.Infof("Gracefully shutting down the database...")
|
||||||
db.Close()
|
db.Close()
|
||||||
})
|
}()
|
||||||
|
|
||||||
|
// Return now if an interrupt signal was triggered.
|
||||||
|
if interruptRequested(interruptedChan) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Drop indexes and exit if requested.
|
// Drop indexes and exit if requested.
|
||||||
//
|
//
|
||||||
@ -118,32 +131,21 @@ func btcdMain(serverChan chan<- *server) error {
|
|||||||
cfg.Listeners, err)
|
cfg.Listeners, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
addInterruptHandler(func() {
|
defer func() {
|
||||||
btcdLog.Infof("Gracefully shutting down the server...")
|
btcdLog.Infof("Gracefully shutting down the server...")
|
||||||
server.Stop()
|
server.Stop()
|
||||||
server.WaitForShutdown()
|
server.WaitForShutdown()
|
||||||
})
|
srvrLog.Infof("Server shutdown complete")
|
||||||
|
}()
|
||||||
server.Start()
|
server.Start()
|
||||||
if serverChan != nil {
|
if serverChan != nil {
|
||||||
serverChan <- server
|
serverChan <- server
|
||||||
}
|
}
|
||||||
|
|
||||||
// Monitor for graceful server shutdown and signal the main goroutine
|
// Wait until the interrupt signal is received from an OS signal or
|
||||||
// when done. This is done in a separate goroutine rather than waiting
|
// shutdown is requested through one of the subsystems such as the RPC
|
||||||
// directly so the main goroutine can be signaled for shutdown by either
|
// server.
|
||||||
// a graceful shutdown or from the main interrupt handler. This is
|
<-interruptedChan
|
||||||
// necessary since the main goroutine must be kept running long enough
|
|
||||||
// for the interrupt handler goroutine to finish.
|
|
||||||
go func() {
|
|
||||||
server.WaitForShutdown()
|
|
||||||
srvrLog.Infof("Server shutdown complete")
|
|
||||||
shutdownChannel <- struct{}{}
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Wait for shutdown signal from either a graceful server stop or from
|
|
||||||
// the interrupt handler.
|
|
||||||
<-shutdownChannel
|
|
||||||
btcdLog.Info("Shutdown complete")
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
64
rpcserver.go
64
rpcserver.go
@ -3513,7 +3513,10 @@ func handleSetGenerate(s *rpcServer, cmd interface{}, closeChan <-chan struct{})
|
|||||||
|
|
||||||
// handleStop implements the stop command.
|
// handleStop implements the stop command.
|
||||||
func handleStop(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) {
|
func handleStop(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) {
|
||||||
s.server.Stop()
|
select {
|
||||||
|
case s.requestProcessShutdown <- struct{}{}:
|
||||||
|
default:
|
||||||
|
}
|
||||||
return "btcd stopping.", nil
|
return "btcd stopping.", nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -3683,23 +3686,24 @@ func handleVerifyMessage(s *rpcServer, cmd interface{}, closeChan <-chan struct{
|
|||||||
// rpcServer holds the items the rpc server may need to access (config,
|
// rpcServer holds the items the rpc server may need to access (config,
|
||||||
// shutdown, main server, etc.)
|
// shutdown, main server, etc.)
|
||||||
type rpcServer struct {
|
type rpcServer struct {
|
||||||
started int32
|
started int32
|
||||||
shutdown int32
|
shutdown int32
|
||||||
policy *mining.Policy
|
policy *mining.Policy
|
||||||
server *server
|
server *server
|
||||||
chain *blockchain.BlockChain
|
chain *blockchain.BlockChain
|
||||||
authsha [fastsha256.Size]byte
|
authsha [fastsha256.Size]byte
|
||||||
limitauthsha [fastsha256.Size]byte
|
limitauthsha [fastsha256.Size]byte
|
||||||
ntfnMgr *wsNotificationManager
|
ntfnMgr *wsNotificationManager
|
||||||
numClients int32
|
numClients int32
|
||||||
statusLines map[int]string
|
statusLines map[int]string
|
||||||
statusLock sync.RWMutex
|
statusLock sync.RWMutex
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
listeners []net.Listener
|
listeners []net.Listener
|
||||||
workState *workState
|
workState *workState
|
||||||
gbtWorkState *gbtWorkState
|
gbtWorkState *gbtWorkState
|
||||||
helpCacher *helpCacher
|
helpCacher *helpCacher
|
||||||
quit chan int
|
requestProcessShutdown chan struct{}
|
||||||
|
quit chan int
|
||||||
}
|
}
|
||||||
|
|
||||||
// httpStatusLine returns a response Status-Line (RFC 2616 Section 6.1)
|
// httpStatusLine returns a response Status-Line (RFC 2616 Section 6.1)
|
||||||
@ -3783,6 +3787,13 @@ func (s *rpcServer) Stop() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RequestedProcessShutdown returns a channel that is sent to when an authorized
|
||||||
|
// RPC client requests the process to shutdown. If the request can not be read
|
||||||
|
// immediately, it is dropped.
|
||||||
|
func (s *rpcServer) RequestedProcessShutdown() <-chan struct{} {
|
||||||
|
return s.requestProcessShutdown
|
||||||
|
}
|
||||||
|
|
||||||
// limitConnections responds with a 503 service unavailable and returns true if
|
// limitConnections responds with a 503 service unavailable and returns true if
|
||||||
// adding another client would exceed the maximum allow RPC clients.
|
// adding another client would exceed the maximum allow RPC clients.
|
||||||
//
|
//
|
||||||
@ -4164,14 +4175,15 @@ func genCertPair(certFile, keyFile string) error {
|
|||||||
// newRPCServer returns a new instance of the rpcServer struct.
|
// newRPCServer returns a new instance of the rpcServer struct.
|
||||||
func newRPCServer(listenAddrs []string, policy *mining.Policy, s *server) (*rpcServer, error) {
|
func newRPCServer(listenAddrs []string, policy *mining.Policy, s *server) (*rpcServer, error) {
|
||||||
rpc := rpcServer{
|
rpc := rpcServer{
|
||||||
policy: policy,
|
policy: policy,
|
||||||
server: s,
|
server: s,
|
||||||
chain: s.blockManager.chain,
|
chain: s.blockManager.chain,
|
||||||
statusLines: make(map[int]string),
|
statusLines: make(map[int]string),
|
||||||
workState: newWorkState(),
|
workState: newWorkState(),
|
||||||
gbtWorkState: newGbtWorkState(s.timeSource),
|
gbtWorkState: newGbtWorkState(s.timeSource),
|
||||||
helpCacher: newHelpCacher(),
|
helpCacher: newHelpCacher(),
|
||||||
quit: make(chan int),
|
requestProcessShutdown: make(chan struct{}),
|
||||||
|
quit: make(chan int),
|
||||||
}
|
}
|
||||||
if cfg.RPCUser != "" && cfg.RPCPass != "" {
|
if cfg.RPCUser != "" && cfg.RPCPass != "" {
|
||||||
login := cfg.RPCUser + ":" + cfg.RPCPass
|
login := cfg.RPCUser + ":" + cfg.RPCPass
|
||||||
|
@ -2555,6 +2555,12 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Signal process shutdown when the RPC server requests it.
|
||||||
|
go func() {
|
||||||
|
<-s.rpcServer.RequestedProcessShutdown()
|
||||||
|
shutdownRequestChannel <- struct{}{}
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
return &s, nil
|
return &s, nil
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
// Copyright (c) 2013-2014 The btcsuite developers
|
// Copyright (c) 2013-2016 The btcsuite developers
|
||||||
// Use of this source code is governed by an ISC
|
// Use of this source code is governed by an ISC
|
||||||
// license that can be found in the LICENSE file.
|
// license that can be found in the LICENSE file.
|
||||||
|
|
||||||
@ -85,18 +85,8 @@ loop:
|
|||||||
// more commands while pending.
|
// more commands while pending.
|
||||||
changes <- svc.Status{State: svc.StopPending}
|
changes <- svc.Status{State: svc.StopPending}
|
||||||
|
|
||||||
// Stop the main server gracefully when it is
|
// Signal the main function to exit.
|
||||||
// already setup or just break out and allow
|
shutdownRequestChannel <- struct{}{}
|
||||||
// the service to exit immediately if it's not
|
|
||||||
// setup yet. Note that calling Stop will cause
|
|
||||||
// btcdMain to exit in the goroutine above which
|
|
||||||
// will in turn send a signal (and a potential
|
|
||||||
// error) to doneChan.
|
|
||||||
if mainServer != nil {
|
|
||||||
mainServer.Stop()
|
|
||||||
} else {
|
|
||||||
break loop
|
|
||||||
}
|
|
||||||
|
|
||||||
default:
|
default:
|
||||||
elog.Error(1, fmt.Sprintf("Unexpected control "+
|
elog.Error(1, fmt.Sprintf("Unexpected control "+
|
||||||
|
110
signal.go
110
signal.go
@ -1,4 +1,4 @@
|
|||||||
// Copyright (c) 2013-2014 The btcsuite developers
|
// Copyright (c) 2013-2016 The btcsuite developers
|
||||||
// Use of this source code is governed by an ISC
|
// Use of this source code is governed by an ISC
|
||||||
// license that can be found in the LICENSE file.
|
// license that can be found in the LICENSE file.
|
||||||
|
|
||||||
@ -9,79 +9,63 @@ import (
|
|||||||
"os/signal"
|
"os/signal"
|
||||||
)
|
)
|
||||||
|
|
||||||
// interruptChannel is used to receive SIGINT (Ctrl+C) signals.
|
// shutdownRequestChannel is used to initiate shutdown from one of the
|
||||||
var interruptChannel chan os.Signal
|
// subsystems using the same code paths as when an interrupt signal is received.
|
||||||
|
var shutdownRequestChannel = make(chan struct{})
|
||||||
|
|
||||||
// addHandlerChannel is used to add an interrupt handler to the list of handlers
|
// interruptSignals defines the default signals to catch in order to do a proper
|
||||||
// to be invoked on SIGINT (Ctrl+C) signals.
|
// shutdown. This may be modified during init depending on the platform.
|
||||||
var addHandlerChannel = make(chan func())
|
var interruptSignals = []os.Signal{os.Interrupt}
|
||||||
|
|
||||||
// signals defines the default signals to catch in order to do a proper
|
// interruptListener listens for OS Signals such as SIGINT (Ctrl+C) and shutdown
|
||||||
// shutdown.
|
// requests from shutdownRequestChannel. It returns a channel that is closed
|
||||||
var signals = []os.Signal{os.Interrupt}
|
// when either signal is received.
|
||||||
|
func interruptListener() <-chan struct{} {
|
||||||
|
c := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
interruptChannel := make(chan os.Signal, 1)
|
||||||
|
signal.Notify(interruptChannel, interruptSignals...)
|
||||||
|
|
||||||
// mainInterruptHandler listens for SIGINT (Ctrl+C) signals on the
|
// Listen for initial shutdown signal and close the returned
|
||||||
// interruptChannel and invokes the registered interruptCallbacks accordingly.
|
// channel to notify the caller.
|
||||||
// It also listens for callback registration. It must be run as a goroutine.
|
|
||||||
func mainInterruptHandler() {
|
|
||||||
// interruptCallbacks is a list of callbacks to invoke when a
|
|
||||||
// SIGINT (Ctrl+C) is received.
|
|
||||||
var interruptCallbacks []func()
|
|
||||||
|
|
||||||
// isShutdown is a flag which is used to indicate whether or not
|
|
||||||
// the shutdown signal has already been received and hence any future
|
|
||||||
// attempts to add a new interrupt handler should invoke them
|
|
||||||
// immediately.
|
|
||||||
var isShutdown bool
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
select {
|
||||||
case sig := <-interruptChannel:
|
case sig := <-interruptChannel:
|
||||||
// Ignore more than one shutdown signal.
|
|
||||||
if isShutdown {
|
|
||||||
btcdLog.Infof("Received signal (%s). "+
|
|
||||||
"Already shutting down...", sig)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
isShutdown = true
|
|
||||||
btcdLog.Infof("Received signal (%s). Shutting down...",
|
btcdLog.Infof("Received signal (%s). Shutting down...",
|
||||||
sig)
|
sig)
|
||||||
|
|
||||||
// Run handlers in LIFO order.
|
case <-shutdownRequestChannel:
|
||||||
for i := range interruptCallbacks {
|
btcdLog.Info("Shutdown requested. Shutting down...")
|
||||||
idx := len(interruptCallbacks) - 1 - i
|
|
||||||
callback := interruptCallbacks[idx]
|
|
||||||
callback()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Signal the main goroutine to shutdown.
|
|
||||||
go func() {
|
|
||||||
shutdownChannel <- struct{}{}
|
|
||||||
}()
|
|
||||||
|
|
||||||
case handler := <-addHandlerChannel:
|
|
||||||
// The shutdown signal has already been received, so
|
|
||||||
// just invoke and new handlers immediately.
|
|
||||||
if isShutdown {
|
|
||||||
handler()
|
|
||||||
}
|
|
||||||
|
|
||||||
interruptCallbacks = append(interruptCallbacks, handler)
|
|
||||||
}
|
}
|
||||||
}
|
close(c)
|
||||||
|
|
||||||
|
// Listen for repeated signals and display a message so the user
|
||||||
|
// knows the shutdown is in progress and the process is not
|
||||||
|
// hung.
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case sig := <-interruptChannel:
|
||||||
|
btcdLog.Infof("Received signal (%s). Already "+
|
||||||
|
"shutting down...", sig)
|
||||||
|
|
||||||
|
case <-shutdownRequestChannel:
|
||||||
|
btcdLog.Info("Shutdown requested. Already " +
|
||||||
|
"shutting down...")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
// addInterruptHandler adds a handler to call when a SIGINT (Ctrl+C) is
|
// interruptRequested returns true when the channel returned by
|
||||||
// received.
|
// interruptListener was closed. This simplifies early shutdown slightly since
|
||||||
func addInterruptHandler(handler func()) {
|
// the caller can just use an if statement instead of a select.
|
||||||
// Create the channel and start the main interrupt handler which invokes
|
func interruptRequested(interrupted <-chan struct{}) bool {
|
||||||
// all other callbacks and exits if not already done.
|
select {
|
||||||
if interruptChannel == nil {
|
case <-interrupted:
|
||||||
interruptChannel = make(chan os.Signal, 1)
|
return true
|
||||||
signal.Notify(interruptChannel, signals...)
|
default:
|
||||||
go mainInterruptHandler()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
addHandlerChannel <- handler
|
return false
|
||||||
}
|
}
|
||||||
|
@ -12,5 +12,5 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
signals = []os.Signal{os.Interrupt, syscall.SIGTERM}
|
interruptSignals = []os.Signal{os.Interrupt, syscall.SIGTERM}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user