From 3eb7f54a6d635fb3236d251eabc56767348b52ad Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Thu, 16 Mar 2023 12:32:42 +0800 Subject: [PATCH] peer: add method `handleLinkUpdateMsg` to handle channel update msgs --- peer/brontide.go | 40 ++++++++++++++++++++++++++-------------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/peer/brontide.go b/peer/brontide.go index d430f41b0..21c00ff25 100644 --- a/peer/brontide.go +++ b/peer/brontide.go @@ -1657,20 +1657,7 @@ out: if isLinkUpdate { // If this is a channel update, then we need to feed it // into the channel's in-order message stream. - chanStream, ok := p.activeMsgStreams[targetChan] - if !ok { - // If a stream hasn't yet been created, then - // we'll do so, add it to the map, and finally - // start it. - chanStream = newChanMsgStream(p, targetChan) - p.activeMsgStreams[targetChan] = chanStream - chanStream.Start() - defer chanStream.Stop() - } - - // With the stream obtained, add the message to the - // stream so we can continue processing message. - chanStream.AddMsg(nextMsg) + p.sendLinkUpdateMsg(targetChan, nextMsg) } idleTimer.Reset(idleTimeout) @@ -3883,3 +3870,28 @@ func (p *Brontide) handleNewPendingChannel(req *newChannelMsg) { p.activeChannels.Store(chanID, nil) p.addedChannels.Store(chanID, struct{}{}) } + +// sendLinkUpdateMsg sends a message that updates the channel to the +// channel's message stream. +func (p *Brontide) sendLinkUpdateMsg(cid lnwire.ChannelID, msg lnwire.Message) { + p.log.Tracef("Sending link update msg=%v", msg.MsgType()) + + chanStream, ok := p.activeMsgStreams[cid] + if !ok { + // If a stream hasn't yet been created, then we'll do so, add + // it to the map, and finally start it. + chanStream = newChanMsgStream(p, cid) + p.activeMsgStreams[cid] = chanStream + chanStream.Start() + + // Stop the stream when quit. + go func() { + <-p.quit + chanStream.Stop() + }() + } + + // With the stream obtained, add the message to the stream so we can + // continue processing message. + chanStream.AddMsg(msg) +}