From 2c8d1c878ecce74eb502134d546ab50972ac3cea Mon Sep 17 00:00:00 2001 From: Andras Banki-Horvath Date: Thu, 25 Jul 2024 18:13:20 +0200 Subject: [PATCH] kvdb: make etcd calls timeout to ensure liveness Previously our RPC calls to etcd would hang even in the case of properly set dial timeouts and even if there was a network partition. To ensure liveness we need to make sure that calls fail correctly in case of system failure. To fix this we add a default timeout of 30 seconds to each etcd RPC call. --- kvdb/etcd/stm.go | 36 ++++++++++++++++++++++++++++++++---- 1 file changed, 32 insertions(+), 4 deletions(-) diff --git a/kvdb/etcd/stm.go b/kvdb/etcd/stm.go index 5b3a9a977..3c933d5c4 100644 --- a/kvdb/etcd/stm.go +++ b/kvdb/etcd/stm.go @@ -8,12 +8,25 @@ import ( "fmt" "math" "strings" + "time" "github.com/google/btree" pb "go.etcd.io/etcd/api/v3/etcdserverpb" v3 "go.etcd.io/etcd/client/v3" ) +const ( + // rpcTimeout is the timeout for all RPC calls to etcd. It is set to 30 + // seconds to avoid blocking the server for too long but give reasonable + // time for etcd to respond. If any operations would take longer than 30 + // seconds that generally means there's a problem with the etcd server + // or the network resulting in degraded performance in which case we + // want LND to fail fast. Due to the underlying gRPC implementation in + // etcd calls without a timeout can hang indefinitely even in the case + // of network partitions or other critical failures. + rpcTimeout = time.Second * 30 +) + type CommitStats struct { Rset int Wset int @@ -609,8 +622,13 @@ func (s *stm) FetchRangePaginatedRaw(prefix string, limit int64, key := prefix for { + timeoutCtx, cancel := context.WithTimeout( + s.options.ctx, rpcTimeout, + ) + defer cancel() + resp, err := s.client.Get( - s.options.ctx, key, append(opts, s.getOpts...)..., + timeoutCtx, key, append(opts, s.getOpts...)..., ) if err != nil { return DatabaseError{ @@ -645,8 +663,12 @@ func (s *stm) FetchRangePaginatedRaw(prefix string, limit int64, // We'll also cache the returned key/value in the read set. func (s *stm) fetch(key string, opts ...v3.OpOption) ([]KV, error) { s.callCount++ + + timeoutCtx, cancel := context.WithTimeout(s.options.ctx, rpcTimeout) + defer cancel() + resp, err := s.client.Get( - s.options.ctx, key, append(opts, s.getOpts...)..., + timeoutCtx, key, append(opts, s.getOpts...)..., ) if err != nil { return nil, DatabaseError{ @@ -1049,7 +1071,10 @@ func (s *stm) Prefetch(keys []string, prefixes []string) { []v3.OpOption{v3.WithPrefix()}, s.getOpts..., ) - txn := s.client.Txn(s.options.ctx) + timeoutCtx, cancel := context.WithTimeout(s.options.ctx, rpcTimeout) + defer cancel() + + txn := s.client.Txn(timeoutCtx) ops := make([]v3.Op, 0, len(fetchKeys)+len(fetchPrefixes)) for _, key := range fetchKeys { @@ -1103,8 +1128,11 @@ func (s *stm) commit() (CommitStats, error) { // Create the compare set. cmps := append(rset, wset...) + // Create a transaction with the optional abort context. - txn := s.client.Txn(s.options.ctx) + timeoutCtx, cancel := context.WithTimeout(s.options.ctx, rpcTimeout) + defer cancel() + txn := s.client.Txn(timeoutCtx) // If the compare set holds, try executing the puts. txn = txn.If(cmps...)