Merge pull request #6419 from Crypt-iQ/shutdown_coop_restart

peer: allow restart during cooperative close
This commit is contained in:
Olaoluwa Osuntokun 2022-04-28 17:59:06 -07:00 committed by GitHub
commit de6fcf15b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 210 additions and 101 deletions

View File

@ -108,6 +108,9 @@ then watch it on chain. Taproot script spends are also supported through the
* [Fixed incorrect PSBT de-serialization for transactions with no
inputs](https://github.com/lightningnetwork/lnd/pull/6428).
* [Fixed a spec-compliance issue where lnd was not allowing cooperative
close to continue after a peer disconnect](https://github.com/lightningnetwork/lnd/pull/6419).
## Routing
* [Add a new `time_pref` parameter to the QueryRoutes and SendPayment APIs](https://github.com/lightningnetwork/lnd/pull/6024) that

View File

@ -483,6 +483,10 @@ func (c *ChanCloser) ProcessCloseMsg(msg lnwire.Message) ([]lnwire.Message,
feeProposal := calcCompromiseFee(c.chanPoint, c.idealFeeSat,
c.lastFeeProposal, remoteProposedFee,
)
if feeProposal > c.idealFeeSat*3 {
return nil, false, fmt.Errorf("couldn't find" +
" compromise fee")
}
// With our new fee proposal calculated, we'll craft a new close
// signed signature to send to the other party so we can continue

View File

@ -60,11 +60,6 @@ const (
// message.
handshakeTimeout = 15 * time.Second
// outgoingQueueLen is the buffer size of the channel which houses
// messages to be sent across the wire, requested by objects outside
// this struct.
outgoingQueueLen = 50
// ErrorBufferSize is the number of historic peer errors that we store.
ErrorBufferSize = 10
)
@ -348,7 +343,7 @@ type Config struct {
// several helper goroutines to handle events such as HTLC timeouts, new
// funding workflow, and detecting an uncooperative closure of any active
// channels.
// TODO(roasbeef): proper reconnection logic
// TODO(roasbeef): proper reconnection logic.
type Brontide struct {
// MUST be used atomically.
started int32
@ -702,6 +697,31 @@ func (p *Brontide) loadActiveChannels(chans []*channeldb.OpenChannel) (
}
msgs = append(msgs, chanSync)
// Check if this channel needs to have the cooperative
// close process restarted. If so, we'll need to send
// the Shutdown message that is returned.
if dbChan.HasChanStatus(
channeldb.ChanStatusCoopBroadcasted,
) {
shutdownMsg, err := p.restartCoopClose(lnChan)
if err != nil {
peerLog.Errorf("Unable to restart "+
"coop close for channel: %v",
err)
continue
}
if shutdownMsg == nil {
continue
}
// Append the message to the set of messages to
// send.
msgs = append(msgs, shutdownMsg)
}
continue
}
@ -1031,7 +1051,7 @@ func (p *Brontide) readNextMessage() (lnwire.Message, error) {
// delivered via closure to a receiver. These messages MUST be in order due to
// the nature of the lightning channel commitment and gossiper state machines.
// TODO(conner): use stream handler interface to abstract out stream
// state/logging
// state/logging.
type msgStream struct {
streamShutdown int32 // To be used atomically.
@ -2408,6 +2428,15 @@ func (p *Brontide) reenableActiveChannels() {
func (p *Brontide) fetchActiveChanCloser(chanID lnwire.ChannelID) (
*chancloser.ChanCloser, error) {
chanCloser, found := p.activeChanCloses[chanID]
if found {
// An entry will only be found if the closer has already been
// created for a non-pending channel or for a channel that had
// previously started the shutdown process but the connection
// was restarted.
return chanCloser, nil
}
// First, we'll ensure that we actually know of the target channel. If
// not, we'll ignore this message.
p.activeChanMtx.RLock()
@ -2420,74 +2449,51 @@ func (p *Brontide) fetchActiveChanCloser(chanID lnwire.ChannelID) (
return nil, ErrChannelNotFound
}
// We'll attempt to look up the matching state machine, if we can't
// find one then this means that the remote party is initiating a
// cooperative channel closure.
chanCloser, ok := p.activeChanCloses[chanID]
if !ok {
// Optimistically try a link shutdown, erroring out if it
// failed.
if err := p.tryLinkShutdown(chanID); err != nil {
peerLog.Errorf("failed link shutdown: %v", err)
return nil, err
}
// We'll create a valid closing state machine in order to
// respond to the initiated cooperative channel closure. First,
// we set the delivery script that our funds will be paid out
// to. If an upfront shutdown script was set, we will use it.
// Otherwise, we get a fresh delivery script.
//
// TODO: Expose option to allow upfront shutdown script from
// watch-only accounts.
deliveryScript := channel.LocalUpfrontShutdownScript()
if len(deliveryScript) == 0 {
var err error
deliveryScript, err = p.genDeliveryScript()
if err != nil {
peerLog.Errorf("unable to gen delivery script: %v", err)
return nil, fmt.Errorf("close addr unavailable")
}
}
// In order to begin fee negotiations, we'll first compute our
// target ideal fee-per-kw.
feePerKw, err := p.cfg.FeeEstimator.EstimateFeePerKW(
p.cfg.CoopCloseTargetConfs,
)
if err != nil {
peerLog.Errorf("unable to query fee estimator: %v", err)
return nil, fmt.Errorf("unable to estimate fee")
}
_, startingHeight, err := p.cfg.ChainIO.GetBestBlock()
if err != nil {
peerLog.Errorf("unable to obtain best block: %v", err)
return nil, fmt.Errorf("cannot obtain best block")
}
chanCloser = chancloser.NewChanCloser(
chancloser.ChanCloseCfg{
Channel: channel,
BroadcastTx: p.cfg.Wallet.PublishTransaction,
DisableChannel: func(chanPoint wire.OutPoint) error {
return p.cfg.ChanStatusMgr.RequestDisable(chanPoint, false)
},
Disconnect: func() error {
return p.cfg.DisconnectPeer(p.IdentityKey())
},
Quit: p.quit,
},
deliveryScript,
feePerKw,
uint32(startingHeight),
nil,
false,
)
p.activeChanCloses[chanID] = chanCloser
// Optimistically try a link shutdown, erroring out if it failed.
if err := p.tryLinkShutdown(chanID); err != nil {
peerLog.Errorf("failed link shutdown: %v", err)
return nil, err
}
// We'll create a valid closing state machine in order to respond to
// the initiated cooperative channel closure. First, we set the
// delivery script that our funds will be paid out to. If an upfront
// shutdown script was set, we will use it. Otherwise, we get a fresh
// delivery script.
//
// TODO: Expose option to allow upfront shutdown script from watch-only
// accounts.
deliveryScript := channel.LocalUpfrontShutdownScript()
if len(deliveryScript) == 0 {
var err error
deliveryScript, err = p.genDeliveryScript()
if err != nil {
peerLog.Errorf("unable to gen delivery script: %v",
err)
return nil, fmt.Errorf("close addr unavailable")
}
}
// In order to begin fee negotiations, we'll first compute our target
// ideal fee-per-kw.
feePerKw, err := p.cfg.FeeEstimator.EstimateFeePerKW(
p.cfg.CoopCloseTargetConfs,
)
if err != nil {
peerLog.Errorf("unable to query fee estimator: %v", err)
return nil, fmt.Errorf("unable to estimate fee")
}
chanCloser, err = p.createChanCloser(
channel, deliveryScript, feePerKw, nil, false,
)
if err != nil {
peerLog.Errorf("unable to create chan closer: %v", err)
return nil, fmt.Errorf("unable to create chan closer")
}
p.activeChanCloses[chanID] = chanCloser
return chanCloser, nil
}
@ -2522,6 +2528,118 @@ func chooseDeliveryScript(upfront,
return upfront, nil
}
// restartCoopClose checks whether we need to restart the cooperative close
// process for a given channel.
func (p *Brontide) restartCoopClose(lnChan *lnwallet.LightningChannel) (
*lnwire.Shutdown, error) {
// If this channel has status ChanStatusCoopBroadcasted and does not
// have a closing transaction, then the cooperative close process was
// started but never finished. We'll re-create the chanCloser state
// machine and resend Shutdown. BOLT#2 requires that we retransmit
// Shutdown exactly, but doing so would mean persisting the RPC
// provided close script. Instead use the LocalUpfrontShutdownScript
// or generate a script.
c := lnChan.State()
_, err := c.BroadcastedCooperative()
if err != nil && err != channeldb.ErrNoCloseTx {
// An error other than ErrNoCloseTx was encountered.
return nil, err
} else if err == nil {
// This channel has already completed the coop close
// negotiation.
return nil, nil
}
// As mentioned above, we don't re-create the delivery script.
deliveryScript := c.LocalShutdownScript
if len(deliveryScript) == 0 {
var err error
deliveryScript, err = p.genDeliveryScript()
if err != nil {
peerLog.Errorf("unable to gen delivery script: %v",
err)
return nil, fmt.Errorf("close addr unavailable")
}
}
// Compute an ideal fee.
feePerKw, err := p.cfg.FeeEstimator.EstimateFeePerKW(
p.cfg.CoopCloseTargetConfs,
)
if err != nil {
peerLog.Errorf("unable to query fee estimator: %v", err)
return nil, fmt.Errorf("unable to estimate fee")
}
// Determine whether we or the peer are the initiator of the coop
// close attempt by looking at the channel's status.
locallyInitiated := c.HasChanStatus(
channeldb.ChanStatusLocalCloseInitiator,
)
chanCloser, err := p.createChanCloser(
lnChan, deliveryScript, feePerKw, nil, locallyInitiated,
)
if err != nil {
peerLog.Errorf("unable to create chan closer: %v", err)
return nil, fmt.Errorf("unable to create chan closer")
}
// This does not need a mutex even though it is in a different
// goroutine since this is done before the channelManager goroutine is
// created.
chanID := lnwire.NewChanIDFromOutPoint(&c.FundingOutpoint)
p.activeChanCloses[chanID] = chanCloser
// Create the Shutdown message.
shutdownMsg, err := chanCloser.ShutdownChan()
if err != nil {
peerLog.Errorf("unable to create shutdown message: %v", err)
delete(p.activeChanCloses, chanID)
return nil, err
}
return shutdownMsg, nil
}
// createChanCloser constructs a ChanCloser from the passed parameters and is
// used to de-duplicate code.
func (p *Brontide) createChanCloser(channel *lnwallet.LightningChannel,
deliveryScript lnwire.DeliveryAddress, fee chainfee.SatPerKWeight,
req *htlcswitch.ChanClose,
locallyInitiated bool) (*chancloser.ChanCloser, error) {
_, startingHeight, err := p.cfg.ChainIO.GetBestBlock()
if err != nil {
peerLog.Errorf("unable to obtain best block: %v", err)
return nil, fmt.Errorf("cannot obtain best block")
}
chanCloser := chancloser.NewChanCloser(
chancloser.ChanCloseCfg{
Channel: channel,
BroadcastTx: p.cfg.Wallet.PublishTransaction,
DisableChannel: func(op wire.OutPoint) error {
return p.cfg.ChanStatusMgr.RequestDisable(
op, false,
)
},
Disconnect: func() error {
return p.cfg.DisconnectPeer(p.IdentityKey())
},
Quit: p.quit,
},
deliveryScript,
fee,
uint32(startingHeight),
req,
locallyInitiated,
)
return chanCloser, nil
}
// handleLocalCloseReq kicks-off the workflow to execute a cooperative or
// forced unilateral closure of the channel initiated by a local subsystem.
func (p *Brontide) handleLocalCloseReq(req *htlcswitch.ChanClose) {
@ -2542,7 +2660,6 @@ func (p *Brontide) handleLocalCloseReq(req *htlcswitch.ChanClose) {
}
switch req.CloseType {
// A type of CloseRegular indicates that the user has opted to close
// out this channel on-chain, so we execute the cooperative channel
// closure workflow.
@ -2575,15 +2692,6 @@ func (p *Brontide) handleLocalCloseReq(req *htlcswitch.ChanClose) {
}
}
// Next, we'll create a new channel closer state machine to
// handle the close negotiation.
_, startingHeight, err := p.cfg.ChainIO.GetBestBlock()
if err != nil {
peerLog.Errorf(err.Error())
req.Err <- err
return
}
// Optimistically try a link shutdown, erroring out if it
// failed.
if err := p.tryLinkShutdown(chanID); err != nil {
@ -2592,24 +2700,15 @@ func (p *Brontide) handleLocalCloseReq(req *htlcswitch.ChanClose) {
return
}
chanCloser := chancloser.NewChanCloser(
chancloser.ChanCloseCfg{
Channel: channel,
BroadcastTx: p.cfg.Wallet.PublishTransaction,
DisableChannel: func(chanPoint wire.OutPoint) error {
return p.cfg.ChanStatusMgr.RequestDisable(chanPoint, false)
},
Disconnect: func() error {
return p.cfg.DisconnectPeer(p.IdentityKey())
},
Quit: p.quit,
},
deliveryScript,
req.TargetFeePerKw,
uint32(startingHeight),
req,
true,
chanCloser, err := p.createChanCloser(
channel, deliveryScript, req.TargetFeePerKw, req, true,
)
if err != nil {
peerLog.Errorf(err.Error())
req.Err <- err
return
}
p.activeChanCloses[chanID] = chanCloser
// Finally, we'll initiate the channel shutdown within the
@ -2783,6 +2882,10 @@ func (p *Brontide) finalizeChanClosure(chanCloser *chancloser.ChanCloser) {
chanPoint := chanCloser.Channel().ChannelPoint()
p.WipeChannel(chanPoint)
// Also clear the activeChanCloses map of this channel.
cid := lnwire.NewChanIDFromOutPoint(chanPoint)
delete(p.activeChanCloses, cid)
// Next, we'll launch a goroutine which will request to be notified by
// the ChainNotifier once the closure transaction obtains a single
// confirmation.
@ -2816,7 +2919,6 @@ func (p *Brontide) finalizeChanClosure(chanCloser *chancloser.ChanCloser) {
go WaitForChanToClose(chanCloser.NegotiationHeight(), notifier, errChan,
chanPoint, &closingTxid, closingTx.TxOut[0].PkScript, func() {
// Respond to the local subsystem which requested the
// channel closure.
if closeReq != nil {