mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-02-21 22:11:41 +01:00
Merge pull request #5538 from ellemouton/refreshPeerIP
multi: refresh peer IP during reconnect
This commit is contained in:
commit
af9b620854
7 changed files with 544 additions and 111 deletions
|
@ -391,7 +391,7 @@ you.
|
|||
## Bug Fixes
|
||||
|
||||
* A bug has been fixed that would cause `lnd` to [try to bootstrap using the
|
||||
currnet DNS seeds when in SigNet
|
||||
current DNS seeds when in SigNet
|
||||
mode](https://github.com/lightningnetwork/lnd/pull/5564).
|
||||
|
||||
* [A validation check for sane `CltvLimit` and `FinalCltvDelta` has been added
|
||||
|
@ -430,6 +430,10 @@ you.
|
|||
result in transactions being rebroadcast even after they had been confirmed.
|
||||
[Lnd is updated to use the version of Neutrino containing this
|
||||
fix](https://github.com/lightningnetwork/lnd/pull/5807).
|
||||
|
||||
* A bug has been fixed that would result in nodes not [reconnecting to their
|
||||
persistent outbound peers if the peer's IP
|
||||
address changed](https://github.com/lightningnetwork/lnd/pull/5538).
|
||||
|
||||
* [Use the change output index when validating the reserved wallet balance for
|
||||
SendCoins/SendMany calls](https://github.com/lightningnetwork/lnd/pull/5665)
|
||||
|
@ -444,6 +448,7 @@ change](https://github.com/lightningnetwork/lnd/pull/5613).
|
|||
* Alyssa Hertig
|
||||
* Andras Banki-Horvath
|
||||
* de6df1re
|
||||
* Elle Mouton
|
||||
* ErikEk
|
||||
* Eugene Siegel
|
||||
* Harsha Goli
|
||||
|
|
|
@ -346,10 +346,10 @@ func (n *NetworkHarness) NewNodeEtcd(name string, etcdCfg *etcd.Config,
|
|||
// current instance of the network harness. The created node is running, but
|
||||
// not yet connected to other nodes within the network.
|
||||
func (n *NetworkHarness) NewNode(t *testing.T,
|
||||
name string, extraArgs []string) *HarnessNode {
|
||||
name string, extraArgs []string, opts ...NodeOption) *HarnessNode {
|
||||
|
||||
node, err := n.newNode(
|
||||
name, extraArgs, false, nil, n.dbBackend, true,
|
||||
name, extraArgs, false, nil, n.dbBackend, true, opts...,
|
||||
)
|
||||
require.NoErrorf(t, err, "unable to create new node for %s", name)
|
||||
|
||||
|
@ -670,13 +670,31 @@ func (n *NetworkHarness) EnsureConnected(t *testing.T, a, b *HarnessNode) {
|
|||
)
|
||||
}
|
||||
|
||||
// ConnectNodes establishes an encrypted+authenticated p2p connection from node
|
||||
// ConnectNodes attempts to create a connection between nodes a and b.
|
||||
func (n *NetworkHarness) ConnectNodes(t *testing.T, a, b *HarnessNode) {
|
||||
n.connectNodes(t, a, b, false)
|
||||
}
|
||||
|
||||
// ConnectNodesPerm attempts to connect nodes a and b and sets node b as
|
||||
// a peer that node a should persistently attempt to reconnect to if they
|
||||
// become disconnected.
|
||||
func (n *NetworkHarness) ConnectNodesPerm(t *testing.T,
|
||||
a, b *HarnessNode) {
|
||||
|
||||
n.connectNodes(t, a, b, true)
|
||||
}
|
||||
|
||||
// connectNodes establishes an encrypted+authenticated p2p connection from node
|
||||
// a towards node b. The function will return a non-nil error if the connection
|
||||
// was unable to be established.
|
||||
// was unable to be established. If the perm parameter is set to true then
|
||||
// node a will persistently attempt to reconnect to node b if they get
|
||||
// disconnected.
|
||||
//
|
||||
// NOTE: This function may block for up to 15-seconds as it will not return
|
||||
// until the new connection is detected as being known to both nodes.
|
||||
func (n *NetworkHarness) ConnectNodes(t *testing.T, a, b *HarnessNode) {
|
||||
func (n *NetworkHarness) connectNodes(t *testing.T, a, b *HarnessNode,
|
||||
perm bool) {
|
||||
|
||||
ctxb := context.Background()
|
||||
ctx, cancel := context.WithTimeout(ctxb, DefaultTimeout)
|
||||
defer cancel()
|
||||
|
@ -692,6 +710,7 @@ func (n *NetworkHarness) ConnectNodes(t *testing.T, a, b *HarnessNode) {
|
|||
Pubkey: bobInfo.IdentityPubkey,
|
||||
Host: b.Cfg.P2PAddr(),
|
||||
},
|
||||
Perm: perm,
|
||||
}
|
||||
|
||||
err = n.connect(ctx, req, a)
|
||||
|
|
|
@ -471,43 +471,102 @@ func assertNumOpenChannelsPending(t *harnessTest,
|
|||
require.NoError(t.t, err)
|
||||
}
|
||||
|
||||
// assertNumConnections asserts number current connections between two peers.
|
||||
func assertNumConnections(t *harnessTest, alice, bob *lntest.HarnessNode,
|
||||
expected int) {
|
||||
// checkPeerInPeersList returns true if Bob appears in Alice's peer list.
|
||||
func checkPeerInPeersList(ctx context.Context, alice,
|
||||
bob *lntest.HarnessNode) (bool, error) {
|
||||
|
||||
peers, err := alice.ListPeers(ctx, &lnrpc.ListPeersRequest{})
|
||||
if err != nil {
|
||||
return false, fmt.Errorf(
|
||||
"error listing %s's node (%v) peers: %v",
|
||||
alice.Name(), alice.NodeID, err,
|
||||
)
|
||||
}
|
||||
|
||||
for _, peer := range peers.Peers {
|
||||
if peer.PubKey == bob.PubKeyStr {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// assertConnected asserts that two peers are connected.
|
||||
func assertConnected(t *harnessTest, alice, bob *lntest.HarnessNode) {
|
||||
ctxb := context.Background()
|
||||
ctxt, _ := context.WithTimeout(ctxb, defaultTimeout)
|
||||
ctxt, cancel := context.WithTimeout(ctxb, defaultTimeout)
|
||||
defer cancel()
|
||||
|
||||
err := wait.NoError(func() error {
|
||||
aNumPeers, err := alice.ListPeers(
|
||||
ctxt, &lnrpc.ListPeersRequest{},
|
||||
)
|
||||
bobIsAlicePeer, err := checkPeerInPeersList(ctxt, alice, bob)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !bobIsAlicePeer {
|
||||
return fmt.Errorf(
|
||||
"unable to fetch %s's node (%v) list peers %v",
|
||||
alice.Name(), alice.NodeID, err,
|
||||
"expected %s and %s to be connected "+
|
||||
"but %s is not in %s's peer list",
|
||||
alice.Name(), bob.Name(),
|
||||
bob.Name(), alice.Name(),
|
||||
)
|
||||
}
|
||||
|
||||
bNumPeers, err := bob.ListPeers(ctxt, &lnrpc.ListPeersRequest{})
|
||||
aliceIsBobPeer, err := checkPeerInPeersList(ctxt, bob, alice)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !aliceIsBobPeer {
|
||||
return fmt.Errorf(
|
||||
"unable to fetch %s's node (%v) list peers %v",
|
||||
bob.Name(), bob.NodeID, err,
|
||||
"expected %s and %s to be connected "+
|
||||
"but %s is not in %s's peer list",
|
||||
alice.Name(), bob.Name(),
|
||||
alice.Name(), bob.Name(),
|
||||
)
|
||||
}
|
||||
|
||||
if len(aNumPeers.Peers) != expected {
|
||||
return nil
|
||||
|
||||
}, defaultTimeout)
|
||||
require.NoError(t.t, err)
|
||||
}
|
||||
|
||||
// assertNotConnected asserts that two peers are not connected.
|
||||
func assertNotConnected(t *harnessTest, alice, bob *lntest.HarnessNode) {
|
||||
ctxb := context.Background()
|
||||
ctxt, cancel := context.WithTimeout(ctxb, defaultTimeout)
|
||||
defer cancel()
|
||||
|
||||
err := wait.NoError(func() error {
|
||||
bobIsAlicePeer, err := checkPeerInPeersList(ctxt, alice, bob)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if bobIsAlicePeer {
|
||||
return fmt.Errorf(
|
||||
"number of peers connected to %s is "+
|
||||
"incorrect: expected %v, got %v",
|
||||
alice.Name(), expected, len(aNumPeers.Peers),
|
||||
"expected %s and %s not to be "+
|
||||
"connected but %s is in %s's "+
|
||||
"peer list",
|
||||
alice.Name(), bob.Name(),
|
||||
bob.Name(), alice.Name(),
|
||||
)
|
||||
}
|
||||
if len(bNumPeers.Peers) != expected {
|
||||
|
||||
aliceIsBobPeer, err := checkPeerInPeersList(ctxt, bob, alice)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if aliceIsBobPeer {
|
||||
return fmt.Errorf(
|
||||
"number of peers connected to %s is "+
|
||||
"incorrect: expected %v, got %v",
|
||||
bob.Name(), expected, len(bNumPeers.Peers),
|
||||
"expected %s and %s not to be "+
|
||||
"connected but %s is in %s's "+
|
||||
"peer list",
|
||||
alice.Name(), bob.Name(),
|
||||
alice.Name(), bob.Name(),
|
||||
)
|
||||
}
|
||||
|
||||
|
|
|
@ -49,7 +49,7 @@ func testDisconnectingTargetPeer(net *lntest.NetworkHarness, t *harnessTest) {
|
|||
net.ConnectNodes(t.t, alice, bob)
|
||||
|
||||
// Check existing connection.
|
||||
assertNumConnections(t, alice, bob, 1)
|
||||
assertConnected(t, alice, bob)
|
||||
|
||||
// Give Alice some coins so she can fund a channel.
|
||||
net.SendCoins(t.t, btcutil.SatoshiPerBitcoin, alice)
|
||||
|
@ -82,7 +82,7 @@ func testDisconnectingTargetPeer(net *lntest.NetworkHarness, t *harnessTest) {
|
|||
time.Sleep(time.Millisecond * 300)
|
||||
|
||||
// Assert that the connection was torn down.
|
||||
assertNumConnections(t, alice, bob, 0)
|
||||
assertNotConnected(t, alice, bob)
|
||||
|
||||
fundingTxID, err := chainhash.NewHash(pendingUpdate.Txid)
|
||||
if err != nil {
|
||||
|
@ -128,7 +128,7 @@ func testDisconnectingTargetPeer(net *lntest.NetworkHarness, t *harnessTest) {
|
|||
}
|
||||
|
||||
// Check existing connection.
|
||||
assertNumConnections(t, alice, bob, 0)
|
||||
assertNotConnected(t, alice, bob)
|
||||
|
||||
// Reconnect both nodes before force closing the channel.
|
||||
net.ConnectNodes(t.t, alice, bob)
|
||||
|
@ -152,14 +152,14 @@ func testDisconnectingTargetPeer(net *lntest.NetworkHarness, t *harnessTest) {
|
|||
err)
|
||||
}
|
||||
|
||||
// Check zero peer connections.
|
||||
assertNumConnections(t, alice, bob, 0)
|
||||
// Check that the nodes not connected.
|
||||
assertNotConnected(t, alice, bob)
|
||||
|
||||
// Finally, re-connect both nodes.
|
||||
net.ConnectNodes(t.t, alice, bob)
|
||||
|
||||
// Check existing connection.
|
||||
assertNumConnections(t, alice, net.Bob, 1)
|
||||
assertConnected(t, alice, bob)
|
||||
|
||||
// Cleanup by mining the force close and sweep transaction.
|
||||
cleanupForceClose(t, net, alice, chanPoint)
|
||||
|
|
|
@ -60,6 +60,173 @@ func testNetworkConnectionTimeout(net *lntest.NetworkHarness, t *harnessTest) {
|
|||
assertTimeoutError(ctxt, t, dave, req)
|
||||
}
|
||||
|
||||
// testReconnectAfterIPChange verifies that if a persistent inbound node changes
|
||||
// its listening address then it's peer will still be able to reconnect to it.
|
||||
func testReconnectAfterIPChange(net *lntest.NetworkHarness, t *harnessTest) {
|
||||
// In this test, the following network will be set up. A single
|
||||
// dash line represents a peer connection and a double dash line
|
||||
// represents a channel.
|
||||
// Charlie will create a connection to Dave so that Dave is the inbound
|
||||
// peer. This will be made a persistent connection for Charlie so that
|
||||
// Charlie will attempt to reconnect to Dave if Dave restarts.
|
||||
// A channel will be opened between Dave and Alice to ensure that any
|
||||
// NodeAnnouncements that Dave sends will reach Alice.
|
||||
// The connection between Alice and Charlie ensures that Charlie
|
||||
// receives all of Dave's NodeAnnouncements.
|
||||
// The desired behaviour is that if Dave changes his P2P IP address then
|
||||
// Charlie should still be able to reconnect to him.
|
||||
//
|
||||
// /------- Charlie <-----\
|
||||
// | |
|
||||
// v |
|
||||
// Dave <===============> Alice
|
||||
|
||||
// The first thing we will test is the case where Dave advertises two
|
||||
// external IP addresses and then switches from the first one listed
|
||||
// to the second one listed. The desired behaviour is that Charlie will
|
||||
// attempt both of Dave's advertised addresses when attempting to
|
||||
// reconnect.
|
||||
|
||||
// Create a new node, Charlie.
|
||||
charlie := net.NewNode(t.t, "Charlie", nil)
|
||||
defer shutdownAndAssert(net, t, charlie)
|
||||
|
||||
// We derive two ports for Dave, and we initialise his node with
|
||||
// these ports advertised as `--externalip` arguments.
|
||||
ip1 := lntest.NextAvailablePort()
|
||||
ip2 := lntest.NextAvailablePort()
|
||||
|
||||
advertisedAddrs := []string{
|
||||
fmt.Sprintf("127.0.0.1:%d", ip1),
|
||||
fmt.Sprintf("127.0.0.1:%d", ip2),
|
||||
}
|
||||
|
||||
var daveArgs []string
|
||||
for _, addr := range advertisedAddrs {
|
||||
daveArgs = append(daveArgs, "--externalip="+addr)
|
||||
}
|
||||
|
||||
// withP2PPort is a helper closure used to set the P2P port that a node
|
||||
// should use.
|
||||
var withP2PPort = func(port int) lntest.NodeOption {
|
||||
return func(cfg *lntest.NodeConfig) {
|
||||
cfg.P2PPort = port
|
||||
}
|
||||
}
|
||||
|
||||
// Create a new node, Dave, and ensure that his initial P2P port is
|
||||
// ip1 derived above.
|
||||
dave := net.NewNode(t.t, "Dave", daveArgs, withP2PPort(ip1))
|
||||
defer shutdownAndAssert(net, t, dave)
|
||||
|
||||
// Subscribe to graph notifications from Charlie so that we can tell
|
||||
// when he receives Dave's NodeAnnouncements.
|
||||
ctxb := context.Background()
|
||||
charlieSub := subscribeGraphNotifications(ctxb, t, charlie)
|
||||
defer close(charlieSub.quit)
|
||||
|
||||
// Connect Alice to Dave and Charlie.
|
||||
net.ConnectNodes(t.t, net.Alice, dave)
|
||||
net.ConnectNodes(t.t, net.Alice, charlie)
|
||||
|
||||
// We'll then go ahead and open a channel between Alice and Dave. This
|
||||
// ensures that Charlie receives the node announcement from Alice as
|
||||
// part of the announcement broadcast.
|
||||
chanPoint := openChannelAndAssert(
|
||||
t, net, net.Alice, dave, lntest.OpenChannelParams{
|
||||
Amt: 1000000,
|
||||
},
|
||||
)
|
||||
defer closeChannelAndAssert(t, net, net.Alice, chanPoint, false)
|
||||
|
||||
// waitForNodeAnnouncement is a closure used to wait on the given graph
|
||||
// subscription for a node announcement from a node with the given
|
||||
// public key. It also waits for the node announcement that advertises
|
||||
// a particular set of addresses.
|
||||
waitForNodeAnnouncement := func(graphSub graphSubscription,
|
||||
nodePubKey string, addrs []string) {
|
||||
|
||||
for {
|
||||
select {
|
||||
case graphUpdate := <-graphSub.updateChan:
|
||||
nextUpdate:
|
||||
for _, update := range graphUpdate.NodeUpdates {
|
||||
if update.IdentityKey != nodePubKey {
|
||||
continue
|
||||
}
|
||||
|
||||
addrMap := make(map[string]bool)
|
||||
for _, addr := range update.NodeAddresses {
|
||||
addrMap[addr.GetAddr()] = true
|
||||
}
|
||||
|
||||
for _, addr := range addrs {
|
||||
if !addrMap[addr] {
|
||||
continue nextUpdate
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
case err := <-graphSub.errChan:
|
||||
t.Fatalf("unable to recv graph update: %v", err)
|
||||
|
||||
case <-time.After(defaultTimeout):
|
||||
t.Fatalf("did not receive node ann update")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for Charlie to receive Dave's initial NodeAnnouncement.
|
||||
waitForNodeAnnouncement(charlieSub, dave.PubKeyStr, advertisedAddrs)
|
||||
|
||||
// Now create a persistent connection between Charlie and Bob with no
|
||||
// channels. Charlie is the outbound node and Bob is the inbound node.
|
||||
net.ConnectNodesPerm(t.t, charlie, dave)
|
||||
|
||||
// Assert that Dave and Charlie are connected
|
||||
assertConnected(t, dave, charlie)
|
||||
|
||||
// Change Dave's P2P port to the second IP address that he advertised
|
||||
// and restart his node.
|
||||
dave.Cfg.P2PPort = ip2
|
||||
err := net.RestartNode(dave, nil)
|
||||
require.NoError(t.t, err)
|
||||
|
||||
// assert that Dave and Charlie reconnect successfully after Dave
|
||||
// changes to his second advertised address.
|
||||
assertConnected(t, dave, charlie)
|
||||
|
||||
// Next we test the case where Dave changes his listening address to one
|
||||
// that was not listed in his original advertised addresses. The desired
|
||||
// behaviour is that Charlie will update his connection requests to Dave
|
||||
// when he receives the Node Announcement from Dave with his updated
|
||||
// address.
|
||||
|
||||
// Change Dave's listening port and restart.
|
||||
dave.Cfg.P2PPort = lntest.NextAvailablePort()
|
||||
dave.Cfg.ExtraArgs = []string{
|
||||
fmt.Sprintf(
|
||||
"--externalip=127.0.0.1:%d", dave.Cfg.P2PPort,
|
||||
),
|
||||
}
|
||||
err = net.RestartNode(dave, nil)
|
||||
require.NoError(t.t, err)
|
||||
|
||||
// Show that Charlie does receive Dave's new listening address in
|
||||
// a Node Announcement.
|
||||
waitForNodeAnnouncement(
|
||||
charlieSub, dave.PubKeyStr,
|
||||
[]string{fmt.Sprintf("127.0.0.1:%d", dave.Cfg.P2PPort)},
|
||||
)
|
||||
|
||||
// assert that Dave and Charlie do reconnect after Dave changes his P2P
|
||||
// address to one not listed in Dave's original advertised list of
|
||||
// addresses.
|
||||
assertConnected(t, dave, charlie)
|
||||
}
|
||||
|
||||
// assertTimeoutError asserts that a connection timeout error is raised. A
|
||||
// context with a default timeout is used to make the request. If our customized
|
||||
// connection timeout is less than the default, we won't see the request context
|
||||
|
|
|
@ -40,6 +40,10 @@ var allTestCases = []*testCase{
|
|||
name: "disconnecting target peer",
|
||||
test: testDisconnectingTargetPeer,
|
||||
},
|
||||
{
|
||||
name: "reconnect after ip change",
|
||||
test: testReconnectAfterIPChange,
|
||||
},
|
||||
{
|
||||
name: "graph topology notifications",
|
||||
test: testGraphTopologyNotifications,
|
||||
|
|
337
server.go
337
server.go
|
@ -194,6 +194,7 @@ type server struct {
|
|||
|
||||
persistentPeers map[string]bool
|
||||
persistentPeersBackoff map[string]time.Duration
|
||||
persistentPeerAddrs map[string][]*lnwire.NetAddress
|
||||
persistentConnReqs map[string][]*connmgr.ConnReq
|
||||
persistentRetryCancels map[string]chan struct{}
|
||||
|
||||
|
@ -308,6 +309,89 @@ type server struct {
|
|||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
// updatePersistentPeerAddrs subscribes to topology changes and stores
|
||||
// advertised addresses for any NodeAnnouncements from our persisted peers.
|
||||
func (s *server) updatePersistentPeerAddrs() error {
|
||||
graphSub, err := s.chanRouter.SubscribeTopology()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.wg.Add(1)
|
||||
go func() {
|
||||
defer func() {
|
||||
graphSub.Cancel()
|
||||
s.wg.Done()
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
|
||||
case <-s.quit:
|
||||
return
|
||||
|
||||
case topChange, ok := <-graphSub.TopologyChanges:
|
||||
// If the router is shutting down, then we will
|
||||
// as well.
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
for _, update := range topChange.NodeUpdates {
|
||||
pubKeyStr := string(
|
||||
update.IdentityKey.
|
||||
SerializeCompressed(),
|
||||
)
|
||||
|
||||
// We only care about updates from
|
||||
// our persistentPeers.
|
||||
s.mu.RLock()
|
||||
_, ok := s.persistentPeers[pubKeyStr]
|
||||
s.mu.RUnlock()
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
addrs := make([]*lnwire.NetAddress, 0,
|
||||
len(update.Addresses))
|
||||
|
||||
for _, addr := range update.Addresses {
|
||||
addrs = append(addrs,
|
||||
&lnwire.NetAddress{
|
||||
IdentityKey: update.IdentityKey,
|
||||
Address: addr,
|
||||
ChainNet: s.cfg.ActiveNetParams.Net,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
|
||||
// Update the stored addresses for this
|
||||
// to peer to reflect the new set.
|
||||
s.persistentPeerAddrs[pubKeyStr] = addrs
|
||||
|
||||
// If there are no outstanding
|
||||
// connection requests for this peer
|
||||
// then our work is done since we are
|
||||
// not currently trying to connect to
|
||||
// them.
|
||||
if len(s.persistentConnReqs[pubKeyStr]) == 0 {
|
||||
s.mu.Unlock()
|
||||
continue
|
||||
}
|
||||
|
||||
s.mu.Unlock()
|
||||
|
||||
s.connectToPersistentPeer(pubKeyStr)
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// parseAddr parses an address from its string format to a net.Addr.
|
||||
func parseAddr(address string, netCfg tor.Net) (net.Addr, error) {
|
||||
var (
|
||||
|
@ -465,6 +549,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
|
|||
persistentPeers: make(map[string]bool),
|
||||
persistentPeersBackoff: make(map[string]time.Duration),
|
||||
persistentConnReqs: make(map[string][]*connmgr.ConnReq),
|
||||
persistentPeerAddrs: make(map[string][]*lnwire.NetAddress),
|
||||
persistentRetryCancels: make(map[string]chan struct{}),
|
||||
peerErrors: make(map[string]*queue.CircularBuffer),
|
||||
ignorePeerTermination: make(map[*peer.Brontide]struct{}),
|
||||
|
@ -1740,6 +1825,13 @@ func (s *server) Start() error {
|
|||
return nil
|
||||
})
|
||||
|
||||
// Subscribe to NodeAnnouncements that advertise new addresses
|
||||
// our persistent peers.
|
||||
if err := s.updatePersistentPeerAddrs(); err != nil {
|
||||
startErr = err
|
||||
return
|
||||
}
|
||||
|
||||
// With all the relevant sub-systems started, we'll now attempt
|
||||
// to establish persistent connections to our direct channel
|
||||
// collaborators within the network. Before doing so however,
|
||||
|
@ -3517,94 +3609,181 @@ func (s *server) peerTerminationWatcher(p *peer.Brontide, ready chan struct{}) {
|
|||
s.removePeer(p)
|
||||
|
||||
// Next, check to see if this is a persistent peer or not.
|
||||
_, ok := s.persistentPeers[pubStr]
|
||||
if ok {
|
||||
// We'll only need to re-launch a connection request if one
|
||||
// isn't already currently pending.
|
||||
if _, ok := s.persistentConnReqs[pubStr]; ok {
|
||||
if _, ok := s.persistentPeers[pubStr]; !ok {
|
||||
return
|
||||
}
|
||||
|
||||
// Get the last address that we used to connect to the peer.
|
||||
addrs := []net.Addr{
|
||||
p.NetAddress().Address,
|
||||
}
|
||||
|
||||
// We'll ensure that we locate all the peers advertised addresses for
|
||||
// reconnection purposes.
|
||||
advertisedAddrs, err := s.fetchNodeAdvertisedAddrs(pubKey)
|
||||
switch {
|
||||
// We found advertised addresses, so use them.
|
||||
case err == nil:
|
||||
addrs = advertisedAddrs
|
||||
|
||||
// The peer doesn't have an advertised address.
|
||||
case err == errNoAdvertisedAddr:
|
||||
// If it is an outbound peer then we fall back to the existing
|
||||
// peer address.
|
||||
if !p.Inbound() {
|
||||
break
|
||||
}
|
||||
|
||||
// Fall back to the existing peer address if
|
||||
// we're not accepting connections over Tor.
|
||||
if s.torController == nil {
|
||||
break
|
||||
}
|
||||
|
||||
// If we are, the peer's address won't be known
|
||||
// to us (we'll see a private address, which is
|
||||
// the address used by our onion service to dial
|
||||
// to lnd), so we don't have enough information
|
||||
// to attempt a reconnect.
|
||||
srvrLog.Debugf("Ignoring reconnection attempt "+
|
||||
"to inbound peer %v without "+
|
||||
"advertised address", p)
|
||||
return
|
||||
|
||||
// We came across an error retrieving an advertised
|
||||
// address, log it, and fall back to the existing peer
|
||||
// address.
|
||||
default:
|
||||
srvrLog.Errorf("Unable to retrieve advertised "+
|
||||
"address for node %x: %v", p.PubKey(),
|
||||
err)
|
||||
}
|
||||
|
||||
// Make an easy lookup map so that we can check if an address
|
||||
// is already in the address list that we have stored for this peer.
|
||||
existingAddrs := make(map[string]bool)
|
||||
for _, addr := range s.persistentPeerAddrs[pubStr] {
|
||||
existingAddrs[addr.String()] = true
|
||||
}
|
||||
|
||||
// Add any missing addresses for this peer to persistentPeerAddr.
|
||||
for _, addr := range addrs {
|
||||
if existingAddrs[addr.String()] {
|
||||
continue
|
||||
}
|
||||
|
||||
s.persistentPeerAddrs[pubStr] = append(
|
||||
s.persistentPeerAddrs[pubStr],
|
||||
&lnwire.NetAddress{
|
||||
IdentityKey: p.IdentityKey(),
|
||||
Address: addr,
|
||||
ChainNet: p.NetAddress().ChainNet,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
// Record the computed backoff in the backoff map.
|
||||
backoff := s.nextPeerBackoff(pubStr, p.StartTime())
|
||||
s.persistentPeersBackoff[pubStr] = backoff
|
||||
|
||||
// Initialize a retry canceller for this peer if one does not
|
||||
// exist.
|
||||
cancelChan, ok := s.persistentRetryCancels[pubStr]
|
||||
if !ok {
|
||||
cancelChan = make(chan struct{})
|
||||
s.persistentRetryCancels[pubStr] = cancelChan
|
||||
}
|
||||
|
||||
// We choose not to wait group this go routine since the Connect
|
||||
// call can stall for arbitrarily long if we shutdown while an
|
||||
// outbound connection attempt is being made.
|
||||
go func() {
|
||||
srvrLog.Debugf("Scheduling connection re-establishment to "+
|
||||
"persistent peer %v in %s", p.IdentityKey(), backoff)
|
||||
|
||||
select {
|
||||
case <-time.After(backoff):
|
||||
case <-cancelChan:
|
||||
return
|
||||
case <-s.quit:
|
||||
return
|
||||
}
|
||||
|
||||
// We'll ensure that we locate an advertised address to use
|
||||
// within the peer's address for reconnection purposes.
|
||||
//
|
||||
// TODO(roasbeef): use them all?
|
||||
if p.Inbound() {
|
||||
advertisedAddr, err := s.fetchNodeAdvertisedAddr(pubKey)
|
||||
switch {
|
||||
// We found an advertised address, so use it.
|
||||
case err == nil:
|
||||
p.SetAddress(advertisedAddr)
|
||||
srvrLog.Debugf("Attempting to re-establish persistent "+
|
||||
"connection to peer %v", p.IdentityKey())
|
||||
|
||||
// The peer doesn't have an advertised address.
|
||||
case err == errNoAdvertisedAddr:
|
||||
// Fall back to the existing peer address if
|
||||
// we're not accepting connections over Tor.
|
||||
if s.torController == nil {
|
||||
break
|
||||
}
|
||||
s.connectToPersistentPeer(pubStr)
|
||||
}()
|
||||
}
|
||||
|
||||
// If we are, the peer's address won't be known
|
||||
// to us (we'll see a private address, which is
|
||||
// the address used by our onion service to dial
|
||||
// to lnd), so we don't have enough information
|
||||
// to attempt a reconnect.
|
||||
srvrLog.Debugf("Ignoring reconnection attempt "+
|
||||
"to inbound peer %v without "+
|
||||
"advertised address", p)
|
||||
return
|
||||
// connectToPersistentPeer uses all the stored addresses for a peer to attempt
|
||||
// to connect to the peer. It creates connection requests if there are
|
||||
// currently none for a given address and it removes old connection requests
|
||||
// if the associated address is no longer in the latest address list for the
|
||||
// peer.
|
||||
func (s *server) connectToPersistentPeer(pubKeyStr string) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
// We came across an error retrieving an advertised
|
||||
// address, log it, and fall back to the existing peer
|
||||
// address.
|
||||
default:
|
||||
srvrLog.Errorf("Unable to retrieve advertised "+
|
||||
"address for node %x: %v", p.PubKey(),
|
||||
err)
|
||||
}
|
||||
// Create an easy lookup map of the addresses we have stored for the
|
||||
// peer. We will remove entries from this map if we have existing
|
||||
// connection requests for the associated address and then any leftover
|
||||
// entries will indicate which addresses we should create new
|
||||
// connection requests for.
|
||||
addrMap := make(map[string]*lnwire.NetAddress)
|
||||
for _, addr := range s.persistentPeerAddrs[pubKeyStr] {
|
||||
addrMap[addr.String()] = addr
|
||||
}
|
||||
|
||||
// Go through each of the existing connection requests and
|
||||
// check if they correspond to the latest set of addresses. If
|
||||
// there is a connection requests that does not use one of the latest
|
||||
// advertised addresses then remove that connection request.
|
||||
var updatedConnReqs []*connmgr.ConnReq
|
||||
for _, connReq := range s.persistentConnReqs[pubKeyStr] {
|
||||
lnAddr := connReq.Addr.(*lnwire.NetAddress).Address.String()
|
||||
|
||||
switch _, ok := addrMap[lnAddr]; ok {
|
||||
// If the existing connection request is using one of the
|
||||
// latest advertised addresses for the peer then we add it to
|
||||
// updatedConnReqs and remove the associated address from
|
||||
// addrMap so that we don't recreate this connReq later on.
|
||||
case true:
|
||||
updatedConnReqs = append(
|
||||
updatedConnReqs, connReq,
|
||||
)
|
||||
delete(addrMap, lnAddr)
|
||||
|
||||
// If the existing connection request is using an address that
|
||||
// is not one of the latest advertised addresses for the peer
|
||||
// then we remove the connecting request from the connection
|
||||
// manager.
|
||||
case false:
|
||||
srvrLog.Info(
|
||||
"Removing conn req:", connReq.Addr.String(),
|
||||
)
|
||||
s.connMgr.Remove(connReq.ID())
|
||||
}
|
||||
}
|
||||
|
||||
s.persistentConnReqs[pubKeyStr] = updatedConnReqs
|
||||
|
||||
// Any addresses left in addrMap are new ones that we have not made
|
||||
// connection requests for. So create new connection requests for those.
|
||||
for _, addr := range s.persistentPeerAddrs[pubKeyStr] {
|
||||
if _, ok := addrMap[addr.String()]; !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
// Otherwise, we'll launch a new connection request in order to
|
||||
// attempt to maintain a persistent connection with this peer.
|
||||
connReq := &connmgr.ConnReq{
|
||||
Addr: p.NetAddress(),
|
||||
Addr: addr,
|
||||
Permanent: true,
|
||||
}
|
||||
s.persistentConnReqs[pubStr] = append(
|
||||
s.persistentConnReqs[pubStr], connReq)
|
||||
|
||||
// Record the computed backoff in the backoff map.
|
||||
backoff := s.nextPeerBackoff(pubStr, p.StartTime())
|
||||
s.persistentPeersBackoff[pubStr] = backoff
|
||||
|
||||
// Initialize a retry canceller for this peer if one does not
|
||||
// exist.
|
||||
cancelChan, ok := s.persistentRetryCancels[pubStr]
|
||||
if !ok {
|
||||
cancelChan = make(chan struct{})
|
||||
s.persistentRetryCancels[pubStr] = cancelChan
|
||||
}
|
||||
|
||||
// We choose not to wait group this go routine since the Connect
|
||||
// call can stall for arbitrarily long if we shutdown while an
|
||||
// outbound connection attempt is being made.
|
||||
go func() {
|
||||
srvrLog.Debugf("Scheduling connection re-establishment to "+
|
||||
"persistent peer %v in %s", p, backoff)
|
||||
|
||||
select {
|
||||
case <-time.After(backoff):
|
||||
case <-cancelChan:
|
||||
return
|
||||
case <-s.quit:
|
||||
return
|
||||
}
|
||||
|
||||
srvrLog.Debugf("Attempting to re-establish persistent "+
|
||||
"connection to peer %v", p)
|
||||
|
||||
s.connMgr.Connect(connReq)
|
||||
}()
|
||||
s.persistentConnReqs[pubKeyStr] = append(
|
||||
s.persistentConnReqs[pubKeyStr], connReq,
|
||||
)
|
||||
go s.connMgr.Connect(connReq)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3922,8 +4101,8 @@ func computeNextBackoff(currBackoff, maxBackoff time.Duration) time.Duration {
|
|||
// advertised address of a node, but they don't have one.
|
||||
var errNoAdvertisedAddr = errors.New("no advertised address found")
|
||||
|
||||
// fetchNodeAdvertisedAddr attempts to fetch an advertised address of a node.
|
||||
func (s *server) fetchNodeAdvertisedAddr(pub *btcec.PublicKey) (net.Addr, error) {
|
||||
// fetchNodeAdvertisedAddrs attempts to fetch the advertised addresses of a node.
|
||||
func (s *server) fetchNodeAdvertisedAddrs(pub *btcec.PublicKey) ([]net.Addr, error) {
|
||||
vertex, err := route.NewVertexFromBytes(pub.SerializeCompressed())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -3938,7 +4117,7 @@ func (s *server) fetchNodeAdvertisedAddr(pub *btcec.PublicKey) (net.Addr, error)
|
|||
return nil, errNoAdvertisedAddr
|
||||
}
|
||||
|
||||
return node.Addresses[0], nil
|
||||
return node.Addresses, nil
|
||||
}
|
||||
|
||||
// fetchLastChanUpdate returns a function which is able to retrieve our latest
|
||||
|
|
Loading…
Add table
Reference in a new issue