From ccb86a9d1dd5840cc42ec9f04001620000e1edb2 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Wed, 29 Mar 2023 19:24:07 +0800 Subject: [PATCH] peer: update and use SyncMap in Brontide This commit applies methods `ForEach` and `Len` to `Brontide`. --- peer/brontide.go | 136 ++++++++++++++++++++++----------------------- peer/test_utils.go | 2 +- 2 files changed, 66 insertions(+), 72 deletions(-) diff --git a/peer/brontide.go b/peer/brontide.go index 7866dbe3c..d048e27e1 100644 --- a/peer/brontide.go +++ b/peer/brontide.go @@ -32,6 +32,7 @@ import ( "github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/invoices" "github.com/lightningnetwork/lnd/lnpeer" + "github.com/lightningnetwork/lnd/lnutils" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet/chainfee" "github.com/lightningnetwork/lnd/lnwallet/chancloser" @@ -403,10 +404,6 @@ type Brontide struct { // objects to queue messages to be sent out on the wire. outgoingQueue chan outgoingMsg - // activeChanMtx protects access to the activeChannels and - // addedChannels maps. - activeChanMtx sync.RWMutex - // activeChannels is a map which stores the state machines of all // active channels. Channels are indexed into the map by the txid of // the funding transaction which opened the channel. @@ -417,13 +414,14 @@ type Brontide struct { // see if this is a pending channel or not. The tradeoff here is either // having two maps everywhere (one for pending, one for confirmed chans) // or having an extra nil-check per access. - activeChannels map[lnwire.ChannelID]*lnwallet.LightningChannel + activeChannels *lnutils.SyncMap[ + lnwire.ChannelID, *lnwallet.LightningChannel] // addedChannels tracks any new channels opened during this peer's // lifecycle. We use this to filter out these new channels when the time // comes to request a reenable for active channels, since they will have // waited a shorter duration. - addedChannels map[lnwire.ChannelID]struct{} + addedChannels *lnutils.SyncMap[lnwire.ChannelID, struct{}] // newChannels is used by the fundingManager to send fully opened // channels to the source peer which handled the funding workflow. @@ -487,13 +485,15 @@ func NewBrontide(cfg Config) *Brontide { logPrefix := fmt.Sprintf("Peer(%x):", cfg.PubKeyBytes) p := &Brontide{ - cfg: cfg, - activeSignal: make(chan struct{}), - sendQueue: make(chan outgoingMsg), - outgoingQueue: make(chan outgoingMsg), - addedChannels: make(map[lnwire.ChannelID]struct{}), - activeChannels: make(map[lnwire.ChannelID]*lnwallet.LightningChannel), - newChannels: make(chan *newChannelMsg, 1), + cfg: cfg, + activeSignal: make(chan struct{}), + sendQueue: make(chan outgoingMsg), + outgoingQueue: make(chan outgoingMsg), + addedChannels: &lnutils.SyncMap[lnwire.ChannelID, struct{}]{}, + activeChannels: &lnutils.SyncMap[ + lnwire.ChannelID, *lnwallet.LightningChannel, + ]{}, + newChannels: make(chan *newChannelMsg, 1), activeMsgStreams: make(map[lnwire.ChannelID]*msgStream), activeChanCloses: make(map[lnwire.ChannelID]*chancloser.ChanCloser), @@ -884,9 +884,7 @@ func (p *Brontide) loadActiveChannels(chans []*channeldb.OpenChannel) ( // here would just be extra work as we'll tear them down when creating // + adding the final link. if lnChan.IsPending() { - p.activeChanMtx.Lock() - p.activeChannels[chanID] = nil - p.activeChanMtx.Unlock() + p.activeChannels.Store(chanID, nil) continue } @@ -908,9 +906,7 @@ func (p *Brontide) loadActiveChannels(chans []*channeldb.OpenChannel) ( "switch: %v", chanPoint, err) } - p.activeChanMtx.Lock() - p.activeChannels[chanID] = lnChan - p.activeChanMtx.Unlock() + p.activeChannels.Store(chanID, lnChan) } return msgs, nil @@ -1667,9 +1663,7 @@ func (p *Brontide) handleCustomMessage(msg *lnwire.Custom) error { // isActiveChannel returns true if the provided channel id is active, otherwise // returns false. func (p *Brontide) isActiveChannel(chanID lnwire.ChannelID) bool { - p.activeChanMtx.RLock() - _, ok := p.activeChannels[chanID] - p.activeChanMtx.RUnlock() + _, ok := p.activeChannels.Load(chanID) return ok } @@ -1680,17 +1674,20 @@ func (p *Brontide) isActiveChannel(chanID lnwire.ChannelID) bool { func (p *Brontide) storeError(err error) { var haveChannels bool - p.activeChanMtx.RLock() - for _, channel := range p.activeChannels { + p.activeChannels.Range(func(_ lnwire.ChannelID, + channel *lnwallet.LightningChannel) bool { + // Pending channels will be nil in the activeChannels map. if channel == nil { - continue + // Return true to continue the iteration. + return true } haveChannels = true - break - } - p.activeChanMtx.RUnlock() + + // Return false to break the iteration. + return false + }) // If we do not have any active channels with the peer, we do not store // errors as a dos mitigation. @@ -2286,25 +2283,30 @@ func (p *Brontide) queue(priority bool, msg lnwire.Message, // ChannelSnapshots returns a slice of channel snapshots detailing all // currently active channels maintained with the remote peer. func (p *Brontide) ChannelSnapshots() []*channeldb.ChannelSnapshot { - p.activeChanMtx.RLock() - defer p.activeChanMtx.RUnlock() + snapshots := make( + []*channeldb.ChannelSnapshot, 0, p.activeChannels.Len(), + ) - snapshots := make([]*channeldb.ChannelSnapshot, 0, len(p.activeChannels)) - for _, activeChan := range p.activeChannels { - // If the activeChan is nil, then we skip it as the channel is pending. + p.activeChannels.ForEach(func(_ lnwire.ChannelID, + activeChan *lnwallet.LightningChannel) error { + + // If the activeChan is nil, then we skip it as the channel is + // pending. if activeChan == nil { - continue + return nil } // We'll only return a snapshot for channels that are // *immediately* available for routing payments over. if activeChan.RemoteNextRevocation() == nil { - continue + return nil } snapshot := activeChan.StateSnapshot() snapshots = append(snapshots, snapshot) - } + + return nil + }) return snapshots } @@ -2358,13 +2360,11 @@ out: // Only update RemoteNextRevocation if the channel is in the // activeChannels map and if we added the link to the switch. // Only active channels will be added to the switch. - p.activeChanMtx.Lock() - currentChan, ok := p.activeChannels[chanID] + currentChan, ok := p.activeChannels.Load(chanID) if ok && currentChan != nil { p.log.Infof("Already have ChannelPoint(%v), "+ "ignoring.", chanPoint) - p.activeChanMtx.Unlock() close(newChanReq.err) // If we're being sent a new channel, and our @@ -2397,7 +2397,6 @@ out: p.cfg.Signer, newChan, p.cfg.SigPool, ) if err != nil { - p.activeChanMtx.Unlock() err := fmt.Errorf("unable to create "+ "LightningChannel: %v", err) p.log.Errorf(err.Error()) @@ -2408,9 +2407,8 @@ out: // This refreshes the activeChannels entry if the link was not in // the switch, also populates for new entries. - p.activeChannels[chanID] = lnChan - p.addedChannels[chanID] = struct{}{} - p.activeChanMtx.Unlock() + p.activeChannels.Store(chanID, lnChan) + p.addedChannels.Store(chanID, struct{}{}) p.log.Infof("New channel active ChannelPoint(%v) "+ "with peer", chanPoint) @@ -2525,16 +2523,19 @@ out: case <-p.quit: // As, we've been signalled to exit, we'll reset all // our active channel back to their default state. - p.activeChanMtx.Lock() - for _, channel := range p.activeChannels { - // If the channel is nil, continue as it's a pending channel. - if channel == nil { - continue + p.activeChannels.ForEach(func(_ lnwire.ChannelID, + lc *lnwallet.LightningChannel) error { + + // Exit if the channel is nil as it's a pending + // channel. + if lc == nil { + return nil } - channel.ResetState() - } - p.activeChanMtx.Unlock() + lc.ResetState() + + return nil + }) break out } @@ -2624,9 +2625,7 @@ func (p *Brontide) fetchActiveChanCloser(chanID lnwire.ChannelID) ( // First, we'll ensure that we actually know of the target channel. If // not, we'll ignore this message. - p.activeChanMtx.RLock() - channel, ok := p.activeChannels[chanID] - p.activeChanMtx.RUnlock() + channel, ok := p.activeChannels.Load(chanID) // If the channel isn't in the map or the channel is nil, return // ErrChannelNotFound as the channel is pending. @@ -2688,19 +2687,18 @@ func (p *Brontide) fetchActiveChanCloser(chanID lnwire.ChannelID) ( func (p *Brontide) filterChannelsToEnable() []wire.OutPoint { var activePublicChans []wire.OutPoint - p.activeChanMtx.RLock() - defer p.activeChanMtx.RUnlock() + p.activeChannels.Range(func(chanID lnwire.ChannelID, + lnChan *lnwallet.LightningChannel) bool { - for chanID, lnChan := range p.activeChannels { // If the lnChan is nil, continue as this is a pending channel. if lnChan == nil { - continue + return true } dbChan := lnChan.State() isPublic := dbChan.ChannelFlags&lnwire.FFAnnounceChannel != 0 if !isPublic || dbChan.IsPending { - continue + return true } // We'll also skip any channels added during this peer's @@ -2708,14 +2706,16 @@ func (p *Brontide) filterChannelsToEnable() []wire.OutPoint { // first announcement will be enabled, and the chan status // manager will begin monitoring them passively since they exist // in the database. - if _, ok := p.addedChannels[chanID]; ok { - continue + if _, ok := p.addedChannels.Load(chanID); ok { + return true } activePublicChans = append( activePublicChans, dbChan.FundingOutpoint, ) - } + + return true + }) return activePublicChans } @@ -2969,9 +2969,7 @@ func (p *Brontide) createChanCloser(channel *lnwallet.LightningChannel, func (p *Brontide) handleLocalCloseReq(req *htlcswitch.ChanClose) { chanID := lnwire.NewChanIDFromOutPoint(req.ChanPoint) - p.activeChanMtx.RLock() - channel, ok := p.activeChannels[chanID] - p.activeChanMtx.RUnlock() + channel, ok := p.activeChannels.Load(chanID) // Though this function can't be called for pending channels, we still // check whether channel is nil for safety. @@ -3081,9 +3079,7 @@ func (p *Brontide) handleLinkFailure(failure linkFailureReport) { // Retrieve the channel from the map of active channels. We do this to // have access to it even after WipeChannel remove it from the map. chanID := lnwire.NewChanIDFromOutPoint(&failure.chanPoint) - p.activeChanMtx.Lock() - lnChan := p.activeChannels[chanID] - p.activeChanMtx.Unlock() + lnChan, _ := p.activeChannels.Load(chanID) // We begin by wiping the link, which will remove it from the switch, // such that it won't be attempted used for any more updates. @@ -3305,9 +3301,7 @@ func WaitForChanToClose(bestHeight uint32, notifier chainntnfs.ChainNotifier, func (p *Brontide) WipeChannel(chanPoint *wire.OutPoint) { chanID := lnwire.NewChanIDFromOutPoint(chanPoint) - p.activeChanMtx.Lock() - delete(p.activeChannels, chanID) - p.activeChanMtx.Unlock() + p.activeChannels.Delete(chanID) // Instruct the HtlcSwitch to close this link as the channel is no // longer active. diff --git a/peer/test_utils.go b/peer/test_utils.go index a997411df..d6c16b076 100644 --- a/peer/test_utils.go +++ b/peer/test_utils.go @@ -408,7 +408,7 @@ func createTestPeer(t *testing.T, notifier chainntnfs.ChainNotifier, alicePeer.remoteFeatures = lnwire.NewFeatureVector(nil, lnwire.Features) chanID := lnwire.NewChanIDFromOutPoint(channelAlice.ChannelPoint()) - alicePeer.activeChannels[chanID] = channelAlice + alicePeer.activeChannels.Store(chanID, channelAlice) alicePeer.wg.Add(1) go alicePeer.channelManager()