Merge pull request #8938 from bhandras/etcd-leader-election-fixups

multi: check leader status with our health checker to correctly shut down LND if network partitions
This commit is contained in:
Olaoluwa Osuntokun 2024-08-01 11:26:29 -07:00 committed by GitHub
commit 6e9eb1d0f7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 416 additions and 33 deletions

View File

@ -99,9 +99,27 @@ func (e *etcdLeaderElector) Leader(ctx context.Context) (string, error) {
return "", err
}
if resp == nil || len(resp.Kvs) == 0 {
return "", nil
}
return string(resp.Kvs[0].Value), nil
}
// IsLeader returns true if the caller is the leader.
func (e *etcdLeaderElector) IsLeader(ctx context.Context) (bool, error) {
resp, err := e.election.Leader(ctx)
if err != nil {
return false, err
}
if resp == nil || len(resp.Kvs) == 0 {
return false, nil
}
return string(resp.Kvs[0].Value) == e.id, nil
}
// Campaign will start a new leader election campaign. Campaign will block until
// the elector context is canceled or the caller is elected as the leader.
func (e *etcdLeaderElector) Campaign(ctx context.Context) error {
@ -110,6 +128,6 @@ func (e *etcdLeaderElector) Campaign(ctx context.Context) error {
// Resign resigns the leader role allowing other election members to take
// the place.
func (e *etcdLeaderElector) Resign() error {
return e.election.Resign(context.Background())
func (e *etcdLeaderElector) Resign(ctx context.Context) error {
return e.election.Resign(ctx)
}

View File

@ -87,12 +87,12 @@ func TestEtcdElector(t *testing.T) {
tmp := <-ch
first, err := tmp.Leader(ctxb)
require.NoError(t, err)
require.NoError(t, tmp.Resign())
require.NoError(t, tmp.Resign(ctxb))
tmp = <-ch
second, err := tmp.Leader(ctxb)
require.NoError(t, err)
require.NoError(t, tmp.Resign())
require.NoError(t, tmp.Resign(ctxb))
require.Contains(t, []string{id1, id2}, first)
require.Contains(t, []string{id1, id2}, second)

View File

@ -19,8 +19,11 @@ type LeaderElector interface {
// Resign resigns from the leader role, allowing other election members
// to take on leadership.
Resign() error
Resign(ctx context.Context) error
// Leader returns the leader value for the current election.
Leader(ctx context.Context) (string, error)
// IsLeader returns true if the caller is the leader.
IsLeader(ctx context.Context) (bool, error)
}

View File

@ -169,6 +169,17 @@ const (
defaultRSBackoff = time.Second * 30
defaultRSAttempts = 1
// Set defaults for a health check which ensures that the leader
// election is functioning correctly. Although this check is off by
// default (as etcd leader election is only used in a clustered setup),
// we still set the default values so that the health check can be
// easily enabled with sane defaults. Note that by default we only run
// this check once, as it is critical for the node's operation.
defaultLeaderCheckInterval = time.Minute
defaultLeaderCheckTimeout = time.Second * 5
defaultLeaderCheckBackoff = time.Second * 5
defaultLeaderCheckAttempts = 1
// defaultRemoteMaxHtlcs specifies the default limit for maximum
// concurrent HTLCs the remote party may add to commitment transactions.
// This value can be overridden with --default-remote-max-htlcs.
@ -672,6 +683,12 @@ func DefaultConfig() Config {
Attempts: defaultRSAttempts,
Backoff: defaultRSBackoff,
},
LeaderCheck: &lncfg.CheckConfig{
Interval: defaultLeaderCheckInterval,
Timeout: defaultLeaderCheckTimeout,
Attempts: defaultLeaderCheckAttempts,
Backoff: defaultLeaderCheckBackoff,
},
},
Gossip: &lncfg.Gossip{
MaxChannelUpdateBurst: discovery.DefaultMaxChannelUpdateBurst,

View File

@ -150,6 +150,10 @@ commitment when the channel was force closed.
* [Fixed](https://github.com/lightningnetwork/lnd/pull/8854) pagination issues
in SQL invoicedb queries.
* [Check](https://github.com/lightningnetwork/lnd/pull/8938) leader status with
our health checker to correctly shut down LND if network partitioning occurs
towards the etcd cluster.
## Code Health
* [Move graph building and

6
go.mod
View File

@ -207,6 +207,12 @@ replace google.golang.org/protobuf => github.com/lightninglabs/protobuf-go-hex-d
// Temporary replace until the next version of sqldb is tagged.
replace github.com/lightningnetwork/lnd/sqldb => ./sqldb
// Temporary replace until the next version of healthcheck is tagged.
replace github.com/lightningnetwork/lnd/healthcheck => ./healthcheck
// Temporary replace until the next version of kvdb is tagged.
replace github.com/lightningnetwork/lnd/kvdb => ./kvdb
// If you change this please also update .github/pull_request_template.md and
// docs/INSTALL.md.
go 1.21.4

4
go.sum
View File

@ -452,10 +452,6 @@ github.com/lightningnetwork/lnd/clock v1.1.1 h1:OfR3/zcJd2RhH0RU+zX/77c0ZiOnIMsD
github.com/lightningnetwork/lnd/clock v1.1.1/go.mod h1:mGnAhPyjYZQJmebS7aevElXKTFDuO+uNFFfMXK1W8xQ=
github.com/lightningnetwork/lnd/fn v1.2.0 h1:YTb2m8NN5ZiJAskHeBZAmR1AiPY8SXziIYPAX1VI/ZM=
github.com/lightningnetwork/lnd/fn v1.2.0/go.mod h1:SyFohpVrARPKH3XVAJZlXdVe+IwMYc4OMAvrDY32kw0=
github.com/lightningnetwork/lnd/healthcheck v1.2.4 h1:lLPLac+p/TllByxGSlkCwkJlkddqMP5UCoawCj3mgFQ=
github.com/lightningnetwork/lnd/healthcheck v1.2.4/go.mod h1:G7Tst2tVvWo7cx6mSBEToQC5L1XOGxzZTPB29g9Rv2I=
github.com/lightningnetwork/lnd/kvdb v1.4.8 h1:xH0a5Vi1yrcZ5BEeF2ba3vlKBRxrL9uYXlWTjOjbNTY=
github.com/lightningnetwork/lnd/kvdb v1.4.8/go.mod h1:J2diNABOoII9UrMnxXS5w7vZwP7CA1CStrl8MnIrb3A=
github.com/lightningnetwork/lnd/queue v1.1.1 h1:99ovBlpM9B0FRCGYJo6RSFDlt8/vOkQQZznVb18iNMI=
github.com/lightningnetwork/lnd/queue v1.1.1/go.mod h1:7A6nC1Qrm32FHuhx/mi1cieAiBZo5O6l8IBIoQxvkz4=
github.com/lightningnetwork/lnd/ticker v1.1.1 h1:J/b6N2hibFtC7JLV77ULQp++QLtCwT6ijJlbdiZFbSM=

View File

@ -234,14 +234,13 @@ func (o *Observation) monitor(shutdown shutdownFunc, quit chan struct{}) {
// the max attempts are reached. In that case we will
// stop the ticker and quit.
if o.retryCheck(quit, shutdown) {
log.Debugf("Health check: max attempts " +
"failed, monitor exiting")
o.Debugf("max attempts failed, monitor exiting")
return
}
// Exit if we receive the instruction to shutdown.
case <-quit:
log.Debug("Health check: monitor quit")
o.Debugf("monitor quit")
return
}
}
@ -270,7 +269,7 @@ func (o *Observation) retryCheck(quit chan struct{},
// so we'll invoke our success callback if defined and
// then exit.
if err == nil {
log.Debug("invoking success callback")
o.Debugf("invoking success callback")
// Invoke the success callback.
o.OnSuccess()
@ -283,7 +282,7 @@ func (o *Observation) retryCheck(quit chan struct{},
"%v", o, o.Timeout)
case <-quit:
log.Debug("Health check: monitor quit")
o.Debugf("monitor quit")
return false
}
@ -291,17 +290,18 @@ func (o *Observation) retryCheck(quit chan struct{},
// check has failed so we'll fire the on failure callback
// and request shutdown.
if count == o.Attempts {
log.Debug("invoking failure callback")
o.Debugf("invoking failure callback")
o.OnFailure()
shutdown("Health check: %v failed after %v "+
"calls", o, o.Attempts)
shutdown("Health check: %v failed after %v calls", o,
o.Attempts)
return true
}
log.Infof("Health check: %v, call: %v failed with: %v, "+
"backing off for: %v", o, count, err, o.Backoff)
o.Infof("failed with: %v, attempts: %v backing off for: %v",
err, count, o.Backoff)
// If we are still within the number of calls allowed for this
// check, we wait for our back off period to elapse, or exit if
@ -310,10 +310,22 @@ func (o *Observation) retryCheck(quit chan struct{},
case <-time.After(o.Backoff):
case <-quit:
log.Debug("Health check: monitor quit")
o.Debugf("monitor quit")
return false
}
}
return false
}
// Infof logs an info message for an observation prefixed with the health check
// name.
func (o *Observation) Infof(format string, params ...interface{}) {
log.Debugf(fmt.Sprintf("Health check: %v ", o)+format, params...)
}
// Debugf logs a debug message for an observation prefixed with the health check
// name.
func (o *Observation) Debugf(format string, params ...interface{}) {
log.Debugf(fmt.Sprintf("Health check: %v ", o)+format, params...)
}

View File

@ -233,6 +233,10 @@ var allTestCases = []*lntest.TestCase{
Name: "etcd failover",
TestFunc: testEtcdFailover,
},
{
Name: "leader health check",
TestFunc: testLeaderHealthCheck,
},
{
Name: "hold invoice force close",
TestFunc: testHoldInvoiceForceClose,

View File

@ -4,6 +4,11 @@
package itest
import (
"context"
"fmt"
"io"
"net"
"sync"
"testing"
"time"
@ -137,3 +142,200 @@ func testEtcdFailoverCase(ht *lntest.HarnessTest, kill bool) {
// process.
ht.Shutdown(carol2)
}
// Proxy is a simple TCP proxy that forwards all traffic between a local and a
// remote address. We use it to simulate a network partition in the leader
// health check test.
type Proxy struct {
listenAddr string
targetAddr string
cancel context.CancelFunc
wg sync.WaitGroup
stopped chan struct{}
}
// NewProxy creates a new Proxy instance with a provided context.
func NewProxy(listenAddr, targetAddr string) *Proxy {
return &Proxy{
listenAddr: listenAddr,
targetAddr: targetAddr,
stopped: make(chan struct{}),
}
}
// Start starts the proxy. It listens on the listen address and forwards all
// traffic to the target address.
func (p *Proxy) Start(ctx context.Context, t *testing.T) {
listener, err := net.Listen("tcp", p.listenAddr)
require.NoError(t, err, "Failed to listen on %s", p.listenAddr)
t.Logf("Proxy is listening on %s", p.listenAddr)
proxyCtx, cancel := context.WithCancel(ctx)
p.cancel = cancel
p.wg.Add(1)
go func() {
defer func() {
close(p.stopped)
p.wg.Done()
}()
for {
select {
case <-proxyCtx.Done():
listener.Close()
return
default:
}
conn, err := listener.Accept()
if err != nil {
if proxyCtx.Err() != nil {
// Context is done, exit the loop
return
}
t.Logf("Proxy failed to accept connection: %v",
err)
continue
}
p.wg.Add(1)
go p.handleConnection(proxyCtx, t, conn)
}
}()
}
// handleConnection handles an accepted connection and forwards all traffic
// between the listener and target.
func (p *Proxy) handleConnection(ctx context.Context, t *testing.T,
conn net.Conn) {
targetConn, err := net.Dial("tcp", p.targetAddr)
require.NoError(t, err, "Failed to connect to target %s", p.targetAddr)
defer func() {
conn.Close()
targetConn.Close()
p.wg.Done()
}()
done := make(chan struct{})
p.wg.Add(2)
go func() {
defer p.wg.Done()
// Ignore the copy error due to the connection being closed.
_, _ = io.Copy(targetConn, conn)
}()
go func() {
defer p.wg.Done()
// Ignore the copy error due to the connection being closed.
_, _ = io.Copy(conn, targetConn)
close(done)
}()
select {
case <-ctx.Done():
case <-done:
}
}
// Stop stops the proxy and waits for all connections to be closed and all
// goroutines to be stopped.
func (p *Proxy) Stop(t *testing.T) {
require.NotNil(t, p.cancel, "Proxy is not started")
p.cancel()
p.wg.Wait()
<-p.stopped
t.Log("Proxy stopped", time.Now())
}
// testLeaderHealthCheck tests that a node is properly shut down when the leader
// health check fails.
func testLeaderHealthCheck(ht *lntest.HarnessTest) {
clientPort := port.NextAvailablePort()
// Let's start a test etcd instance that we'll redirect through a proxy.
etcdCfg, cleanup, err := kvdb.StartEtcdTestBackend(
ht.T.TempDir(), uint16(clientPort),
uint16(port.NextAvailablePort()), "",
)
require.NoError(ht, err, "Failed to start etcd instance")
// Make leader election session TTL 5 sec to make the test run fast.
const leaderSessionTTL = 5
// Create an election observer that we will use to monitor the leader
// election.
observer, err := cluster.MakeLeaderElector(
ht.Context(), cluster.EtcdLeaderElector, "observer",
lncfg.DefaultEtcdElectionPrefix, leaderSessionTTL, etcdCfg,
)
require.NoError(ht, err, "Cannot start election observer")
// Start a proxy that will forward all traffic to the etcd instance.
clientAddr := fmt.Sprintf("localhost:%d", clientPort)
proxyAddr := fmt.Sprintf("localhost:%d", port.NextAvailablePort())
ctx, cancel := context.WithCancel(ht.Context())
defer cancel()
proxy := NewProxy(proxyAddr, clientAddr)
proxy.Start(ctx, ht.T)
// Copy the etcd config so that we can modify the host to point to the
// proxy.
proxyEtcdCfg := *etcdCfg
// With the proxy in place, we can now configure the etcd client to
// connect to the proxy instead of the etcd instance.
proxyEtcdCfg.Host = "http://" + proxyAddr
defer cleanup()
// Start Carol-1 with cluster support and connect to etcd through the
// proxy.
password := []byte("the quick brown fox jumps the lazy dog")
stateless := false
cluster := true
carol, _, _ := ht.NewNodeWithSeedEtcd(
"Carol-1", &proxyEtcdCfg, password, stateless, cluster,
leaderSessionTTL,
)
// Make sure Carol-1 is indeed the leader.
assertLeader(ht, observer, "Carol-1")
// At this point Carol-1 is the elected leader, while Carol-2 will wait
// to become the leader when Carol-1 releases the lease. Note that for
// Carol-2 we don't use the proxy as we want to simulate a network
// partition only for Carol-1.
carol2 := ht.NewNodeEtcd(
"Carol-2", etcdCfg, password, cluster, leaderSessionTTL,
)
// Stop the proxy so that we simulate a network partition which
// consequently will make the leader health check fail and force Carol
// to shut down.
proxy.Stop(ht.T)
// Wait for Carol-1 to stop. If the health check wouldn't properly work
// this call would timeout and trigger a test failure.
require.NoError(ht.T, carol.WaitForProcessExit())
// Now that Carol-1 is shut down we should fail over to Carol-2.
failoverTimeout := time.Duration(2*leaderSessionTTL) * time.Second
// Make sure that Carol-2 becomes the leader (reported by Carol-2).
err = carol2.WaitUntilLeader(failoverTimeout)
require.NoError(ht, err, "Waiting for Carol-2 to become the leader "+
"failed")
// Make sure Carol-2 is indeed the leader (repoted by the observer).
assertLeader(ht, observer, "Carol-2")
}

View File

@ -8,3 +8,7 @@ import "github.com/lightningnetwork/lnd/lntest"
// testEtcdFailover is an empty itest when LND is not compiled with etcd
// support.
func testEtcdFailover(ht *lntest.HarnessTest) {}
// testLeaderHealthCheck is an empty itest when LND is not compiled with etcd
// support.
func testLeaderHealthCheck(ht *lntest.HarnessTest) {}

View File

@ -8,12 +8,25 @@ import (
"fmt"
"math"
"strings"
"time"
"github.com/google/btree"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
v3 "go.etcd.io/etcd/client/v3"
)
const (
// rpcTimeout is the timeout for all RPC calls to etcd. It is set to 30
// seconds to avoid blocking the server for too long but give reasonable
// time for etcd to respond. If any operations would take longer than 30
// seconds that generally means there's a problem with the etcd server
// or the network resulting in degraded performance in which case we
// want LND to fail fast. Due to the underlying gRPC implementation in
// etcd calls without a timeout can hang indefinitely even in the case
// of network partitions or other critical failures.
rpcTimeout = time.Second * 30
)
type CommitStats struct {
Rset int
Wset int
@ -609,8 +622,13 @@ func (s *stm) FetchRangePaginatedRaw(prefix string, limit int64,
key := prefix
for {
timeoutCtx, cancel := context.WithTimeout(
s.options.ctx, rpcTimeout,
)
defer cancel()
resp, err := s.client.Get(
s.options.ctx, key, append(opts, s.getOpts...)...,
timeoutCtx, key, append(opts, s.getOpts...)...,
)
if err != nil {
return DatabaseError{
@ -645,8 +663,12 @@ func (s *stm) FetchRangePaginatedRaw(prefix string, limit int64,
// We'll also cache the returned key/value in the read set.
func (s *stm) fetch(key string, opts ...v3.OpOption) ([]KV, error) {
s.callCount++
timeoutCtx, cancel := context.WithTimeout(s.options.ctx, rpcTimeout)
defer cancel()
resp, err := s.client.Get(
s.options.ctx, key, append(opts, s.getOpts...)...,
timeoutCtx, key, append(opts, s.getOpts...)...,
)
if err != nil {
return nil, DatabaseError{
@ -1049,7 +1071,10 @@ func (s *stm) Prefetch(keys []string, prefixes []string) {
[]v3.OpOption{v3.WithPrefix()}, s.getOpts...,
)
txn := s.client.Txn(s.options.ctx)
timeoutCtx, cancel := context.WithTimeout(s.options.ctx, rpcTimeout)
defer cancel()
txn := s.client.Txn(timeoutCtx)
ops := make([]v3.Op, 0, len(fetchKeys)+len(fetchPrefixes))
for _, key := range fetchKeys {
@ -1103,8 +1128,11 @@ func (s *stm) commit() (CommitStats, error) {
// Create the compare set.
cmps := append(rset, wset...)
// Create a transaction with the optional abort context.
txn := s.client.Txn(s.options.ctx)
timeoutCtx, cancel := context.WithTimeout(s.options.ctx, rpcTimeout)
defer cancel()
txn := s.client.Txn(timeoutCtx)
// If the compare set holds, try executing the puts.
txn = txn.If(cmps...)

View File

@ -33,7 +33,7 @@ func DefaultCluster() *Cluster {
return &Cluster{
LeaderElector: cluster.EtcdLeaderElector,
EtcdElectionPrefix: DefaultEtcdElectionPrefix,
LeaderSessionTTL: 60,
LeaderSessionTTL: 90,
ID: hostname,
}
}

View File

@ -34,6 +34,8 @@ type HealthCheckConfig struct {
TorConnection *CheckConfig `group:"torconnection" namespace:"torconnection"`
RemoteSigner *CheckConfig `group:"remotesigner" namespace:"remotesigner"`
LeaderCheck *CheckConfig `group:"leader" namespace:"leader"`
}
// Validate checks the values configured for our health checks.

26
lnd.go
View File

@ -24,6 +24,7 @@ import (
"github.com/lightningnetwork/lnd/build"
"github.com/lightningnetwork/lnd/chanacceptor"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/cluster"
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/lncfg"
"github.com/lightningnetwork/lnd/lnrpc"
@ -56,6 +57,14 @@ const (
// admin macaroon unless the administrator explicitly allowed it. Thus
// there's no harm allowing group read.
adminMacaroonFilePermissions = 0640
// leaderResignTimeout is the timeout used when resigning from the
// leader role. This is kept short so LND can shut down quickly in case
// of a system failure or network partition making the cluster
// unresponsive. The cluster itself should ensure that the leader is not
// elected again until the previous leader has resigned or the leader
// election timeout has passed.
leaderResignTimeout = 5 * time.Second
)
// AdminAuthOptions returns a list of DialOptions that can be used to
@ -381,6 +390,7 @@ func Main(cfg *Config, lisCfg ListenerCfg, implCfg *ImplementationCfg,
// blocked until this instance is elected as the current leader or
// shutting down.
elected := false
var leaderElector cluster.LeaderElector
if cfg.Cluster.EnableLeaderElection {
electionCtx, cancelElection := context.WithCancel(ctx)
@ -392,7 +402,7 @@ func Main(cfg *Config, lisCfg ListenerCfg, implCfg *ImplementationCfg,
ltndLog.Infof("Using %v leader elector",
cfg.Cluster.LeaderElector)
leaderElector, err := cfg.Cluster.MakeLeaderElector(
leaderElector, err = cfg.Cluster.MakeLeaderElector(
electionCtx, cfg.DB,
)
if err != nil {
@ -407,7 +417,17 @@ func Main(cfg *Config, lisCfg ListenerCfg, implCfg *ImplementationCfg,
ltndLog.Infof("Attempting to resign from leader role "+
"(%v)", cfg.Cluster.ID)
if err := leaderElector.Resign(); err != nil {
// Ensure that we don't block the shutdown process if
// the leader resigning process takes too long. The
// cluster will ensure that the leader is not elected
// again until the previous leader has resigned or the
// leader election timeout has passed.
timeoutCtx, cancel := context.WithTimeout(
ctx, leaderResignTimeout,
)
defer cancel()
if err := leaderElector.Resign(timeoutCtx); err != nil {
ltndLog.Errorf("Leader elector failed to "+
"resign: %v", err)
}
@ -579,7 +599,7 @@ func Main(cfg *Config, lisCfg ListenerCfg, implCfg *ImplementationCfg,
server, err := newServer(
cfg, cfg.Listeners, dbs, activeChainControl, &idKeyDesc,
activeChainControl.Cfg.WalletUnlockParams.ChansToRestore,
multiAcceptor, torController, tlsManager,
multiAcceptor, torController, tlsManager, leaderElector,
)
if err != nil {
return mkErr("unable to create server: %v", err)

View File

@ -315,6 +315,9 @@ func ExtraArgsEtcd(etcdCfg *etcd.Config, name string, cluster bool,
leaderSessionTTL),
}
extraArgs = append(extraArgs, clusterArgs...)
extraArgs = append(
extraArgs, "--healthcheck.leader.interval=10s",
)
}
return extraArgs

View File

@ -1130,6 +1130,24 @@
; checks. This value must be >= 1m.
; healthcheck.remotesigner.interval=1m
; The number of times we should attempt to check the node's leader status
; before gracefully shutting down. Set this value to 0 to disable this health
; check.
; healthcheck.leader.attempts=1
; The amount of time after the leader check times out due to unanswered RPC.
; This value must be >= 1s.
; healthcheck.leader.timeout=5s
; The amount of time we should backoff between failed attempts of leader checks.
; This value must be >= 1s.
; healthcheck.leader.backoff=5s
; The amount of time we should wait between leader checks.
; This value must be >= 1m.
; healthcheck.leader.interval=1m
[signrpc]
@ -1537,7 +1555,7 @@
; The session TTL in seconds after which a new leader is elected if the old
; leader is shut down, crashed or becomes unreachable.
; cluster.leader-session-ttl=60
; cluster.leader-session-ttl=90
[rpcmiddleware]

View File

@ -36,6 +36,7 @@ import (
"github.com/lightningnetwork/lnd/channeldb/models"
"github.com/lightningnetwork/lnd/channelnotifier"
"github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/cluster"
"github.com/lightningnetwork/lnd/contractcourt"
"github.com/lightningnetwork/lnd/discovery"
"github.com/lightningnetwork/lnd/feature"
@ -484,8 +485,8 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
nodeKeyDesc *keychain.KeyDescriptor,
chansToRestore walletunlocker.ChannelsToRecover,
chanPredicate chanacceptor.ChannelAcceptor,
torController *tor.Controller, tlsManager *TLSManager) (*server,
error) {
torController *tor.Controller, tlsManager *TLSManager,
leaderElector cluster.LeaderElector) (*server, error) {
var (
err error
@ -1674,7 +1675,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
}
// Create liveness monitor.
s.createLivenessMonitor(cfg, cc)
s.createLivenessMonitor(cfg, cc, leaderElector)
// Create the connection manager which will be responsible for
// maintaining persistent outbound connections and also accepting new
@ -1721,7 +1722,9 @@ func (s *server) signAliasUpdate(u *lnwire.ChannelUpdate) (*ecdsa.Signature,
//
// If a health check has been disabled by setting attempts to 0, our monitor
// will not run it.
func (s *server) createLivenessMonitor(cfg *Config, cc *chainreg.ChainControl) {
func (s *server) createLivenessMonitor(cfg *Config, cc *chainreg.ChainControl,
leaderElector cluster.LeaderElector) {
chainBackendAttempts := cfg.HealthChecks.ChainCheck.Attempts
if cfg.Bitcoin.Node == "nochainbackend" {
srvrLog.Info("Disabling chain backend checks for " +
@ -1837,6 +1840,49 @@ func (s *server) createLivenessMonitor(cfg *Config, cc *chainreg.ChainControl) {
checks = append(checks, remoteSignerConnectionCheck)
}
// If we have a leader elector, we add a health check to ensure we are
// still the leader. During normal operation, we should always be the
// leader, but there are circumstances where this may change, such as
// when we lose network connectivity for long enough expiring out lease.
if leaderElector != nil {
leaderCheck := healthcheck.NewObservation(
"leader status",
func() error {
// Check if we are still the leader. Note that
// we don't need to use a timeout context here
// as the healthcheck observer will handle the
// timeout case for us.
timeoutCtx, cancel := context.WithTimeout(
context.Background(),
cfg.HealthChecks.LeaderCheck.Timeout,
)
defer cancel()
leader, err := leaderElector.IsLeader(
timeoutCtx,
)
if err != nil {
return fmt.Errorf("unable to check if "+
"still leader: %v", err)
}
if !leader {
srvrLog.Debug("Not the current leader")
return fmt.Errorf("not the current " +
"leader")
}
return nil
},
cfg.HealthChecks.LeaderCheck.Interval,
cfg.HealthChecks.LeaderCheck.Timeout,
cfg.HealthChecks.LeaderCheck.Backoff,
cfg.HealthChecks.LeaderCheck.Attempts,
)
checks = append(checks, leaderCheck)
}
// If we have not disabled all of our health checks, we create a
// liveness monitor with our configured checks.
s.livenessMonitor = healthcheck.NewMonitor(