Merge pull request #6342 from bhandras/leader_election_ttl

cluster: configurable session TTL for the leader elector
This commit is contained in:
Olaoluwa Osuntokun 2022-03-22 18:47:17 -07:00 committed by GitHub
commit 42ca9b171e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 55 additions and 27 deletions

View File

@ -35,7 +35,7 @@ type etcdLeaderElector struct {
// newEtcdLeaderElector constructs a new etcdLeaderElector.
func newEtcdLeaderElector(ctx context.Context, id, electionPrefix string,
cfg *etcd.Config) (*etcdLeaderElector, error) {
leaderSessionTTL int, cfg *etcd.Config) (*etcdLeaderElector, error) {
clientCfg := clientv3.Config{
Context: ctx,
@ -72,7 +72,9 @@ func newEtcdLeaderElector(ctx context.Context, id, electionPrefix string,
cli.Lease = namespace.NewLease(cli.Lease, cfg.Namespace)
log.Infof("Applied namespace to leader elector: %v", cfg.Namespace)
session, err := concurrency.NewSession(cli)
session, err := concurrency.NewSession(
cli, concurrency.WithTTL(leaderSessionTTL),
)
if err != nil {
log.Errorf("Unable to start new leader election session: %v",
err)

View File

@ -15,9 +15,9 @@ import (
func makeEtcdElector(ctx context.Context, args ...interface{}) (LeaderElector,
error) {
if len(args) != 3 {
if len(args) != 4 {
return nil, fmt.Errorf("invalid number of arguments to "+
"cluster.makeEtcdElector(): expected 3, got %v",
"cluster.makeEtcdElector(): expected 4, got %v",
len(args))
}
@ -33,13 +33,21 @@ func makeEtcdElector(ctx context.Context, args ...interface{}) (LeaderElector,
"cluster.makeEtcdElector(), expected: string")
}
etcdCfg, ok := args[2].(*etcd.Config)
leaderSessionTTL, ok := args[2].(int)
if !ok {
return nil, fmt.Errorf("invalid argument (2) to " +
"cluster.makeEtcdElector(), expected: int")
}
etcdCfg, ok := args[3].(*etcd.Config)
if !ok {
return nil, fmt.Errorf("invalid argument (3) to " +
"cluster.makeEtcdElector(), expected: *etcd.Config")
}
return newEtcdLeaderElector(ctx, id, electionPrefix, etcdCfg)
return newEtcdLeaderElector(
ctx, id, electionPrefix, leaderSessionTTL, etcdCfg,
)
}
func init() {

View File

@ -57,15 +57,16 @@ func TestEtcdElector(t *testing.T) {
election = "/election/"
id1 = "e1"
id2 = "e2"
ttl = 5
)
e1, err := newEtcdLeaderElector(
ctx, id1, election, etcdCfg,
ctx, id1, election, ttl, etcdCfg,
)
require.NoError(t, err)
e2, err := newEtcdLeaderElector(
ctx, id2, election, etcdCfg,
ctx, id2, election, ttl, etcdCfg,
)
require.NoError(t, err)

View File

@ -98,6 +98,9 @@
modules were integrated into `lnd` as a preparation for basic Taproot
support](https://github.com/lightningnetwork/lnd/pull/6285).
* [Make etcd leader election session
TTL](https://github.com/lightningnetwork/lnd/pull/6342) configurable.
## RPC Server
* [Add value to the field

View File

@ -20,9 +20,11 @@ type Cluster struct {
LeaderElector string `long:"leader-elector" choice:"etcd" description:"Leader elector to use. Valid values: \"etcd\"."`
EtcdElectionPrefix string `long:"etcd-election-prefix" description:"Election key prefix when using etcd leader elector. Defaults to \"/leader/\"."`
EtcdElectionPrefix string `long:"etcd-election-prefix" description:"Election key prefix when using etcd leader elector."`
ID string `long:"id" description:"Identifier for this node inside the cluster (used in leader election). Defaults to the hostname."`
LeaderSessionTTL int `long:"leader-session-ttl" description:"The TTL in seconds to use for the leader election session."`
}
// DefaultCluster creates and returns a new default DB config.
@ -31,6 +33,7 @@ func DefaultCluster() *Cluster {
return &Cluster{
LeaderElector: cluster.EtcdLeaderElector,
EtcdElectionPrefix: DefaultEtcdElectionPrefix,
LeaderSessionTTL: 60,
ID: hostname,
}
}
@ -43,7 +46,7 @@ func (c *Cluster) MakeLeaderElector(electionCtx context.Context, db *DB) (
if c.LeaderElector == cluster.EtcdLeaderElector {
return cluster.MakeLeaderElector(
electionCtx, c.LeaderElector, c.ID,
c.EtcdElectionPrefix, db.Etcd,
c.EtcdElectionPrefix, c.LeaderSessionTTL, db.Etcd,
)
}

View File

@ -275,7 +275,9 @@ func (n *NetworkHarness) Stop() {
// extraArgsEtcd returns extra args for configuring LND to use an external etcd
// database (for remote channel DB and wallet DB).
func extraArgsEtcd(etcdCfg *etcd.Config, name string, cluster bool) []string {
func extraArgsEtcd(etcdCfg *etcd.Config, name string, cluster bool,
leaderSessionTTL int) []string {
extraArgs := []string{
"--db.backend=etcd",
fmt.Sprintf("--db.etcd.host=%v", etcdCfg.Host),
@ -289,10 +291,13 @@ func extraArgsEtcd(etcdCfg *etcd.Config, name string, cluster bool) []string {
}
if cluster {
extraArgs = append(extraArgs, "--cluster.enable-leader-election")
extraArgs = append(
extraArgs, fmt.Sprintf("--cluster.id=%v", name),
)
clusterArgs := []string{
"--cluster.enable-leader-election",
fmt.Sprintf("--cluster.id=%v", name),
fmt.Sprintf("--cluster.leader-session-ttl=%v",
leaderSessionTTL),
}
extraArgs = append(extraArgs, clusterArgs...)
}
return extraArgs
@ -302,13 +307,13 @@ func extraArgsEtcd(etcdCfg *etcd.Config, name string, cluster bool) []string {
// etcd database as its (remote) channel and wallet DB. The passsed cluster
// flag indicates that we'd like the node to join the cluster leader election.
func (n *NetworkHarness) NewNodeWithSeedEtcd(name string, etcdCfg *etcd.Config,
password []byte, entropy []byte, statelessInit, cluster bool) (
*HarnessNode, []string, []byte, error) {
password []byte, entropy []byte, statelessInit, cluster bool,
leaderSessionTTL int) (*HarnessNode, []string, []byte, error) {
// We don't want to use the embedded etcd instance.
const dbBackend = BackendBbolt
extraArgs := extraArgsEtcd(etcdCfg, name, cluster)
extraArgs := extraArgsEtcd(etcdCfg, name, cluster, leaderSessionTTL)
return n.newNodeWithSeed(
name, extraArgs, password, entropy, statelessInit, dbBackend,
)
@ -320,12 +325,13 @@ func (n *NetworkHarness) NewNodeWithSeedEtcd(name string, etcdCfg *etcd.Config,
// If the wait flag is false then we won't wait until RPC is available (this is
// useful when the node is not expected to become the leader right away).
func (n *NetworkHarness) NewNodeEtcd(name string, etcdCfg *etcd.Config,
password []byte, cluster, wait bool) (*HarnessNode, error) {
password []byte, cluster, wait bool, leaderSessionTTL int) (
*HarnessNode, error) {
// We don't want to use the embedded etcd instance.
const dbBackend = BackendBbolt
extraArgs := extraArgsEtcd(etcdCfg, name, cluster)
extraArgs := extraArgsEtcd(etcdCfg, name, cluster, leaderSessionTTL)
return n.newNode(name, extraArgs, true, password, dbBackend, wait)
}

View File

@ -81,12 +81,15 @@ func testEtcdFailoverCase(net *lntest.NetworkHarness, ht *harnessTest,
}
defer cleanup()
// Make leader election session TTL 5 sec to make the test run fast.
const leaderSessionTTL = 5
observer, err := cluster.MakeLeaderElector(
ctxb, cluster.EtcdLeaderElector, "observer",
lncfg.DefaultEtcdElectionPrefix, etcdCfg,
lncfg.DefaultEtcdElectionPrefix, leaderSessionTTL, etcdCfg,
)
if err != nil {
ht.Fatalf("Cannot start election observer")
ht.Fatalf("Cannot start election observer: %v", err)
}
password := []byte("the quick brown fox jumps the lazy dog")
@ -96,6 +99,7 @@ func testEtcdFailoverCase(net *lntest.NetworkHarness, ht *harnessTest,
carol1, _, _, err := net.NewNodeWithSeedEtcd(
"Carol-1", etcdCfg, password, entropy[:], stateless, cluster,
leaderSessionTTL,
)
if err != nil {
ht.Fatalf("unable to start Carol-1: %v", err)
@ -122,7 +126,7 @@ func testEtcdFailoverCase(net *lntest.NetworkHarness, ht *harnessTest,
// At this point Carol-1 is the elected leader, while Carol-2 will wait
// to become the leader when Carol-1 stops.
carol2, err := net.NewNodeEtcd(
"Carol-2", etcdCfg, password, cluster, false,
"Carol-2", etcdCfg, password, cluster, false, leaderSessionTTL,
)
if err != nil {
ht.Fatalf("Unable to start Carol-2: %v", err)
@ -145,17 +149,14 @@ func testEtcdFailoverCase(net *lntest.NetworkHarness, ht *harnessTest,
)
// Shut down or kill Carol-1 and wait for Carol-2 to become the leader.
var failoverTimeout time.Duration
failoverTimeout := time.Duration(2*leaderSessionTTL) * time.Second
if kill {
err = net.KillNode(carol1)
if err != nil {
ht.Fatalf("Can't kill Carol-1: %v", err)
}
failoverTimeout = 2 * time.Minute
} else {
shutdownAndAssert(net, ht, carol1)
failoverTimeout = 30 * time.Second
}
err = carol2.WaitUntilLeader(failoverTimeout)

View File

@ -1248,6 +1248,10 @@ litecoin.node=ltcd
; Defaults to the hostname.
; cluster.id=example.com
; The session TTL in seconds after which a new leader is elected if the old
; leader is shut down, crashed or becomes unreachable.
; cluster.leader-session-ttl=30
[rpcmiddleware]
; Enable the RPC middleware interceptor functionality.