From aeaa009e92b61d1d934813066a976c3393a2cca7 Mon Sep 17 00:00:00 2001 From: eugene Date: Tue, 10 Aug 2021 17:00:51 -0400 Subject: [PATCH] peer+chancloser: tryLinkShutdown during cooperative close process Adds a new Brontide struct method tryLinkShutdown that attempts to fetch the target link and calls ShutdownIfChannelClean on it. This allows the coop close process to guarantee atomicity of the underlying channel state. Also removes the UnregisterChannel method from the chancloser's config as the link is shut down before the chancloser is created. --- lnwallet/chancloser/chancloser.go | 11 ---- peer/brontide.go | 97 ++++++++++++++++++++++--------- peer/brontide_test.go | 43 +++++++++++--- peer/test_utils.go | 8 ++- 4 files changed, 110 insertions(+), 49 deletions(-) diff --git a/lnwallet/chancloser/chancloser.go b/lnwallet/chancloser/chancloser.go index 0fe643bcf..2eaa83288 100644 --- a/lnwallet/chancloser/chancloser.go +++ b/lnwallet/chancloser/chancloser.go @@ -79,11 +79,6 @@ type ChanCloseCfg struct { // Channel is the channel that should be closed. Channel *lnwallet.LightningChannel - // UnregisterChannel is a function closure that allows the ChanCloser to - // unregister a channel. Once this has been done, no further HTLC's should - // be routed through the channel. - UnregisterChannel func(lnwire.ChannelID) - // BroadcastTx broadcasts the passed transaction to the network. BroadcastTx func(*wire.MsgTx, string) error @@ -212,8 +207,6 @@ func (c *ChanCloser) initChanShutdown() (*lnwire.Shutdown, error) { // closing script. shutdown := lnwire.NewShutdown(c.cid, c.localDeliveryScript) - // TODO(roasbeef): err if channel has htlc's? - // Before closing, we'll attempt to send a disable update for the channel. // We do so before closing the channel as otherwise the current edge policy // won't be retrievable from the graph. @@ -222,10 +215,6 @@ func (c *ChanCloser) initChanShutdown() (*lnwire.Shutdown, error) { c.chanPoint, err) } - // Before returning the shutdown message, we'll unregister the channel to - // ensure that it isn't seen as usable within the system. - c.cfg.UnregisterChannel(c.cid) - // Before continuing, mark the channel as cooperatively closed with a nil // txn. Even though we haven't negotiated the final txn, this guarantees // that our listchannels rpc will be externally consistent, and reflect diff --git a/peer/brontide.go b/peer/brontide.go index 695a09a87..c3c3dd754 100644 --- a/peer/brontide.go +++ b/peer/brontide.go @@ -1184,11 +1184,9 @@ func waitUntilLinkActive(p *Brontide, // The link may already be active by this point, and we may have missed the // ActiveLinkEvent. Check if the link exists. - links, _ := p.cfg.Switch.GetLinksByInterface(p.cfg.PubKeyBytes) - for _, link := range links { - if link.ChanID() == cid { - return link - } + link := p.fetchLinkFromKeyAndCid(cid) + if link != nil { + return link } // If the link is nil, we must wait for it to be active. @@ -1216,16 +1214,7 @@ func waitUntilLinkActive(p *Brontide, // The link shouldn't be nil as we received an // ActiveLinkEvent. If it is nil, we return nil and the // calling function should catch it. - links, _ = p.cfg.Switch.GetLinksByInterface( - p.cfg.PubKeyBytes, - ) - for _, link := range links { - if link.ChanID() == cid { - return link - } - } - - return nil + return p.fetchLinkFromKeyAndCid(cid) case <-p.quit: return nil @@ -2408,12 +2397,11 @@ func (p *Brontide) fetchActiveChanCloser(chanID lnwire.ChannelID) ( // cooperative channel closure. chanCloser, ok := p.activeChanCloses[chanID] if !ok { - // If we need to create a chan closer for the first time, then - // we'll check to ensure that the channel is even in the proper - // state to allow a co-op channel closure. - if len(channel.ActiveHtlcs()) != 0 { - return nil, fmt.Errorf("cannot co-op close " + - "channel w/ active htlcs") + // Optimistically try a link shutdown, erroring out if it + // failed. + if err := p.tryLinkShutdown(chanID); err != nil { + peerLog.Errorf("failed link shutdown: %v", err) + return nil, err } // We'll create a valid closing state machine in order to @@ -2453,9 +2441,8 @@ func (p *Brontide) fetchActiveChanCloser(chanID lnwire.ChannelID) ( chanCloser = chancloser.NewChanCloser( chancloser.ChanCloseCfg{ - Channel: channel, - UnregisterChannel: p.cfg.Switch.RemoveLink, - BroadcastTx: p.cfg.Wallet.PublishTransaction, + Channel: channel, + BroadcastTx: p.cfg.Wallet.PublishTransaction, DisableChannel: func(chanPoint wire.OutPoint) error { return p.cfg.ChanStatusMgr.RequestDisable(chanPoint, false) }, @@ -2569,11 +2556,18 @@ func (p *Brontide) handleLocalCloseReq(req *htlcswitch.ChanClose) { return } + // Optimistically try a link shutdown, erroring out if it + // failed. + if err := p.tryLinkShutdown(chanID); err != nil { + peerLog.Errorf("failed link shutdown: %v", err) + req.Err <- err + return + } + chanCloser := chancloser.NewChanCloser( chancloser.ChanCloseCfg{ - Channel: channel, - UnregisterChannel: p.cfg.Switch.RemoveLink, - BroadcastTx: p.cfg.Wallet.PublishTransaction, + Channel: channel, + BroadcastTx: p.cfg.Wallet.PublishTransaction, DisableChannel: func(chanPoint wire.OutPoint) error { return p.cfg.ChanStatusMgr.RequestDisable(chanPoint, false) }, @@ -2700,6 +2694,55 @@ func (p *Brontide) handleLinkFailure(failure linkFailureReport) { } } +// tryLinkShutdown attempts to fetch a target link from the switch, calls +// ShutdownIfChannelClean to optimistically trigger a link shutdown, and +// removes the link from the switch. It returns an error if any step failed. +func (p *Brontide) tryLinkShutdown(cid lnwire.ChannelID) error { + // Fetch the appropriate link and call ShutdownIfChannelClean to ensure + // no other updates can occur. + chanLink := p.fetchLinkFromKeyAndCid(cid) + + // If the link happens to be nil, return ErrChannelNotFound so we can + // ignore the close message. + if chanLink == nil { + return ErrChannelNotFound + } + + // Else, the link exists, so attempt to trigger shutdown. If this + // fails, we'll send an error message to the remote peer. + if err := chanLink.ShutdownIfChannelClean(); err != nil { + return err + } + + // Next, we remove the link from the switch to shut down all of the + // link's goroutines and remove it from the switch's internal maps. We + // don't call WipeChannel as the channel must still be in the + // activeChannels map to process coop close messages. + p.cfg.Switch.RemoveLink(cid) + + return nil +} + +// fetchLinkFromKeyAndCid fetches a link from the switch via the remote's +// public key and the channel id. +func (p *Brontide) fetchLinkFromKeyAndCid( + cid lnwire.ChannelID) htlcswitch.ChannelUpdateHandler { + + var chanLink htlcswitch.ChannelUpdateHandler + + // We don't need to check the error here, and can instead just loop + // over the slice and return nil. + links, _ := p.cfg.Switch.GetLinksByInterface(p.cfg.PubKeyBytes) + for _, link := range links { + if link.ChanID() == cid { + chanLink = link + break + } + } + + return chanLink +} + // finalizeChanClosure performs the final clean up steps once the cooperative // closure transaction has been fully broadcast. The finalized closing state // machine should be passed in. Once the transaction has been sufficiently diff --git a/peer/brontide_test.go b/peer/brontide_test.go index 588090878..01a5f0c89 100644 --- a/peer/brontide_test.go +++ b/peer/brontide_test.go @@ -39,8 +39,10 @@ func TestPeerChannelClosureAcceptFeeResponder(t *testing.T) { } broadcastTxChan := make(chan *wire.MsgTx) + mockSwitch := &mockMessageSwitch{} + alicePeer, bobChan, cleanUp, err := createTestPeer( - notifier, broadcastTxChan, noUpdate, nil, + notifier, broadcastTxChan, noUpdate, mockSwitch, ) if err != nil { t.Fatalf("unable to create test channels: %v", err) @@ -49,6 +51,9 @@ func TestPeerChannelClosureAcceptFeeResponder(t *testing.T) { chanID := lnwire.NewChanIDFromOutPoint(bobChan.ChannelPoint()) + mockLink := newMockUpdateHandler(chanID) + mockSwitch.links = append(mockSwitch.links, mockLink) + // We send a shutdown request to Alice. She will now be the responding // node in this shutdown procedure. We first expect Alice to answer // this shutdown request with a Shutdown message. @@ -142,14 +147,20 @@ func TestPeerChannelClosureAcceptFeeInitiator(t *testing.T) { } broadcastTxChan := make(chan *wire.MsgTx) + mockSwitch := &mockMessageSwitch{} + alicePeer, bobChan, cleanUp, err := createTestPeer( - notifier, broadcastTxChan, noUpdate, nil, + notifier, broadcastTxChan, noUpdate, mockSwitch, ) if err != nil { t.Fatalf("unable to create test channels: %v", err) } defer cleanUp() + chanID := lnwire.NewChanIDFromOutPoint(bobChan.ChannelPoint()) + mockLink := newMockUpdateHandler(chanID) + mockSwitch.links = append(mockSwitch.links, mockLink) + // We make Alice send a shutdown request. updateChan := make(chan interface{}, 1) errChan := make(chan error, 1) @@ -179,7 +190,6 @@ func TestPeerChannelClosureAcceptFeeInitiator(t *testing.T) { aliceDeliveryScript := shutdownMsg.Address // Bob will respond with his own Shutdown message. - chanID := shutdownMsg.ChannelID alicePeer.chanCloseMsgs <- &closeMsg{ cid: chanID, msg: lnwire.NewShutdown(chanID, @@ -264,8 +274,10 @@ func TestPeerChannelClosureFeeNegotiationsResponder(t *testing.T) { } broadcastTxChan := make(chan *wire.MsgTx) + mockSwitch := &mockMessageSwitch{} + alicePeer, bobChan, cleanUp, err := createTestPeer( - notifier, broadcastTxChan, noUpdate, nil, + notifier, broadcastTxChan, noUpdate, mockSwitch, ) if err != nil { t.Fatalf("unable to create test channels: %v", err) @@ -274,6 +286,9 @@ func TestPeerChannelClosureFeeNegotiationsResponder(t *testing.T) { chanID := lnwire.NewChanIDFromOutPoint(bobChan.ChannelPoint()) + mockLink := newMockUpdateHandler(chanID) + mockSwitch.links = append(mockSwitch.links, mockLink) + // Bob sends a shutdown request to Alice. She will now be the responding // node in this shutdown procedure. We first expect Alice to answer this // Shutdown request with a Shutdown message. @@ -458,14 +473,20 @@ func TestPeerChannelClosureFeeNegotiationsInitiator(t *testing.T) { } broadcastTxChan := make(chan *wire.MsgTx) + mockSwitch := &mockMessageSwitch{} + alicePeer, bobChan, cleanUp, err := createTestPeer( - notifier, broadcastTxChan, noUpdate, nil, + notifier, broadcastTxChan, noUpdate, mockSwitch, ) if err != nil { t.Fatalf("unable to create test channels: %v", err) } defer cleanUp() + chanID := lnwire.NewChanIDFromOutPoint(bobChan.ChannelPoint()) + mockLink := newMockUpdateHandler(chanID) + mockSwitch.links = append(mockSwitch.links, mockLink) + // We make the initiator send a shutdown request. updateChan := make(chan interface{}, 1) errChan := make(chan error, 1) @@ -496,7 +517,6 @@ func TestPeerChannelClosureFeeNegotiationsInitiator(t *testing.T) { aliceDeliveryScript := shutdownMsg.Address // Bob will answer the Shutdown message with his own Shutdown. - chanID := lnwire.NewChanIDFromOutPoint(bobChan.ChannelPoint()) respShutdown := lnwire.NewShutdown(chanID, dummyDeliveryScript) alicePeer.chanCloseMsgs <- &closeMsg{ cid: chanID, @@ -793,20 +813,27 @@ func TestCustomShutdownScript(t *testing.T) { } broadcastTxChan := make(chan *wire.MsgTx) + mockSwitch := &mockMessageSwitch{} + // Open a channel. alicePeer, bobChan, cleanUp, err := createTestPeer( - notifier, broadcastTxChan, test.update, nil, + notifier, broadcastTxChan, test.update, + mockSwitch, ) if err != nil { t.Fatalf("unable to create test channels: %v", err) } defer cleanUp() + chanPoint := bobChan.ChannelPoint() + chanID := lnwire.NewChanIDFromOutPoint(chanPoint) + mockLink := newMockUpdateHandler(chanID) + mockSwitch.links = append(mockSwitch.links, mockLink) + // Request initiator to cooperatively close the channel, with // a specified delivery address. updateChan := make(chan interface{}, 1) errChan := make(chan error, 1) - chanPoint := bobChan.ChannelPoint() closeCommand := htlcswitch.ChanClose{ CloseType: htlcswitch.CloseRegular, ChanPoint: chanPoint, diff --git a/peer/test_utils.go b/peer/test_utils.go index 1d63a67bf..3ce1cbe03 100644 --- a/peer/test_utils.go +++ b/peer/test_utils.go @@ -380,7 +380,9 @@ func createTestPeer(notifier chainntnfs.ChainNotifier, // mockMessageSwitch is a mock implementation of the messageSwitch interface // used for testing without relying on a *htlcswitch.Switch in unit tests. -type mockMessageSwitch struct{} +type mockMessageSwitch struct { + links []htlcswitch.ChannelUpdateHandler +} // BestHeight currently returns a dummy value. func (m *mockMessageSwitch) BestHeight() uint32 { @@ -402,11 +404,11 @@ func (m *mockMessageSwitch) CreateAndAddLink(cfg htlcswitch.ChannelLinkConfig, return nil } -// GetLinksByInterface currently returns dummy values. +// GetLinksByInterface returns the active links. func (m *mockMessageSwitch) GetLinksByInterface(pub [33]byte) ( []htlcswitch.ChannelUpdateHandler, error) { - return nil, nil + return m.links, nil } // mockUpdateHandler is a mock implementation of the ChannelUpdateHandler