peer: track pending channels in Brontide

This commit adds a new channel `newPendingChannel` and its dedicated
handler `handleNewPendingChannel` to keep track of pending open
channels. This should not affect the original handling of new active
channels, except `addedChannels` is now updated in
`handleNewPendingChannel` such that this new pending channel won't be
reestablished in link.
This commit is contained in:
yyforyongyu 2023-03-16 21:24:04 +08:00
parent dfe95d74ea
commit f39c568c94
No known key found for this signature in database
GPG key ID: 9BCD95C4FF296868

View file

@ -88,8 +88,13 @@ type outgoingMsg struct {
// the receiver of the request to report when the channel creation process has
// completed.
type newChannelMsg struct {
// channel is used when the pending channel becomes active.
channel *channeldb.OpenChannel
err chan error
// channelID is used when there's a new pending channel.
channelID lnwire.ChannelID
err chan error
}
type customMsg struct {
@ -427,6 +432,10 @@ type Brontide struct {
// channels to the source peer which handled the funding workflow.
newActiveChannel chan *newChannelMsg
// newPendingChannel is used by the fundingManager to send pending open
// channels to the source peer which handled the funding workflow.
newPendingChannel chan *newChannelMsg
// activeMsgStreams is a map from channel id to the channel streams that
// proxy messages to individual, active links.
activeMsgStreams map[lnwire.ChannelID]*msgStream
@ -493,7 +502,8 @@ func NewBrontide(cfg Config) *Brontide {
activeChannels: &lnutils.SyncMap[
lnwire.ChannelID, *lnwallet.LightningChannel,
]{},
newActiveChannel: make(chan *newChannelMsg, 1),
newActiveChannel: make(chan *newChannelMsg, 1),
newPendingChannel: make(chan *newChannelMsg, 1),
activeMsgStreams: make(map[lnwire.ChannelID]*msgStream),
activeChanCloses: make(map[lnwire.ChannelID]*chancloser.ChanCloser),
@ -2382,6 +2392,14 @@ func (p *Brontide) channelManager() {
out:
for {
select {
// A new pending channel has arrived which means we are about
// to complete a funding workflow and is waiting for the final
// `ChannelReady` messages to be exchanged. We will add this
// channel to the `activeChannels` with a nil value to indicate
// this is a pending channel.
case req := <-p.newPendingChannel:
p.handleNewPendingChannel(req)
// A new channel has arrived which means we've just completed a
// funding workflow. We'll initialize the necessary local
// state, and notify the htlc switch of a new link.
@ -3484,6 +3502,44 @@ func (p *Brontide) AddNewChannel(channel *channeldb.OpenChannel,
}
}
// AddPendingChannel adds a pending open channel to the peer. The channel
// should fail to be added if the cancel channel is closed.
//
// NOTE: Part of the lnpeer.Peer interface.
func (p *Brontide) AddPendingChannel(cid lnwire.ChannelID,
cancel <-chan struct{}) error {
errChan := make(chan error, 1)
newChanMsg := &newChannelMsg{
channelID: cid,
err: errChan,
}
select {
case p.newPendingChannel <- newChanMsg:
case <-cancel:
return errors.New("canceled adding pending channel")
case <-p.quit:
return lnpeer.ErrPeerExiting
}
// We pause here to wait for the peer to recognize the new pending
// channel before we close the channel barrier corresponding to the
// channel.
select {
case err := <-errChan:
return err
case <-cancel:
return errors.New("canceled adding pending channel")
case <-p.quit:
return lnpeer.ErrPeerExiting
}
}
// StartTime returns the time at which the connection was established if the
// peer started successfully, and zero otherwise.
func (p *Brontide) StartTime() time.Time {
@ -3718,7 +3774,6 @@ func (p *Brontide) addActiveChannel(c *channeldb.OpenChannel) error {
// Store the channel in the activeChannels map.
p.activeChannels.Store(chanID, lnChan)
p.addedChannels.Store(chanID, struct{}{})
p.log.Infof("New channel active ChannelPoint(%v) with peer", chanPoint)
@ -3793,3 +3848,38 @@ func (p *Brontide) handleNewActiveChannel(req *newChannelMsg) {
// Close the err chan if everything went fine.
close(req.err)
}
// handleNewPendingChannel takes a `newChannelMsg` request and add it to
// `activeChannels` map with nil value. This pending channel will be saved as
// it may become active in the future. Once active, the funding manager will
// send it again via `AddNewChannel`, and we'd handle the link creation there.
func (p *Brontide) handleNewPendingChannel(req *newChannelMsg) {
defer close(req.err)
chanID := req.channelID
// If we already have this channel, something is wrong with the funding
// flow as it will only be marked as active after `ChannelReady` is
// handled. In this case, we will do nothing but log an error, just in
// case this is a legit channel.
if p.isActiveChannel(chanID) {
p.log.Errorf("Channel(%v) is already active, ignoring "+
"pending channel request", chanID)
return
}
// The channel has already been added, we will do nothing and return.
if p.isPendingChannel(chanID) {
p.log.Infof("Channel(%v) is already added, ignoring "+
"pending channel request", chanID)
return
}
// This is a new channel, we now add it to the map `activeChannels`
// with nil value and mark it as a newly added channel in
// `addedChannels`.
p.activeChannels.Store(chanID, nil)
p.addedChannels.Store(chanID, struct{}{})
}