peer: update and use SyncMap in Brontide

This commit applies methods `ForEach` and `Len` to `Brontide`.
This commit is contained in:
yyforyongyu 2023-03-29 19:24:07 +08:00
parent a66d42c682
commit ccb86a9d1d
No known key found for this signature in database
GPG key ID: 9BCD95C4FF296868
2 changed files with 66 additions and 72 deletions

View file

@ -32,6 +32,7 @@ import (
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/invoices"
"github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lnutils"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
"github.com/lightningnetwork/lnd/lnwallet/chancloser"
@ -403,10 +404,6 @@ type Brontide struct {
// objects to queue messages to be sent out on the wire.
outgoingQueue chan outgoingMsg
// activeChanMtx protects access to the activeChannels and
// addedChannels maps.
activeChanMtx sync.RWMutex
// activeChannels is a map which stores the state machines of all
// active channels. Channels are indexed into the map by the txid of
// the funding transaction which opened the channel.
@ -417,13 +414,14 @@ type Brontide struct {
// see if this is a pending channel or not. The tradeoff here is either
// having two maps everywhere (one for pending, one for confirmed chans)
// or having an extra nil-check per access.
activeChannels map[lnwire.ChannelID]*lnwallet.LightningChannel
activeChannels *lnutils.SyncMap[
lnwire.ChannelID, *lnwallet.LightningChannel]
// addedChannels tracks any new channels opened during this peer's
// lifecycle. We use this to filter out these new channels when the time
// comes to request a reenable for active channels, since they will have
// waited a shorter duration.
addedChannels map[lnwire.ChannelID]struct{}
addedChannels *lnutils.SyncMap[lnwire.ChannelID, struct{}]
// newChannels is used by the fundingManager to send fully opened
// channels to the source peer which handled the funding workflow.
@ -487,13 +485,15 @@ func NewBrontide(cfg Config) *Brontide {
logPrefix := fmt.Sprintf("Peer(%x):", cfg.PubKeyBytes)
p := &Brontide{
cfg: cfg,
activeSignal: make(chan struct{}),
sendQueue: make(chan outgoingMsg),
outgoingQueue: make(chan outgoingMsg),
addedChannels: make(map[lnwire.ChannelID]struct{}),
activeChannels: make(map[lnwire.ChannelID]*lnwallet.LightningChannel),
newChannels: make(chan *newChannelMsg, 1),
cfg: cfg,
activeSignal: make(chan struct{}),
sendQueue: make(chan outgoingMsg),
outgoingQueue: make(chan outgoingMsg),
addedChannels: &lnutils.SyncMap[lnwire.ChannelID, struct{}]{},
activeChannels: &lnutils.SyncMap[
lnwire.ChannelID, *lnwallet.LightningChannel,
]{},
newChannels: make(chan *newChannelMsg, 1),
activeMsgStreams: make(map[lnwire.ChannelID]*msgStream),
activeChanCloses: make(map[lnwire.ChannelID]*chancloser.ChanCloser),
@ -884,9 +884,7 @@ func (p *Brontide) loadActiveChannels(chans []*channeldb.OpenChannel) (
// here would just be extra work as we'll tear them down when creating
// + adding the final link.
if lnChan.IsPending() {
p.activeChanMtx.Lock()
p.activeChannels[chanID] = nil
p.activeChanMtx.Unlock()
p.activeChannels.Store(chanID, nil)
continue
}
@ -908,9 +906,7 @@ func (p *Brontide) loadActiveChannels(chans []*channeldb.OpenChannel) (
"switch: %v", chanPoint, err)
}
p.activeChanMtx.Lock()
p.activeChannels[chanID] = lnChan
p.activeChanMtx.Unlock()
p.activeChannels.Store(chanID, lnChan)
}
return msgs, nil
@ -1667,9 +1663,7 @@ func (p *Brontide) handleCustomMessage(msg *lnwire.Custom) error {
// isActiveChannel returns true if the provided channel id is active, otherwise
// returns false.
func (p *Brontide) isActiveChannel(chanID lnwire.ChannelID) bool {
p.activeChanMtx.RLock()
_, ok := p.activeChannels[chanID]
p.activeChanMtx.RUnlock()
_, ok := p.activeChannels.Load(chanID)
return ok
}
@ -1680,17 +1674,20 @@ func (p *Brontide) isActiveChannel(chanID lnwire.ChannelID) bool {
func (p *Brontide) storeError(err error) {
var haveChannels bool
p.activeChanMtx.RLock()
for _, channel := range p.activeChannels {
p.activeChannels.Range(func(_ lnwire.ChannelID,
channel *lnwallet.LightningChannel) bool {
// Pending channels will be nil in the activeChannels map.
if channel == nil {
continue
// Return true to continue the iteration.
return true
}
haveChannels = true
break
}
p.activeChanMtx.RUnlock()
// Return false to break the iteration.
return false
})
// If we do not have any active channels with the peer, we do not store
// errors as a dos mitigation.
@ -2286,25 +2283,30 @@ func (p *Brontide) queue(priority bool, msg lnwire.Message,
// ChannelSnapshots returns a slice of channel snapshots detailing all
// currently active channels maintained with the remote peer.
func (p *Brontide) ChannelSnapshots() []*channeldb.ChannelSnapshot {
p.activeChanMtx.RLock()
defer p.activeChanMtx.RUnlock()
snapshots := make(
[]*channeldb.ChannelSnapshot, 0, p.activeChannels.Len(),
)
snapshots := make([]*channeldb.ChannelSnapshot, 0, len(p.activeChannels))
for _, activeChan := range p.activeChannels {
// If the activeChan is nil, then we skip it as the channel is pending.
p.activeChannels.ForEach(func(_ lnwire.ChannelID,
activeChan *lnwallet.LightningChannel) error {
// If the activeChan is nil, then we skip it as the channel is
// pending.
if activeChan == nil {
continue
return nil
}
// We'll only return a snapshot for channels that are
// *immediately* available for routing payments over.
if activeChan.RemoteNextRevocation() == nil {
continue
return nil
}
snapshot := activeChan.StateSnapshot()
snapshots = append(snapshots, snapshot)
}
return nil
})
return snapshots
}
@ -2358,13 +2360,11 @@ out:
// Only update RemoteNextRevocation if the channel is in the
// activeChannels map and if we added the link to the switch.
// Only active channels will be added to the switch.
p.activeChanMtx.Lock()
currentChan, ok := p.activeChannels[chanID]
currentChan, ok := p.activeChannels.Load(chanID)
if ok && currentChan != nil {
p.log.Infof("Already have ChannelPoint(%v), "+
"ignoring.", chanPoint)
p.activeChanMtx.Unlock()
close(newChanReq.err)
// If we're being sent a new channel, and our
@ -2397,7 +2397,6 @@ out:
p.cfg.Signer, newChan, p.cfg.SigPool,
)
if err != nil {
p.activeChanMtx.Unlock()
err := fmt.Errorf("unable to create "+
"LightningChannel: %v", err)
p.log.Errorf(err.Error())
@ -2408,9 +2407,8 @@ out:
// This refreshes the activeChannels entry if the link was not in
// the switch, also populates for new entries.
p.activeChannels[chanID] = lnChan
p.addedChannels[chanID] = struct{}{}
p.activeChanMtx.Unlock()
p.activeChannels.Store(chanID, lnChan)
p.addedChannels.Store(chanID, struct{}{})
p.log.Infof("New channel active ChannelPoint(%v) "+
"with peer", chanPoint)
@ -2525,16 +2523,19 @@ out:
case <-p.quit:
// As, we've been signalled to exit, we'll reset all
// our active channel back to their default state.
p.activeChanMtx.Lock()
for _, channel := range p.activeChannels {
// If the channel is nil, continue as it's a pending channel.
if channel == nil {
continue
p.activeChannels.ForEach(func(_ lnwire.ChannelID,
lc *lnwallet.LightningChannel) error {
// Exit if the channel is nil as it's a pending
// channel.
if lc == nil {
return nil
}
channel.ResetState()
}
p.activeChanMtx.Unlock()
lc.ResetState()
return nil
})
break out
}
@ -2624,9 +2625,7 @@ func (p *Brontide) fetchActiveChanCloser(chanID lnwire.ChannelID) (
// First, we'll ensure that we actually know of the target channel. If
// not, we'll ignore this message.
p.activeChanMtx.RLock()
channel, ok := p.activeChannels[chanID]
p.activeChanMtx.RUnlock()
channel, ok := p.activeChannels.Load(chanID)
// If the channel isn't in the map or the channel is nil, return
// ErrChannelNotFound as the channel is pending.
@ -2688,19 +2687,18 @@ func (p *Brontide) fetchActiveChanCloser(chanID lnwire.ChannelID) (
func (p *Brontide) filterChannelsToEnable() []wire.OutPoint {
var activePublicChans []wire.OutPoint
p.activeChanMtx.RLock()
defer p.activeChanMtx.RUnlock()
p.activeChannels.Range(func(chanID lnwire.ChannelID,
lnChan *lnwallet.LightningChannel) bool {
for chanID, lnChan := range p.activeChannels {
// If the lnChan is nil, continue as this is a pending channel.
if lnChan == nil {
continue
return true
}
dbChan := lnChan.State()
isPublic := dbChan.ChannelFlags&lnwire.FFAnnounceChannel != 0
if !isPublic || dbChan.IsPending {
continue
return true
}
// We'll also skip any channels added during this peer's
@ -2708,14 +2706,16 @@ func (p *Brontide) filterChannelsToEnable() []wire.OutPoint {
// first announcement will be enabled, and the chan status
// manager will begin monitoring them passively since they exist
// in the database.
if _, ok := p.addedChannels[chanID]; ok {
continue
if _, ok := p.addedChannels.Load(chanID); ok {
return true
}
activePublicChans = append(
activePublicChans, dbChan.FundingOutpoint,
)
}
return true
})
return activePublicChans
}
@ -2969,9 +2969,7 @@ func (p *Brontide) createChanCloser(channel *lnwallet.LightningChannel,
func (p *Brontide) handleLocalCloseReq(req *htlcswitch.ChanClose) {
chanID := lnwire.NewChanIDFromOutPoint(req.ChanPoint)
p.activeChanMtx.RLock()
channel, ok := p.activeChannels[chanID]
p.activeChanMtx.RUnlock()
channel, ok := p.activeChannels.Load(chanID)
// Though this function can't be called for pending channels, we still
// check whether channel is nil for safety.
@ -3081,9 +3079,7 @@ func (p *Brontide) handleLinkFailure(failure linkFailureReport) {
// Retrieve the channel from the map of active channels. We do this to
// have access to it even after WipeChannel remove it from the map.
chanID := lnwire.NewChanIDFromOutPoint(&failure.chanPoint)
p.activeChanMtx.Lock()
lnChan := p.activeChannels[chanID]
p.activeChanMtx.Unlock()
lnChan, _ := p.activeChannels.Load(chanID)
// We begin by wiping the link, which will remove it from the switch,
// such that it won't be attempted used for any more updates.
@ -3305,9 +3301,7 @@ func WaitForChanToClose(bestHeight uint32, notifier chainntnfs.ChainNotifier,
func (p *Brontide) WipeChannel(chanPoint *wire.OutPoint) {
chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
p.activeChanMtx.Lock()
delete(p.activeChannels, chanID)
p.activeChanMtx.Unlock()
p.activeChannels.Delete(chanID)
// Instruct the HtlcSwitch to close this link as the channel is no
// longer active.

View file

@ -408,7 +408,7 @@ func createTestPeer(t *testing.T, notifier chainntnfs.ChainNotifier,
alicePeer.remoteFeatures = lnwire.NewFeatureVector(nil, lnwire.Features)
chanID := lnwire.NewChanIDFromOutPoint(channelAlice.ChannelPoint())
alicePeer.activeChannels[chanID] = channelAlice
alicePeer.activeChannels.Store(chanID, channelAlice)
alicePeer.wg.Add(1)
go alicePeer.channelManager()