Merge pull request #8167 from ProofOfKeags/bugfix/htlc-flush-shutdown

Bugfix/htlc flush shutdown
This commit is contained in:
Olaoluwa Osuntokun 2024-01-23 18:49:47 -08:00 committed by GitHub
commit 1caca81ba1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
25 changed files with 4378 additions and 3672 deletions

View File

@ -921,6 +921,10 @@ func executeChannelClose(ctxc context.Context, client lnrpc.LightningClient,
}
switch update := resp.Update.(type) {
case *lnrpc.CloseStatusUpdate_CloseInstant:
if req.NoWait {
return nil
}
case *lnrpc.CloseStatusUpdate_ClosePending:
closingHash := update.ClosePending.Txid
txid, err := chainhash.NewHash(closingHash)

View File

@ -68,6 +68,11 @@
* [Properly handle un-acked updates for exhausted watchtower
sessions](https://github.com/lightningnetwork/lnd/pull/8233)
* [Allow `shutdown`s while HTLCs are in-flight](https://github.com/lightningnetwork/lnd/pull/8167).
This change fixes an issue where we would force-close channels when receiving
a `shutdown` message if there were currently HTLCs on the channel. After this
change, the shutdown procedure should be compliant with BOLT2 requirements.
# New Features
## Functional Enhancements
@ -126,6 +131,12 @@
file](https://github.com/lightningnetwork/lnd/pull/8188). The corresponding
`lncli getdebuginfo` command was also added.
* Add a [new flag](https://github.com/lightningnetwork/lnd/pull/8167) to the
`CloseChannel` RPC method that instructs the client to not wait for the
closing transaction to be negotiated. This should be used if you don't care
about the txid and don't want the calling code to block while the channel
drains the active HTLCs.
## lncli Additions
* Deprecate `bumpclosefee` for `bumpforceclosefee` to accommodate for the fact

2
go.mod
View File

@ -39,7 +39,7 @@ require (
github.com/lightningnetwork/lightning-onion v1.2.1-0.20230823005744-06182b1d7d2f
github.com/lightningnetwork/lnd/cert v1.2.2
github.com/lightningnetwork/lnd/clock v1.1.1
github.com/lightningnetwork/lnd/fn v1.0.0
github.com/lightningnetwork/lnd/fn v1.0.1
github.com/lightningnetwork/lnd/healthcheck v1.2.3
github.com/lightningnetwork/lnd/kvdb v1.4.4
github.com/lightningnetwork/lnd/queue v1.1.1

4
go.sum
View File

@ -449,8 +449,8 @@ github.com/lightningnetwork/lnd/cert v1.2.2/go.mod h1:jQmFn/Ez4zhDgq2hnYSw8r35bq
github.com/lightningnetwork/lnd/clock v1.0.1/go.mod h1:KnQudQ6w0IAMZi1SgvecLZQZ43ra2vpDNj7H/aasemg=
github.com/lightningnetwork/lnd/clock v1.1.1 h1:OfR3/zcJd2RhH0RU+zX/77c0ZiOnIMsDIBjgjWdZgA0=
github.com/lightningnetwork/lnd/clock v1.1.1/go.mod h1:mGnAhPyjYZQJmebS7aevElXKTFDuO+uNFFfMXK1W8xQ=
github.com/lightningnetwork/lnd/fn v1.0.0 h1:I5VG9AD63mOQ89RMQEu7HRI1r68wn8yz539LoylUIKM=
github.com/lightningnetwork/lnd/fn v1.0.0/go.mod h1:XV+0vBXSnh3aUjskJUv58TOpsveiXQ+ac8rEnXZDGFc=
github.com/lightningnetwork/lnd/fn v1.0.1 h1:4nAxKpGKgk4/xRQKxvim3BW0QM34S4BH6QghWZVjsko=
github.com/lightningnetwork/lnd/fn v1.0.1/go.mod h1:XV+0vBXSnh3aUjskJUv58TOpsveiXQ+ac8rEnXZDGFc=
github.com/lightningnetwork/lnd/healthcheck v1.2.3 h1:oqhOOy8WmIEa6RBkYKC0mmYZkhl8T2kGD97n9jpML8o=
github.com/lightningnetwork/lnd/healthcheck v1.2.3/go.mod h1:eDxH3dEwV9DeBW/6inrmlVh1qBOFV0AI14EEPnGt9gc=
github.com/lightningnetwork/lnd/kvdb v1.4.4 h1:bCv63rVCvzqj1BkagN/EWTov6NDDgYEG/t0z2HepRMk=

View File

@ -134,12 +134,55 @@ type ChannelUpdateHandler interface {
// parameter.
MayAddOutgoingHtlc(lnwire.MilliSatoshi) error
// ShutdownIfChannelClean shuts the link down if the channel state is
// clean. This can be used with dynamic commitment negotiation or coop
// close negotiation which require a clean channel state.
ShutdownIfChannelClean() error
// EnableAdds sets the ChannelUpdateHandler state to allow
// UpdateAddHtlc's in the specified direction. It returns an error if
// the state already allowed those adds.
EnableAdds(direction LinkDirection) error
// DiableAdds sets the ChannelUpdateHandler state to allow
// UpdateAddHtlc's in the specified direction. It returns an error if
// the state already disallowed those adds.
DisableAdds(direction LinkDirection) error
// IsFlushing returns true when UpdateAddHtlc's are disabled in the
// direction of the argument.
IsFlushing(direction LinkDirection) bool
// OnFlushedOnce adds a hook that will be called the next time the
// channel state reaches zero htlcs. This hook will only ever be called
// once. If the channel state already has zero htlcs, then this will be
// called immediately.
OnFlushedOnce(func())
// OnCommitOnce adds a hook that will be called the next time a
// CommitSig message is sent in the argument's LinkDirection. This hook
// will only ever be called once. If no CommitSig is owed in the
// argument's LinkDirection, then we will call this hook immediately.
OnCommitOnce(LinkDirection, func())
}
// CommitHookID is a value that is used to uniquely identify hooks in the
// ChannelUpdateHandler's commitment update lifecycle. You should never need to
// construct one of these by hand, nor should you try.
type CommitHookID uint64
// FlushHookID is a value that is used to uniquely identify hooks in the
// ChannelUpdateHandler's flush lifecycle. You should never need to construct
// one of these by hand, nor should you try.
type FlushHookID uint64
// LinkDirection is used to query and change any link state on a per-direction
// basis.
type LinkDirection bool
const (
// Incoming is the direction from the remote peer to our node.
Incoming LinkDirection = false
// Outgoing is the direction from our node to the remote peer.
Outgoing LinkDirection = true
)
// ChannelLink is an interface which represents the subsystem for managing the
// incoming htlc requests, applying the changes to the channel, and also
// propagating/forwarding it to htlc switch.

View File

@ -273,13 +273,6 @@ type ChannelLinkConfig struct {
GetAliases func(base lnwire.ShortChannelID) []lnwire.ShortChannelID
}
// shutdownReq contains an error channel that will be used by the channelLink
// to send an error if shutdown failed. If shutdown succeeded, the channel will
// be closed.
type shutdownReq struct {
err chan error
}
// channelLink is the service which drives a channel's commitment update
// state-machine. In the event that an HTLC needs to be propagated to another
// link, the forward handler from config is used which sends HTLC to the
@ -340,10 +333,6 @@ type channelLink struct {
// by the HTLC switch.
downstream chan *htlcPacket
// shutdownRequest is a channel that the channelLink will listen on to
// service shutdown requests from ShutdownIfChannelClean calls.
shutdownRequest chan *shutdownReq
// updateFeeTimer is the timer responsible for updating the link's
// commitment fee every time it fires.
updateFeeTimer *time.Timer
@ -367,10 +356,80 @@ type channelLink struct {
// log is a link-specific logging instance.
log btclog.Logger
// isOutgoingAddBlocked tracks whether the channelLink can send an
// UpdateAddHTLC.
isOutgoingAddBlocked atomic.Bool
// isIncomingAddBlocked tracks whether the channelLink can receive an
// UpdateAddHTLC.
isIncomingAddBlocked atomic.Bool
// flushHooks is a hookMap that is triggered when we reach a channel
// state with no live HTLCs.
flushHooks hookMap
// outgoingCommitHooks is a hookMap that is triggered after we send our
// next CommitSig.
outgoingCommitHooks hookMap
// incomingCommitHooks is a hookMap that is triggered after we receive
// our next CommitSig.
incomingCommitHooks hookMap
wg sync.WaitGroup
quit chan struct{}
}
// hookMap is a data structure that is used to track the hooks that need to be
// called in various parts of the channelLink's lifecycle.
//
// WARNING: NOT thread-safe.
type hookMap struct {
// allocIdx keeps track of the next id we haven't yet allocated.
allocIdx atomic.Uint64
// transient is a map of hooks that are only called the next time invoke
// is called. These hooks are deleted during invoke.
transient map[uint64]func()
// newTransients is a channel that we use to accept new hooks into the
// hookMap.
newTransients chan func()
}
// newHookMap initializes a new empty hookMap.
func newHookMap() hookMap {
return hookMap{
allocIdx: atomic.Uint64{},
transient: make(map[uint64]func()),
newTransients: make(chan func()),
}
}
// alloc allocates space in the hook map for the supplied hook, the second
// argument determines whether it goes into the transient or persistent part
// of the hookMap.
func (m *hookMap) alloc(hook func()) uint64 {
// We assume we never overflow a uint64. Seems OK.
hookID := m.allocIdx.Add(1)
if hookID == 0 {
panic("hookMap allocIdx overflow")
}
m.transient[hookID] = hook
return hookID
}
// invoke is used on a hook map to call all the registered hooks and then clear
// out the transient hooks so they are not called again.
func (m *hookMap) invoke() {
for _, hook := range m.transient {
hook()
}
m.transient = make(map[uint64]func())
}
// hodlHtlc contains htlc data that is required for resolution.
type hodlHtlc struct {
pd *lnwallet.PaymentDescriptor
@ -385,14 +444,16 @@ func NewChannelLink(cfg ChannelLinkConfig,
logPrefix := fmt.Sprintf("ChannelLink(%v):", channel.ChannelPoint())
return &channelLink{
cfg: cfg,
channel: channel,
shortChanID: channel.ShortChanID(),
shutdownRequest: make(chan *shutdownReq),
hodlMap: make(map[models.CircuitKey]hodlHtlc),
hodlQueue: queue.NewConcurrentQueue(10),
log: build.NewPrefixLog(logPrefix, log),
quit: make(chan struct{}),
cfg: cfg,
channel: channel,
shortChanID: channel.ShortChanID(),
hodlMap: make(map[models.CircuitKey]hodlHtlc),
hodlQueue: queue.NewConcurrentQueue(10),
log: build.NewPrefixLog(logPrefix, log),
flushHooks: newHookMap(),
outgoingCommitHooks: newHookMap(),
incomingCommitHooks: newHookMap(),
quit: make(chan struct{}),
}
}
@ -538,15 +599,101 @@ func (l *channelLink) WaitForShutdown() {
// EligibleToForward returns a bool indicating if the channel is able to
// actively accept requests to forward HTLC's. We're able to forward HTLC's if
// we know the remote party's next revocation point. Otherwise, we can't
// initiate new channel state. We also require that the short channel ID not be
// the all-zero source ID, meaning that the channel has had its ID finalized.
// we are eligible to update AND the channel isn't currently flushing the
// outgoing half of the channel.
func (l *channelLink) EligibleToForward() bool {
return l.EligibleToUpdate() &&
!l.IsFlushing(Outgoing)
}
// EligibleToUpdate returns a bool indicating if the channel is able to update
// channel state. We're able to update channel state if we know the remote
// party's next revocation point. Otherwise, we can't initiate new channel
// state. We also require that the short channel ID not be the all-zero source
// ID, meaning that the channel has had its ID finalized.
func (l *channelLink) EligibleToUpdate() bool {
return l.channel.RemoteNextRevocation() != nil &&
l.ShortChanID() != hop.Source &&
l.isReestablished()
}
// EnableAdds sets the ChannelUpdateHandler state to allow UpdateAddHtlc's in
// the specified direction. It returns an error if the state already allowed
// those adds.
func (l *channelLink) EnableAdds(linkDirection LinkDirection) error {
if linkDirection == Outgoing {
if !l.isOutgoingAddBlocked.Swap(false) {
return errors.New("outgoing adds already enabled")
}
}
if linkDirection == Incoming {
if !l.isIncomingAddBlocked.Swap(false) {
return errors.New("incoming adds already enabled")
}
}
return nil
}
// DiableAdds sets the ChannelUpdateHandler state to allow UpdateAddHtlc's in
// the specified direction. It returns an error if the state already disallowed
// those adds.
func (l *channelLink) DisableAdds(linkDirection LinkDirection) error {
if linkDirection == Outgoing {
if l.isOutgoingAddBlocked.Swap(true) {
return errors.New("outgoing adds already disabled")
}
}
if linkDirection == Incoming {
if l.isIncomingAddBlocked.Swap(true) {
return errors.New("incoming adds already disabled")
}
}
return nil
}
// IsFlushing returns true when UpdateAddHtlc's are disabled in the direction of
// the argument.
func (l *channelLink) IsFlushing(linkDirection LinkDirection) bool {
if linkDirection == Outgoing {
return l.isOutgoingAddBlocked.Load()
}
return l.isIncomingAddBlocked.Load()
}
// OnFlushedOnce adds a hook that will be called the next time the channel
// state reaches zero htlcs. This hook will only ever be called once. If the
// channel state already has zero htlcs, then this will be called immediately.
func (l *channelLink) OnFlushedOnce(hook func()) {
select {
case l.flushHooks.newTransients <- hook:
case <-l.quit:
}
}
// OnCommitOnce adds a hook that will be called the next time a CommitSig
// message is sent in the argument's LinkDirection. This hook will only ever be
// called once. If no CommitSig is owed in the argument's LinkDirection, then
// we will call this hook be run immediately.
func (l *channelLink) OnCommitOnce(direction LinkDirection, hook func()) {
var queue chan func()
if direction == Outgoing {
queue = l.outgoingCommitHooks.newTransients
} else {
queue = l.incomingCommitHooks.newTransients
}
select {
case queue <- hook:
case <-l.quit:
}
}
// isReestablished returns true if the link has successfully completed the
// channel reestablishment dance.
func (l *channelLink) isReestablished() bool {
@ -1135,6 +1282,33 @@ func (l *channelLink) htlcManager() {
}
select {
// We have a new hook that needs to be run when we reach a clean
// channel state.
case hook := <-l.flushHooks.newTransients:
if l.channel.IsChannelClean() {
hook()
} else {
l.flushHooks.alloc(hook)
}
// We have a new hook that needs to be run when we have
// committed all of our updates.
case hook := <-l.outgoingCommitHooks.newTransients:
if !l.channel.OweCommitment() {
hook()
} else {
l.outgoingCommitHooks.alloc(hook)
}
// We have a new hook that needs to be run when our peer has
// committed all of their updates.
case hook := <-l.incomingCommitHooks.newTransients:
if !l.channel.NeedCommitment() {
hook()
} else {
l.incomingCommitHooks.alloc(hook)
}
// Our update fee timer has fired, so we'll check the network
// fee to see if we should adjust our commitment fee.
case <-l.updateFeeTimer.C:
@ -1260,24 +1434,6 @@ func (l *channelLink) htlcManager() {
)
}
case req := <-l.shutdownRequest:
// If the channel is clean, we send nil on the err chan
// and return to prevent the htlcManager goroutine from
// processing any more updates. The full link shutdown
// will be triggered by RemoveLink in the peer.
if l.channel.IsChannelClean() {
req.err <- nil
return
}
l.log.Infof("Channel is in an unclean state " +
"(lingering updates), graceful shutdown of " +
"channel link not possible")
// Otherwise, the channel has lingering updates, send
// an error and continue.
req.err <- ErrLinkFailedShutdown
case <-l.quit:
return
}
@ -1409,6 +1565,17 @@ func (l *channelLink) handleDownstreamUpdateAdd(pkt *htlcPacket) error {
return errors.New("not an UpdateAddHTLC packet")
}
// If we are flushing the link in the outgoing direction we can't add
// new htlcs to the link and we need to bounce it
if l.IsFlushing(Outgoing) {
l.mailBox.FailAdd(pkt)
return NewDetailedLinkError(
&lnwire.FailPermanentChannelFailure{},
OutgoingFailureLinkNotEligible,
)
}
// If hodl.AddOutgoing mode is active, we exit early to simulate
// arbitrary delays between the switch adding an ADD to the
// mailbox, and the HTLC being added to the commitment state.
@ -1727,6 +1894,39 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
switch msg := msg.(type) {
case *lnwire.UpdateAddHTLC:
if l.IsFlushing(Incoming) {
// This is forbidden by the protocol specification.
// The best chance we have to deal with this is to drop
// the connection. This should roll back the channel
// state to the last CommitSig. If the remote has
// already sent a CommitSig we haven't received yet,
// channel state will be re-synchronized with a
// ChannelReestablish message upon reconnection and the
// protocol state that caused us to flush the link will
// be rolled back. In the event that there was some
// non-deterministic behavior in the remote that caused
// them to violate the protocol, we have a decent shot
// at correcting it this way, since reconnecting will
// put us in the cleanest possible state to try again.
//
// In addition to the above, it is possible for us to
// hit this case in situations where we improperly
// handle message ordering due to concurrency choices.
// An issue has been filed to address this here:
// https://github.com/lightningnetwork/lnd/issues/8393
l.fail(
LinkFailureError{
code: ErrInvalidUpdate,
FailureAction: LinkFailureDisconnect,
PermanentFailure: false,
Warning: true,
},
"received add while link is flushing",
)
return
}
// We just received an add request from an upstream peer, so we
// add it to our state machine, then add the HTLC to our
// "settle" list in the event that we know the preimage.
@ -1976,6 +2176,13 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
)
return
}
// As soon as we are ready to send our next revocation, we can
// invoke the incoming commit hooks.
l.RWMutex.Lock()
l.incomingCommitHooks.invoke()
l.RWMutex.Unlock()
l.cfg.Peer.SendMessage(false, nextRevocation)
// Notify the incoming htlcs of which the resolutions were
@ -2013,19 +2220,26 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
default:
}
// If both commitment chains are fully synced from our PoV,
// then we don't need to reply with a signature as both sides
// already have a commitment with the latest accepted.
if !l.channel.OweCommitment() {
return
// If the remote party initiated the state transition,
// we'll reply with a signature to provide them with their
// version of the latest commitment. Otherwise, both commitment
// chains are fully synced from our PoV, then we don't need to
// reply with a signature as both sides already have a
// commitment with the latest accepted.
if l.channel.OweCommitment() {
if !l.updateCommitTxOrFail() {
return
}
}
// Otherwise, the remote party initiated the state transition,
// so we'll reply with a signature to provide them with their
// version of the latest commitment.
if !l.updateCommitTxOrFail() {
return
// Now that we have finished processing the incoming CommitSig
// and sent out our RevokeAndAck, we invoke the flushHooks if
// the channel state is clean.
l.RWMutex.Lock()
if l.channel.IsChannelClean() {
l.flushHooks.invoke()
}
l.RWMutex.Unlock()
case *lnwire.RevokeAndAck:
// We've received a revocation from the remote chain, if valid,
@ -2108,6 +2322,14 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
}
}
// Now that we have finished processing the RevokeAndAck, we
// can invoke the flushHooks if the channel state is clean.
l.RWMutex.Lock()
if l.channel.IsChannelClean() {
l.flushHooks.invoke()
}
l.RWMutex.Unlock()
case *lnwire.UpdateFee:
// We received fee update from peer. If we are the initiator we
// will fail the channel, if not we will apply the update.
@ -2316,6 +2538,12 @@ func (l *channelLink) updateCommitTx() error {
}
l.cfg.Peer.SendMessage(false, commitSig)
// Now that we have sent out a new CommitSig, we invoke the outgoing set
// of commit hooks.
l.RWMutex.Lock()
l.outgoingCommitHooks.invoke()
l.RWMutex.Unlock()
return nil
}
@ -2759,29 +2987,9 @@ func (l *channelLink) HandleChannelUpdate(message lnwire.Message) {
default:
}
l.mailBox.AddMessage(message)
}
// ShutdownIfChannelClean triggers a link shutdown if the channel is in a clean
// state and errors if the channel has lingering updates.
//
// NOTE: Part of the ChannelUpdateHandler interface.
func (l *channelLink) ShutdownIfChannelClean() error {
errChan := make(chan error, 1)
select {
case l.shutdownRequest <- &shutdownReq{
err: errChan,
}:
case <-l.quit:
return ErrLinkShuttingDown
}
select {
case err := <-errChan:
return err
case <-l.quit:
return ErrLinkShuttingDown
err := l.mailBox.AddMessage(message)
if err != nil {
l.log.Errorf("failed to add Message to mailbox: %v", err)
}
}
@ -2792,7 +3000,7 @@ func (l *channelLink) updateChannelFee(feePerKw chainfee.SatPerKWeight) error {
// We skip sending the UpdateFee message if the channel is not
// currently eligible to forward messages.
if !l.EligibleToForward() {
if !l.EligibleToUpdate() {
l.log.Debugf("skipping fee update for inactive channel")
return nil
}

View File

@ -8,6 +8,7 @@ import (
"encoding/binary"
"fmt"
"io"
prand "math/rand"
"net"
"reflect"
"runtime"
@ -6665,94 +6666,6 @@ func TestPendingCommitTicker(t *testing.T) {
}
}
// TestShutdownIfChannelClean tests that a link will exit the htlcManager loop
// if and only if the underlying channel state is clean.
func TestShutdownIfChannelClean(t *testing.T) {
t.Parallel()
const chanAmt = btcutil.SatoshiPerBitcoin * 5
const chanReserve = btcutil.SatoshiPerBitcoin * 1
aliceLink, bobChannel, batchTicker, start, _, err :=
newSingleLinkTestHarness(t, chanAmt, chanReserve)
require.NoError(t, err)
var (
coreLink = aliceLink.(*channelLink)
aliceMsgs = coreLink.cfg.Peer.(*mockPeer).sentMsgs
)
shutdownAssert := func(expectedErr error) {
err = aliceLink.ShutdownIfChannelClean()
if expectedErr != nil {
require.Error(t, err, expectedErr)
} else {
require.NoError(t, err)
}
}
err = start()
require.NoError(t, err)
ctx := linkTestContext{
t: t,
aliceLink: aliceLink,
bobChannel: bobChannel,
aliceMsgs: aliceMsgs,
}
// First send an HTLC from Bob to Alice and assert that the link can't
// be shutdown while the update is outstanding.
htlc := generateHtlc(t, coreLink, 0)
// <---add-----
ctx.sendHtlcBobToAlice(htlc)
// <---sig-----
ctx.sendCommitSigBobToAlice(1)
// ----rev---->
ctx.receiveRevAndAckAliceToBob()
shutdownAssert(ErrLinkFailedShutdown)
// ----sig---->
ctx.receiveCommitSigAliceToBob(1)
shutdownAssert(ErrLinkFailedShutdown)
// <---rev-----
ctx.sendRevAndAckBobToAlice()
shutdownAssert(ErrLinkFailedShutdown)
// ---settle-->
ctx.receiveSettleAliceToBob()
shutdownAssert(ErrLinkFailedShutdown)
// ----sig---->
ctx.receiveCommitSigAliceToBob(0)
shutdownAssert(ErrLinkFailedShutdown)
// <---rev-----
ctx.sendRevAndAckBobToAlice()
shutdownAssert(ErrLinkFailedShutdown)
// There is currently no controllable breakpoint between Alice
// receiving the CommitSig and her sending out the RevokeAndAck. As
// soon as the RevokeAndAck is generated, the channel becomes clean.
// This can happen right after the CommitSig is received, so there is
// no shutdown assertion here.
// <---sig-----
ctx.sendCommitSigBobToAlice(0)
// ----rev---->
ctx.receiveRevAndAckAliceToBob()
shutdownAssert(nil)
// Now that the link has exited the htlcManager loop, attempt to
// trigger the batch ticker. It should not be possible.
select {
case batchTicker <- time.Now():
t.Fatalf("expected batch ticker to be inactive")
case <-time.After(5 * time.Second):
}
}
// TestPipelineSettle tests that a link should only pipeline a settle if the
// related add is fully locked-in meaning it is on both sides' commitment txns.
func TestPipelineSettle(t *testing.T) {
@ -7041,3 +6954,207 @@ func TestChannelLinkShortFailureRelay(t *testing.T) {
default:
}
}
// TestLinkFlushApiDirectionIsolation tests whether the calls to EnableAdds and
// DisableAdds are correctly isolated based off of direction (Incoming and
// Outgoing). This means that the state of the Outgoing flush should always be
// unaffected by calls to EnableAdds/DisableAdds on the Incoming direction and
// vice-versa.
func TestLinkFlushApiDirectionIsolation(t *testing.T) {
aliceLink, _, _, _, _, _ :=
newSingleLinkTestHarness(
t, 5*btcutil.SatoshiPerBitcoin,
1*btcutil.SatoshiPerBitcoin,
)
for i := 0; i < 10; i++ {
if prand.Uint64()%2 == 0 {
//nolint:errcheck
aliceLink.EnableAdds(Outgoing)
require.False(t, aliceLink.IsFlushing(Outgoing))
} else {
//nolint:errcheck
aliceLink.DisableAdds(Outgoing)
require.True(t, aliceLink.IsFlushing(Outgoing))
}
require.False(t, aliceLink.IsFlushing(Incoming))
}
//nolint:errcheck
aliceLink.EnableAdds(Outgoing)
for i := 0; i < 10; i++ {
if prand.Uint64()%2 == 0 {
//nolint:errcheck
aliceLink.EnableAdds(Incoming)
require.False(t, aliceLink.IsFlushing(Incoming))
} else {
//nolint:errcheck
aliceLink.DisableAdds(Incoming)
require.True(t, aliceLink.IsFlushing(Incoming))
}
require.False(t, aliceLink.IsFlushing(Outgoing))
}
}
// TestLinkFlushApiGateStateIdempotence tests whether successive calls to
// EnableAdds or DisableAdds (without the other one in between) result in both
// no state change in the flush state AND that the second call results in an
// error (to inform the caller that the call was unnecessary in case it implies
// a bug in their logic).
func TestLinkFlushApiGateStateIdempotence(t *testing.T) {
aliceLink, _, _, _, _, _ :=
newSingleLinkTestHarness(
t, 5*btcutil.SatoshiPerBitcoin,
1*btcutil.SatoshiPerBitcoin,
)
for _, dir := range []LinkDirection{Incoming, Outgoing} {
require.Nil(t, aliceLink.DisableAdds(dir))
require.True(t, aliceLink.IsFlushing(dir))
require.NotNil(t, aliceLink.DisableAdds(dir))
require.True(t, aliceLink.IsFlushing(dir))
require.Nil(t, aliceLink.EnableAdds(dir))
require.False(t, aliceLink.IsFlushing(dir))
require.NotNil(t, aliceLink.EnableAdds(dir))
require.False(t, aliceLink.IsFlushing(dir))
}
}
func TestLinkOutgoingCommitHooksCalled(t *testing.T) {
aliceLink, bobChannel, batchTicker, start, _, err :=
newSingleLinkTestHarness(
t, 5*btcutil.SatoshiPerBitcoin,
btcutil.SatoshiPerBitcoin,
)
require.NoError(t, err)
require.NoError(t, start(), "could not start link")
hookCalled := make(chan struct{})
//nolint:forcetypeassert
aliceMsgs := aliceLink.(*channelLink).cfg.Peer.(*mockPeer).sentMsgs
ctx := linkTestContext{
t: t,
aliceLink: aliceLink,
bobChannel: bobChannel,
aliceMsgs: aliceMsgs,
}
// Set up a pending HTLC on the link.
//nolint:forcetypeassert
htlc := generateHtlc(t, aliceLink.(*channelLink), 0)
ctx.sendHtlcAliceToBob(0, htlc)
ctx.receiveHtlcAliceToBob()
aliceLink.OnCommitOnce(Outgoing, func() {
close(hookCalled)
})
select {
case <-hookCalled:
t.Fatal("hook called prematurely")
case <-time.NewTimer(time.Second).C:
}
batchTicker <- time.Now()
// Send a second tick just to ensure the hook isn't called more than
// once.
batchTicker <- time.Now()
select {
case <-hookCalled:
case <-time.NewTimer(time.Second).C:
t.Fatal("hook not called")
}
}
func TestLinkFlushHooksCalled(t *testing.T) {
aliceLink, bobChannel, _, start, _, err :=
newSingleLinkTestHarness(
t, 5*btcutil.SatoshiPerBitcoin,
btcutil.SatoshiPerBitcoin,
)
require.NoError(t, err)
require.NoError(t, start(), "could not start link")
//nolint:forcetypeassert
aliceMsgs := aliceLink.(*channelLink).cfg.Peer.(*mockPeer).sentMsgs
ctx := linkTestContext{
t: t,
aliceLink: aliceLink,
bobChannel: bobChannel,
aliceMsgs: aliceMsgs,
}
hookCalled := make(chan struct{})
assertHookCalled := func(shouldBeCalled bool) {
select {
case <-hookCalled:
require.True(
t, shouldBeCalled, "hook called prematurely",
)
case <-time.NewTimer(time.Millisecond).C:
require.False(t, shouldBeCalled, "hook not called")
}
}
//nolint:forcetypeassert
htlc := generateHtlc(t, aliceLink.(*channelLink), 0)
// A <- add -- B
ctx.sendHtlcBobToAlice(htlc)
// A <- sig -- B
ctx.sendCommitSigBobToAlice(1)
// A -- rev -> B
ctx.receiveRevAndAckAliceToBob()
// Register flush hook
aliceLink.OnFlushedOnce(func() {
close(hookCalled)
})
// Channel is not clean, hook should not be called
assertHookCalled(false)
// A -- sig -> B
ctx.receiveCommitSigAliceToBob(1)
assertHookCalled(false)
// A <- rev -- B
ctx.sendRevAndAckBobToAlice()
assertHookCalled(false)
// A -- set -> B
ctx.receiveSettleAliceToBob()
assertHookCalled(false)
// A -- sig -> B
ctx.receiveCommitSigAliceToBob(0)
assertHookCalled(false)
// A <- rev -- B
ctx.sendRevAndAckBobToAlice()
assertHookCalled(false)
// A <- sig -- B
ctx.sendCommitSigBobToAlice(0)
// since there is no pause point between alice receiving CommitSig and
// sending RevokeAndAck, we don't assert the hook hasn't been called
// here.
// A -- rev -> B
ctx.receiveRevAndAckAliceToBob()
assertHookCalled(true)
}

View File

@ -899,7 +899,6 @@ func (f *mockChannelLink) ChannelPoint() *wire.OutPoint { return
func (f *mockChannelLink) Stop() {}
func (f *mockChannelLink) EligibleToForward() bool { return f.eligible }
func (f *mockChannelLink) MayAddOutgoingHtlc(lnwire.MilliSatoshi) error { return nil }
func (f *mockChannelLink) ShutdownIfChannelClean() error { return nil }
func (f *mockChannelLink) setLiveShortChanID(sid lnwire.ShortChannelID) { f.shortChanID = sid }
func (f *mockChannelLink) IsUnadvertised() bool { return f.unadvertised }
func (f *mockChannelLink) UpdateShortChanID() (lnwire.ShortChannelID, error) {
@ -907,6 +906,25 @@ func (f *mockChannelLink) UpdateShortChanID() (lnwire.ShortChannelID, error) {
return f.shortChanID, nil
}
func (f *mockChannelLink) EnableAdds(linkDirection LinkDirection) error {
// TODO(proofofkeags): Implement
return nil
}
func (f *mockChannelLink) DisableAdds(linkDirection LinkDirection) error {
// TODO(proofofkeags): Implement
return nil
}
func (f *mockChannelLink) IsFlushing(linkDirection LinkDirection) bool {
// TODO(proofofkeags): Implement
return false
}
func (f *mockChannelLink) OnFlushedOnce(func()) {
// TODO(proofofkeags): Implement
}
func (f *mockChannelLink) OnCommitOnce(LinkDirection, func()) {
// TODO(proofofkeags): Implement
}
var _ ChannelLink = (*mockChannelLink)(nil)
func newDB() (*channeldb.DB, func(), error) {

View File

@ -566,4 +566,8 @@ var allTestCases = []*lntest.TestCase{
Name: "fail funding flow psbt",
TestFunc: testPsbtChanFundingFailFlow,
},
{
Name: "coop close with htlcs",
TestFunc: testCoopCloseWithHtlcs,
},
}

View File

@ -0,0 +1,103 @@
package itest
import (
"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnrpc/invoicesrpc"
"github.com/lightningnetwork/lnd/lnrpc/routerrpc"
"github.com/lightningnetwork/lnd/lntest"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/stretchr/testify/require"
)
// testCoopCloseWithHtlcs tests whether or not we can successfully issue a coop
// close request whilt there are still active htlcs on the link. Here we will
// set up an HODL invoice to suspend settlement. Then we will attempt to close
// the channel which should appear as a noop for the time being. Then we will
// have the receiver settle the invoice and observe that the channel gets torn
// down after settlement.
func testCoopCloseWithHtlcs(ht *lntest.HarnessTest) {
alice, bob := ht.Alice, ht.Bob
// Here we set up a channel between Alice and Bob, beginning with a
// balance on Bob's side.
chanPoint := ht.OpenChannel(bob, alice, lntest.OpenChannelParams{
Amt: btcutil.Amount(1000000),
})
// Wait for Bob to understand that the channel is ready to use.
ht.AssertTopologyChannelOpen(bob, chanPoint)
// Here we set things up so that Alice generates a HODL invoice so we
// can test whether the shutdown is deferred until the settlement of
// that invoice.
payAmt := btcutil.Amount(4)
var preimage lntypes.Preimage
copy(preimage[:], ht.Random32Bytes())
payHash := preimage.Hash()
invoiceReq := &invoicesrpc.AddHoldInvoiceRequest{
Memo: "testing close",
Value: int64(payAmt),
Hash: payHash[:],
}
resp := alice.RPC.AddHoldInvoice(invoiceReq)
invoiceStream := alice.RPC.SubscribeSingleInvoice(payHash[:])
// Here we wait for the invoice to be open and payable.
ht.AssertInvoiceState(invoiceStream, lnrpc.Invoice_OPEN)
// Now that the invoice is ready to be paid, let's have Bob open an
// HTLC for it.
req := &routerrpc.SendPaymentRequest{
PaymentRequest: resp.PaymentRequest,
TimeoutSeconds: 60,
FeeLimitSat: 1000000,
}
ht.SendPaymentAndAssertStatus(bob, req, lnrpc.Payment_IN_FLIGHT)
ht.AssertNumActiveHtlcs(bob, 1)
// Assert at this point that the HTLC is open but not yet settled.
ht.AssertInvoiceState(invoiceStream, lnrpc.Invoice_ACCEPTED)
// Have alice attempt to close the channel.
closeClient := alice.RPC.CloseChannel(&lnrpc.CloseChannelRequest{
ChannelPoint: chanPoint,
NoWait: true,
})
ht.AssertChannelInactive(bob, chanPoint)
// Now that the channel is inactive we can be certain that the deferred
// closure is set up. Let's settle the invoice.
alice.RPC.SettleInvoice(preimage[:])
// Pull the instant update off the wire to clear the path for the
// close pending update.
_, err := closeClient.Recv()
require.NoError(ht, err)
// Wait for the next channel closure update. Now that we have settled
// the only HTLC this should be imminent.
update, err := closeClient.Recv()
require.NoError(ht, err)
// This next update should be a GetClosePending as it should be the
// negotiation of the coop close tx.
closePending := update.GetClosePending()
require.NotNil(ht, closePending)
// Convert the txid we get from the PendingUpdate to a Hash so we can
// wait for it to be mined.
var closeTxid chainhash.Hash
require.NoError(
ht, closeTxid.SetBytes(closePending.Txid),
"invalid closing txid",
)
// Wait for the close tx to be in the Mempool.
ht.Miner.AssertTxInMempool(&closeTxid)
// Wait for it to get mined and finish tearing down.
ht.AssertStreamChannelCoopClosed(alice, chanPoint, false, closeClient)
}

View File

@ -309,15 +309,6 @@ func (m *mppTestScenario) closeChannels() {
return
}
// TODO(yy): remove the sleep once the following bug is fixed. When the
// payment is reported as settled by Alice, it's expected the
// commitment dance is finished and all subsequent states have been
// updated. Yet we'd receive the error `cannot co-op close channel with
// active htlcs` or `link failed to shutdown` if we close the channel.
// We need to investigate the order of settling the payments and
// updating commitments to understand and fix .
time.Sleep(5 * time.Second)
// Close all channels without mining the closing transactions.
m.ht.CloseChannelAssertPending(m.alice, m.channelPoints[0], false)
m.ht.CloseChannelAssertPending(m.alice, m.channelPoints[1], false)

View File

@ -312,15 +312,6 @@ func runPsbtChanFunding(ht *lntest.HarnessTest, carol, dave *node.HarnessNode,
resp := dave.RPC.AddInvoice(invoice)
ht.CompletePaymentRequests(carol, []string{resp.PaymentRequest})
// TODO(yy): remove the sleep once the following bug is fixed. When the
// payment is reported as settled by Carol, it's expected the
// commitment dance is finished and all subsequent states have been
// updated. Yet we'd receive the error `cannot co-op close channel with
// active htlcs` or `link failed to shutdown` if we close the channel.
// We need to investigate the order of settling the payments and
// updating commitments to understand and fix .
time.Sleep(2 * time.Second)
// To conclude, we'll close the newly created channel between Carol and
// Dave. This function will also block until the channel is closed and
// will additionally assert the relevant channel closing post
@ -496,15 +487,6 @@ func runPsbtChanFundingExternal(ht *lntest.HarnessTest, carol,
resp := dave.RPC.AddInvoice(invoice)
ht.CompletePaymentRequests(carol, []string{resp.PaymentRequest})
// TODO(yy): remove the sleep once the following bug is fixed. When the
// payment is reported as settled by Carol, it's expected the
// commitment dance is finished and all subsequent states have been
// updated. Yet we'd receive the error `cannot co-op close channel with
// active htlcs` or `link failed to shutdown` if we close the channel.
// We need to investigate the order of settling the payments and
// updating commitments to understand and fix .
time.Sleep(2 * time.Second)
// To conclude, we'll close the newly created channel between Carol and
// Dave. This function will also block until the channels are closed and
// will additionally assert the relevant channel closing post
@ -656,15 +638,6 @@ func runPsbtChanFundingSingleStep(ht *lntest.HarnessTest, carol,
resp := dave.RPC.AddInvoice(invoice)
ht.CompletePaymentRequests(carol, []string{resp.PaymentRequest})
// TODO(yy): remove the sleep once the following bug is fixed. When the
// payment is reported as settled by Carol, it's expected the
// commitment dance is finished and all subsequent states have been
// updated. Yet we'd receive the error `cannot co-op close channel with
// active htlcs` or `link failed to shutdown` if we close the channel.
// We need to investigate the order of settling the payments and
// updating commitments to understand and fix .
time.Sleep(2 * time.Second)
// To conclude, we'll close the newly created channel between Carol and
// Dave. This function will also block until the channel is closed and
// will additionally assert the relevant channel closing post

View File

@ -1323,15 +1323,6 @@ func testRouteFeeCutoff(ht *lntest.HarnessTest) {
}
testFeeCutoff(feeLimitFixed)
// TODO(yy): remove the sleep once the following bug is fixed. When the
// payment is reported as settled by Carol, it's expected the
// commitment dance is finished and all subsequent states have been
// updated. Yet we'd receive the error `cannot co-op close channel with
// active htlcs` or `link failed to shutdown` if we close the channel.
// We need to investigate the order of settling the payments and
// updating commitments to understand and fix .
time.Sleep(2 * time.Second)
// Once we're done, close the channels and shut down the nodes created
// throughout this test.
ht.CloseChannel(alice, chanPointAliceBob)

View File

@ -426,15 +426,6 @@ func fundChanAndCloseFromImportedAccount(ht *lntest.HarnessTest, srcNode,
ht.CompletePaymentRequests(srcNode, []string{resp.PaymentRequest})
// TODO(yy): remove the sleep once the following bug is fixed. When the
// payment is reported as settled by srcNode, it's expected the
// commitment dance is finished and all subsequent states have been
// updated. Yet we'd receive the error `cannot co-op close channel with
// active htlcs` or `link failed to shutdown` if we close the channel.
// We need to investigate the order of settling the payments and
// updating commitments to understand and fix .
time.Sleep(2 * time.Second)
// Now that we've confirmed the opened channel works, we'll close it.
ht.CloseChannel(srcNode, chanPoint)

View File

@ -865,15 +865,6 @@ func testOptionScidUpgrade(ht *lntest.HarnessTest) {
daveInvoice2 := dave.RPC.AddInvoice(daveParams)
ht.CompletePaymentRequests(bob, []string{daveInvoice2.PaymentRequest})
// TODO(yy): remove the sleep once the following bug is fixed. When
// the payment is reported as settled by Bob, it's expected the
// commitment dance is finished and all subsequent states have been
// updated. Yet we'd receive the error `cannot co-op close channel with
// active htlcs` or `link failed to shutdown` if we close the channel.
// We need to investigate the order of settling the payments and
// updating commitments to understand and fix.
time.Sleep(2 * time.Second)
// Close standby node's channels.
ht.CloseChannel(bob, fundingPoint2)
}

File diff suppressed because it is too large Load Diff

View File

@ -2046,12 +2046,18 @@ message CloseChannelRequest {
//
// NOTE: This field is only respected if we're the initiator of the channel.
uint64 max_fee_per_vbyte = 7;
// If true, then the rpc call will not block while it awaits a closing txid.
// Consequently this RPC call will not return a closing txid if this value
// is set.
bool no_wait = 8;
}
message CloseStatusUpdate {
oneof update {
PendingUpdate close_pending = 1;
ChannelCloseUpdate chan_close = 3;
InstantUpdate close_instant = 4;
}
}
@ -2060,6 +2066,9 @@ message PendingUpdate {
uint32 output_index = 2;
}
message InstantUpdate {
}
message ReadyForPsbtFunding {
/*
The P2WSH address of the channel funding multisig address that the below

View File

@ -866,6 +866,13 @@
"required": false,
"type": "string",
"format": "uint64"
},
{
"name": "no_wait",
"description": "If true, then the rpc call will not block while it awaits a closing txid.\nConsequently this RPC call will not return a closing txid if this value\nis set.",
"in": "query",
"required": false,
"type": "boolean"
}
],
"tags": [
@ -4367,6 +4374,9 @@
},
"chan_close": {
"$ref": "#/definitions/lnrpcChannelCloseUpdate"
},
"close_instant": {
"$ref": "#/definitions/lnrpcInstantUpdate"
}
}
},
@ -5236,6 +5246,9 @@
],
"default": "INITIATOR_UNKNOWN"
},
"lnrpcInstantUpdate": {
"type": "object"
},
"lnrpcInterceptFeedback": {
"type": "object",
"properties": {

View File

@ -1207,6 +1207,7 @@ func (h *HarnessTest) CloseChannelAssertPending(hn *node.HarnessNode,
closeReq := &lnrpc.CloseChannelRequest{
ChannelPoint: cp,
Force: force,
NoWait: true,
}
var (
@ -1218,28 +1219,15 @@ func (h *HarnessTest) CloseChannelAssertPending(hn *node.HarnessNode,
// Consume the "channel close" update in order to wait for the closing
// transaction to be broadcast, then wait for the closing tx to be seen
// within the network.
//
// TODO(yy): remove the wait once the following bug is fixed.
// - https://github.com/lightningnetwork/lnd/issues/6039
// We may receive the error `cannot co-op close channel with active
// htlcs` or `link failed to shutdown` if we close the channel. We need
// to investigate the order of settling the payments and updating
// commitments to properly fix it.
err = wait.NoError(func() error {
stream = hn.RPC.CloseChannel(closeReq)
event, err = h.ReceiveCloseChannelUpdate(stream)
if err != nil {
h.Logf("Test: %s, close channel got error: %v",
h.manager.currentTestCase, err)
stream = hn.RPC.CloseChannel(closeReq)
_, err = h.ReceiveCloseChannelUpdate(stream)
require.NoError(h, err, "close channel update got error: %v", err)
// NoError predicates every 200ms, which is too
// frequent for closing channels. We sleep here to
// avoid trying it too much.
time.Sleep(2 * time.Second)
}
return err
}, wait.ChannelCloseTimeout)
event, err = h.ReceiveCloseChannelUpdate(stream)
if err != nil {
h.Logf("Test: %s, close channel got error: %v",
h.manager.currentTestCase, err)
}
require.NoError(h, err, "retry closing channel failed")
pendingClose, ok := event.Update.(*lnrpc.CloseStatusUpdate_ClosePending)

View File

@ -11,6 +11,7 @@ import (
"github.com/btcsuite/btcd/wire"
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/fn"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/labels"
@ -70,6 +71,11 @@ const (
// phase.
closeShutdownInitiated
// closeAwaitingFlush is the state that's transitioned to once both
// Shutdown messages have been exchanged but we are waiting for the
// HTLCs to clear out of the channel.
closeAwaitingFlush
// closeFeeNegotiation is the third, and most persistent state. Both
// parties enter this state after they've sent and received a shutdown
// message. During this phase, both sides will send monotonically
@ -197,6 +203,11 @@ type ChanCloser struct {
// locallyInitiated is true if we initiated the channel close.
locallyInitiated bool
// cachedClosingSigned is a cached copy of a received ClosingSigned that
// we use to handle a specific race condition caused by the independent
// message processing queues.
cachedClosingSigned fn.Option[lnwire.ClosingSigned]
}
// calcCoopCloseFee computes an "ideal" absolute co-op close fee given the
@ -339,16 +350,6 @@ func (c *ChanCloser) initChanShutdown() (*lnwire.Shutdown, error) {
c.chanPoint, err)
}
// Before continuing, mark the channel as cooperatively closed with a
// nil txn. Even though we haven't negotiated the final txn, this
// guarantees that our listchannels rpc will be externally consistent,
// and reflect that the channel is being shutdown by the time the
// closing request returns.
err := c.cfg.Channel.MarkCoopBroadcasted(nil, c.locallyInitiated)
if err != nil {
return nil, err
}
chancloserLog.Infof("ChannelPoint(%v): sending shutdown message",
c.chanPoint)
@ -470,28 +471,22 @@ func validateShutdownScript(disconnect func() error, upfrontScript,
return nil
}
// ProcessCloseMsg attempts to process the next message in the closing series.
// This method will update the state accordingly and return two primary values:
// the next set of messages to be sent, and a bool indicating if the fee
// negotiation process has completed. If the second value is true, then this
// means the ChanCloser can be garbage collected.
func (c *ChanCloser) ProcessCloseMsg(msg lnwire.Message) ([]lnwire.Message,
bool, error) {
// ReceiveShutdown takes a raw Shutdown message and uses it to try and advance
// the ChanCloser state machine, failing if it is coming in at an invalid time.
// If appropriate, it will also generate a Shutdown message of its own to send
// out to the peer. It is possible for this method to return None when no error
// occurred.
func (c *ChanCloser) ReceiveShutdown(
msg lnwire.Shutdown,
) (fn.Option[lnwire.Shutdown], error) {
noShutdown := fn.None[lnwire.Shutdown]()
switch c.state {
// If we're in the close idle state, and we're receiving a channel closure
// related message, then this indicates that we're on the receiving side of
// an initiated channel closure.
// If we're in the close idle state, and we're receiving a channel
// closure related message, then this indicates that we're on the
// receiving side of an initiated channel closure.
case closeIdle:
// First, we'll assert that we have a channel shutdown message,
// as otherwise, this is an attempted invalid state transition.
shutdownMsg, ok := msg.(*lnwire.Shutdown)
if !ok {
return nil, false, fmt.Errorf("expected "+
"lnwire.Shutdown, instead have %v",
spew.Sdump(msg))
}
// As we're the responder to this shutdown (the other party
// wants to close), we'll check if this is a frozen channel or
// not. If the channel is frozen and we were not also the
@ -499,12 +494,13 @@ func (c *ChanCloser) ProcessCloseMsg(msg lnwire.Message) ([]lnwire.Message,
// attempt.
chanInitiator := c.cfg.Channel.IsInitiator()
if !chanInitiator {
absoluteThawHeight, err := c.cfg.Channel.AbsoluteThawHeight()
absoluteThawHeight, err :=
c.cfg.Channel.AbsoluteThawHeight()
if err != nil {
return nil, false, err
return noShutdown, err
}
if c.negotiationHeight < absoluteThawHeight {
return nil, false, fmt.Errorf("initiator "+
return noShutdown, fmt.Errorf("initiator "+
"attempting to co-op close frozen "+
"ChannelPoint(%v) (current_height=%v, "+
"thaw_height=%v)", c.chanPoint,
@ -512,157 +508,195 @@ func (c *ChanCloser) ProcessCloseMsg(msg lnwire.Message) ([]lnwire.Message,
}
}
// If the remote node opened the channel with option upfront shutdown
// script, check that the script they provided matches.
// If the remote node opened the channel with option upfront
// shutdown script, check that the script they provided matches.
if err := validateShutdownScript(
c.cfg.Disconnect, c.cfg.Channel.RemoteUpfrontShutdownScript(),
shutdownMsg.Address, c.cfg.ChainParams,
c.cfg.Disconnect,
c.cfg.Channel.RemoteUpfrontShutdownScript(),
msg.Address, c.cfg.ChainParams,
); err != nil {
return nil, false, err
return noShutdown, err
}
// Once we have checked that the other party has not violated option
// upfront shutdown we set their preference for delivery address. We'll
// use this when we craft the closure transaction.
c.remoteDeliveryScript = shutdownMsg.Address
// Once we have checked that the other party has not violated
// option upfront shutdown we set their preference for delivery
// address. We'll use this when we craft the closure
// transaction.
c.remoteDeliveryScript = msg.Address
// Now that we know their desired delivery script, we can
// compute what our max/ideal fee will be.
c.initFeeBaseline()
// We'll generate a shutdown message of our own to send across the
// wire.
// We'll generate a shutdown message of our own to send across
// the wire.
localShutdown, err := c.initChanShutdown()
if err != nil {
return nil, false, err
return noShutdown, err
}
// If this is a taproot channel, then we'll want to stash the
// remote nonces so we can properly create a new musig
// session for signing.
if c.cfg.Channel.ChanType().IsTaproot() {
if shutdownMsg.ShutdownNonce == nil {
return nil, false, fmt.Errorf("shutdown " +
if msg.ShutdownNonce == nil {
return noShutdown, fmt.Errorf("shutdown " +
"nonce not populated")
}
c.cfg.MusigSession.InitRemoteNonce(&musig2.Nonces{
PubNonce: *shutdownMsg.ShutdownNonce,
PubNonce: *msg.ShutdownNonce,
})
}
chancloserLog.Infof("ChannelPoint(%v): responding to shutdown",
c.chanPoint)
msgsToSend := make([]lnwire.Message, 0, 2)
msgsToSend = append(msgsToSend, localShutdown)
// After the other party receives this message, we'll actually
// start the final stage of the closure process: fee
// negotiation. So we'll update our internal state to reflect
// this, so we can handle the next message sent.
c.state = closeAwaitingFlush
// After the other party receives this message, we'll actually start
// the final stage of the closure process: fee negotiation. So we'll
// update our internal state to reflect this, so we can handle the next
// message sent.
c.state = closeFeeNegotiation
return fn.Some(*localShutdown), err
// We'll also craft our initial close proposal in order to keep the
// negotiation moving, but only if we're the negotiator.
if chanInitiator {
closeSigned, err := c.proposeCloseSigned(c.idealFeeSat)
if err != nil {
return nil, false, fmt.Errorf("unable to sign "+
"new co op close offer: %w", err)
}
msgsToSend = append(msgsToSend, closeSigned)
}
// We'll return both sets of messages to send to the remote party to
// kick off the fee negotiation process.
return msgsToSend, false, nil
// If we just initiated a channel shutdown, and we receive a new message,
// then this indicates the other party is ready to shutdown as well. In
// this state we'll send our first signature.
case closeShutdownInitiated:
// First, we'll assert that we have a channel shutdown message.
// Otherwise, this is an attempted invalid state transition.
shutdownMsg, ok := msg.(*lnwire.Shutdown)
if !ok {
return nil, false, fmt.Errorf("expected lnwire.Shutdown, instead "+
"have %v", spew.Sdump(msg))
}
// If the remote node opened the channel with option upfront shutdown
// script, check that the script they provided matches.
// If the remote node opened the channel with option upfront
// shutdown script, check that the script they provided matches.
if err := validateShutdownScript(
c.cfg.Disconnect,
c.cfg.Channel.RemoteUpfrontShutdownScript(), shutdownMsg.Address,
c.cfg.ChainParams,
c.cfg.Channel.RemoteUpfrontShutdownScript(),
msg.Address, c.cfg.ChainParams,
); err != nil {
return nil, false, err
return noShutdown, err
}
// Now that we know this is a valid shutdown message and address, we'll
// record their preferred delivery closing script.
c.remoteDeliveryScript = shutdownMsg.Address
// Now that we know this is a valid shutdown message and
// address, we'll record their preferred delivery closing
// script.
c.remoteDeliveryScript = msg.Address
// At this point, we can now start the fee negotiation state, by
// constructing and sending our initial signature for what we think the
// closing transaction should look like.
c.state = closeFeeNegotiation
// Now that we know their desired delivery script, we can
// compute what our max/ideal fee will be.
c.initFeeBaseline()
// constructing and sending our initial signature for what we
// think the closing transaction should look like.
c.state = closeAwaitingFlush
// If this is a taproot channel, then we'll want to stash the
// local+remote nonces so we can properly create a new musig
// session for signing.
if c.cfg.Channel.ChanType().IsTaproot() {
if shutdownMsg.ShutdownNonce == nil {
return nil, false, fmt.Errorf("shutdown " +
if msg.ShutdownNonce == nil {
return noShutdown, fmt.Errorf("shutdown " +
"nonce not populated")
}
c.cfg.MusigSession.InitRemoteNonce(&musig2.Nonces{
PubNonce: *shutdownMsg.ShutdownNonce,
PubNonce: *msg.ShutdownNonce,
})
}
chancloserLog.Infof("ChannelPoint(%v): shutdown response received, "+
"entering fee negotiation", c.chanPoint)
chancloserLog.Infof("ChannelPoint(%v): shutdown response "+
"received, entering fee negotiation", c.chanPoint)
// Starting with our ideal fee rate, we'll create an initial closing
// proposal, but only if we're the initiator, as otherwise, the other
// party will send their initial proposal first.
if c.cfg.Channel.IsInitiator() {
closeSigned, err := c.proposeCloseSigned(c.idealFeeSat)
if err != nil {
return nil, false, fmt.Errorf("unable to sign "+
"new co op close offer: %w", err)
}
return noShutdown, nil
return []lnwire.Message{closeSigned}, false, nil
default:
// Otherwise we are not in a state where we can accept this
// message.
return noShutdown, ErrInvalidState
}
}
// BeginNegotiation should be called when we have definitively reached a clean
// channel state and are ready to cooperatively arrive at a closing transaction.
// If it is our responsibility to kick off the negotiation, this method will
// generate a ClosingSigned message. If it is the remote's responsibility, then
// it will not. In either case it will transition the ChanCloser state machine
// to the negotiation phase wherein ClosingSigned messages are exchanged until
// a mutually agreeable result is achieved.
func (c *ChanCloser) BeginNegotiation() (
fn.Option[lnwire.ClosingSigned], error,
) {
noClosingSigned := fn.None[lnwire.ClosingSigned]()
switch c.state {
case closeAwaitingFlush:
// Now that we know their desired delivery script, we can
// compute what our max/ideal fee will be.
c.initFeeBaseline()
// Before continuing, mark the channel as cooperatively closed
// with a nil txn. Even though we haven't negotiated the final
// txn, this guarantees that our listchannels rpc will be
// externally consistent, and reflect that the channel is being
// shutdown by the time the closing request returns.
err := c.cfg.Channel.MarkCoopBroadcasted(
nil, c.locallyInitiated,
)
if err != nil {
return noClosingSigned, err
}
return nil, false, nil
// At this point, we can now start the fee negotiation state, by
// constructing and sending our initial signature for what we
// think the closing transaction should look like.
c.state = closeFeeNegotiation
if !c.cfg.Channel.IsInitiator() {
// By default this means we do nothing, but we do want
// to check if we have a cached remote offer to process.
// If we do, we'll process it here.
res := noClosingSigned
err = nil
c.cachedClosingSigned.WhenSome(
func(cs lnwire.ClosingSigned) {
res, err = c.ReceiveClosingSigned(cs)
},
)
return res, err
}
// We'll craft our initial close proposal in order to keep the
// negotiation moving, but only if we're the initiator.
closingSigned, err := c.proposeCloseSigned(c.idealFeeSat)
if err != nil {
return noClosingSigned,
fmt.Errorf("unable to sign new co op "+
"close offer: %w", err)
}
return fn.Some(*closingSigned), nil
default:
return noClosingSigned, ErrInvalidState
}
}
// ReceiveClosingSigned is a method that should be called whenever we receive a
// ClosingSigned message from the wire. It may or may not return a ClosingSigned
// of our own to send back to the remote.
//
//nolint:funlen
func (c *ChanCloser) ReceiveClosingSigned(
msg lnwire.ClosingSigned,
) (fn.Option[lnwire.ClosingSigned], error) {
noClosing := fn.None[lnwire.ClosingSigned]()
switch c.state {
case closeAwaitingFlush:
// If we hit this case it either means there's a protocol
// violation or that our chanCloser received the remote offer
// before the link finished processing the channel flush.
c.cachedClosingSigned = fn.Some(msg)
return fn.None[lnwire.ClosingSigned](), nil
// If we're receiving a message while we're in the fee negotiation phase,
// then this indicates the remote party is responding to a close signed
// message we sent, or kicking off the process with their own.
case closeFeeNegotiation:
// First, we'll assert that we're actually getting a ClosingSigned
// message, otherwise an invalid state transition was attempted.
closeSignedMsg, ok := msg.(*lnwire.ClosingSigned)
if !ok {
return nil, false, fmt.Errorf("expected lnwire.ClosingSigned, "+
"instead have %v", spew.Sdump(msg))
}
// If this is a taproot channel, then it MUST have a partial
// signature set at this point.
isTaproot := c.cfg.Channel.ChanType().IsTaproot()
if isTaproot && closeSignedMsg.PartialSig == nil {
return nil, false, fmt.Errorf("partial sig not set " +
"for taproot chan")
if isTaproot && msg.PartialSig == nil {
return noClosing,
fmt.Errorf("partial sig not set " +
"for taproot chan")
}
isInitiator := c.cfg.Channel.IsInitiator()
@ -671,7 +705,7 @@ func (c *ChanCloser) ProcessCloseMsg(msg lnwire.Message) ([]lnwire.Message,
// during the negotiations. If it doesn't match any of our
// prior offers, then we'll attempt to ratchet the fee closer
// to our ideal fee.
remoteProposedFee := closeSignedMsg.FeeSatoshis
remoteProposedFee := msg.FeeSatoshis
_, feeMatchesOffer := c.priorFeeOffers[remoteProposedFee]
switch {
@ -691,7 +725,7 @@ func (c *ChanCloser) ProcessCloseMsg(msg lnwire.Message) ([]lnwire.Message,
// to Alice the final signature.
_, err := c.proposeCloseSigned(remoteProposedFee)
if err != nil {
return nil, false, fmt.Errorf("unable to sign "+
return noClosing, fmt.Errorf("unable to sign "+
"new co op close offer: %w", err)
}
@ -699,10 +733,11 @@ func (c *ChanCloser) ProcessCloseMsg(msg lnwire.Message) ([]lnwire.Message,
// signature for a taproot channel, then we'll ensure that the
// fee rate matches up exactly.
case isTaproot && isInitiator && !feeMatchesOffer:
return nil, false, fmt.Errorf("fee rate for "+
"taproot channels was not accepted: "+
"sent %v, got %v",
c.idealFeeSat, remoteProposedFee)
return noClosing,
fmt.Errorf("fee rate for "+
"taproot channels was not accepted: "+
"sent %v, got %v",
c.idealFeeSat, remoteProposedFee)
// If we're the initiator of the taproot channel, and we had
// our fee echo'd back, then it's all good, and we can proceed
@ -717,37 +752,41 @@ func (c *ChanCloser) ProcessCloseMsg(msg lnwire.Message) ([]lnwire.Message,
// We'll now attempt to ratchet towards a fee deemed
// acceptable by both parties, factoring in our ideal
// fee rate, and the last proposed fee by both sides.
feeProposal := calcCompromiseFee(c.chanPoint, c.idealFeeSat,
c.lastFeeProposal, remoteProposedFee,
proposal := calcCompromiseFee(
c.chanPoint, c.idealFeeSat, c.lastFeeProposal,
remoteProposedFee,
)
if c.cfg.Channel.IsInitiator() && feeProposal > c.maxFee {
return nil, false, fmt.Errorf("%w: %v > %v",
ErrProposalExceedsMaxFee, feeProposal,
c.maxFee)
if c.cfg.Channel.IsInitiator() && proposal > c.maxFee {
return noClosing, fmt.Errorf(
"%w: %v > %v",
ErrProposalExceedsMaxFee,
proposal, c.maxFee)
}
// With our new fee proposal calculated, we'll craft a
// new close signed signature to send to the other
// party so we can continue the fee negotiation
// process.
closeSigned, err := c.proposeCloseSigned(feeProposal)
closeSigned, err := c.proposeCloseSigned(proposal)
if err != nil {
return nil, false, fmt.Errorf("unable to sign "+
return noClosing, fmt.Errorf("unable to sign "+
"new co op close offer: %w", err)
}
// If the compromise fee doesn't match what the peer
// proposed, then we'll return this latest close signed
// message so we can continue negotiation.
if feeProposal != remoteProposedFee {
chancloserLog.Debugf("ChannelPoint(%v): close tx fee "+
"disagreement, continuing negotiation", c.chanPoint)
return []lnwire.Message{closeSigned}, false, nil
if proposal != remoteProposedFee {
chancloserLog.Debugf("ChannelPoint(%v): close "+
"tx fee disagreement, continuing "+
"negotiation", c.chanPoint)
return fn.Some(*closeSigned), nil
}
}
chancloserLog.Infof("ChannelPoint(%v) fee of %v accepted, ending "+
"negotiation", c.chanPoint, remoteProposedFee)
chancloserLog.Infof("ChannelPoint(%v) fee of %v accepted, "+
"ending negotiation", c.chanPoint, remoteProposedFee)
// Otherwise, we've agreed on a fee for the closing
// transaction! We'll craft the final closing transaction so we
@ -760,21 +799,22 @@ func (c *ChanCloser) ProcessCloseMsg(msg lnwire.Message) ([]lnwire.Message,
matchingSig := c.priorFeeOffers[remoteProposedFee]
if c.cfg.Channel.ChanType().IsTaproot() {
muSession := c.cfg.MusigSession
localSig, remoteSig, closeOpts, err = muSession.CombineClosingOpts( //nolint:lll
*matchingSig.PartialSig,
*closeSignedMsg.PartialSig,
)
localSig, remoteSig, closeOpts, err =
muSession.CombineClosingOpts(
*matchingSig.PartialSig,
*msg.PartialSig,
)
if err != nil {
return nil, false, err
return noClosing, err
}
} else {
localSig, err = matchingSig.Signature.ToSignature()
if err != nil {
return nil, false, err
return noClosing, err
}
remoteSig, err = closeSignedMsg.Signature.ToSignature()
remoteSig, err = msg.Signature.ToSignature()
if err != nil {
return nil, false, err
return noClosing, err
}
}
@ -783,7 +823,7 @@ func (c *ChanCloser) ProcessCloseMsg(msg lnwire.Message) ([]lnwire.Message,
c.remoteDeliveryScript, remoteProposedFee, closeOpts...,
)
if err != nil {
return nil, false, err
return noClosing, err
}
c.closingTx = closeTx
@ -794,7 +834,7 @@ func (c *ChanCloser) ProcessCloseMsg(msg lnwire.Message) ([]lnwire.Message,
closeTx, c.locallyInitiated,
)
if err != nil {
return nil, false, err
return noClosing, err
}
// With the closing transaction crafted, we'll now broadcast it
@ -812,7 +852,7 @@ func (c *ChanCloser) ProcessCloseMsg(msg lnwire.Message) ([]lnwire.Message,
)
if err := c.cfg.BroadcastTx(closeTx, closeLabel); err != nil {
return nil, false, err
return noClosing, err
}
// Finally, we'll transition to the closeFinished state, and
@ -822,24 +862,20 @@ func (c *ChanCloser) ProcessCloseMsg(msg lnwire.Message) ([]lnwire.Message,
// negotiation.
c.state = closeFinished
matchingOffer := c.priorFeeOffers[remoteProposedFee]
return []lnwire.Message{matchingOffer}, true, nil
return fn.Some(*matchingOffer), nil
// If we received a message while in the closeFinished state, then this
// should only be the remote party echoing the last ClosingSigned message
// that we agreed on.
// should only be the remote party echoing the last ClosingSigned
// message that we agreed on.
case closeFinished:
if _, ok := msg.(*lnwire.ClosingSigned); !ok {
return nil, false, fmt.Errorf("expected lnwire.ClosingSigned, "+
"instead have %v", spew.Sdump(msg))
}
// There's no more to do as both sides should have already broadcast
// the closing transaction at this state.
return nil, true, nil
// There's no more to do as both sides should have already
// broadcast the closing transaction at this state.
return noClosing, nil
// Otherwise, we're in an unknown state, and can't proceed.
default:
return nil, false, ErrInvalidState
return noClosing, ErrInvalidState
}
}

View File

@ -192,7 +192,7 @@ func (m *mockChannel) CompleteCooperativeClose(localSig,
proposedFee btcutil.Amount,
_ ...lnwallet.ChanCloseOpt) (*wire.MsgTx, btcutil.Amount, error) {
return nil, 0, nil
return &wire.MsgTx{}, 0, nil
}
func (m *mockChannel) LocalBalanceDust() bool {
@ -396,7 +396,7 @@ func TestMaxFeeBailOut(t *testing.T) {
FeeSatoshis: absoluteFee * 2,
}
_, _, err := chanCloser.ProcessCloseMsg(closeMsg)
_, err := chanCloser.ReceiveClosingSigned(*closeMsg)
switch isInitiator {
// If we're the initiator, then we expect an error at
@ -472,13 +472,6 @@ func TestParseUpfrontShutdownAddress(t *testing.T) {
}
}
func assertType[T any](t *testing.T, typ any) T {
value, ok := typ.(T)
require.True(t, ok)
return value
}
// TestTaprootFastClose tests that we are able to properly execute a fast close
// (skip negotiation) for taproot channels.
func TestTaprootFastClose(t *testing.T) {
@ -535,27 +528,39 @@ func TestTaprootFastClose(t *testing.T) {
// Bob will then process this message. As he's the responder, he should
// only send the shutdown message back to Alice.
bobMsgs, closeFinished, err := bobCloser.ProcessCloseMsg(msg)
oShutdown, err := bobCloser.ReceiveShutdown(*msg)
require.NoError(t, err)
require.False(t, closeFinished)
require.Len(t, bobMsgs, 1)
require.IsType(t, &lnwire.Shutdown{}, bobMsgs[0])
oClosingSigned, err := bobCloser.BeginNegotiation()
require.NoError(t, err)
tx, _ := bobCloser.ClosingTx()
require.Nil(t, tx)
require.True(t, oShutdown.IsSome())
require.True(t, oClosingSigned.IsNone())
bobShutdown := oShutdown.UnsafeFromSome()
// Alice should process the shutdown message, and create a closing
// signed of her own.
aliceMsgs, closeFinished, err := aliceCloser.ProcessCloseMsg(bobMsgs[0])
oShutdown, err = aliceCloser.ReceiveShutdown(bobShutdown)
require.NoError(t, err)
require.False(t, closeFinished)
require.Len(t, aliceMsgs, 1)
require.IsType(t, &lnwire.ClosingSigned{}, aliceMsgs[0])
oClosingSigned, err = aliceCloser.BeginNegotiation()
require.NoError(t, err)
tx, _ = aliceCloser.ClosingTx()
require.Nil(t, tx)
require.True(t, oShutdown.IsNone())
require.True(t, oClosingSigned.IsSome())
aliceClosingSigned := oClosingSigned.UnsafeFromSome()
// Next, Bob will process the closing signed message, and send back a
// new one that should match exactly the offer Alice sent.
bobMsgs, closeFinished, err = bobCloser.ProcessCloseMsg(aliceMsgs[0])
oClosingSigned, err = bobCloser.ReceiveClosingSigned(aliceClosingSigned)
require.NoError(t, err)
require.True(t, closeFinished)
require.Len(t, aliceMsgs, 1)
require.IsType(t, &lnwire.ClosingSigned{}, bobMsgs[0])
tx, _ = bobCloser.ClosingTx()
require.NotNil(t, tx)
require.True(t, oClosingSigned.IsSome())
bobClosingSigned := oClosingSigned.UnsafeFromSome()
// At this point, Bob has accepted the offer, so he can broadcast the
// closing transaction, and considers the channel closed.
@ -563,38 +568,41 @@ func TestTaprootFastClose(t *testing.T) {
require.NoError(t, err)
// Bob's fee proposal should exactly match Alice's initial fee.
aliceOffer := assertType[*lnwire.ClosingSigned](t, aliceMsgs[0])
bobOffer := assertType[*lnwire.ClosingSigned](t, bobMsgs[0])
require.Equal(t, aliceOffer.FeeSatoshis, bobOffer.FeeSatoshis)
require.Equal(
t, aliceClosingSigned.FeeSatoshis, bobClosingSigned.FeeSatoshis,
)
// If we modify Bob's offer, and try to have Alice process it, then she
// should reject it.
ogOffer := bobOffer.FeeSatoshis
bobOffer.FeeSatoshis /= 2
ogOffer := bobClosingSigned.FeeSatoshis
bobClosingSigned.FeeSatoshis /= 2
_, _, err = aliceCloser.ProcessCloseMsg(bobOffer)
_, err = aliceCloser.ReceiveClosingSigned(bobClosingSigned)
require.Error(t, err)
require.Contains(t, err.Error(), "was not accepted")
// We'll now restore the original offer before passing it on to Alice.
bobOffer.FeeSatoshis = ogOffer
bobClosingSigned.FeeSatoshis = ogOffer
// If we use the original offer, then Alice should accept this message,
// and finalize the shutdown process. We expect a message here as Alice
// will echo back the final message.
aliceMsgs, closeFinished, err = aliceCloser.ProcessCloseMsg(bobMsgs[0])
oClosingSigned, err = aliceCloser.ReceiveClosingSigned(bobClosingSigned)
require.NoError(t, err)
require.True(t, closeFinished)
require.Len(t, aliceMsgs, 1)
require.IsType(t, &lnwire.ClosingSigned{}, aliceMsgs[0])
tx, _ = aliceCloser.ClosingTx()
require.NotNil(t, tx)
require.True(t, oClosingSigned.IsSome())
aliceClosingSigned = oClosingSigned.UnsafeFromSome()
// Alice should now also broadcast her closing transaction.
_, err = lnutils.RecvOrTimeout(broadcastSignal, time.Second*1)
require.NoError(t, err)
// Finally, Bob will process Alice's echo message, and conclude.
bobMsgs, closeFinished, err = bobCloser.ProcessCloseMsg(aliceMsgs[0])
oClosingSigned, err = bobCloser.ReceiveClosingSigned(aliceClosingSigned)
require.NoError(t, err)
require.True(t, closeFinished)
require.Len(t, bobMsgs, 0)
tx, _ = bobCloser.ClosingTx()
require.NotNil(t, tx)
require.True(t, oClosingSigned.IsNone())
}

View File

@ -5466,6 +5466,17 @@ func (lc *LightningChannel) OweCommitment() bool {
return lc.oweCommitment(true)
}
// NeedCommitment returns a boolean value reflecting whether we are waiting on
// a commitment signature because there are outstanding remote updates and/or
// updates in the remote commit tx that aren't reflected in the local commit tx
// yet.
func (lc *LightningChannel) NeedCommitment() bool {
lc.RLock()
defer lc.RUnlock()
return lc.oweCommitment(false)
}
// oweCommitment is the internal version of OweCommitment. This function expects
// to be executed with a lock held.
func (lc *LightningChannel) oweCommitment(local bool) bool {

View File

@ -2575,12 +2575,6 @@ func (p *Brontide) fetchActiveChanCloser(chanID lnwire.ChannelID) (
return nil, ErrChannelNotFound
}
// Optimistically try a link shutdown, erroring out if it failed.
if err := p.tryLinkShutdown(chanID); err != nil {
p.log.Errorf("failed link shutdown: %v", err)
return nil, err
}
// We'll create a valid closing state machine in order to respond to
// the initiated cooperative channel closure. First, we set the
// delivery script that our funds will be paid out to. If an upfront
@ -2957,17 +2951,6 @@ func (p *Brontide) handleLocalCloseReq(req *htlcswitch.ChanClose) {
}
}
// Optimistically try a link shutdown, erroring out if it
// failed.
if err := p.tryLinkShutdown(chanID); err != nil {
p.log.Errorf("failed link shutdown: %v", err)
req.Err <- fmt.Errorf("failed handling co-op closing "+
"request with (try force closing "+
"it instead): %w", err)
return
}
chanCloser, err := p.createChanCloser(
channel, deliveryScript, req.TargetFeePerKw, req, true,
)
@ -2994,7 +2977,28 @@ func (p *Brontide) handleLocalCloseReq(req *htlcswitch.ChanClose) {
return
}
p.queueMsg(shutdownMsg, nil)
link := p.fetchLinkFromKeyAndCid(chanID)
if link == nil {
// If the link is nil then it means it was already
// removed from the switch or it never existed in the
// first place. The latter case is handled at the
// beginning of this function, so in the case where it
// has already been removed, we can skip adding the
// commit hook to queue a Shutdown message.
p.log.Warnf("link not found during attempted closure: "+
"%v", chanID)
return
}
link.OnCommitOnce(htlcswitch.Outgoing, func() {
err := link.DisableAdds(htlcswitch.Outgoing)
if err != nil {
p.log.Warnf("outgoing link adds already "+
"disabled: %v", link.ChanID())
}
p.queueMsg(shutdownMsg, nil)
})
// A type of CloseBreach indicates that the counterparty has breached
// the channel therefore we need to clean up our local state.
@ -3104,35 +3108,6 @@ func (p *Brontide) handleLinkFailure(failure linkFailureReport) {
}
}
// tryLinkShutdown attempts to fetch a target link from the switch, calls
// ShutdownIfChannelClean to optimistically trigger a link shutdown, and
// removes the link from the switch. It returns an error if any step failed.
func (p *Brontide) tryLinkShutdown(cid lnwire.ChannelID) error {
// Fetch the appropriate link and call ShutdownIfChannelClean to ensure
// no other updates can occur.
chanLink := p.fetchLinkFromKeyAndCid(cid)
// If the link happens to be nil, return ErrChannelNotFound so we can
// ignore the close message.
if chanLink == nil {
return ErrChannelNotFound
}
// Else, the link exists, so attempt to trigger shutdown. If this
// fails, we'll send an error message to the remote peer.
if err := chanLink.ShutdownIfChannelClean(); err != nil {
return err
}
// Next, we remove the link from the switch to shut down all of the
// link's goroutines and remove it from the switch's internal maps. We
// don't call WipeChannel as the channel must still be in the
// activeChannels map to process coop close messages.
p.cfg.Switch.RemoveLink(cid)
return nil
}
// fetchLinkFromKeyAndCid fetches a link from the switch via the remote's
// public key and the channel id.
func (p *Brontide) fetchLinkFromKeyAndCid(
@ -3602,6 +3577,8 @@ func (p *Brontide) StartTime() time.Time {
// message is received from the remote peer. We'll use this message to advance
// the chan closer state machine.
func (p *Brontide) handleCloseMsg(msg *closeMsg) {
link := p.fetchLinkFromKeyAndCid(msg.cid)
// We'll now fetch the matching closing state machine in order to continue,
// or finalize the channel closure process.
chanCloser, err := p.fetchActiveChanCloser(msg.cid)
@ -3621,13 +3598,8 @@ func (p *Brontide) handleCloseMsg(msg *closeMsg) {
return
}
// Next, we'll process the next message using the target state machine.
// We'll either continue negotiation, or halt.
msgs, closeFin, err := chanCloser.ProcessCloseMsg(
msg.msg,
)
if err != nil {
err := fmt.Errorf("unable to process close msg: %v", err)
handleErr := func(err error) {
err = fmt.Errorf("unable to process close msg: %w", err)
p.log.Error(err)
// As the negotiations failed, we'll reset the channel state machine to
@ -3638,18 +3610,93 @@ func (p *Brontide) handleCloseMsg(msg *closeMsg) {
chanCloser.CloseRequest().Err <- err
}
delete(p.activeChanCloses, msg.cid)
return
p.Disconnect(err)
}
// Queue any messages to the remote peer that need to be sent as a part of
// this latest round of negotiations.
for _, msg := range msgs {
p.queueMsg(msg, nil)
// Next, we'll process the next message using the target state machine.
// We'll either continue negotiation, or halt.
switch typed := msg.msg.(type) {
case *lnwire.Shutdown:
// Disable incoming adds immediately.
if link != nil {
err := link.DisableAdds(htlcswitch.Incoming)
if err != nil {
p.log.Warnf("incoming link adds already "+
"disabled: %v", link.ChanID())
}
}
oShutdown, err := chanCloser.ReceiveShutdown(*typed)
if err != nil {
handleErr(err)
return
}
oShutdown.WhenSome(func(msg lnwire.Shutdown) {
// if the link is nil it means we can immediately queue
// the Shutdown message since we don't have to wait for
// commitment transaction synchronization.
if link == nil {
p.queueMsg(typed, nil)
return
}
// When we have a Shutdown to send, we defer it till the
// next time we send a CommitSig to remain spec
// compliant.
link.OnCommitOnce(htlcswitch.Outgoing, func() {
err := link.DisableAdds(htlcswitch.Outgoing)
if err != nil {
p.log.Warn(err.Error())
}
p.queueMsg(&msg, nil)
})
})
beginNegotiation := func() {
oClosingSigned, err := chanCloser.BeginNegotiation()
if err != nil {
handleErr(err)
return
}
oClosingSigned.WhenSome(func(msg lnwire.ClosingSigned) {
p.queueMsg(&msg, nil)
})
}
if link == nil {
beginNegotiation()
} else {
// Now we register a flush hook to advance the
// ChanCloser and possibly send out a ClosingSigned
// when the link finishes draining.
link.OnFlushedOnce(func() {
// Remove link in goroutine to prevent deadlock.
go p.cfg.Switch.RemoveLink(msg.cid)
beginNegotiation()
})
}
case *lnwire.ClosingSigned:
oClosingSigned, err := chanCloser.ReceiveClosingSigned(*typed)
if err != nil {
handleErr(err)
return
}
oClosingSigned.WhenSome(func(msg lnwire.ClosingSigned) {
p.queueMsg(&msg, nil)
})
default:
panic("impossible closeMsg type")
}
// If we haven't finished close negotiations, then we'll continue as we
// can't yet finalize the closure.
if !closeFin {
if _, err := chanCloser.ClosingTx(); err != nil {
return
}

View File

@ -4,9 +4,11 @@ import (
"bytes"
crand "crypto/rand"
"encoding/binary"
"fmt"
"io"
"math/rand"
"net"
"sync/atomic"
"testing"
"time"
@ -456,7 +458,9 @@ func (m *mockMessageSwitch) GetLinksByInterface(pub [33]byte) (
// mockUpdateHandler is a mock implementation of the ChannelUpdateHandler
// interface. It is used in mockMessageSwitch's GetLinksByInterface method.
type mockUpdateHandler struct {
cid lnwire.ChannelID
cid lnwire.ChannelID
isOutgoingAddBlocked atomic.Bool
isIncomingAddBlocked atomic.Bool
}
// newMockUpdateHandler creates a new mockUpdateHandler.
@ -481,9 +485,6 @@ func (m *mockUpdateHandler) EligibleToForward() bool { return false }
// MayAddOutgoingHtlc currently returns nil.
func (m *mockUpdateHandler) MayAddOutgoingHtlc(lnwire.MilliSatoshi) error { return nil }
// ShutdownIfChannelClean currently returns nil.
func (m *mockUpdateHandler) ShutdownIfChannelClean() error { return nil }
type mockMessageConn struct {
t *testing.T
@ -509,6 +510,57 @@ type mockMessageConn struct {
readRaceDetectingCounter int
}
func (m *mockUpdateHandler) EnableAdds(dir htlcswitch.LinkDirection) error {
switch dir {
case htlcswitch.Outgoing:
if !m.isOutgoingAddBlocked.Swap(false) {
return fmt.Errorf("%v adds already enabled", dir)
}
case htlcswitch.Incoming:
if !m.isIncomingAddBlocked.Swap(false) {
return fmt.Errorf("%v adds already enabled", dir)
}
}
return nil
}
func (m *mockUpdateHandler) DisableAdds(dir htlcswitch.LinkDirection) error {
switch dir {
case htlcswitch.Outgoing:
if m.isOutgoingAddBlocked.Swap(true) {
return fmt.Errorf("%v adds already disabled", dir)
}
case htlcswitch.Incoming:
if m.isIncomingAddBlocked.Swap(true) {
return fmt.Errorf("%v adds already disabled", dir)
}
}
return nil
}
func (m *mockUpdateHandler) IsFlushing(dir htlcswitch.LinkDirection) bool {
switch dir {
case htlcswitch.Outgoing:
return m.isOutgoingAddBlocked.Load()
case htlcswitch.Incoming:
return m.isIncomingAddBlocked.Load()
}
return false
}
func (m *mockUpdateHandler) OnFlushedOnce(hook func()) {
hook()
}
func (m *mockUpdateHandler) OnCommitOnce(
_ htlcswitch.LinkDirection, hook func(),
) {
hook()
}
func newMockConn(t *testing.T, expectedMessages int) *mockMessageConn {
return &mockMessageConn{
t: t,

View File

@ -2643,10 +2643,10 @@ func (r *rpcServer) CloseChannel(in *lnrpc.CloseChannelRequest,
rpcsLog.Debugf("Target sat/kw for closing transaction: %v",
int64(feeRate))
// Before we attempt the cooperative channel closure, we'll
// examine the channel to ensure that it doesn't have a
// lingering HTLC.
if len(channel.ActiveHtlcs()) != 0 {
// If the user hasn't specified NoWait, then before we attempt
// to close the channel we ensure there are no active HTLCs on
// the link.
if !in.NoWait && len(channel.ActiveHtlcs()) != 0 {
return fmt.Errorf("cannot co-op close channel " +
"with active htlcs")
}
@ -2690,6 +2690,19 @@ func (r *rpcServer) CloseChannel(in *lnrpc.CloseChannelRequest,
maxFee, deliveryScript,
)
}
// If the user doesn't want to wait for the txid to come back then we
// will send an empty update to kick off the stream.
if in.NoWait {
rpcsLog.Trace("[closechannel] sending instant update")
if err := updateStream.Send(
&lnrpc.CloseStatusUpdate{
Update: &lnrpc.CloseStatusUpdate_CloseInstant{},
},
); err != nil {
return err
}
}
out:
for {
select {