Merge pull request #6082 from yyforyongyu/6000-peer-conn

peer: fix competing connections to the same peer
This commit is contained in:
Oliver Gugger 2022-01-03 13:28:45 +01:00 committed by GitHub
commit 9d6701be2b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 100 additions and 39 deletions

View file

@ -158,6 +158,7 @@ func (c *ChannelGraphBootstrapper) SampleNodeAddrs(numAddrs uint32,
// We'll merge the ignore map with our currently selected map in order
// to ensure we don't return any duplicate nodes.
for n := range ignore {
log.Tracef("Ignored node %x for bootstrapping", n)
c.tried[n] = struct{}{}
}

View file

@ -1,13 +1,5 @@
# Release Notes
## Bug Fixes
* [Return the nearest known fee rate when a given conf target cannot be found
from Web API fee estimator.](https://github.com/lightningnetwork/lnd/pull/6062)
* [We now _always_ set a channel type if the other party signals the feature
bit](https://github.com/lightningnetwork/lnd/pull/6075).
## Wallet
* A bug that prevented opening anchor-based channels from external wallets when
@ -31,6 +23,12 @@
## Bug fixes
* [Return the nearest known fee rate when a given conf target cannot be found
from Web API fee estimator.](https://github.com/lightningnetwork/lnd/pull/6062)
* [We now _always_ set a channel type if the other party signals the feature
bit](https://github.com/lightningnetwork/lnd/pull/6075).
* [Add json flag to
trackpayment](https://github.com/lightningnetwork/lnd/pull/6060)
* [Clarify invalid config timeout
@ -47,6 +45,12 @@
* [Fix Postgres context cancellation](https://github.com/lightningnetwork/lnd/pull/6108)
* A conflict was found in connecting peers, where the peer bootstrapping
process and persistent connection could compete connection for a peer that
led to an already made connection being lost. [This is now fixed so that
bootstrapping will always ignore the peers chosen by the persistent
connection.](https://github.com/lightningnetwork/lnd/pull/6082)
## RPC Server
* [ChanStatusFlags is now

View file

@ -1744,7 +1744,7 @@ func (lc *LightningChannel) restoreCommitState(
}
lc.localCommitChain.addCommitment(localCommit)
lc.log.Debugf("starting local commitment: %v",
lc.log.Tracef("starting local commitment: %v",
newLogClosure(func() string {
return spew.Sdump(lc.localCommitChain.tail())
}),
@ -1760,7 +1760,7 @@ func (lc *LightningChannel) restoreCommitState(
}
lc.remoteCommitChain.addCommitment(remoteCommit)
lc.log.Debugf("starting remote commitment: %v",
lc.log.Tracef("starting remote commitment: %v",
newLogClosure(func() string {
return spew.Sdump(lc.remoteCommitChain.tail())
}),

View file

@ -1442,7 +1442,7 @@ func (l *LightningWallet) handleContributionMsg(req *addContributionMsg) {
)
}
walletLog.Debugf("Funding tx for ChannelPoint(%v) "+
walletLog.Tracef("Funding tx for ChannelPoint(%v) "+
"generated: %v", chanPoint, spew.Sdump(fundingTx))
}
@ -1589,9 +1589,9 @@ func (l *LightningWallet) handleChanPointReady(req *continueContributionMsg) {
txsort.InPlaceSort(ourCommitTx)
txsort.InPlaceSort(theirCommitTx)
walletLog.Debugf("Local commit tx for ChannelPoint(%v): %v",
walletLog.Tracef("Local commit tx for ChannelPoint(%v): %v",
chanPoint, spew.Sdump(ourCommitTx))
walletLog.Debugf("Remote commit tx for ChannelPoint(%v): %v",
walletLog.Tracef("Remote commit tx for ChannelPoint(%v): %v",
chanPoint, spew.Sdump(theirCommitTx))
// Record newly available information within the open channel state.

View file

@ -479,7 +479,8 @@ func (p *Brontide) Start() error {
return nil
}
peerLog.Tracef("Peer %v starting", p)
peerLog.Tracef("Peer %v starting with conn[%v->%v]", p,
p.cfg.Conn.LocalAddr(), p.cfg.Conn.RemoteAddr())
// Fetch and then load all the active channels we have with this remote
// peer from the database.

View file

@ -521,3 +521,11 @@ func (m *mockMessageConn) ReadNextHeader() (uint32, error) {
func (m *mockMessageConn) ReadNextBody(buf []byte) ([]byte, error) {
return m.curReadMessage, nil
}
func (m *mockMessageConn) RemoteAddr() net.Addr {
return nil
}
func (m *mockMessageConn) LocalAddr() net.Addr {
return nil
}

View file

@ -125,9 +125,8 @@ func (r *ChannelRouter) notifyTopologyChange(topologyDiff *TopologyChange) {
return
}
log.Debugf("Sending topology notification to %v clients %v",
numClients,
newLogClosure(func() string {
log.Tracef("Sending topology notification to %v clients %v",
numClients, newLogClosure(func() string {
return spew.Sdump(topologyDiff)
}),
)

View file

@ -199,6 +199,10 @@ type server struct {
peerConnectedListeners map[string][]chan<- lnpeer.Peer
peerDisconnectedListeners map[string][]chan<- struct{}
// TODO(yy): the Brontide.Start doesn't know this value, which means it
// will continue to send messages even if there are no active channels
// and the value below is false. Once it's pruned, all its connections
// will be closed, thus the Brontide.Start will return an error.
persistentPeers map[string]bool
persistentPeersBackoff map[string]time.Duration
persistentPeerAddrs map[string][]*lnwire.NetAddress
@ -2303,6 +2307,39 @@ func initNetworkBootstrappers(s *server) ([]discovery.NetworkPeerBootstrapper, e
return bootStrappers, nil
}
// createBootstrapIgnorePeers creates a map of peers that the bootstrap process
// needs to ignore, which is made of three parts,
// - the node itself needs to be skipped as it doesn't make sense to connect
// to itself.
// - the peers that already have connections with, as in s.peersByPub.
// - the peers that we are attempting to connect, as in s.persistentPeers.
func (s *server) createBootstrapIgnorePeers() map[autopilot.NodeID]struct{} {
s.mu.RLock()
defer s.mu.RUnlock()
ignore := make(map[autopilot.NodeID]struct{})
// We should ignore ourselves from bootstrapping.
selfKey := autopilot.NewNodeID(s.identityECDH.PubKey())
ignore[selfKey] = struct{}{}
// Ignore all connected peers.
for _, peer := range s.peersByPub {
nID := autopilot.NewNodeID(peer.IdentityKey())
ignore[nID] = struct{}{}
}
// Ignore all persistent peers as they have a dedicated reconnecting
// process.
for pubKeyStr := range s.persistentPeers {
var nID autopilot.NodeID
copy(nID[:], []byte(pubKeyStr))
ignore[nID] = struct{}{}
}
return ignore
}
// peerBootstrapper is a goroutine which is tasked with attempting to establish
// and maintain a target minimum number of outbound connections. With this
// invariant, we ensure that our node is connected to a diverse set of peers
@ -2313,13 +2350,12 @@ func (s *server) peerBootstrapper(numTargetPeers uint32,
defer s.wg.Done()
// ignore is a set used to keep track of peers already retrieved from
// our bootstrappers in order to avoid duplicates.
ignore := make(map[autopilot.NodeID]struct{})
// Before we continue, init the ignore peers map.
ignoreList := s.createBootstrapIgnorePeers()
// We'll start off by aggressively attempting connections to peers in
// order to be a part of the network as soon as possible.
s.initialPeerBootstrap(ignore, numTargetPeers, bootstrappers)
s.initialPeerBootstrap(ignoreList, numTargetPeers, bootstrappers)
// Once done, we'll attempt to maintain our target minimum number of
// peers.
@ -2391,13 +2427,10 @@ func (s *server) peerBootstrapper(numTargetPeers uint32,
// With the number of peers we need calculated, we'll
// query the network bootstrappers to sample a set of
// random addrs for us.
s.mu.RLock()
ignoreList := make(map[autopilot.NodeID]struct{})
for _, peer := range s.peersByPub {
nID := autopilot.NewNodeID(peer.IdentityKey())
ignoreList[nID] = struct{}{}
}
s.mu.RUnlock()
//
// Before we continue, get a copy of the ignore peers
// map.
ignoreList = s.createBootstrapIgnorePeers()
peerAddrs, err := discovery.MultiSourceBootstrap(
ignoreList, numNeeded*2, bootstrappers...,
@ -2450,7 +2483,11 @@ const bootstrapBackOffCeiling = time.Minute * 5
// until the target number of peers has been reached. This ensures that nodes
// receive an up to date network view as soon as possible.
func (s *server) initialPeerBootstrap(ignore map[autopilot.NodeID]struct{},
numTargetPeers uint32, bootstrappers []discovery.NetworkPeerBootstrapper) {
numTargetPeers uint32,
bootstrappers []discovery.NetworkPeerBootstrapper) {
srvrLog.Debugf("Init bootstrap with targetPeers=%v, bootstrappers=%v, "+
"ignore=%v", numTargetPeers, len(bootstrappers), len(ignore))
// We'll start off by waiting 2 seconds between failed attempts, then
// double each time we fail until we hit the bootstrapBackOffCeiling.
@ -2775,6 +2812,9 @@ func (s *server) establishPersistentConnections() error {
return err
}
srvrLog.Debugf("Establishing %v persistent connections on start",
len(nodeAddrsMap))
// Acquire and hold server lock until all persistent connection requests
// have been recorded and sent to the connection manager.
s.mu.Lock()
@ -3093,10 +3133,10 @@ func (s *server) InboundPeerConnected(conn net.Conn) {
// If we already have an outbound connection to this peer, then ignore
// this new connection.
if _, ok := s.outboundPeers[pubStr]; ok {
srvrLog.Debugf("Already have outbound connection for %x, "+
"ignoring inbound connection",
nodePub.SerializeCompressed())
if p, ok := s.outboundPeers[pubStr]; ok {
srvrLog.Debugf("Already have outbound connection for %v, "+
"ignoring inbound connection from local=%v, remote=%v",
p, conn.LocalAddr(), conn.RemoteAddr())
conn.Close()
return
@ -3105,8 +3145,9 @@ func (s *server) InboundPeerConnected(conn net.Conn) {
// If we already have a valid connection that is scheduled to take
// precedence once the prior peer has finished disconnecting, we'll
// ignore this connection.
if _, ok := s.scheduledPeerConnection[pubStr]; ok {
srvrLog.Debugf("Ignoring connection, peer already scheduled")
if p, ok := s.scheduledPeerConnection[pubStr]; ok {
srvrLog.Debugf("Ignoring connection from %v, peer %v already "+
"scheduled", conn.RemoteAddr(), p)
conn.Close()
return
}
@ -3179,10 +3220,10 @@ func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn)
// If we already have an inbound connection to this peer, then ignore
// this new connection.
if _, ok := s.inboundPeers[pubStr]; ok {
srvrLog.Debugf("Already have inbound connection for %x, "+
"ignoring outbound connection",
nodePub.SerializeCompressed())
if p, ok := s.inboundPeers[pubStr]; ok {
srvrLog.Debugf("Already have inbound connection for %v, "+
"ignoring outbound connection from local=%v, remote=%v",
p, conn.LocalAddr(), conn.RemoteAddr())
if connReq != nil {
s.connMgr.Remove(connReq.ID())
@ -3303,6 +3344,8 @@ func (s *server) cancelConnReqs(pubStr string, skip *uint64) {
}
for _, connReq := range connReqs {
srvrLog.Tracef("Canceling %s:", connReqs)
// Atomically capture the current request identifier.
connID := connReq.ID()
@ -3998,6 +4041,9 @@ func (s *server) connectToPeer(addr *lnwire.NetAddress,
close(errChan)
srvrLog.Tracef("Brontide dialer made local=%v, remote=%v",
conn.LocalAddr(), conn.RemoteAddr())
s.OutboundPeerConnected(nil, conn)
}
@ -4272,5 +4318,7 @@ func shouldPeerBootstrap(cfg *Config) bool {
isRegtest := (cfg.Bitcoin.RegTest || cfg.Litecoin.RegTest)
isDevNetwork := isSimnet || isSignet || isRegtest
// TODO(yy): remove the check on simnet/regtest such that the itest is
// covering the bootstrapping process.
return !cfg.NoNetBootstrap && !isDevNetwork
}