diff --git a/cluster/etcd_elector.go b/cluster/etcd_elector.go index 25b11af25..7e6c74d53 100644 --- a/cluster/etcd_elector.go +++ b/cluster/etcd_elector.go @@ -99,9 +99,27 @@ func (e *etcdLeaderElector) Leader(ctx context.Context) (string, error) { return "", err } + if resp == nil || len(resp.Kvs) == 0 { + return "", nil + } + return string(resp.Kvs[0].Value), nil } +// IsLeader returns true if the caller is the leader. +func (e *etcdLeaderElector) IsLeader(ctx context.Context) (bool, error) { + resp, err := e.election.Leader(ctx) + if err != nil { + return false, err + } + + if resp == nil || len(resp.Kvs) == 0 { + return false, nil + } + + return string(resp.Kvs[0].Value) == e.id, nil +} + // Campaign will start a new leader election campaign. Campaign will block until // the elector context is canceled or the caller is elected as the leader. func (e *etcdLeaderElector) Campaign(ctx context.Context) error { @@ -110,6 +128,6 @@ func (e *etcdLeaderElector) Campaign(ctx context.Context) error { // Resign resigns the leader role allowing other election members to take // the place. -func (e *etcdLeaderElector) Resign() error { - return e.election.Resign(context.Background()) +func (e *etcdLeaderElector) Resign(ctx context.Context) error { + return e.election.Resign(ctx) } diff --git a/cluster/etcd_elector_test.go b/cluster/etcd_elector_test.go index 999a3d791..f7fcebec3 100644 --- a/cluster/etcd_elector_test.go +++ b/cluster/etcd_elector_test.go @@ -87,12 +87,12 @@ func TestEtcdElector(t *testing.T) { tmp := <-ch first, err := tmp.Leader(ctxb) require.NoError(t, err) - require.NoError(t, tmp.Resign()) + require.NoError(t, tmp.Resign(ctxb)) tmp = <-ch second, err := tmp.Leader(ctxb) require.NoError(t, err) - require.NoError(t, tmp.Resign()) + require.NoError(t, tmp.Resign(ctxb)) require.Contains(t, []string{id1, id2}, first) require.Contains(t, []string{id1, id2}, second) diff --git a/cluster/interface.go b/cluster/interface.go index 8a317095e..098c061a6 100644 --- a/cluster/interface.go +++ b/cluster/interface.go @@ -19,8 +19,11 @@ type LeaderElector interface { // Resign resigns from the leader role, allowing other election members // to take on leadership. - Resign() error + Resign(ctx context.Context) error // Leader returns the leader value for the current election. Leader(ctx context.Context) (string, error) + + // IsLeader returns true if the caller is the leader. + IsLeader(ctx context.Context) (bool, error) } diff --git a/config.go b/config.go index 792c286ac..34d84720c 100644 --- a/config.go +++ b/config.go @@ -169,6 +169,17 @@ const ( defaultRSBackoff = time.Second * 30 defaultRSAttempts = 1 + // Set defaults for a health check which ensures that the leader + // election is functioning correctly. Although this check is off by + // default (as etcd leader election is only used in a clustered setup), + // we still set the default values so that the health check can be + // easily enabled with sane defaults. Note that by default we only run + // this check once, as it is critical for the node's operation. + defaultLeaderCheckInterval = time.Minute + defaultLeaderCheckTimeout = time.Second * 5 + defaultLeaderCheckBackoff = time.Second * 5 + defaultLeaderCheckAttempts = 1 + // defaultRemoteMaxHtlcs specifies the default limit for maximum // concurrent HTLCs the remote party may add to commitment transactions. // This value can be overridden with --default-remote-max-htlcs. @@ -672,6 +683,12 @@ func DefaultConfig() Config { Attempts: defaultRSAttempts, Backoff: defaultRSBackoff, }, + LeaderCheck: &lncfg.CheckConfig{ + Interval: defaultLeaderCheckInterval, + Timeout: defaultLeaderCheckTimeout, + Attempts: defaultLeaderCheckAttempts, + Backoff: defaultLeaderCheckBackoff, + }, }, Gossip: &lncfg.Gossip{ MaxChannelUpdateBurst: discovery.DefaultMaxChannelUpdateBurst, diff --git a/lncfg/healthcheck.go b/lncfg/healthcheck.go index 92c415474..9a1403687 100644 --- a/lncfg/healthcheck.go +++ b/lncfg/healthcheck.go @@ -34,6 +34,8 @@ type HealthCheckConfig struct { TorConnection *CheckConfig `group:"torconnection" namespace:"torconnection"` RemoteSigner *CheckConfig `group:"remotesigner" namespace:"remotesigner"` + + LeaderCheck *CheckConfig `group:"leader" namespace:"leader"` } // Validate checks the values configured for our health checks. diff --git a/lnd.go b/lnd.go index 0a85e1bf3..bc74c9b96 100644 --- a/lnd.go +++ b/lnd.go @@ -24,6 +24,7 @@ import ( "github.com/lightningnetwork/lnd/build" "github.com/lightningnetwork/lnd/chanacceptor" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/cluster" "github.com/lightningnetwork/lnd/keychain" "github.com/lightningnetwork/lnd/lncfg" "github.com/lightningnetwork/lnd/lnrpc" @@ -56,6 +57,14 @@ const ( // admin macaroon unless the administrator explicitly allowed it. Thus // there's no harm allowing group read. adminMacaroonFilePermissions = 0640 + + // leaderResignTimeout is the timeout used when resigning from the + // leader role. This is kept short so LND can shut down quickly in case + // of a system failure or network partition making the cluster + // unresponsive. The cluster itself should ensure that the leader is not + // elected again until the previous leader has resigned or the leader + // election timeout has passed. + leaderResignTimeout = 5 * time.Second ) // AdminAuthOptions returns a list of DialOptions that can be used to @@ -381,6 +390,7 @@ func Main(cfg *Config, lisCfg ListenerCfg, implCfg *ImplementationCfg, // blocked until this instance is elected as the current leader or // shutting down. elected := false + var leaderElector cluster.LeaderElector if cfg.Cluster.EnableLeaderElection { electionCtx, cancelElection := context.WithCancel(ctx) @@ -392,7 +402,7 @@ func Main(cfg *Config, lisCfg ListenerCfg, implCfg *ImplementationCfg, ltndLog.Infof("Using %v leader elector", cfg.Cluster.LeaderElector) - leaderElector, err := cfg.Cluster.MakeLeaderElector( + leaderElector, err = cfg.Cluster.MakeLeaderElector( electionCtx, cfg.DB, ) if err != nil { @@ -407,7 +417,17 @@ func Main(cfg *Config, lisCfg ListenerCfg, implCfg *ImplementationCfg, ltndLog.Infof("Attempting to resign from leader role "+ "(%v)", cfg.Cluster.ID) - if err := leaderElector.Resign(); err != nil { + // Ensure that we don't block the shutdown process if + // the leader resigning process takes too long. The + // cluster will ensure that the leader is not elected + // again until the previous leader has resigned or the + // leader election timeout has passed. + timeoutCtx, cancel := context.WithTimeout( + ctx, leaderResignTimeout, + ) + defer cancel() + + if err := leaderElector.Resign(timeoutCtx); err != nil { ltndLog.Errorf("Leader elector failed to "+ "resign: %v", err) } @@ -579,7 +599,7 @@ func Main(cfg *Config, lisCfg ListenerCfg, implCfg *ImplementationCfg, server, err := newServer( cfg, cfg.Listeners, dbs, activeChainControl, &idKeyDesc, activeChainControl.Cfg.WalletUnlockParams.ChansToRestore, - multiAcceptor, torController, tlsManager, + multiAcceptor, torController, tlsManager, leaderElector, ) if err != nil { return mkErr("unable to create server: %v", err) diff --git a/lntest/node/config.go b/lntest/node/config.go index de1ea92ec..f0e3f7059 100644 --- a/lntest/node/config.go +++ b/lntest/node/config.go @@ -315,6 +315,9 @@ func ExtraArgsEtcd(etcdCfg *etcd.Config, name string, cluster bool, leaderSessionTTL), } extraArgs = append(extraArgs, clusterArgs...) + extraArgs = append( + extraArgs, "--healthcheck.leader.interval=10s", + ) } return extraArgs diff --git a/server.go b/server.go index 5d52cd399..ae05637bc 100644 --- a/server.go +++ b/server.go @@ -36,6 +36,7 @@ import ( "github.com/lightningnetwork/lnd/channeldb/models" "github.com/lightningnetwork/lnd/channelnotifier" "github.com/lightningnetwork/lnd/clock" + "github.com/lightningnetwork/lnd/cluster" "github.com/lightningnetwork/lnd/contractcourt" "github.com/lightningnetwork/lnd/discovery" "github.com/lightningnetwork/lnd/feature" @@ -484,8 +485,8 @@ func newServer(cfg *Config, listenAddrs []net.Addr, nodeKeyDesc *keychain.KeyDescriptor, chansToRestore walletunlocker.ChannelsToRecover, chanPredicate chanacceptor.ChannelAcceptor, - torController *tor.Controller, tlsManager *TLSManager) (*server, - error) { + torController *tor.Controller, tlsManager *TLSManager, + leaderElector cluster.LeaderElector) (*server, error) { var ( err error @@ -1674,7 +1675,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, } // Create liveness monitor. - s.createLivenessMonitor(cfg, cc) + s.createLivenessMonitor(cfg, cc, leaderElector) // Create the connection manager which will be responsible for // maintaining persistent outbound connections and also accepting new @@ -1721,7 +1722,9 @@ func (s *server) signAliasUpdate(u *lnwire.ChannelUpdate) (*ecdsa.Signature, // // If a health check has been disabled by setting attempts to 0, our monitor // will not run it. -func (s *server) createLivenessMonitor(cfg *Config, cc *chainreg.ChainControl) { +func (s *server) createLivenessMonitor(cfg *Config, cc *chainreg.ChainControl, + leaderElector cluster.LeaderElector) { + chainBackendAttempts := cfg.HealthChecks.ChainCheck.Attempts if cfg.Bitcoin.Node == "nochainbackend" { srvrLog.Info("Disabling chain backend checks for " + @@ -1837,6 +1840,49 @@ func (s *server) createLivenessMonitor(cfg *Config, cc *chainreg.ChainControl) { checks = append(checks, remoteSignerConnectionCheck) } + // If we have a leader elector, we add a health check to ensure we are + // still the leader. During normal operation, we should always be the + // leader, but there are circumstances where this may change, such as + // when we lose network connectivity for long enough expiring out lease. + if leaderElector != nil { + leaderCheck := healthcheck.NewObservation( + "leader status", + func() error { + // Check if we are still the leader. Note that + // we don't need to use a timeout context here + // as the healthcheck observer will handle the + // timeout case for us. + timeoutCtx, cancel := context.WithTimeout( + context.Background(), + cfg.HealthChecks.LeaderCheck.Timeout, + ) + defer cancel() + + leader, err := leaderElector.IsLeader( + timeoutCtx, + ) + if err != nil { + return fmt.Errorf("unable to check if "+ + "still leader: %v", err) + } + + if !leader { + srvrLog.Debug("Not the current leader") + return fmt.Errorf("not the current " + + "leader") + } + + return nil + }, + cfg.HealthChecks.LeaderCheck.Interval, + cfg.HealthChecks.LeaderCheck.Timeout, + cfg.HealthChecks.LeaderCheck.Backoff, + cfg.HealthChecks.LeaderCheck.Attempts, + ) + + checks = append(checks, leaderCheck) + } + // If we have not disabled all of our health checks, we create a // liveness monitor with our configured checks. s.livenessMonitor = healthcheck.NewMonitor(