server: adds truncated exponential backoff + rand for retry

This commit adds an backoff policy to the peer termination
watcher to avoid getting stuck in tight connection loops
with failing peers. The maximum backoff is now set to 128s,
and each backoff is randomized so that two instances using
the same algorithm have some hope of desynchronizing.
This commit is contained in:
Conner Fromknecht 2018-02-01 00:48:38 -08:00
parent 7bbcbc6fea
commit a03e95410b
No known key found for this signature in database
GPG Key ID: 39DE78FBE6ACB0EF

View File

@ -2,10 +2,12 @@ package main
import (
"bytes"
"crypto/rand"
"crypto/sha256"
"encoding/hex"
"fmt"
"image/color"
"math/big"
"net"
"strconv"
"sync"
@ -42,6 +44,14 @@ var (
// ErrServerShuttingDown indicates that the server is in the process of
// gracefully exiting.
ErrServerShuttingDown = errors.New("server is shutting down")
// defaultBackoff is the starting point for exponential backoff for
// reconnecting to persistent peers.
defaultBackoff = time.Second
// maximumBackoff is the largest backoff we will permit when
// reattempting connections to persistent peers.
maximumBackoff = time.Minute
)
// server is the main server of the Lightning Network Daemon. The server houses
@ -73,8 +83,9 @@ type server struct {
peerConnectedListeners map[string][]chan<- struct{}
persistentPeers map[string]struct{}
persistentConnReqs map[string][]*connmgr.ConnReq
persistentPeers map[string]struct{}
persistentPeersBackoff map[string]time.Duration
persistentConnReqs map[string][]*connmgr.ConnReq
// ignorePeerTermination tracks peers for which the server has initiated
// a disconnect. Adding a peer to this map causes the peer termination
@ -157,9 +168,10 @@ func newServer(listenAddrs []string, chanDB *channeldb.DB, cc *chainControl,
sphinx.NewRouter(privKey, activeNetParams.Params)),
lightningID: sha256.Sum256(serializedPubKey),
persistentPeers: make(map[string]struct{}),
persistentConnReqs: make(map[string][]*connmgr.ConnReq),
ignorePeerTermination: make(map[*peer]struct{}),
persistentPeers: make(map[string]struct{}),
persistentPeersBackoff: make(map[string]time.Duration),
persistentConnReqs: make(map[string][]*connmgr.ConnReq),
ignorePeerTermination: make(map[*peer]struct{}),
peersByID: make(map[int32]*peer),
peersByPub: make(map[string]*peer),
@ -663,6 +675,9 @@ func (s *server) peerBootstrapper(numTargetPeers uint32,
targetPub := string(conn.RemotePub().SerializeCompressed())
s.mu.Lock()
s.persistentPeers[targetPub] = struct{}{}
if _, ok := s.persistentPeersBackoff[targetPub]; !ok {
s.persistentPeersBackoff[targetPub] = defaultBackoff
}
s.mu.Unlock()
s.OutboundPeerConnected(nil, conn)
@ -775,6 +790,9 @@ func (s *server) peerBootstrapper(numTargetPeers uint32,
targetPub := string(conn.RemotePub().SerializeCompressed())
s.mu.Lock()
s.persistentPeers[targetPub] = struct{}{}
if _, ok := s.persistentPeersBackoff[targetPub]; !ok {
s.persistentPeersBackoff[targetPub] = defaultBackoff
}
s.mu.Unlock()
s.OutboundPeerConnected(nil, conn)
@ -941,6 +959,9 @@ func (s *server) establishPersistentConnections() error {
// Add this peer to the set of peers we should maintain a
// persistent connection with.
s.persistentPeers[pubStr] = struct{}{}
if _, ok := s.persistentPeersBackoff[pubStr]; !ok {
s.persistentPeersBackoff[pubStr] = defaultBackoff
}
for _, address := range nodeAddr.addresses {
// Create a wrapper address which couples the IP and
@ -1261,24 +1282,39 @@ func (s *server) peerTerminationWatcher(p *peer) {
return
}
srvrLog.Debugf("Attempting to re-establish persistent "+
"connection to peer %v", p)
// If so, then we'll attempt to re-establish a persistent
// connection to the peer.
// Otherwise, we'll launch a new connection request in order to
// attempt to maintain a persistent connection with this peer.
// TODO(roasbeef): look up latest info for peer in database
connReq := &connmgr.ConnReq{
Addr: p.addr,
Permanent: true,
}
// Otherwise, we'll launch a new connection requests in order
// to attempt to maintain a persistent connection with this
// peer.
s.persistentConnReqs[pubStr] = append(
s.persistentConnReqs[pubStr], connReq)
go s.connMgr.Connect(connReq)
// Compute the subsequent backoff duration.
currBackoff := s.persistentPeersBackoff[pubStr]
nextBackoff := computeNextBackoff(currBackoff)
s.persistentPeersBackoff[pubStr] = nextBackoff
// We choose not to wait group this go routine since the Connect
// call can stall for arbitrarily long if we shutdown while an
// outbound connection attempt is being made.
go func() {
srvrLog.Debugf("Scheduling connection re-establishment to "+
"persistent peer %v in %s", p, nextBackoff)
select {
case <-time.After(nextBackoff):
case <-s.quit:
return
}
srvrLog.Debugf("Attempting to re-establish persistent "+
"connection to peer %v", p)
s.connMgr.Connect(connReq)
}()
}
}
@ -1675,6 +1711,9 @@ func (s *server) ConnectToPeer(addr *lnwire.NetAddress, perm bool) error {
}
s.persistentPeers[targetPub] = struct{}{}
if _, ok := s.persistentPeersBackoff[targetPub]; !ok {
s.persistentPeersBackoff[targetPub] = defaultBackoff
}
s.persistentConnReqs[targetPub] = append(
s.persistentConnReqs[targetPub], connReq)
s.mu.Unlock()
@ -1727,6 +1766,7 @@ func (s *server) DisconnectPeer(pubKey *btcec.PublicKey) error {
// them from this map so we don't attempt to re-connect after we
// disconnect.
delete(s.persistentPeers, pubStr)
delete(s.persistentPeersBackoff, pubStr)
// Remove the current peer from the server's internal state and signal
// that the peer termination watcher does not need to execute for this
@ -1850,3 +1890,31 @@ func parseHexColor(colorStr string) (color.RGBA, error) {
return color.RGBA{R: colorBytes[0], G: colorBytes[1], B: colorBytes[2]}, nil
}
// computeNextBackoff uses a truncated exponential backoff to compute the next
// backoff using the value of the exiting backoff. The returned duration is
// randomized in either direction by 1/20 to prevent tight loops from
// stabilizing.
func computeNextBackoff(currBackoff time.Duration) time.Duration {
// Double the current backoff, truncating if it exceeds our maximum.
nextBackoff := 2 * currBackoff
if nextBackoff > maximumBackoff {
nextBackoff = maximumBackoff
}
// Using 1/10 of our duration as a margin, compute a random offset to
// avoid the nodes entering connection cycles.
margin := nextBackoff / 10
var wiggle big.Int
wiggle.SetUint64(uint64(margin))
if _, err := rand.Int(rand.Reader, &wiggle); err != nil {
// Randomizing is not mission critical, so we'll just return the
// current backoff.
return nextBackoff
}
// Otherwise add in our wiggle, but subtract out half of the margin so
// that the backoff can tweaked by 1/20 in either direction.
return nextBackoff + (time.Duration(wiggle.Uint64()) - margin/2)
}