Merge pull request #8963 from Roasbeef/always-send-chan-upd

peer: always send channel update on reconnect
This commit is contained in:
Olaoluwa Osuntokun 2024-08-07 18:50:34 -07:00 committed by GitHub
commit 51f3577ef9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 131 additions and 2 deletions

View File

@ -57,6 +57,9 @@ commitment when the channel was force closed.
[here](https://github.com/lightningnetwork/lnd/issues/8146) for a summary of
the issue.
* We'll now always send [channel updates to our remote peer for open
channels](https://github.com/lightningnetwork/lnd/pull/8963).
# New Features
## Functional Enhancements
## RPC Additions
@ -247,6 +250,7 @@ commitment when the channel was force closed.
* Elle Mouton
* Eugene Siegel
* Matheus Degiovani
* Olaoluwa Osuntokun
* Oliver Gugger
* Slyghtning
* Yong Yu

View File

@ -787,7 +787,9 @@ func (p *Brontide) Start() error {
//
// TODO(wilmer): Remove this once we're able to query for node
// announcements through their timestamps.
p.wg.Add(2)
go p.maybeSendNodeAnn(activeChans)
go p.maybeSendChannelUpdates()
return nil
}
@ -1218,6 +1220,8 @@ func (p *Brontide) addLink(chanPoint *wire.OutPoint,
// maybeSendNodeAnn sends our node announcement to the remote peer if at least
// one confirmed public channel exists with them.
func (p *Brontide) maybeSendNodeAnn(channels []*channeldb.OpenChannel) {
defer p.wg.Done()
hasConfirmedPublicChan := false
for _, channel := range channels {
if channel.IsPending {
@ -1245,6 +1249,72 @@ func (p *Brontide) maybeSendNodeAnn(channels []*channeldb.OpenChannel) {
}
}
// maybeSendChannelUpdates sends our channel updates to the remote peer if we
// have any active channels with them.
func (p *Brontide) maybeSendChannelUpdates() {
defer p.wg.Done()
// If we don't have any active channels, then we can exit early.
if p.activeChannels.Len() == 0 {
return
}
maybeSendUpd := func(cid lnwire.ChannelID,
lnChan *lnwallet.LightningChannel) error {
// Nil channels are pending, so we'll skip them.
if lnChan == nil {
return nil
}
dbChan := lnChan.State()
scid := func() lnwire.ShortChannelID {
switch {
// Otherwise if it's a zero conf channel and confirmed,
// then we need to use the "real" scid.
case dbChan.IsZeroConf() && dbChan.ZeroConfConfirmed():
return dbChan.ZeroConfRealScid()
// Otherwise, we can use the normal scid.
default:
return dbChan.ShortChanID()
}
}()
// Now that we know the channel is in a good state, we'll try
// to fetch the update to send to the remote peer. If the
// channel is pending, and not a zero conf channel, we'll get
// an error here which we'll ignore.
chanUpd, err := p.cfg.FetchLastChanUpdate(scid)
if err != nil {
p.log.Debugf("Unable to fetch channel update for "+
"ChannelPoint(%v), scid=%v: %v",
dbChan.FundingOutpoint, dbChan.ShortChanID, err)
return nil
}
p.log.Debugf("Sending channel update for ChannelPoint(%v), "+
"scid=%v", dbChan.FundingOutpoint, dbChan.ShortChanID)
// We'll send it as a normal message instead of using the lazy
// queue to prioritize transmission of the fresh update.
if err := p.SendMessage(false, chanUpd); err != nil {
err := fmt.Errorf("unable to send channel update for "+
"ChannelPoint(%v), scid=%v: %w",
dbChan.FundingOutpoint, dbChan.ShortChanID(),
err)
p.log.Errorf(err.Error())
return err
}
return nil
}
p.activeChannels.ForEach(maybeSendUpd)
}
// WaitForDisconnect waits until the peer has disconnected. A peer may be
// disconnected if the local or remote side terminates the connection, or an
// irrecoverable protocol error has been encountered. This method will only

View File

@ -1100,6 +1100,51 @@ func TestUpdateNextRevocation(t *testing.T) {
// `lnwallet.LightningWallet` once it's interfaced.
}
func assertMsgSent(t *testing.T, conn *mockMessageConn,
msgType lnwire.MessageType) {
t.Helper()
require := require.New(t)
rawMsg, err := fn.RecvOrTimeout(conn.writtenMessages, timeout)
require.NoError(err)
msgReader := bytes.NewReader(rawMsg)
msg, err := lnwire.ReadMessage(msgReader, 0)
require.NoError(err)
require.Equal(msgType, msg.MsgType())
}
// TestAlwaysSendChannelUpdate tests that each time we connect to the peer if
// an active channel, we always send the latest channel update.
func TestAlwaysSendChannelUpdate(t *testing.T) {
require := require.New(t)
var channel *channeldb.OpenChannel
channelIntercept := func(a, b *channeldb.OpenChannel) {
channel = a
}
harness, err := createTestPeerWithChannel(t, channelIntercept)
require.NoError(err, "unable to create test channels")
// Avoid the need to mock the channel graph by marking the channel
// borked. Borked channels still get a reestablish message sent on
// reconnect, while skipping channel graph checks and link creation.
require.NoError(channel.MarkBorked())
// Start the peer, which'll trigger the normal init and start up logic.
startPeerDone := startPeer(t, harness.mockConn, harness.peer)
_, err = fn.RecvOrTimeout(startPeerDone, 2*timeout)
require.NoError(err)
// Assert that we eventually send a channel update.
assertMsgSent(t, harness.mockConn, lnwire.MsgChannelReestablish)
assertMsgSent(t, harness.mockConn, lnwire.MsgChannelUpdate)
}
// TODO(yy): add test for `addActiveChannel` and `handleNewActiveChannel` once
// we have interfaced `lnwallet.LightningChannel` and
// `*contractcourt.ChainArbitrator`.

View File

@ -341,6 +341,7 @@ func createTestPeerWithChannel(t *testing.T, updateChan func(a,
notifier: notifier,
publishTx: publishTx,
mockSwitch: mockSwitch,
mockConn: params.mockConn,
}, nil
}
@ -493,10 +494,14 @@ func (m *mockMessageConn) Flush() (int, error) {
// the bytes sent into the mock's writtenMessages channel.
func (m *mockMessageConn) WriteMessage(msg []byte) error {
m.writeRaceDetectingCounter++
msgCopy := make([]byte, len(msg))
copy(msgCopy, msg)
select {
case m.writtenMessages <- msg:
case m.writtenMessages <- msgCopy:
case <-time.After(timeout):
m.t.Fatalf("timeout sending message: %v", msg)
m.t.Fatalf("timeout sending message: %v", msgCopy)
}
return nil
@ -713,6 +718,11 @@ func createTestPeer(t *testing.T) *peerTestCtx {
return nil
},
PongBuf: make([]byte, lnwire.MaxPongBytes),
FetchLastChanUpdate: func(chanID lnwire.ShortChannelID,
) (*lnwire.ChannelUpdate, error) {
return &lnwire.ChannelUpdate{}, nil
},
}
alicePeer := NewBrontide(*cfg)