diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index 7cfc7bce8..5bc84d390 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -765,7 +765,7 @@ func createTestCtx(t *testing.T, startHeight uint32) (*testCtx, error) { peerChan chan<- lnpeer.Peer) { pk, _ := btcec.ParsePubKey(target[:]) - peerChan <- &mockPeer{pk, nil, nil} + peerChan <- &mockPeer{pk, nil, nil, atomic.Bool{}} }, NotifyWhenOffline: func(_ [33]byte) <-chan struct{} { c := make(chan struct{}) @@ -843,7 +843,7 @@ func TestProcessAnnouncement(t *testing.T) { } } - nodePeer := &mockPeer{remoteKeyPriv1.PubKey(), nil, nil} + nodePeer := &mockPeer{remoteKeyPriv1.PubKey(), nil, nil, atomic.Bool{}} // First, we'll craft a valid remote channel announcement and send it to // the gossiper so that it can be processed. @@ -953,7 +953,7 @@ func TestPrematureAnnouncement(t *testing.T) { _, err = createNodeAnnouncement(remoteKeyPriv1, timestamp) require.NoError(t, err, "can't create node announcement") - nodePeer := &mockPeer{remoteKeyPriv1.PubKey(), nil, nil} + nodePeer := &mockPeer{remoteKeyPriv1.PubKey(), nil, nil, atomic.Bool{}} // Pretending that we receive the valid channel announcement from // remote side, but block height of this announcement is greater than @@ -990,7 +990,9 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) { pk, _ := btcec.ParsePubKey(target[:]) select { - case peerChan <- &mockPeer{pk, sentMsgs, ctx.gossiper.quit}: + case peerChan <- &mockPeer{ + pk, sentMsgs, ctx.gossiper.quit, atomic.Bool{}, + }: case <-ctx.gossiper.quit: } } @@ -1000,7 +1002,9 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) { remoteKey, err := btcec.ParsePubKey(batch.nodeAnn2.NodeID[:]) require.NoError(t, err, "unable to parse pubkey") - remotePeer := &mockPeer{remoteKey, sentMsgs, ctx.gossiper.quit} + remotePeer := &mockPeer{ + remoteKey, sentMsgs, ctx.gossiper.quit, atomic.Bool{}, + } // Recreate lightning network topology. Initialize router with channel // between two nodes. @@ -1162,7 +1166,9 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { pk, _ := btcec.ParsePubKey(target[:]) select { - case peerChan <- &mockPeer{pk, sentMsgs, ctx.gossiper.quit}: + case peerChan <- &mockPeer{ + pk, sentMsgs, ctx.gossiper.quit, atomic.Bool{}, + }: case <-ctx.gossiper.quit: } } @@ -1172,7 +1178,9 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { remoteKey, err := btcec.ParsePubKey(batch.nodeAnn2.NodeID[:]) require.NoError(t, err, "unable to parse pubkey") - remotePeer := &mockPeer{remoteKey, sentMsgs, ctx.gossiper.quit} + remotePeer := &mockPeer{ + remoteKey, sentMsgs, ctx.gossiper.quit, atomic.Bool{}, + } // Pretending that we receive local channel announcement from funding // manager, thereby kick off the announcement exchange process, in @@ -1344,7 +1352,9 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { // Set up a channel to intercept the messages sent to the remote peer. sentToPeer := make(chan lnwire.Message, 1) - remotePeer := &mockPeer{remoteKey, sentToPeer, ctx.gossiper.quit} + remotePeer := &mockPeer{ + remoteKey, sentToPeer, ctx.gossiper.quit, atomic.Bool{}, + } // Since the reliable send to the remote peer of the local channel proof // requires a notification when the peer comes online, we'll capture the @@ -1578,7 +1588,9 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) { // Set up a channel we can use to inspect messages sent by the // gossiper to the remote peer. sentToPeer := make(chan lnwire.Message, 1) - remotePeer := &mockPeer{remoteKey, sentToPeer, ctx.gossiper.quit} + remotePeer := &mockPeer{ + remoteKey, sentToPeer, ctx.gossiper.quit, atomic.Bool{}, + } // Override NotifyWhenOnline to return the remote peer which we expect // meesages to be sent to. @@ -1772,7 +1784,7 @@ func TestDeDuplicatedAnnouncements(t *testing.T) { ca, err := createRemoteChannelAnnouncement(0) require.NoError(t, err, "can't create remote channel announcement") - nodePeer := &mockPeer{bitcoinKeyPub2, nil, nil} + nodePeer := &mockPeer{bitcoinKeyPub2, nil, nil, atomic.Bool{}} announcements.AddMsgs(networkMsg{ msg: ca, peer: nodePeer, @@ -2058,7 +2070,7 @@ func TestForwardPrivateNodeAnnouncement(t *testing.T) { // process it. remoteChanAnn, err := createRemoteChannelAnnouncement(startingHeight - 1) require.NoError(t, err, "unable to create remote channel announcement") - peer := &mockPeer{pubKey, nil, nil} + peer := &mockPeer{pubKey, nil, nil, atomic.Bool{}} select { case err := <-ctx.gossiper.ProcessRemoteAnnouncement(remoteChanAnn, peer): @@ -2373,7 +2385,9 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { // Set up a channel that we can use to inspect the messages sent // directly from the gossiper. sentMsgs := make(chan lnwire.Message, 10) - remotePeer := &mockPeer{remoteKey, sentMsgs, ctx.gossiper.quit} + remotePeer := &mockPeer{ + remoteKey, sentMsgs, ctx.gossiper.quit, atomic.Bool{}, + } // Override NotifyWhenOnline to return the remote peer which we expect // messages to be sent to. @@ -2561,7 +2575,9 @@ func TestExtraDataChannelAnnouncementValidation(t *testing.T) { ctx, err := createTestCtx(t, 0) require.NoError(t, err, "can't create context") - remotePeer := &mockPeer{remoteKeyPriv1.PubKey(), nil, nil} + remotePeer := &mockPeer{ + remoteKeyPriv1.PubKey(), nil, nil, atomic.Bool{}, + } // We'll now create an announcement that contains an extra set of bytes // that we don't know of ourselves, but should still include in the @@ -2592,7 +2608,9 @@ func TestExtraDataChannelUpdateValidation(t *testing.T) { ctx, err := createTestCtx(t, 0) require.NoError(t, err, "can't create context") - remotePeer := &mockPeer{remoteKeyPriv1.PubKey(), nil, nil} + remotePeer := &mockPeer{ + remoteKeyPriv1.PubKey(), nil, nil, atomic.Bool{}, + } // In this scenario, we'll create two announcements, one regular // channel announcement, and another channel update announcement, that @@ -2643,7 +2661,9 @@ func TestExtraDataNodeAnnouncementValidation(t *testing.T) { ctx, err := createTestCtx(t, 0) require.NoError(t, err, "can't create context") - remotePeer := &mockPeer{remoteKeyPriv1.PubKey(), nil, nil} + remotePeer := &mockPeer{ + remoteKeyPriv1.PubKey(), nil, nil, atomic.Bool{}, + } timestamp := testTimestamp // We'll create a node announcement that includes a set of opaque data @@ -2716,7 +2736,7 @@ func TestRetransmit(t *testing.T) { remoteKey, err := btcec.ParsePubKey(batch.nodeAnn2.NodeID[:]) require.NoError(t, err, "unable to parse pubkey") - remotePeer := &mockPeer{remoteKey, nil, nil} + remotePeer := &mockPeer{remoteKey, nil, nil, atomic.Bool{}} // Process a local channel announcement, channel update and node // announcement. No messages should be broadcasted yet, since no proof @@ -2822,7 +2842,7 @@ func TestNodeAnnouncementNoChannels(t *testing.T) { remoteKey, err := btcec.ParsePubKey(batch.nodeAnn2.NodeID[:]) require.NoError(t, err, "unable to parse pubkey") - remotePeer := &mockPeer{remoteKey, nil, nil} + remotePeer := &mockPeer{remoteKey, nil, nil, atomic.Bool{}} // Process the remote node announcement. select { @@ -2906,7 +2926,7 @@ func TestOptionalFieldsChannelUpdateValidation(t *testing.T) { chanUpdateHeight := uint32(0) timestamp := uint32(123456) - nodePeer := &mockPeer{remoteKeyPriv1.PubKey(), nil, nil} + nodePeer := &mockPeer{remoteKeyPriv1.PubKey(), nil, nil, atomic.Bool{}} // In this scenario, we'll test whether the message flags field in a // channel update is properly handled. @@ -3013,7 +3033,9 @@ func TestSendChannelUpdateReliably(t *testing.T) { // Set up a channel we can use to inspect messages sent by the // gossiper to the remote peer. sentToPeer := make(chan lnwire.Message, 1) - remotePeer := &mockPeer{remoteKey, sentToPeer, ctx.gossiper.quit} + remotePeer := &mockPeer{ + remoteKey, sentToPeer, ctx.gossiper.quit, atomic.Bool{}, + } // Since we first wait to be notified of the peer before attempting to // send the message, we'll overwrite NotifyWhenOnline and @@ -3367,7 +3389,9 @@ func TestPropagateChanPolicyUpdate(t *testing.T) { remoteKey := remoteKeyPriv1.PubKey() sentMsgs := make(chan lnwire.Message, 10) - remotePeer := &mockPeer{remoteKey, sentMsgs, ctx.gossiper.quit} + remotePeer := &mockPeer{ + remoteKey, sentMsgs, ctx.gossiper.quit, atomic.Bool{}, + } // The forced code path for sending the private ChannelUpdate to the // remote peer will be hit, forcing it to request a notification that @@ -3715,7 +3739,9 @@ func TestBroadcastAnnsAfterGraphSynced(t *testing.T) { t.Helper() - nodePeer := &mockPeer{remoteKeyPriv1.PubKey(), nil, nil} + nodePeer := &mockPeer{ + remoteKeyPriv1.PubKey(), nil, nil, atomic.Bool{}, + } var errChan chan error if isRemote { errChan = ctx.gossiper.ProcessRemoteAnnouncement( @@ -3791,7 +3817,9 @@ func TestRateLimitChannelUpdates(t *testing.T) { batch, err := createRemoteAnnouncements(blockHeight) require.NoError(t, err) - nodePeer1 := &mockPeer{remoteKeyPriv1.PubKey(), nil, nil} + nodePeer1 := &mockPeer{ + remoteKeyPriv1.PubKey(), nil, nil, atomic.Bool{}, + } select { case err := <-ctx.gossiper.ProcessRemoteAnnouncement( batch.chanAnn, nodePeer1, @@ -3810,7 +3838,9 @@ func TestRateLimitChannelUpdates(t *testing.T) { t.Fatal("remote announcement not processed") } - nodePeer2 := &mockPeer{remoteKeyPriv2.PubKey(), nil, nil} + nodePeer2 := &mockPeer{ + remoteKeyPriv2.PubKey(), nil, nil, atomic.Bool{}, + } select { case err := <-ctx.gossiper.ProcessRemoteAnnouncement( batch.chanUpdAnn2, nodePeer2, @@ -3929,7 +3959,7 @@ func TestIgnoreOwnAnnouncement(t *testing.T) { remoteKey, err := btcec.ParsePubKey(batch.nodeAnn2.NodeID[:]) require.NoError(t, err, "unable to parse pubkey") - remotePeer := &mockPeer{remoteKey, nil, nil} + remotePeer := &mockPeer{remoteKey, nil, nil, atomic.Bool{}} // Try to let the remote peer tell us about the channel we are part of. select { @@ -4075,7 +4105,7 @@ func TestRejectCacheChannelAnn(t *testing.T) { remoteKey, err := btcec.ParsePubKey(batch.nodeAnn2.NodeID[:]) require.NoError(t, err, "unable to parse pubkey") - remotePeer := &mockPeer{remoteKey, nil, nil} + remotePeer := &mockPeer{remoteKey, nil, nil, atomic.Bool{}} // Before sending over the announcement, we'll modify it such that we // know it will always fail. diff --git a/discovery/mock_test.go b/discovery/mock_test.go index 4f5c5d4e4..f34a62514 100644 --- a/discovery/mock_test.go +++ b/discovery/mock_test.go @@ -4,6 +4,7 @@ import ( "errors" "net" "sync" + "sync/atomic" "github.com/btcsuite/btcd/btcec/v2" "github.com/btcsuite/btcd/wire" @@ -14,9 +15,10 @@ import ( // mockPeer implements the lnpeer.Peer interface and is used to test the // gossiper's interaction with peers. type mockPeer struct { - pk *btcec.PublicKey - sentMsgs chan lnwire.Message - quit chan struct{} + pk *btcec.PublicKey + sentMsgs chan lnwire.Message + quit chan struct{} + disconnected atomic.Bool } var _ lnpeer.Peer = (*mockPeer)(nil) @@ -74,6 +76,10 @@ func (p *mockPeer) RemovePendingChannel(_ lnwire.ChannelID) error { return nil } +func (p *mockPeer) Disconnect(err error) { + p.disconnected.Store(true) +} + // mockMessageStore is an in-memory implementation of the MessageStore interface // used for the gossiper's unit tests. type mockMessageStore struct { diff --git a/discovery/reliable_sender_test.go b/discovery/reliable_sender_test.go index d1e69b11f..19fdaa1ca 100644 --- a/discovery/reliable_sender_test.go +++ b/discovery/reliable_sender_test.go @@ -2,6 +2,7 @@ package discovery import ( "fmt" + "sync/atomic" "testing" "time" @@ -74,7 +75,7 @@ func TestReliableSenderFlow(t *testing.T) { // Create a mock peer to send the messages to. pubKey := randPubKey(t) msgsSent := make(chan lnwire.Message) - peer := &mockPeer{pubKey, msgsSent, reliableSender.quit} + peer := &mockPeer{pubKey, msgsSent, reliableSender.quit, atomic.Bool{}} // Override NotifyWhenOnline and NotifyWhenOffline to provide the // notification channels so that we can control when notifications get @@ -193,7 +194,7 @@ func TestReliableSenderStaleMessages(t *testing.T) { // Create a mock peer to send the messages to. pubKey := randPubKey(t) msgsSent := make(chan lnwire.Message) - peer := &mockPeer{pubKey, msgsSent, reliableSender.quit} + peer := &mockPeer{pubKey, msgsSent, reliableSender.quit, atomic.Bool{}} // Override NotifyWhenOnline to provide the notification channel so that // we can control when notifications get dispatched. diff --git a/funding/manager_test.go b/funding/manager_test.go index 9db175ec3..c4c8b4f36 100644 --- a/funding/manager_test.go +++ b/funding/manager_test.go @@ -283,6 +283,8 @@ type testNode struct { var _ lnpeer.Peer = (*testNode)(nil) +func (n *testNode) Disconnect(err error) {} + func (n *testNode) IdentityKey() *btcec.PublicKey { return n.addr.IdentityKey } diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index 8343d5a1c..60010d6ff 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -2072,6 +2072,8 @@ func (m *mockPeer) QuitSignal() <-chan struct{} { return m.quit } +func (m *mockPeer) Disconnect(err error) {} + var _ lnpeer.Peer = (*mockPeer)(nil) func (m *mockPeer) SendMessage(sync bool, msgs ...lnwire.Message) error { diff --git a/htlcswitch/mock.go b/htlcswitch/mock.go index 6a9628f94..c328bc533 100644 --- a/htlcswitch/mock.go +++ b/htlcswitch/mock.go @@ -684,6 +684,8 @@ func (s *mockServer) RemoteFeatures() *lnwire.FeatureVector { return nil } +func (s *mockServer) Disconnect(err error) {} + func (s *mockServer) Stop() error { if !atomic.CompareAndSwapInt32(&s.shutdown, 0, 1) { return nil diff --git a/lnpeer/mock_peer.go b/lnpeer/mock_peer.go index 908c9a0c9..7fed3e4a0 100644 --- a/lnpeer/mock_peer.go +++ b/lnpeer/mock_peer.go @@ -79,3 +79,5 @@ func (m *MockPeer) RemoteFeatures() *lnwire.FeatureVector { args := m.Called() return args.Get(0).(*lnwire.FeatureVector) } + +func (m *MockPeer) Disconnect(err error) {} diff --git a/lnpeer/peer.go b/lnpeer/peer.go index f7d2e971b..cb6bc9867 100644 --- a/lnpeer/peer.go +++ b/lnpeer/peer.go @@ -74,4 +74,7 @@ type Peer interface { // by the remote peer. This allows sub-systems that use this interface // to gate their behavior off the set of negotiated feature bits. RemoteFeatures() *lnwire.FeatureVector + + // Disconnect halts communication with the peer. + Disconnect(error) }