peer: add method handleLinkUpdateMsg to handle channel update msgs

This commit is contained in:
yyforyongyu 2023-03-16 12:32:42 +08:00
parent e46bd8e177
commit 3eb7f54a6d
No known key found for this signature in database
GPG Key ID: 9BCD95C4FF296868

View File

@ -1657,20 +1657,7 @@ out:
if isLinkUpdate {
// If this is a channel update, then we need to feed it
// into the channel's in-order message stream.
chanStream, ok := p.activeMsgStreams[targetChan]
if !ok {
// If a stream hasn't yet been created, then
// we'll do so, add it to the map, and finally
// start it.
chanStream = newChanMsgStream(p, targetChan)
p.activeMsgStreams[targetChan] = chanStream
chanStream.Start()
defer chanStream.Stop()
}
// With the stream obtained, add the message to the
// stream so we can continue processing message.
chanStream.AddMsg(nextMsg)
p.sendLinkUpdateMsg(targetChan, nextMsg)
}
idleTimer.Reset(idleTimeout)
@ -3883,3 +3870,28 @@ func (p *Brontide) handleNewPendingChannel(req *newChannelMsg) {
p.activeChannels.Store(chanID, nil)
p.addedChannels.Store(chanID, struct{}{})
}
// sendLinkUpdateMsg sends a message that updates the channel to the
// channel's message stream.
func (p *Brontide) sendLinkUpdateMsg(cid lnwire.ChannelID, msg lnwire.Message) {
p.log.Tracef("Sending link update msg=%v", msg.MsgType())
chanStream, ok := p.activeMsgStreams[cid]
if !ok {
// If a stream hasn't yet been created, then we'll do so, add
// it to the map, and finally start it.
chanStream = newChanMsgStream(p, cid)
p.activeMsgStreams[cid] = chanStream
chanStream.Start()
// Stop the stream when quit.
go func() {
<-p.quit
chanStream.Stop()
}()
}
// With the stream obtained, add the message to the stream so we can
// continue processing message.
chanStream.AddMsg(msg)
}