Merge pull request #5925 from ellemouton/staggerConnToMultipleAddresses

server: stagger connReqs to multi-address peers
This commit is contained in:
Olaoluwa Osuntokun 2021-11-04 17:48:49 -07:00 committed by GitHub
commit 1ab5cc3ae8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 68 additions and 40 deletions

View file

@ -610,6 +610,11 @@ messages directly. There is no routing/path finding involved.
* [Fix pathfinding crash when inbound policy is unknown]( * [Fix pathfinding crash when inbound policy is unknown](
https://github.com/lightningnetwork/lnd/pull/5922) https://github.com/lightningnetwork/lnd/pull/5922)
* [Stagger connection attempts to multi-address peers to ensure that the peer
doesn't close the first successful connection in favour of the next if
the first one was successful](
https://github.com/lightningnetwork/lnd/pull/5925)
## Documentation ## Documentation
The [code contribution guidelines have been updated to mention the new The [code contribution guidelines have been updated to mention the new

View file

@ -95,6 +95,10 @@ const (
// value used or a particular peer will be chosen between 0s and this // value used or a particular peer will be chosen between 0s and this
// value. // value.
maxInitReconnectDelay = 30 maxInitReconnectDelay = 30
// multiAddrConnectionStagger is the number of seconds to wait between
// attempting to a peer with each of its advertised addresses.
multiAddrConnectionStagger = 10 * time.Second
) )
var ( var (
@ -2798,20 +2802,11 @@ func (s *server) establishPersistentConnections() error {
IdentityKey: nodeAddr.pubKey, IdentityKey: nodeAddr.pubKey,
Address: address, Address: address,
} }
srvrLog.Debugf("Attempting persistent connection to "+
"channel peer %v", lnAddr)
// Send the persistent connection request to the s.persistentPeerAddrs[pubStr] = append(
// connection manager, saving the request itself so we s.persistentPeerAddrs[pubStr], lnAddr)
// can cancel/restart the process as needed.
connReq := &connmgr.ConnReq{
Addr: lnAddr,
Permanent: true,
} }
s.persistentConnReqs[pubStr] = append(
s.persistentConnReqs[pubStr], connReq)
// We'll connect to the first 10 peers immediately, then // We'll connect to the first 10 peers immediately, then
// randomly stagger any remaining connections if the // randomly stagger any remaining connections if the
// stagger initial reconnect flag is set. This ensures // stagger initial reconnect flag is set. This ensures
@ -2822,10 +2817,9 @@ func (s *server) establishPersistentConnections() error {
if numOutboundConns < numInstantInitReconnect || if numOutboundConns < numInstantInitReconnect ||
!s.cfg.StaggerInitialReconnect { !s.cfg.StaggerInitialReconnect {
go s.connMgr.Connect(connReq) go s.connectToPersistentPeer(pubStr)
} else { } else {
go s.delayInitialReconnect(connReq) go s.delayInitialReconnect(pubStr)
}
} }
numOutboundConns++ numOutboundConns++
@ -2834,16 +2828,15 @@ func (s *server) establishPersistentConnections() error {
return nil return nil
} }
// delayInitialReconnect will attempt a reconnection using the passed connreq // delayInitialReconnect will attempt a reconnection to the given peer after
// after sampling a value for the delay between 0s and the // sampling a value for the delay between 0s and the maxInitReconnectDelay.
// maxInitReconnectDelay.
// //
// NOTE: This method MUST be run as a goroutine. // NOTE: This method MUST be run as a goroutine.
func (s *server) delayInitialReconnect(connReq *connmgr.ConnReq) { func (s *server) delayInitialReconnect(pubStr string) {
delay := time.Duration(prand.Intn(maxInitReconnectDelay)) * time.Second delay := time.Duration(prand.Intn(maxInitReconnectDelay)) * time.Second
select { select {
case <-time.After(delay): case <-time.After(delay):
s.connMgr.Connect(connReq) s.connectToPersistentPeer(pubStr)
case <-s.quit: case <-s.quit:
} }
} }
@ -2858,6 +2851,7 @@ func (s *server) prunePersistentPeerConnection(compressedPubKey [33]byte) {
if perm, ok := s.persistentPeers[pubKeyStr]; ok && !perm { if perm, ok := s.persistentPeers[pubKeyStr]; ok && !perm {
delete(s.persistentPeers, pubKeyStr) delete(s.persistentPeers, pubKeyStr)
delete(s.persistentPeersBackoff, pubKeyStr) delete(s.persistentPeersBackoff, pubKeyStr)
delete(s.persistentPeerAddrs, pubKeyStr)
s.cancelConnReqs(pubKeyStr, nil) s.cancelConnReqs(pubKeyStr, nil)
s.mu.Unlock() s.mu.Unlock()
@ -3813,20 +3807,49 @@ func (s *server) connectToPersistentPeer(pubKeyStr string) {
s.persistentConnReqs[pubKeyStr] = updatedConnReqs s.persistentConnReqs[pubKeyStr] = updatedConnReqs
cancelChan, ok := s.persistentRetryCancels[pubKeyStr]
if !ok {
cancelChan = make(chan struct{})
s.persistentRetryCancels[pubKeyStr] = cancelChan
}
// Any addresses left in addrMap are new ones that we have not made // Any addresses left in addrMap are new ones that we have not made
// connection requests for. So create new connection requests for those. // connection requests for. So create new connection requests for those.
// If there is more than one address in the address map, stagger the
// creation of the connection requests for those.
go func() {
ticker := time.NewTicker(multiAddrConnectionStagger)
for _, addr := range addrMap { for _, addr := range addrMap {
// Send the persistent connection request to the
// connection manager, saving the request itself so we
// can cancel/restart the process as needed.
connReq := &connmgr.ConnReq{ connReq := &connmgr.ConnReq{
Addr: addr, Addr: addr,
Permanent: true, Permanent: true,
} }
s.mu.Lock()
s.persistentConnReqs[pubKeyStr] = append( s.persistentConnReqs[pubKeyStr] = append(
s.persistentConnReqs[pubKeyStr], connReq, s.persistentConnReqs[pubKeyStr], connReq,
) )
s.mu.Unlock()
srvrLog.Debugf("Attempting persistent connection to "+
"channel peer %v", addr)
go s.connMgr.Connect(connReq) go s.connMgr.Connect(connReq)
select {
case <-s.quit:
return
case <-cancelChan:
return
case <-ticker.C:
} }
} }
}()
}
// removePeer removes the passed peer from the server's state of all active // removePeer removes the passed peer from the server's state of all active
// peers. // peers.