mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-01-19 05:45:21 +01:00
8e0534f756
This commit extends our healtcheck with an optional leader check. This is to ensure that given network partition or other cluster wide failure we act as soon as possible to avoid a split-brain situation where a new leader is elected but we still hold onto our etcd client.
134 lines
3.3 KiB
Go
134 lines
3.3 KiB
Go
//go:build kvdb_etcd
|
|
// +build kvdb_etcd
|
|
|
|
package cluster
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"github.com/lightningnetwork/lnd/kvdb/etcd"
|
|
"go.etcd.io/etcd/client/pkg/v3/transport"
|
|
clientv3 "go.etcd.io/etcd/client/v3"
|
|
"go.etcd.io/etcd/client/v3/concurrency"
|
|
"go.etcd.io/etcd/client/v3/namespace"
|
|
)
|
|
|
|
const (
|
|
// etcdConnectionTimeout is the timeout until successful connection to
|
|
// the etcd instance.
|
|
etcdConnectionTimeout = 10 * time.Second
|
|
)
|
|
|
|
// Enforce that etcdLeaderElector implements the LeaderElector interface.
|
|
var _ LeaderElector = (*etcdLeaderElector)(nil)
|
|
|
|
// etcdLeaderElector is an implementation of LeaderElector using etcd as the
|
|
// election governor.
|
|
type etcdLeaderElector struct {
|
|
id string
|
|
ctx context.Context
|
|
cli *clientv3.Client
|
|
session *concurrency.Session
|
|
election *concurrency.Election
|
|
}
|
|
|
|
// newEtcdLeaderElector constructs a new etcdLeaderElector.
|
|
func newEtcdLeaderElector(ctx context.Context, id, electionPrefix string,
|
|
leaderSessionTTL int, cfg *etcd.Config) (*etcdLeaderElector, error) {
|
|
|
|
clientCfg := clientv3.Config{
|
|
Context: ctx,
|
|
Endpoints: []string{cfg.Host},
|
|
DialTimeout: etcdConnectionTimeout,
|
|
Username: cfg.User,
|
|
Password: cfg.Pass,
|
|
}
|
|
|
|
if !cfg.DisableTLS {
|
|
tlsInfo := transport.TLSInfo{
|
|
CertFile: cfg.CertFile,
|
|
KeyFile: cfg.KeyFile,
|
|
InsecureSkipVerify: cfg.InsecureSkipVerify,
|
|
}
|
|
|
|
tlsConfig, err := tlsInfo.ClientConfig()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
clientCfg.TLS = tlsConfig
|
|
}
|
|
|
|
cli, err := clientv3.New(clientCfg)
|
|
if err != nil {
|
|
log.Errorf("Unable to connect to etcd: %v", err)
|
|
return nil, err
|
|
}
|
|
|
|
// Apply the namespace.
|
|
cli.KV = namespace.NewKV(cli.KV, cfg.Namespace)
|
|
cli.Watcher = namespace.NewWatcher(cli.Watcher, cfg.Namespace)
|
|
cli.Lease = namespace.NewLease(cli.Lease, cfg.Namespace)
|
|
log.Infof("Applied namespace to leader elector: %v", cfg.Namespace)
|
|
|
|
session, err := concurrency.NewSession(
|
|
cli, concurrency.WithTTL(leaderSessionTTL),
|
|
)
|
|
if err != nil {
|
|
log.Errorf("Unable to start new leader election session: %v",
|
|
err)
|
|
return nil, err
|
|
}
|
|
|
|
return &etcdLeaderElector{
|
|
id: id,
|
|
ctx: ctx,
|
|
cli: cli,
|
|
session: session,
|
|
election: concurrency.NewElection(
|
|
session, electionPrefix,
|
|
),
|
|
}, nil
|
|
}
|
|
|
|
// Leader returns the leader value for the current election.
|
|
func (e *etcdLeaderElector) Leader(ctx context.Context) (string, error) {
|
|
resp, err := e.election.Leader(ctx)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
if resp == nil || len(resp.Kvs) == 0 {
|
|
return "", nil
|
|
}
|
|
|
|
return string(resp.Kvs[0].Value), nil
|
|
}
|
|
|
|
// IsLeader returns true if the caller is the leader.
|
|
func (e *etcdLeaderElector) IsLeader(ctx context.Context) (bool, error) {
|
|
resp, err := e.election.Leader(ctx)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
if resp == nil || len(resp.Kvs) == 0 {
|
|
return false, nil
|
|
}
|
|
|
|
return string(resp.Kvs[0].Value) == e.id, nil
|
|
}
|
|
|
|
// Campaign will start a new leader election campaign. Campaign will block until
|
|
// the elector context is canceled or the caller is elected as the leader.
|
|
func (e *etcdLeaderElector) Campaign(ctx context.Context) error {
|
|
return e.election.Campaign(ctx, e.id)
|
|
}
|
|
|
|
// Resign resigns the leader role allowing other election members to take
|
|
// the place.
|
|
func (e *etcdLeaderElector) Resign(ctx context.Context) error {
|
|
return e.election.Resign(ctx)
|
|
}
|