From 9acad37f57ebdce4e15b4432a5f96a27c511667e Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Wed, 31 Jul 2024 18:03:57 -0700 Subject: [PATCH] peer: always send channel update on reconnect We have existing logic to attempt to reliably send a channel update to the remote peer. In the wild, we've seen this fail, as it's possible right when we send the update the peer disconnects. In this commit, we implement a simple fix which is just to send the chan update each time we connect to the remote party. Fixes https://github.com/lightningnetwork/lnd/issues/6870. --- docs/release-notes/release-notes-0.18.3.md | 4 ++ peer/brontide.go | 70 ++++++++++++++++++++++ peer/brontide_test.go | 45 ++++++++++++++ peer/test_utils.go | 14 ++++- 4 files changed, 131 insertions(+), 2 deletions(-) diff --git a/docs/release-notes/release-notes-0.18.3.md b/docs/release-notes/release-notes-0.18.3.md index a1a3209c0..e83836367 100644 --- a/docs/release-notes/release-notes-0.18.3.md +++ b/docs/release-notes/release-notes-0.18.3.md @@ -57,6 +57,9 @@ commitment when the channel was force closed. [here](https://github.com/lightningnetwork/lnd/issues/8146) for a summary of the issue. +* We'll now always send [channel updates to our remote peer for open + channels](https://github.com/lightningnetwork/lnd/pull/8963). + # New Features ## Functional Enhancements ## RPC Additions @@ -243,6 +246,7 @@ commitment when the channel was force closed. * Elle Mouton * Eugene Siegel * Matheus Degiovani +* Olaoluwa Osuntokun * Oliver Gugger * Slyghtning * Yong Yu diff --git a/peer/brontide.go b/peer/brontide.go index 5176959e2..920a1bcca 100644 --- a/peer/brontide.go +++ b/peer/brontide.go @@ -787,7 +787,9 @@ func (p *Brontide) Start() error { // // TODO(wilmer): Remove this once we're able to query for node // announcements through their timestamps. + p.wg.Add(2) go p.maybeSendNodeAnn(activeChans) + go p.maybeSendChannelUpdates() return nil } @@ -1218,6 +1220,8 @@ func (p *Brontide) addLink(chanPoint *wire.OutPoint, // maybeSendNodeAnn sends our node announcement to the remote peer if at least // one confirmed public channel exists with them. func (p *Brontide) maybeSendNodeAnn(channels []*channeldb.OpenChannel) { + defer p.wg.Done() + hasConfirmedPublicChan := false for _, channel := range channels { if channel.IsPending { @@ -1245,6 +1249,72 @@ func (p *Brontide) maybeSendNodeAnn(channels []*channeldb.OpenChannel) { } } +// maybeSendChannelUpdates sends our channel updates to the remote peer if we +// have any active channels with them. +func (p *Brontide) maybeSendChannelUpdates() { + defer p.wg.Done() + + // If we don't have any active channels, then we can exit early. + if p.activeChannels.Len() == 0 { + return + } + + maybeSendUpd := func(cid lnwire.ChannelID, + lnChan *lnwallet.LightningChannel) error { + + // Nil channels are pending, so we'll skip them. + if lnChan == nil { + return nil + } + + dbChan := lnChan.State() + scid := func() lnwire.ShortChannelID { + switch { + // Otherwise if it's a zero conf channel and confirmed, + // then we need to use the "real" scid. + case dbChan.IsZeroConf() && dbChan.ZeroConfConfirmed(): + return dbChan.ZeroConfRealScid() + + // Otherwise, we can use the normal scid. + default: + return dbChan.ShortChanID() + } + }() + + // Now that we know the channel is in a good state, we'll try + // to fetch the update to send to the remote peer. If the + // channel is pending, and not a zero conf channel, we'll get + // an error here which we'll ignore. + chanUpd, err := p.cfg.FetchLastChanUpdate(scid) + if err != nil { + p.log.Debugf("Unable to fetch channel update for "+ + "ChannelPoint(%v), scid=%v: %v", + dbChan.FundingOutpoint, dbChan.ShortChanID, err) + + return nil + } + + p.log.Debugf("Sending channel update for ChannelPoint(%v), "+ + "scid=%v", dbChan.FundingOutpoint, dbChan.ShortChanID) + + // We'll send it as a normal message instead of using the lazy + // queue to prioritize transmission of the fresh update. + if err := p.SendMessage(false, chanUpd); err != nil { + err := fmt.Errorf("unable to send channel update for "+ + "ChannelPoint(%v), scid=%v: %w", + dbChan.FundingOutpoint, dbChan.ShortChanID(), + err) + p.log.Errorf(err.Error()) + + return err + } + + return nil + } + + p.activeChannels.ForEach(maybeSendUpd) +} + // WaitForDisconnect waits until the peer has disconnected. A peer may be // disconnected if the local or remote side terminates the connection, or an // irrecoverable protocol error has been encountered. This method will only diff --git a/peer/brontide_test.go b/peer/brontide_test.go index d2f3fc7bc..c3d1bee48 100644 --- a/peer/brontide_test.go +++ b/peer/brontide_test.go @@ -1100,6 +1100,51 @@ func TestUpdateNextRevocation(t *testing.T) { // `lnwallet.LightningWallet` once it's interfaced. } +func assertMsgSent(t *testing.T, conn *mockMessageConn, + msgType lnwire.MessageType) { + + t.Helper() + + require := require.New(t) + + rawMsg, err := fn.RecvOrTimeout(conn.writtenMessages, timeout) + require.NoError(err) + + msgReader := bytes.NewReader(rawMsg) + msg, err := lnwire.ReadMessage(msgReader, 0) + require.NoError(err) + + require.Equal(msgType, msg.MsgType()) +} + +// TestAlwaysSendChannelUpdate tests that each time we connect to the peer if +// an active channel, we always send the latest channel update. +func TestAlwaysSendChannelUpdate(t *testing.T) { + require := require.New(t) + + var channel *channeldb.OpenChannel + channelIntercept := func(a, b *channeldb.OpenChannel) { + channel = a + } + + harness, err := createTestPeerWithChannel(t, channelIntercept) + require.NoError(err, "unable to create test channels") + + // Avoid the need to mock the channel graph by marking the channel + // borked. Borked channels still get a reestablish message sent on + // reconnect, while skipping channel graph checks and link creation. + require.NoError(channel.MarkBorked()) + + // Start the peer, which'll trigger the normal init and start up logic. + startPeerDone := startPeer(t, harness.mockConn, harness.peer) + _, err = fn.RecvOrTimeout(startPeerDone, 2*timeout) + require.NoError(err) + + // Assert that we eventually send a channel update. + assertMsgSent(t, harness.mockConn, lnwire.MsgChannelReestablish) + assertMsgSent(t, harness.mockConn, lnwire.MsgChannelUpdate) +} + // TODO(yy): add test for `addActiveChannel` and `handleNewActiveChannel` once // we have interfaced `lnwallet.LightningChannel` and // `*contractcourt.ChainArbitrator`. diff --git a/peer/test_utils.go b/peer/test_utils.go index 8d1355b18..e0ae29be8 100644 --- a/peer/test_utils.go +++ b/peer/test_utils.go @@ -341,6 +341,7 @@ func createTestPeerWithChannel(t *testing.T, updateChan func(a, notifier: notifier, publishTx: publishTx, mockSwitch: mockSwitch, + mockConn: params.mockConn, }, nil } @@ -493,10 +494,14 @@ func (m *mockMessageConn) Flush() (int, error) { // the bytes sent into the mock's writtenMessages channel. func (m *mockMessageConn) WriteMessage(msg []byte) error { m.writeRaceDetectingCounter++ + + msgCopy := make([]byte, len(msg)) + copy(msgCopy, msg) + select { - case m.writtenMessages <- msg: + case m.writtenMessages <- msgCopy: case <-time.After(timeout): - m.t.Fatalf("timeout sending message: %v", msg) + m.t.Fatalf("timeout sending message: %v", msgCopy) } return nil @@ -713,6 +718,11 @@ func createTestPeer(t *testing.T) *peerTestCtx { return nil }, PongBuf: make([]byte, lnwire.MaxPongBytes), + FetchLastChanUpdate: func(chanID lnwire.ShortChannelID, + ) (*lnwire.ChannelUpdate, error) { + + return &lnwire.ChannelUpdate{}, nil + }, } alicePeer := NewBrontide(*cfg)