Merge pull request #7794 from yyforyongyu/prepare-channel-ready-fix

brontide: refactor peer to prepare channel ready fix
This commit is contained in:
Olaoluwa Osuntokun 2023-07-04 12:39:59 -07:00 committed by GitHub
commit 86e7b4e1e0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 290 additions and 262 deletions

View file

@ -27,6 +27,7 @@ import (
"github.com/lightningnetwork/lnd/labels"
"github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnutils"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
"github.com/lightningnetwork/lnd/lnwallet/chanfunding"
@ -583,14 +584,11 @@ type Manager struct {
// be signalled once the channel is fully open. This barrier acts as a
// synchronization point for any incoming/outgoing HTLCs before the
// channel has been fully opened.
barrierMtx sync.RWMutex
newChanBarriers map[lnwire.ChannelID]chan struct{}
newChanBarriers *lnutils.SyncMap[lnwire.ChannelID, chan struct{}]
localDiscoveryMtx sync.Mutex
localDiscoverySignals map[lnwire.ChannelID]chan struct{}
localDiscoverySignals *lnutils.SyncMap[lnwire.ChannelID, chan struct{}]
handleChannelReadyMtx sync.RWMutex
handleChannelReadyBarriers map[lnwire.ChannelID]struct{}
handleChannelReadyBarriers *lnutils.SyncMap[lnwire.ChannelID, struct{}]
quit chan struct{}
wg sync.WaitGroup
@ -644,21 +642,21 @@ func NewFundingManager(cfg Config) (*Manager, error) {
signedReservations: make(
map[lnwire.ChannelID][32]byte,
),
newChanBarriers: make(
map[lnwire.ChannelID]chan struct{},
),
newChanBarriers: &lnutils.SyncMap[
lnwire.ChannelID, chan struct{},
]{},
fundingMsgs: make(
chan *fundingMsg, msgBufferSize,
),
fundingRequests: make(
chan *InitFundingMsg, msgBufferSize,
),
localDiscoverySignals: make(
map[lnwire.ChannelID]chan struct{},
),
handleChannelReadyBarriers: make(
map[lnwire.ChannelID]struct{},
),
localDiscoverySignals: &lnutils.SyncMap[
lnwire.ChannelID, chan struct{},
]{},
handleChannelReadyBarriers: &lnutils.SyncMap[
lnwire.ChannelID, struct{},
]{},
quit: make(chan struct{}),
}, nil
}
@ -694,17 +692,14 @@ func (f *Manager) start() error {
// re-initialize the channel barriers, and republish the
// funding transaction if we're the initiator.
if channel.IsPending {
f.barrierMtx.Lock()
log.Tracef("Loading pending ChannelPoint(%v), "+
"creating chan barrier",
channel.FundingOutpoint)
f.newChanBarriers[chanID] = make(chan struct{})
f.barrierMtx.Unlock()
f.localDiscoveryMtx.Lock()
f.localDiscoverySignals[chanID] = make(chan struct{})
f.localDiscoveryMtx.Unlock()
f.newChanBarriers.Store(chanID, make(chan struct{}))
f.localDiscoverySignals.Store(
chanID, make(chan struct{}),
)
// Rebroadcast the funding transaction for any pending
// channel that we initiated. No error will be returned
@ -1247,12 +1242,9 @@ func (f *Manager) advancePendingChannelState(
// Find and close the discoverySignal for this channel such
// that ChannelReady messages will be processed.
chanID := lnwire.NewChanIDFromOutPoint(
&channel.FundingOutpoint,
)
f.localDiscoveryMtx.Lock()
defer f.localDiscoveryMtx.Unlock()
if discoverySignal, ok := f.localDiscoverySignals[chanID]; ok {
chanID := lnwire.NewChanIDFromOutPoint(&channel.FundingOutpoint)
discoverySignal, ok := f.localDiscoverySignals.Load(chanID)
if ok {
close(discoverySignal)
}
@ -2158,11 +2150,9 @@ func (f *Manager) continueFundingAccept(resCtx *reservationWithCtx,
// properly synchronize with the writeHandler goroutine, we add a new
// channel to the barriers map which will be closed once the channel is
// fully open.
f.barrierMtx.Lock()
channelID := lnwire.NewChanIDFromOutPoint(outPoint)
log.Debugf("Creating chan barrier for ChanID(%v)", channelID)
f.newChanBarriers[channelID] = make(chan struct{})
f.barrierMtx.Unlock()
f.newChanBarriers.Store(channelID, make(chan struct{}))
// The next message that advances the funding flow will reference the
// channel via its permanent channel ID, so we'll set up this mapping
@ -2279,11 +2269,9 @@ func (f *Manager) handleFundingCreated(peer lnpeer.Peer,
// properly synchronize with the writeHandler goroutine, we add a new
// channel to the barriers map which will be closed once the channel is
// fully open.
f.barrierMtx.Lock()
channelID := lnwire.NewChanIDFromOutPoint(&fundingOut)
log.Debugf("Creating chan barrier for ChanID(%v)", channelID)
f.newChanBarriers[channelID] = make(chan struct{})
f.barrierMtx.Unlock()
f.newChanBarriers.Store(channelID, make(chan struct{}))
log.Infof("sending FundingSigned for pending_id(%x) over "+
"ChannelPoint(%v)", pendingChanID[:], fundingOut)
@ -2329,9 +2317,7 @@ func (f *Manager) handleFundingCreated(peer lnpeer.Peer,
// Create an entry in the local discovery map so we can ensure that we
// process the channel confirmation fully before we receive a
// channel_ready message.
f.localDiscoveryMtx.Lock()
f.localDiscoverySignals[channelID] = make(chan struct{})
f.localDiscoveryMtx.Unlock()
f.localDiscoverySignals.Store(channelID, make(chan struct{}))
// Inform the ChannelNotifier that the channel has entered
// pending open state.
@ -2395,9 +2381,7 @@ func (f *Manager) handleFundingSigned(peer lnpeer.Peer,
// channel_ready message.
fundingPoint := resCtx.reservation.FundingOutpoint()
permChanID := lnwire.NewChanIDFromOutPoint(fundingPoint)
f.localDiscoveryMtx.Lock()
f.localDiscoverySignals[permChanID] = make(chan struct{})
f.localDiscoveryMtx.Unlock()
f.localDiscoverySignals.Store(permChanID, make(chan struct{}))
// We have to store the forwardingPolicy before the reservation context
// is deleted. The policy will then be read and applied in
@ -2914,11 +2898,9 @@ func (f *Manager) handleFundingConfirmation(
// goroutine that the channel now is marked as open in the database
// and that it is acceptable to process channel_ready messages
// from the peer.
f.localDiscoveryMtx.Lock()
if discoverySignal, ok := f.localDiscoverySignals[chanID]; ok {
if discoverySignal, ok := f.localDiscoverySignals.Load(chanID); ok {
close(discoverySignal)
}
f.localDiscoveryMtx.Unlock()
return nil
}
@ -3456,32 +3438,25 @@ func (f *Manager) handleChannelReady(peer lnpeer.Peer,
"peer %x", msg.ChanID,
peer.IdentityKey().SerializeCompressed())
// We now load or create a new channel barrier for this channel.
_, loaded := f.handleChannelReadyBarriers.LoadOrStore(
msg.ChanID, struct{}{},
)
// If we are currently in the process of handling a channel_ready
// message for this channel, ignore.
f.handleChannelReadyMtx.Lock()
_, ok := f.handleChannelReadyBarriers[msg.ChanID]
if ok {
if loaded {
log.Infof("Already handling channelReady for "+
"ChannelID(%v), ignoring.", msg.ChanID)
f.handleChannelReadyMtx.Unlock()
return
}
// If not already handling channelReady for this channel, set up
// barrier, and move on.
f.handleChannelReadyBarriers[msg.ChanID] = struct{}{}
f.handleChannelReadyMtx.Unlock()
defer func() {
f.handleChannelReadyMtx.Lock()
delete(f.handleChannelReadyBarriers, msg.ChanID)
f.handleChannelReadyMtx.Unlock()
}()
f.localDiscoveryMtx.Lock()
localDiscoverySignal, ok := f.localDiscoverySignals[msg.ChanID]
f.localDiscoveryMtx.Unlock()
// If not already handling channelReady for this channel, then the
// `LoadOrStore` has set up a barrier, and it will be removed once this
// function exits.
defer f.handleChannelReadyBarriers.Delete(msg.ChanID)
localDiscoverySignal, ok := f.localDiscoverySignals.Load(msg.ChanID)
if ok {
// Before we proceed with processing the channel_ready
// message, we'll wait for the local waitForFundingConfirmation
@ -3497,9 +3472,7 @@ func (f *Manager) handleChannelReady(peer lnpeer.Peer,
// With the signal received, we can now safely delete the entry
// from the map.
f.localDiscoveryMtx.Lock()
delete(f.localDiscoverySignals, msg.ChanID)
f.localDiscoveryMtx.Unlock()
f.localDiscoverySignals.Delete(msg.ChanID)
}
// First, we'll attempt to locate the channel whose funding workflow is
@ -3617,15 +3590,12 @@ func (f *Manager) handleChannelReady(peer lnpeer.Peer,
// Close the active channel barrier signaling the readHandler
// that commitment related modifications to this channel can
// now proceed.
f.barrierMtx.Lock()
chanBarrier, ok := f.newChanBarriers[chanID]
chanBarrier, ok := f.newChanBarriers.LoadAndDelete(chanID)
if ok {
log.Tracef("Closing chan barrier for ChanID(%v)",
chanID)
close(chanBarrier)
delete(f.newChanBarriers, chanID)
}
f.barrierMtx.Unlock()
}()
if err := peer.AddNewChannel(channel, f.quit); err != nil {

View file

@ -32,6 +32,7 @@ import (
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/invoices"
"github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lnutils"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
"github.com/lightningnetwork/lnd/lnwallet/chancloser"
@ -403,10 +404,6 @@ type Brontide struct {
// objects to queue messages to be sent out on the wire.
outgoingQueue chan outgoingMsg
// activeChanMtx protects access to the activeChannels and
// addedChannels maps.
activeChanMtx sync.RWMutex
// activeChannels is a map which stores the state machines of all
// active channels. Channels are indexed into the map by the txid of
// the funding transaction which opened the channel.
@ -417,17 +414,18 @@ type Brontide struct {
// see if this is a pending channel or not. The tradeoff here is either
// having two maps everywhere (one for pending, one for confirmed chans)
// or having an extra nil-check per access.
activeChannels map[lnwire.ChannelID]*lnwallet.LightningChannel
activeChannels *lnutils.SyncMap[
lnwire.ChannelID, *lnwallet.LightningChannel]
// addedChannels tracks any new channels opened during this peer's
// lifecycle. We use this to filter out these new channels when the time
// comes to request a reenable for active channels, since they will have
// waited a shorter duration.
addedChannels map[lnwire.ChannelID]struct{}
addedChannels *lnutils.SyncMap[lnwire.ChannelID, struct{}]
// newChannels is used by the fundingManager to send fully opened
// newActiveChannel is used by the fundingManager to send fully opened
// channels to the source peer which handled the funding workflow.
newChannels chan *newChannelMsg
newActiveChannel chan *newChannelMsg
// activeMsgStreams is a map from channel id to the channel streams that
// proxy messages to individual, active links.
@ -487,13 +485,15 @@ func NewBrontide(cfg Config) *Brontide {
logPrefix := fmt.Sprintf("Peer(%x):", cfg.PubKeyBytes)
p := &Brontide{
cfg: cfg,
activeSignal: make(chan struct{}),
sendQueue: make(chan outgoingMsg),
outgoingQueue: make(chan outgoingMsg),
addedChannels: make(map[lnwire.ChannelID]struct{}),
activeChannels: make(map[lnwire.ChannelID]*lnwallet.LightningChannel),
newChannels: make(chan *newChannelMsg, 1),
cfg: cfg,
activeSignal: make(chan struct{}),
sendQueue: make(chan outgoingMsg),
outgoingQueue: make(chan outgoingMsg),
addedChannels: &lnutils.SyncMap[lnwire.ChannelID, struct{}]{},
activeChannels: &lnutils.SyncMap[
lnwire.ChannelID, *lnwallet.LightningChannel,
]{},
newActiveChannel: make(chan *newChannelMsg, 1),
activeMsgStreams: make(map[lnwire.ChannelID]*msgStream),
activeChanCloses: make(map[lnwire.ChannelID]*chancloser.ChanCloser),
@ -884,9 +884,7 @@ func (p *Brontide) loadActiveChannels(chans []*channeldb.OpenChannel) (
// here would just be extra work as we'll tear them down when creating
// + adding the final link.
if lnChan.IsPending() {
p.activeChanMtx.Lock()
p.activeChannels[chanID] = nil
p.activeChanMtx.Unlock()
p.activeChannels.Store(chanID, nil)
continue
}
@ -908,9 +906,7 @@ func (p *Brontide) loadActiveChannels(chans []*channeldb.OpenChannel) (
"switch: %v", chanPoint, err)
}
p.activeChanMtx.Lock()
p.activeChannels[chanID] = lnChan
p.activeChanMtx.Unlock()
p.activeChannels.Store(chanID, lnChan)
}
return msgs, nil
@ -1576,7 +1572,8 @@ out:
case *lnwire.ChannelReestablish:
targetChan = msg.ChanID
isLinkUpdate = p.isActiveChannel(targetChan)
isLinkUpdate = p.isActiveChannel(targetChan) ||
p.isPendingChannel(targetChan)
// If we failed to find the link in question, and the
// message received was a channel sync message, then
@ -1664,13 +1661,50 @@ func (p *Brontide) handleCustomMessage(msg *lnwire.Custom) error {
return p.cfg.HandleCustomMessage(p.PubKey(), msg)
}
// isLoadedFromDisk returns true if the provided channel ID is loaded from
// disk.
//
// NOTE: only returns true for pending channels.
func (p *Brontide) isLoadedFromDisk(chanID lnwire.ChannelID) bool {
// If this is a newly added channel, no need to reestablish.
_, added := p.addedChannels.Load(chanID)
if added {
return false
}
// Return false if the channel is unknown.
channel, ok := p.activeChannels.Load(chanID)
if !ok {
return false
}
// During startup, we will use a nil value to mark a pending channel
// that's loaded from disk.
return channel == nil
}
// isActiveChannel returns true if the provided channel id is active, otherwise
// returns false.
func (p *Brontide) isActiveChannel(chanID lnwire.ChannelID) bool {
p.activeChanMtx.RLock()
_, ok := p.activeChannels[chanID]
p.activeChanMtx.RUnlock()
return ok
// The channel would be nil if,
// - the channel doesn't exist, or,
// - the channel exists, but is pending. In this case, we don't
// consider this channel active.
channel, _ := p.activeChannels.Load(chanID)
return channel != nil
}
// isPendingChannel returns true if the provided channel ID is pending, and
// returns false if the channel is active or unknown.
func (p *Brontide) isPendingChannel(chanID lnwire.ChannelID) bool {
// Return false if the channel is unknown.
channel, ok := p.activeChannels.Load(chanID)
if !ok {
return false
}
return channel == nil
}
// storeError stores an error in our peer's buffer of recent errors with the
@ -1680,17 +1714,20 @@ func (p *Brontide) isActiveChannel(chanID lnwire.ChannelID) bool {
func (p *Brontide) storeError(err error) {
var haveChannels bool
p.activeChanMtx.RLock()
for _, channel := range p.activeChannels {
p.activeChannels.Range(func(_ lnwire.ChannelID,
channel *lnwallet.LightningChannel) bool {
// Pending channels will be nil in the activeChannels map.
if channel == nil {
continue
// Return true to continue the iteration.
return true
}
haveChannels = true
break
}
p.activeChanMtx.RUnlock()
// Return false to break the iteration.
return false
})
// If we do not have any active channels with the peer, we do not store
// errors as a dos mitigation.
@ -2286,25 +2323,30 @@ func (p *Brontide) queue(priority bool, msg lnwire.Message,
// ChannelSnapshots returns a slice of channel snapshots detailing all
// currently active channels maintained with the remote peer.
func (p *Brontide) ChannelSnapshots() []*channeldb.ChannelSnapshot {
p.activeChanMtx.RLock()
defer p.activeChanMtx.RUnlock()
snapshots := make(
[]*channeldb.ChannelSnapshot, 0, p.activeChannels.Len(),
)
snapshots := make([]*channeldb.ChannelSnapshot, 0, len(p.activeChannels))
for _, activeChan := range p.activeChannels {
// If the activeChan is nil, then we skip it as the channel is pending.
p.activeChannels.ForEach(func(_ lnwire.ChannelID,
activeChan *lnwallet.LightningChannel) error {
// If the activeChan is nil, then we skip it as the channel is
// pending.
if activeChan == nil {
continue
return nil
}
// We'll only return a snapshot for channels that are
// *immediately* available for routing payments over.
if activeChan.RemoteNextRevocation() == nil {
continue
return nil
}
snapshot := activeChan.StateSnapshot()
snapshots = append(snapshots, snapshot)
}
return nil
})
return snapshots
}
@ -2350,130 +2392,8 @@ out:
// A new channel has arrived which means we've just completed a
// funding workflow. We'll initialize the necessary local
// state, and notify the htlc switch of a new link.
case newChanReq := <-p.newChannels:
newChan := newChanReq.channel
chanPoint := &newChan.FundingOutpoint
chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
// Only update RemoteNextRevocation if the channel is in the
// activeChannels map and if we added the link to the switch.
// Only active channels will be added to the switch.
p.activeChanMtx.Lock()
currentChan, ok := p.activeChannels[chanID]
if ok && currentChan != nil {
p.log.Infof("Already have ChannelPoint(%v), "+
"ignoring.", chanPoint)
p.activeChanMtx.Unlock()
close(newChanReq.err)
// If we're being sent a new channel, and our
// existing channel doesn't have the next
// revocation, then we need to update the
// current existing channel.
if currentChan.RemoteNextRevocation() != nil {
continue
}
p.log.Infof("Processing retransmitted "+
"ChannelReady for ChannelPoint(%v)",
chanPoint)
nextRevoke := newChan.RemoteNextRevocation
err := currentChan.InitNextRevocation(nextRevoke)
if err != nil {
p.log.Errorf("unable to init chan "+
"revocation: %v", err)
continue
}
continue
}
// If not already active, we'll add this channel to the
// set of active channels, so we can look it up later
// easily according to its channel ID.
lnChan, err := lnwallet.NewLightningChannel(
p.cfg.Signer, newChan, p.cfg.SigPool,
)
if err != nil {
p.activeChanMtx.Unlock()
err := fmt.Errorf("unable to create "+
"LightningChannel: %v", err)
p.log.Errorf(err.Error())
newChanReq.err <- err
continue
}
// This refreshes the activeChannels entry if the link was not in
// the switch, also populates for new entries.
p.activeChannels[chanID] = lnChan
p.addedChannels[chanID] = struct{}{}
p.activeChanMtx.Unlock()
p.log.Infof("New channel active ChannelPoint(%v) "+
"with peer", chanPoint)
// Next, we'll assemble a ChannelLink along with the
// necessary items it needs to function.
//
// TODO(roasbeef): panic on below?
chainEvents, err := p.cfg.ChainArb.SubscribeChannelEvents(
*chanPoint,
)
if err != nil {
err := fmt.Errorf("unable to subscribe to "+
"chain events: %v", err)
p.log.Errorf(err.Error())
newChanReq.err <- err
continue
}
// We'll query the localChanCfg of the new channel to determine the
// minimum HTLC value that can be forwarded. For the maximum HTLC
// value that can be forwarded and fees we'll use the default
// values, as they currently are always set to the default values
// at initial channel creation. Note that the maximum HTLC value
// defaults to the cap on the total value of outstanding HTLCs.
//
// TODO(guggero): We should instead pass in the current
// forwarding policy from the funding manager to avoid
// needing us to update the link once the channel is
// announced to the network with custom user values.
fwdMinHtlc := lnChan.FwdMinHtlc()
defaultPolicy := p.cfg.RoutingPolicy
forwardingPolicy := &htlcswitch.ForwardingPolicy{
MinHTLCOut: fwdMinHtlc,
MaxHTLC: newChan.LocalChanCfg.MaxPendingAmount,
BaseFee: defaultPolicy.BaseFee,
FeeRate: defaultPolicy.FeeRate,
TimeLockDelta: defaultPolicy.TimeLockDelta,
}
// If we've reached this point, there are two possible scenarios.
// If the channel was in the active channels map as nil, then it
// was loaded from disk and we need to send reestablish. Else,
// it was not loaded from disk and we don't need to send
// reestablish as this is a fresh channel.
shouldReestablish := ok
// Create the link and add it to the switch.
err = p.addLink(
chanPoint, lnChan, forwardingPolicy,
chainEvents, shouldReestablish,
)
if err != nil {
err := fmt.Errorf("can't register new channel "+
"link(%v) with peer", chanPoint)
p.log.Errorf(err.Error())
newChanReq.err <- err
continue
}
close(newChanReq.err)
case req := <-p.newActiveChannel:
p.handleNewActiveChannel(req)
// We've just received a local request to close an active
// channel. It will either kick of a cooperative channel
@ -2525,16 +2445,19 @@ out:
case <-p.quit:
// As, we've been signalled to exit, we'll reset all
// our active channel back to their default state.
p.activeChanMtx.Lock()
for _, channel := range p.activeChannels {
// If the channel is nil, continue as it's a pending channel.
if channel == nil {
continue
p.activeChannels.ForEach(func(_ lnwire.ChannelID,
lc *lnwallet.LightningChannel) error {
// Exit if the channel is nil as it's a pending
// channel.
if lc == nil {
return nil
}
channel.ResetState()
}
p.activeChanMtx.Unlock()
lc.ResetState()
return nil
})
break out
}
@ -2624,9 +2547,7 @@ func (p *Brontide) fetchActiveChanCloser(chanID lnwire.ChannelID) (
// First, we'll ensure that we actually know of the target channel. If
// not, we'll ignore this message.
p.activeChanMtx.RLock()
channel, ok := p.activeChannels[chanID]
p.activeChanMtx.RUnlock()
channel, ok := p.activeChannels.Load(chanID)
// If the channel isn't in the map or the channel is nil, return
// ErrChannelNotFound as the channel is pending.
@ -2688,19 +2609,18 @@ func (p *Brontide) fetchActiveChanCloser(chanID lnwire.ChannelID) (
func (p *Brontide) filterChannelsToEnable() []wire.OutPoint {
var activePublicChans []wire.OutPoint
p.activeChanMtx.RLock()
defer p.activeChanMtx.RUnlock()
p.activeChannels.Range(func(chanID lnwire.ChannelID,
lnChan *lnwallet.LightningChannel) bool {
for chanID, lnChan := range p.activeChannels {
// If the lnChan is nil, continue as this is a pending channel.
if lnChan == nil {
continue
return true
}
dbChan := lnChan.State()
isPublic := dbChan.ChannelFlags&lnwire.FFAnnounceChannel != 0
if !isPublic || dbChan.IsPending {
continue
return true
}
// We'll also skip any channels added during this peer's
@ -2708,14 +2628,16 @@ func (p *Brontide) filterChannelsToEnable() []wire.OutPoint {
// first announcement will be enabled, and the chan status
// manager will begin monitoring them passively since they exist
// in the database.
if _, ok := p.addedChannels[chanID]; ok {
continue
if _, ok := p.addedChannels.Load(chanID); ok {
return true
}
activePublicChans = append(
activePublicChans, dbChan.FundingOutpoint,
)
}
return true
})
return activePublicChans
}
@ -2969,9 +2891,7 @@ func (p *Brontide) createChanCloser(channel *lnwallet.LightningChannel,
func (p *Brontide) handleLocalCloseReq(req *htlcswitch.ChanClose) {
chanID := lnwire.NewChanIDFromOutPoint(req.ChanPoint)
p.activeChanMtx.RLock()
channel, ok := p.activeChannels[chanID]
p.activeChanMtx.RUnlock()
channel, ok := p.activeChannels.Load(chanID)
// Though this function can't be called for pending channels, we still
// check whether channel is nil for safety.
@ -3081,9 +3001,7 @@ func (p *Brontide) handleLinkFailure(failure linkFailureReport) {
// Retrieve the channel from the map of active channels. We do this to
// have access to it even after WipeChannel remove it from the map.
chanID := lnwire.NewChanIDFromOutPoint(&failure.chanPoint)
p.activeChanMtx.Lock()
lnChan := p.activeChannels[chanID]
p.activeChanMtx.Unlock()
lnChan, _ := p.activeChannels.Load(chanID)
// We begin by wiping the link, which will remove it from the switch,
// such that it won't be attempted used for any more updates.
@ -3305,9 +3223,7 @@ func WaitForChanToClose(bestHeight uint32, notifier chainntnfs.ChainNotifier,
func (p *Brontide) WipeChannel(chanPoint *wire.OutPoint) {
chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
p.activeChanMtx.Lock()
delete(p.activeChannels, chanID)
p.activeChanMtx.Unlock()
p.activeChannels.Delete(chanID)
// Instruct the HtlcSwitch to close this link as the channel is no
// longer active.
@ -3558,7 +3474,7 @@ func (p *Brontide) AddNewChannel(channel *channeldb.OpenChannel,
}
select {
case p.newChannels <- newChanMsg:
case p.newActiveChannel <- newChanMsg:
case <-cancel:
return errors.New("canceled adding new channel")
case <-p.quit:
@ -3742,3 +3658,145 @@ func (p *Brontide) attachChannelEventSubscription() error {
return nil
}
// updateNextRevocation updates the existing channel's next revocation if it's
// nil.
func (p *Brontide) updateNextRevocation(c *channeldb.OpenChannel) {
chanPoint := &c.FundingOutpoint
chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
// Read the current channel.
currentChan, loaded := p.activeChannels.Load(chanID)
// currentChan should exist, but we perform a check anyway to avoid nil
// pointer dereference.
if !loaded {
p.log.Errorf("missing active channel with chanID=%v", chanID)
return
}
// currentChan should not be nil, but we perform a check anyway to
// avoid nil pointer dereference.
if currentChan == nil {
p.log.Errorf("found nil active channel with chanID=%v", chanID)
return
}
// If we're being sent a new channel, and our existing channel doesn't
// have the next revocation, then we need to update the current
// existing channel.
if currentChan.RemoteNextRevocation() != nil {
return
}
p.log.Infof("Processing retransmitted ChannelReady for "+
"ChannelPoint(%v)", chanPoint)
nextRevoke := c.RemoteNextRevocation
err := currentChan.InitNextRevocation(nextRevoke)
if err != nil {
p.log.Errorf("unable to init chan revocation: %v", err)
}
}
// addActiveChannel adds a new active channel to the `activeChannels` map. It
// takes a `channeldb.OpenChannel`, creates a `lnwallet.LightningChannel` from
// it and assembles it with a channel link.
func (p *Brontide) addActiveChannel(c *channeldb.OpenChannel) error {
chanPoint := &c.FundingOutpoint
chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
// If not already active, we'll add this channel to the set of active
// channels, so we can look it up later easily according to its channel
// ID.
lnChan, err := lnwallet.NewLightningChannel(
p.cfg.Signer, c, p.cfg.SigPool,
)
if err != nil {
return fmt.Errorf("unable to create LightningChannel: %w", err)
}
// If we've reached this point, there are two possible scenarios. If
// the channel was in the active channels map as nil, then it was
// loaded from disk and we need to send reestablish. Else, it was not
// loaded from disk and we don't need to send reestablish as this is a
// fresh channel.
shouldReestablish := p.isLoadedFromDisk(chanID)
// Store the channel in the activeChannels map.
p.activeChannels.Store(chanID, lnChan)
p.addedChannels.Store(chanID, struct{}{})
p.log.Infof("New channel active ChannelPoint(%v) with peer", chanPoint)
// Next, we'll assemble a ChannelLink along with the necessary items it
// needs to function.
chainEvents, err := p.cfg.ChainArb.SubscribeChannelEvents(*chanPoint)
if err != nil {
return fmt.Errorf("unable to subscribe to chain events: %w",
err)
}
// We'll query the localChanCfg of the new channel to determine the
// minimum HTLC value that can be forwarded. For the maximum HTLC value
// that can be forwarded and fees we'll use the default values, as they
// currently are always set to the default values at initial channel
// creation. Note that the maximum HTLC value defaults to the cap on
// the total value of outstanding HTLCs.
fwdMinHtlc := lnChan.FwdMinHtlc()
defaultPolicy := p.cfg.RoutingPolicy
forwardingPolicy := &htlcswitch.ForwardingPolicy{
MinHTLCOut: fwdMinHtlc,
MaxHTLC: c.LocalChanCfg.MaxPendingAmount,
BaseFee: defaultPolicy.BaseFee,
FeeRate: defaultPolicy.FeeRate,
TimeLockDelta: defaultPolicy.TimeLockDelta,
}
// Create the link and add it to the switch.
err = p.addLink(
chanPoint, lnChan, forwardingPolicy,
chainEvents, shouldReestablish,
)
if err != nil {
return fmt.Errorf("can't register new channel link(%v) with "+
"peer", chanPoint)
}
return nil
}
// handleNewActiveChannel handles a `newChannelMsg` request. Depending on we
// know this channel ID or not, we'll either add it to the `activeChannels` map
// or init the next revocation for it.
func (p *Brontide) handleNewActiveChannel(req *newChannelMsg) {
newChan := req.channel
chanPoint := &newChan.FundingOutpoint
chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
// Only update RemoteNextRevocation if the channel is in the
// activeChannels map and if we added the link to the switch. Only
// active channels will be added to the switch.
if p.isActiveChannel(chanID) {
p.log.Infof("Already have ChannelPoint(%v), ignoring",
chanPoint)
// Handle it and close the err chan on the request.
close(req.err)
p.updateNextRevocation(newChan)
return
}
// This is a new channel, we now add it to the map.
if err := p.addActiveChannel(req.channel); err != nil {
// Log and send back the error to the request.
p.log.Errorf(err.Error())
req.err <- err
return
}
// Close the err chan if everything went fine.
close(req.err)
}

View file

@ -408,7 +408,7 @@ func createTestPeer(t *testing.T, notifier chainntnfs.ChainNotifier,
alicePeer.remoteFeatures = lnwire.NewFeatureVector(nil, lnwire.Features)
chanID := lnwire.NewChanIDFromOutPoint(channelAlice.ChannelPoint())
alicePeer.activeChannels[chanID] = channelAlice
alicePeer.activeChannels.Store(chanID, channelAlice)
alicePeer.wg.Add(1)
go alicePeer.channelManager()