From cd14c52ecd066ac3c434fc5b38915b06867a1449 Mon Sep 17 00:00:00 2001 From: Jonathan Harvey-Buschel Date: Thu, 17 Oct 2024 13:38:31 +0200 Subject: [PATCH] htlcswitch: pass quit chans as unidirectional This is a requirement for replacing the quit channel with a Context. The Done() channel of a Context is always recv-only, so all users of that channel must not expect a bidirectional channel. --- htlcswitch/interceptable_switch.go | 6 ++--- htlcswitch/link.go | 2 +- htlcswitch/link_test.go | 40 ++++++++++++++++++------------ htlcswitch/mailbox.go | 4 +-- htlcswitch/mailbox_test.go | 4 +-- htlcswitch/switch.go | 4 +-- htlcswitch/test_utils.go | 18 ++++++++------ 7 files changed, 45 insertions(+), 33 deletions(-) diff --git a/htlcswitch/interceptable_switch.go b/htlcswitch/interceptable_switch.go index e06163d48..71302bf07 100644 --- a/htlcswitch/interceptable_switch.go +++ b/htlcswitch/interceptable_switch.go @@ -95,7 +95,7 @@ type InterceptableSwitch struct { type interceptedPackets struct { packets []*htlcPacket - linkQuit chan struct{} + linkQuit <-chan struct{} isReplay bool } @@ -465,8 +465,8 @@ func (s *InterceptableSwitch) Resolve(res *FwdResolution) error { // interceptor. If the interceptor signals the resume action, the htlcs are // forwarded to the switch. The link's quit signal should be provided to allow // cancellation of forwarding during link shutdown. -func (s *InterceptableSwitch) ForwardPackets(linkQuit chan struct{}, isReplay bool, - packets ...*htlcPacket) error { +func (s *InterceptableSwitch) ForwardPackets(linkQuit <-chan struct{}, + isReplay bool, packets ...*htlcPacket) error { // Synchronize with the main event loop. This should be light in the // case where there is no interceptor. diff --git a/htlcswitch/link.go b/htlcswitch/link.go index e7766bc81..d395a94b4 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -101,7 +101,7 @@ type ChannelLinkConfig struct { // switch. The function returns and error in case it fails to send one or // more packets. The link's quit signal should be provided to allow // cancellation of forwarding during link shutdown. - ForwardPackets func(chan struct{}, bool, ...*htlcPacket) error + ForwardPackets func(<-chan struct{}, bool, ...*htlcPacket) error // DecodeHopIterators facilitates batched decoding of HTLC Sphinx onion // blobs, which are then used to inform how to forward an HTLC. diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index 018601a04..e473f9111 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -2197,17 +2197,21 @@ func newSingleLinkTestHarness(t *testing.T, chanAmt, return nil } + forwardPackets := func(linkQuit <-chan struct{}, _ bool, + packets ...*htlcPacket) error { + + return aliceSwitch.ForwardPackets(linkQuit, packets...) + } + // Instantiate with a long interval, so that we can precisely control // the firing via force feeding. bticker := ticker.NewForce(time.Hour) aliceCfg := ChannelLinkConfig{ - FwrdingPolicy: globalPolicy, - Peer: alicePeer, - BestHeight: aliceSwitch.BestHeight, - Circuits: aliceSwitch.CircuitModifier(), - ForwardPackets: func(linkQuit chan struct{}, _ bool, packets ...*htlcPacket) error { - return aliceSwitch.ForwardPackets(linkQuit, packets...) - }, + FwrdingPolicy: globalPolicy, + Peer: alicePeer, + BestHeight: aliceSwitch.BestHeight, + Circuits: aliceSwitch.CircuitModifier(), + ForwardPackets: forwardPackets, DecodeHopIterators: decoder.DecodeHopIterators, ExtractErrorEncrypter: func(*btcec.PublicKey) ( hop.ErrorEncrypter, lnwire.FailCode) { @@ -4867,17 +4871,21 @@ func (h *persistentLinkHarness) restartLink( return nil } + forwardPackets := func(linkQuit <-chan struct{}, _ bool, + packets ...*htlcPacket) error { + + return h.hSwitch.ForwardPackets(linkQuit, packets...) + } + // Instantiate with a long interval, so that we can precisely control // the firing via force feeding. bticker := ticker.NewForce(time.Hour) aliceCfg := ChannelLinkConfig{ - FwrdingPolicy: globalPolicy, - Peer: alicePeer, - BestHeight: h.hSwitch.BestHeight, - Circuits: h.hSwitch.CircuitModifier(), - ForwardPackets: func(linkQuit chan struct{}, _ bool, packets ...*htlcPacket) error { - return h.hSwitch.ForwardPackets(linkQuit, packets...) - }, + FwrdingPolicy: globalPolicy, + Peer: alicePeer, + BestHeight: h.hSwitch.BestHeight, + Circuits: h.hSwitch.CircuitModifier(), + ForwardPackets: forwardPackets, DecodeHopIterators: decoder.DecodeHopIterators, ExtractErrorEncrypter: func(*btcec.PublicKey) ( hop.ErrorEncrypter, lnwire.FailCode) { @@ -7037,7 +7045,7 @@ func TestPipelineSettle(t *testing.T) { // erroneously forwarded. If the forwardChan is closed before the last // step, then the test will fail. forwardChan := make(chan struct{}) - fwdPkts := func(c chan struct{}, _ bool, hp ...*htlcPacket) error { + fwdPkts := func(c <-chan struct{}, _ bool, hp ...*htlcPacket) error { close(forwardChan) return nil } @@ -7223,7 +7231,7 @@ func TestChannelLinkShortFailureRelay(t *testing.T) { aliceMsgs := mockPeer.sentMsgs switchChan := make(chan *htlcPacket) - coreLink.cfg.ForwardPackets = func(linkQuit chan struct{}, _ bool, + coreLink.cfg.ForwardPackets = func(linkQuit <-chan struct{}, _ bool, packets ...*htlcPacket) error { for _, p := range packets { diff --git a/htlcswitch/mailbox.go b/htlcswitch/mailbox.go index 9b82f8912..b283825dd 100644 --- a/htlcswitch/mailbox.go +++ b/htlcswitch/mailbox.go @@ -95,7 +95,7 @@ type mailBoxConfig struct { // forwardPackets send a varidic number of htlcPackets to the switch to // be routed. A quit channel should be provided so that the call can // properly exit during shutdown. - forwardPackets func(chan struct{}, ...*htlcPacket) error + forwardPackets func(<-chan struct{}, ...*htlcPacket) error // clock is a time source for the mailbox. clock clock.Clock @@ -804,7 +804,7 @@ type mailOrchConfig struct { // forwardPackets send a varidic number of htlcPackets to the switch to // be routed. A quit channel should be provided so that the call can // properly exit during shutdown. - forwardPackets func(chan struct{}, ...*htlcPacket) error + forwardPackets func(<-chan struct{}, ...*htlcPacket) error // clock is a time source for the generated mailboxes. clock clock.Clock diff --git a/htlcswitch/mailbox_test.go b/htlcswitch/mailbox_test.go index dfb0fb616..57a581c4b 100644 --- a/htlcswitch/mailbox_test.go +++ b/htlcswitch/mailbox_test.go @@ -250,7 +250,7 @@ func newMailboxContext(t *testing.T, startTime time.Time, return ctx } -func (c *mailboxContext) forward(_ chan struct{}, +func (c *mailboxContext) forward(_ <-chan struct{}, pkts ...*htlcPacket) error { for _, pkt := range pkts { @@ -706,7 +706,7 @@ func TestMailOrchestrator(t *testing.T) { // First, we'll create a new instance of our orchestrator. mo := newMailOrchestrator(&mailOrchConfig{ failMailboxUpdate: failMailboxUpdate, - forwardPackets: func(_ chan struct{}, + forwardPackets: func(_ <-chan struct{}, pkts ...*htlcPacket) error { return nil diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index 4ff2746a7..10e4d37bf 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -672,7 +672,7 @@ func (s *Switch) IsForwardedHTLC(chanID lnwire.ShortChannelID, // given to forward them through the router. The sending link's quit channel is // used to prevent deadlocks when the switch stops a link in the midst of // forwarding. -func (s *Switch) ForwardPackets(linkQuit chan struct{}, +func (s *Switch) ForwardPackets(linkQuit <-chan struct{}, packets ...*htlcPacket) error { var ( @@ -850,7 +850,7 @@ func (s *Switch) logFwdErrs(num *int, wg *sync.WaitGroup, fwdChan chan error) { // receive a shutdown requuest. This method does not wait for a response from // the htlcForwarder before returning. func (s *Switch) routeAsync(packet *htlcPacket, errChan chan error, - linkQuit chan struct{}) error { + linkQuit <-chan struct{}) error { command := &plexPacket{ pkt: packet, diff --git a/htlcswitch/test_utils.go b/htlcswitch/test_utils.go index 450d5a19d..5ef83c8f1 100644 --- a/htlcswitch/test_utils.go +++ b/htlcswitch/test_utils.go @@ -1144,15 +1144,19 @@ func (h *hopNetwork) createChannelLink(server, peer *mockServer, return nil } + forwardPackets := func(linkQuit <-chan struct{}, _ bool, + packets ...*htlcPacket) error { + + return server.htlcSwitch.ForwardPackets(linkQuit, packets...) + } + link := NewChannelLink( ChannelLinkConfig{ - BestHeight: server.htlcSwitch.BestHeight, - FwrdingPolicy: h.globalPolicy, - Peer: peer, - Circuits: server.htlcSwitch.CircuitModifier(), - ForwardPackets: func(linkQuit chan struct{}, _ bool, packets ...*htlcPacket) error { - return server.htlcSwitch.ForwardPackets(linkQuit, packets...) - }, + BestHeight: server.htlcSwitch.BestHeight, + FwrdingPolicy: h.globalPolicy, + Peer: peer, + Circuits: server.htlcSwitch.CircuitModifier(), + ForwardPackets: forwardPackets, DecodeHopIterators: decoder.DecodeHopIterators, ExtractErrorEncrypter: func(*btcec.PublicKey) ( hop.ErrorEncrypter, lnwire.FailCode) {