diff --git a/docs/release-notes/release-notes-0.14.0.md b/docs/release-notes/release-notes-0.14.0.md index 7b3bea00c..f2e2526c1 100644 --- a/docs/release-notes/release-notes-0.14.0.md +++ b/docs/release-notes/release-notes-0.14.0.md @@ -610,6 +610,11 @@ messages directly. There is no routing/path finding involved. * [Fix pathfinding crash when inbound policy is unknown]( https://github.com/lightningnetwork/lnd/pull/5922) +* [Stagger connection attempts to multi-address peers to ensure that the peer + doesn't close the first successful connection in favour of the next if + the first one was successful]( + https://github.com/lightningnetwork/lnd/pull/5925) + ## Documentation The [code contribution guidelines have been updated to mention the new diff --git a/server.go b/server.go index 354b90dba..3b108d250 100644 --- a/server.go +++ b/server.go @@ -95,6 +95,10 @@ const ( // value used or a particular peer will be chosen between 0s and this // value. maxInitReconnectDelay = 30 + + // multiAddrConnectionStagger is the number of seconds to wait between + // attempting to a peer with each of its advertised addresses. + multiAddrConnectionStagger = 10 * time.Second ) var ( @@ -2798,34 +2802,24 @@ func (s *server) establishPersistentConnections() error { IdentityKey: nodeAddr.pubKey, Address: address, } - srvrLog.Debugf("Attempting persistent connection to "+ - "channel peer %v", lnAddr) - // Send the persistent connection request to the - // connection manager, saving the request itself so we - // can cancel/restart the process as needed. - connReq := &connmgr.ConnReq{ - Addr: lnAddr, - Permanent: true, - } + s.persistentPeerAddrs[pubStr] = append( + s.persistentPeerAddrs[pubStr], lnAddr) + } - s.persistentConnReqs[pubStr] = append( - s.persistentConnReqs[pubStr], connReq) + // We'll connect to the first 10 peers immediately, then + // randomly stagger any remaining connections if the + // stagger initial reconnect flag is set. This ensures + // that mobile nodes or nodes with a small number of + // channels obtain connectivity quickly, but larger + // nodes are able to disperse the costs of connecting to + // all peers at once. + if numOutboundConns < numInstantInitReconnect || + !s.cfg.StaggerInitialReconnect { - // We'll connect to the first 10 peers immediately, then - // randomly stagger any remaining connections if the - // stagger initial reconnect flag is set. This ensures - // that mobile nodes or nodes with a small number of - // channels obtain connectivity quickly, but larger - // nodes are able to disperse the costs of connecting to - // all peers at once. - if numOutboundConns < numInstantInitReconnect || - !s.cfg.StaggerInitialReconnect { - - go s.connMgr.Connect(connReq) - } else { - go s.delayInitialReconnect(connReq) - } + go s.connectToPersistentPeer(pubStr) + } else { + go s.delayInitialReconnect(pubStr) } numOutboundConns++ @@ -2834,16 +2828,15 @@ func (s *server) establishPersistentConnections() error { return nil } -// delayInitialReconnect will attempt a reconnection using the passed connreq -// after sampling a value for the delay between 0s and the -// maxInitReconnectDelay. +// delayInitialReconnect will attempt a reconnection to the given peer after +// sampling a value for the delay between 0s and the maxInitReconnectDelay. // // NOTE: This method MUST be run as a goroutine. -func (s *server) delayInitialReconnect(connReq *connmgr.ConnReq) { +func (s *server) delayInitialReconnect(pubStr string) { delay := time.Duration(prand.Intn(maxInitReconnectDelay)) * time.Second select { case <-time.After(delay): - s.connMgr.Connect(connReq) + s.connectToPersistentPeer(pubStr) case <-s.quit: } } @@ -2858,6 +2851,7 @@ func (s *server) prunePersistentPeerConnection(compressedPubKey [33]byte) { if perm, ok := s.persistentPeers[pubKeyStr]; ok && !perm { delete(s.persistentPeers, pubKeyStr) delete(s.persistentPeersBackoff, pubKeyStr) + delete(s.persistentPeerAddrs, pubKeyStr) s.cancelConnReqs(pubKeyStr, nil) s.mu.Unlock() @@ -3813,19 +3807,48 @@ func (s *server) connectToPersistentPeer(pubKeyStr string) { s.persistentConnReqs[pubKeyStr] = updatedConnReqs + cancelChan, ok := s.persistentRetryCancels[pubKeyStr] + if !ok { + cancelChan = make(chan struct{}) + s.persistentRetryCancels[pubKeyStr] = cancelChan + } + // Any addresses left in addrMap are new ones that we have not made // connection requests for. So create new connection requests for those. - for _, addr := range addrMap { - connReq := &connmgr.ConnReq{ - Addr: addr, - Permanent: true, - } + // If there is more than one address in the address map, stagger the + // creation of the connection requests for those. + go func() { + ticker := time.NewTicker(multiAddrConnectionStagger) - s.persistentConnReqs[pubKeyStr] = append( - s.persistentConnReqs[pubKeyStr], connReq, - ) - go s.connMgr.Connect(connReq) - } + for _, addr := range addrMap { + // Send the persistent connection request to the + // connection manager, saving the request itself so we + // can cancel/restart the process as needed. + connReq := &connmgr.ConnReq{ + Addr: addr, + Permanent: true, + } + + s.mu.Lock() + s.persistentConnReqs[pubKeyStr] = append( + s.persistentConnReqs[pubKeyStr], connReq, + ) + s.mu.Unlock() + + srvrLog.Debugf("Attempting persistent connection to "+ + "channel peer %v", addr) + + go s.connMgr.Connect(connReq) + + select { + case <-s.quit: + return + case <-cancelChan: + return + case <-ticker.C: + } + } + }() } // removePeer removes the passed peer from the server's state of all active