cluster: configurable session TTL for the leader elector

This commit is contained in:
Andras Banki-Horvath 2022-03-16 20:09:55 +01:00
parent 57840bba36
commit 8eca46f142
No known key found for this signature in database
GPG key ID: 80E5375C094198D8
4 changed files with 24 additions and 10 deletions

View file

@ -35,7 +35,7 @@ type etcdLeaderElector struct {
// newEtcdLeaderElector constructs a new etcdLeaderElector.
func newEtcdLeaderElector(ctx context.Context, id, electionPrefix string,
cfg *etcd.Config) (*etcdLeaderElector, error) {
leaderSessionTTL int, cfg *etcd.Config) (*etcdLeaderElector, error) {
clientCfg := clientv3.Config{
Context: ctx,
@ -72,7 +72,9 @@ func newEtcdLeaderElector(ctx context.Context, id, electionPrefix string,
cli.Lease = namespace.NewLease(cli.Lease, cfg.Namespace)
log.Infof("Applied namespace to leader elector: %v", cfg.Namespace)
session, err := concurrency.NewSession(cli)
session, err := concurrency.NewSession(
cli, concurrency.WithTTL(leaderSessionTTL),
)
if err != nil {
log.Errorf("Unable to start new leader election session: %v",
err)

View file

@ -15,9 +15,9 @@ import (
func makeEtcdElector(ctx context.Context, args ...interface{}) (LeaderElector,
error) {
if len(args) != 3 {
if len(args) != 4 {
return nil, fmt.Errorf("invalid number of arguments to "+
"cluster.makeEtcdElector(): expected 3, got %v",
"cluster.makeEtcdElector(): expected 4, got %v",
len(args))
}
@ -33,13 +33,21 @@ func makeEtcdElector(ctx context.Context, args ...interface{}) (LeaderElector,
"cluster.makeEtcdElector(), expected: string")
}
etcdCfg, ok := args[2].(*etcd.Config)
leaderSessionTTL, ok := args[2].(int)
if !ok {
return nil, fmt.Errorf("invalid argument (2) to " +
"cluster.makeEtcdElector(), expected: int")
}
etcdCfg, ok := args[3].(*etcd.Config)
if !ok {
return nil, fmt.Errorf("invalid argument (3) to " +
"cluster.makeEtcdElector(), expected: *etcd.Config")
}
return newEtcdLeaderElector(ctx, id, electionPrefix, etcdCfg)
return newEtcdLeaderElector(
ctx, id, electionPrefix, leaderSessionTTL, etcdCfg,
)
}
func init() {

View file

@ -57,15 +57,16 @@ func TestEtcdElector(t *testing.T) {
election = "/election/"
id1 = "e1"
id2 = "e2"
ttl = 5
)
e1, err := newEtcdLeaderElector(
ctx, id1, election, etcdCfg,
ctx, id1, election, ttl, etcdCfg,
)
require.NoError(t, err)
e2, err := newEtcdLeaderElector(
ctx, id2, election, etcdCfg,
ctx, id2, election, ttl, etcdCfg,
)
require.NoError(t, err)

View file

@ -20,9 +20,11 @@ type Cluster struct {
LeaderElector string `long:"leader-elector" choice:"etcd" description:"Leader elector to use. Valid values: \"etcd\"."`
EtcdElectionPrefix string `long:"etcd-election-prefix" description:"Election key prefix when using etcd leader elector. Defaults to \"/leader/\"."`
EtcdElectionPrefix string `long:"etcd-election-prefix" description:"Election key prefix when using etcd leader elector."`
ID string `long:"id" description:"Identifier for this node inside the cluster (used in leader election). Defaults to the hostname."`
LeaderSessionTTL int `long:"leader-session-ttl" description:"The TTL in seconds to use for the leader election session."`
}
// DefaultCluster creates and returns a new default DB config.
@ -31,6 +33,7 @@ func DefaultCluster() *Cluster {
return &Cluster{
LeaderElector: cluster.EtcdLeaderElector,
EtcdElectionPrefix: DefaultEtcdElectionPrefix,
LeaderSessionTTL: 60,
ID: hostname,
}
}
@ -43,7 +46,7 @@ func (c *Cluster) MakeLeaderElector(electionCtx context.Context, db *DB) (
if c.LeaderElector == cluster.EtcdLeaderElector {
return cluster.MakeLeaderElector(
electionCtx, c.LeaderElector, c.ID,
c.EtcdElectionPrefix, db.Etcd,
c.EtcdElectionPrefix, c.LeaderSessionTTL, db.Etcd,
)
}