htlcswitch+go.mod: use updated fn.ContextGuard

This commit updates the fn dep to the version containing the updates to
the ContextGuard implementation. Only the htlcswitch/link uses the guard
at the moment so this is updated to make use of the new implementation.
This commit is contained in:
Elle Mouton 2024-12-10 14:12:14 +02:00
parent 77848c402d
commit 950194a2da
No known key found for this signature in database
GPG Key ID: D7D916376026F177
7 changed files with 95 additions and 78 deletions

2
go.mod
View File

@ -36,7 +36,7 @@ require (
github.com/lightningnetwork/lightning-onion v1.2.1-0.20240712235311-98bd56499dfb
github.com/lightningnetwork/lnd/cert v1.2.2
github.com/lightningnetwork/lnd/clock v1.1.1
github.com/lightningnetwork/lnd/fn/v2 v2.0.4
github.com/lightningnetwork/lnd/fn/v2 v2.0.8
github.com/lightningnetwork/lnd/healthcheck v1.2.6
github.com/lightningnetwork/lnd/kvdb v1.4.12
github.com/lightningnetwork/lnd/queue v1.1.1

4
go.sum
View File

@ -456,8 +456,8 @@ github.com/lightningnetwork/lnd/cert v1.2.2 h1:71YK6hogeJtxSxw2teq3eGeuy4rHGKcFf
github.com/lightningnetwork/lnd/cert v1.2.2/go.mod h1:jQmFn/Ez4zhDgq2hnYSw8r35bqGVxViXhX6Cd7HXM6U=
github.com/lightningnetwork/lnd/clock v1.1.1 h1:OfR3/zcJd2RhH0RU+zX/77c0ZiOnIMsDIBjgjWdZgA0=
github.com/lightningnetwork/lnd/clock v1.1.1/go.mod h1:mGnAhPyjYZQJmebS7aevElXKTFDuO+uNFFfMXK1W8xQ=
github.com/lightningnetwork/lnd/fn/v2 v2.0.4 h1:DiC/AEa7DhnY4qOEQBISu1cp+1+51LjbVDzNLVBwNjI=
github.com/lightningnetwork/lnd/fn/v2 v2.0.4/go.mod h1:TOzwrhjB/Azw1V7aa8t21ufcQmdsQOQMDtxVOQWNl8s=
github.com/lightningnetwork/lnd/fn/v2 v2.0.8 h1:r2SLz7gZYQPVc3IZhU82M66guz3Zk2oY+Rlj9QN5S3g=
github.com/lightningnetwork/lnd/fn/v2 v2.0.8/go.mod h1:TOzwrhjB/Azw1V7aa8t21ufcQmdsQOQMDtxVOQWNl8s=
github.com/lightningnetwork/lnd/healthcheck v1.2.6 h1:1sWhqr93GdkWy4+6U7JxBfcyZIE78MhIHTJZfPx7qqI=
github.com/lightningnetwork/lnd/healthcheck v1.2.6/go.mod h1:Mu02um4CWY/zdTOvFje7WJgJcHyX2zq/FG3MhOAiGaQ=
github.com/lightningnetwork/lnd/kvdb v1.4.12 h1:Y0WY5Tbjyjn6eCYh068qkWur5oFtioJlfxc8w5SlJeQ=

View File

@ -2,6 +2,7 @@ package htlcswitch
import (
"bytes"
"context"
crand "crypto/rand"
"crypto/sha256"
"errors"
@ -408,10 +409,10 @@ type channelLink struct {
// the result.
quiescenceReqs chan StfuReq
// ContextGuard is a helper that encapsulates a wait group and quit
// channel and allows contexts that either block or cancel on those
// depending on the use case.
*fn.ContextGuard
// cg is a helper that encapsulates a wait group and quit channel and
// allows contexts that either block or cancel on those depending on
// the use case.
cg *fn.ContextGuard
}
// hookMap is a data structure that is used to track the hooks that need to be
@ -517,7 +518,7 @@ func NewChannelLink(cfg ChannelLinkConfig,
incomingCommitHooks: newHookMap(),
quiescer: qsm,
quiescenceReqs: quiescenceReqs,
ContextGuard: fn.NewContextGuard(),
cg: fn.NewContextGuard(),
}
}
@ -596,8 +597,8 @@ func (l *channelLink) Start() error {
l.updateFeeTimer = time.NewTimer(l.randomFeeUpdateTimeout())
l.Wg.Add(1)
go l.htlcManager()
l.cg.WgAdd(1)
go l.htlcManager(context.TODO())
return nil
}
@ -636,8 +637,8 @@ func (l *channelLink) Stop() {
l.hodlQueue.Stop()
}
close(l.Quit)
l.Wg.Wait()
l.cg.Quit()
l.cg.WgWait()
// Now that the htlcManager has completely exited, reset the packet
// courier. This allows the mailbox to revaluate any lingering Adds that
@ -662,7 +663,7 @@ func (l *channelLink) Stop() {
// WaitForShutdown blocks until the link finishes shutting down, which includes
// termination of all dependent goroutines.
func (l *channelLink) WaitForShutdown() {
l.Wg.Wait()
l.cg.WgWait()
}
// EligibleToForward returns a bool indicating if the channel is able to
@ -740,7 +741,7 @@ func (l *channelLink) IsFlushing(linkDirection LinkDirection) bool {
func (l *channelLink) OnFlushedOnce(hook func()) {
select {
case l.flushHooks.newTransients <- hook:
case <-l.Quit:
case <-l.cg.Done():
}
}
@ -759,7 +760,7 @@ func (l *channelLink) OnCommitOnce(direction LinkDirection, hook func()) {
select {
case queue <- hook:
case <-l.Quit:
case <-l.cg.Done():
}
}
@ -777,7 +778,7 @@ func (l *channelLink) InitStfu() <-chan fn.Result[lntypes.ChannelParty] {
select {
case l.quiescenceReqs <- req:
case <-l.Quit:
case <-l.cg.Done():
req.Resolve(fn.Err[lntypes.ChannelParty](ErrLinkShuttingDown))
}
@ -887,7 +888,7 @@ func (l *channelLink) createFailureWithUpdate(incoming bool,
// This method is to be called upon reconnection after the initial funding
// flow. We'll compare out commitment chains with the remote party, and re-send
// either a danging commit signature, a revocation, or both.
func (l *channelLink) syncChanStates() error {
func (l *channelLink) syncChanStates(ctx context.Context) error {
chanState := l.channel.State()
l.log.Infof("Attempting to re-synchronize channel: %v", chanState)
@ -989,7 +990,7 @@ func (l *channelLink) syncChanStates() error {
// We've just received a ChanSync message from the remote
// party, so we'll process the message in order to determine
// if we need to re-transmit any messages to the remote party.
ctx, cancel := l.WithCtxQuitNoTimeout()
ctx, cancel := l.cg.Create(ctx)
defer cancel()
msgsToReSend, openedCircuits, closedCircuits, err =
l.channel.ProcessChanSyncMsg(ctx, remoteChanSyncMsg)
@ -1021,7 +1022,7 @@ func (l *channelLink) syncChanStates() error {
l.cfg.Peer.SendMessage(false, msg)
}
case <-l.Quit:
case <-l.cg.Done():
return ErrLinkShuttingDown
}
@ -1033,7 +1034,7 @@ func (l *channelLink) syncChanStates() error {
// we previously received are reinstated in memory, and forwarded to the switch
// if necessary. After a restart, this will also delete any previously
// completed packages.
func (l *channelLink) resolveFwdPkgs() error {
func (l *channelLink) resolveFwdPkgs(ctx context.Context) error {
fwdPkgs, err := l.channel.LoadFwdPkgs()
if err != nil {
return err
@ -1050,7 +1051,7 @@ func (l *channelLink) resolveFwdPkgs() error {
// If any of our reprocessing steps require an update to the commitment
// txn, we initiate a state transition to capture all relevant changes.
if l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote) > 0 {
return l.updateCommitTx()
return l.updateCommitTx(ctx)
}
return nil
@ -1111,7 +1112,7 @@ func (l *channelLink) resolveFwdPkg(fwdPkg *channeldb.FwdPkg) error {
//
// NOTE: This MUST be run as a goroutine.
func (l *channelLink) fwdPkgGarbager() {
defer l.Wg.Done()
defer l.cg.WgDone()
l.cfg.FwdPkgGCTicker.Resume()
defer l.cfg.FwdPkgGCTicker.Stop()
@ -1128,7 +1129,7 @@ func (l *channelLink) fwdPkgGarbager() {
err)
continue
}
case <-l.Quit:
case <-l.cg.Done():
return
}
}
@ -1248,10 +1249,12 @@ func (l *channelLink) handleChanSyncErr(err error) {
// and also the htlc trickle queue+timer for this active channels.
//
// NOTE: This MUST be run as a goroutine.
func (l *channelLink) htlcManager() {
//
//nolint:funlen
func (l *channelLink) htlcManager(ctx context.Context) {
defer func() {
l.cfg.BatchTicker.Stop()
l.Wg.Done()
l.cg.WgDone()
l.log.Infof("exited")
}()
@ -1271,7 +1274,7 @@ func (l *channelLink) htlcManager() {
// re-synchronize state with the remote peer. settledHtlcs is a map of
// HTLC's that we re-settled as part of the channel state sync.
if l.cfg.SyncStates {
err := l.syncChanStates()
err := l.syncChanStates(ctx)
if err != nil {
l.handleChanSyncErr(err)
return
@ -1322,7 +1325,7 @@ func (l *channelLink) htlcManager() {
// the channel is not pending, otherwise we should have no htlcs to
// reforward.
if l.ShortChanID() != hop.Source {
err := l.resolveFwdPkgs()
err := l.resolveFwdPkgs(ctx)
switch err {
// No error was encountered, success.
case nil:
@ -1345,7 +1348,7 @@ func (l *channelLink) htlcManager() {
// With our link's in-memory state fully reconstructed, spawn a
// goroutine to manage the reclamation of disk space occupied by
// completed forwarding packages.
l.Wg.Add(1)
l.cg.WgAdd(1)
go l.fwdPkgGarbager()
}
@ -1447,7 +1450,8 @@ func (l *channelLink) htlcManager() {
// If we do, then we'll send a new UpdateFee message to
// the remote party, to be locked in with a new update.
if err := l.updateChannelFee(newCommitFee); err != nil {
err = l.updateChannelFee(ctx, newCommitFee)
if err != nil {
l.log.Errorf("unable to update fee rate: %v",
err)
continue
@ -1475,7 +1479,7 @@ func (l *channelLink) htlcManager() {
// including all the currently pending entries. If the
// send was unsuccessful, then abandon the update,
// waiting for the revocation window to open up.
if !l.updateCommitTxOrFail() {
if !l.updateCommitTxOrFail(ctx) {
return
}
@ -1493,19 +1497,19 @@ func (l *channelLink) htlcManager() {
// that the link is an intermediate hop in a multi-hop HTLC
// circuit.
case pkt := <-l.downstream:
l.handleDownstreamPkt(pkt)
l.handleDownstreamPkt(ctx, pkt)
// A message from the connected peer was just received. This
// indicates that we have a new incoming HTLC, either directly
// for us, or part of a multi-hop HTLC circuit.
case msg := <-l.upstream:
l.handleUpstreamMsg(msg)
l.handleUpstreamMsg(ctx, msg)
// A htlc resolution is received. This means that we now have a
// resolution for a previously accepted htlc.
case hodlItem := <-l.hodlQueue.ChanOut():
htlcResolution := hodlItem.(invoices.HtlcResolution)
err := l.processHodlQueue(htlcResolution)
err := l.processHodlQueue(ctx, htlcResolution)
switch err {
// No error, success.
case nil:
@ -1543,7 +1547,7 @@ func (l *channelLink) htlcManager() {
}
}
case <-l.Quit:
case <-l.cg.Done():
return
}
}
@ -1552,7 +1556,7 @@ func (l *channelLink) htlcManager() {
// processHodlQueue processes a received htlc resolution and continues reading
// from the hodl queue until no more resolutions remain. When this function
// returns without an error, the commit tx should be updated.
func (l *channelLink) processHodlQueue(
func (l *channelLink) processHodlQueue(ctx context.Context,
firstResolution invoices.HtlcResolution) error {
// Try to read all waiting resolution messages, so that they can all be
@ -1584,7 +1588,7 @@ loop:
}
// Update the commitment tx.
if err := l.updateCommitTx(); err != nil {
if err := l.updateCommitTx(ctx); err != nil {
return err
}
@ -1671,7 +1675,9 @@ func (l *channelLink) randomFeeUpdateTimeout() time.Duration {
// handleDownstreamUpdateAdd processes an UpdateAddHTLC packet sent from the
// downstream HTLC Switch.
func (l *channelLink) handleDownstreamUpdateAdd(pkt *htlcPacket) error {
func (l *channelLink) handleDownstreamUpdateAdd(ctx context.Context,
pkt *htlcPacket) error {
htlc, ok := pkt.htlc.(*lnwire.UpdateAddHTLC)
if !ok {
return errors.New("not an UpdateAddHTLC packet")
@ -1775,7 +1781,7 @@ func (l *channelLink) handleDownstreamUpdateAdd(pkt *htlcPacket) error {
getEventType(pkt),
)
l.tryBatchUpdateCommitTx()
l.tryBatchUpdateCommitTx(ctx)
return nil
}
@ -1786,7 +1792,9 @@ func (l *channelLink) handleDownstreamUpdateAdd(pkt *htlcPacket) error {
// cleared HTLCs with the upstream peer.
//
// TODO(roasbeef): add sync ntfn to ensure switch always has consistent view?
func (l *channelLink) handleDownstreamPkt(pkt *htlcPacket) {
func (l *channelLink) handleDownstreamPkt(ctx context.Context,
pkt *htlcPacket) {
if pkt.htlc.MsgType().IsChannelUpdate() &&
!l.quiescer.CanSendUpdates() {
@ -1800,7 +1808,7 @@ func (l *channelLink) handleDownstreamPkt(pkt *htlcPacket) {
case *lnwire.UpdateAddHTLC:
// Handle add message. The returned error can be ignored,
// because it is also sent through the mailbox.
_ = l.handleDownstreamUpdateAdd(pkt)
_ = l.handleDownstreamUpdateAdd(ctx, pkt)
case *lnwire.UpdateFulfillHTLC:
// If hodl.SettleOutgoing mode is active, we exit early to
@ -1867,7 +1875,7 @@ func (l *channelLink) handleDownstreamPkt(pkt *htlcPacket) {
)
// Immediately update the commitment tx to minimize latency.
l.updateCommitTxOrFail()
l.updateCommitTxOrFail(ctx)
case *lnwire.UpdateFailHTLC:
// If hodl.FailOutgoing mode is active, we exit early to
@ -1957,19 +1965,19 @@ func (l *channelLink) handleDownstreamPkt(pkt *htlcPacket) {
}
// Immediately update the commitment tx to minimize latency.
l.updateCommitTxOrFail()
l.updateCommitTxOrFail(ctx)
}
}
// tryBatchUpdateCommitTx updates the commitment transaction if the batch is
// full.
func (l *channelLink) tryBatchUpdateCommitTx() {
func (l *channelLink) tryBatchUpdateCommitTx(ctx context.Context) {
pending := l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote)
if pending < uint64(l.cfg.BatchSize) {
return
}
l.updateCommitTxOrFail()
l.updateCommitTxOrFail(ctx)
}
// cleanupSpuriousResponse attempts to ack any AddRef or SettleFailRef
@ -2039,7 +2047,11 @@ func (l *channelLink) cleanupSpuriousResponse(pkt *htlcPacket) {
// handleUpstreamMsg processes wire messages related to commitment state
// updates from the upstream peer. The upstream peer is the peer whom we have a
// direct channel with, updating our respective commitment chains.
func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
//
//nolint:funlen
func (l *channelLink) handleUpstreamMsg(ctx context.Context,
msg lnwire.Message) {
// First check if the message is an update and we are capable of
// receiving updates right now.
if msg.MsgType().IsChannelUpdate() && !l.quiescer.CanRecvUpdates() {
@ -2418,7 +2430,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
}
select {
case <-l.Quit:
case <-l.cg.Done():
return
default:
}
@ -2430,7 +2442,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
// reply with a signature as both sides already have a
// commitment with the latest accepted.
if l.channel.OweCommitment() {
if !l.updateCommitTxOrFail() {
if !l.updateCommitTxOrFail(ctx) {
return
}
}
@ -2488,7 +2500,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
}
select {
case <-l.Quit:
case <-l.cg.Done():
return
default:
}
@ -2542,7 +2554,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
// but there are still remote updates that are not in the remote
// commit tx yet, send out an update.
if l.channel.OweCommitment() {
if !l.updateCommitTxOrFail() {
if !l.updateCommitTxOrFail(ctx) {
return
}
}
@ -2732,8 +2744,8 @@ func (l *channelLink) ackDownStreamPackets() error {
// updateCommitTxOrFail updates the commitment tx and if that fails, it fails
// the link.
func (l *channelLink) updateCommitTxOrFail() bool {
err := l.updateCommitTx()
func (l *channelLink) updateCommitTxOrFail(ctx context.Context) bool {
err := l.updateCommitTx(ctx)
switch err {
// No error encountered, success.
case nil:
@ -2759,7 +2771,7 @@ func (l *channelLink) updateCommitTxOrFail() bool {
// updateCommitTx signs, then sends an update to the remote peer adding a new
// commitment to their commitment chain which includes all the latest updates
// we've received+processed up to this point.
func (l *channelLink) updateCommitTx() error {
func (l *channelLink) updateCommitTx(ctx context.Context) error {
// Preemptively write all pending keystones to disk, just in case the
// HTLCs we have in memory are included in the subsequent attempt to
// sign a commitment state.
@ -2782,7 +2794,7 @@ func (l *channelLink) updateCommitTx() error {
return nil
}
ctx, done := l.WithCtxQuitNoTimeout()
ctx, done := l.cg.Create(ctx)
defer done()
newCommit, err := l.channel.SignNextCommitment(ctx)
@ -2822,7 +2834,7 @@ func (l *channelLink) updateCommitTx() error {
}
select {
case <-l.Quit:
case <-l.cg.Done():
return ErrLinkShuttingDown
default:
}
@ -3529,7 +3541,7 @@ func (l *channelLink) handleSwitchPacket(pkt *htlcPacket) error {
// NOTE: Part of the ChannelLink interface.
func (l *channelLink) HandleChannelUpdate(message lnwire.Message) {
select {
case <-l.Quit:
case <-l.cg.Done():
// Return early if the link is already in the process of
// quitting. It doesn't make sense to hand the message to the
// mailbox here.
@ -3545,7 +3557,9 @@ func (l *channelLink) HandleChannelUpdate(message lnwire.Message) {
// updateChannelFee updates the commitment fee-per-kw on this channel by
// committing to an update_fee message.
func (l *channelLink) updateChannelFee(feePerKw chainfee.SatPerKWeight) error {
func (l *channelLink) updateChannelFee(ctx context.Context,
feePerKw chainfee.SatPerKWeight) error {
l.log.Infof("updating commit fee to %v", feePerKw)
// We skip sending the UpdateFee message if the channel is not
@ -3583,7 +3597,8 @@ func (l *channelLink) updateChannelFee(feePerKw chainfee.SatPerKWeight) error {
if err := l.cfg.Peer.SendMessage(false, msg); err != nil {
return err
}
return l.updateCommitTx()
return l.updateCommitTx(ctx)
}
// processRemoteSettleFails accepts a batch of settle/fail payment descriptors
@ -4290,7 +4305,7 @@ func (l *channelLink) forwardBatch(replay bool, packets ...*htlcPacket) {
filteredPkts = append(filteredPkts, pkt)
}
err := l.cfg.ForwardPackets(l.Quit, replay, filteredPkts...)
err := l.cfg.ForwardPackets(l.cg.Done(), replay, filteredPkts...)
if err != nil {
log.Errorf("Unhandled error while reforwarding htlc "+
"settle/fail over htlcswitch: %v", err)

View File

@ -2257,7 +2257,7 @@ func newSingleLinkTestHarness(t *testing.T, chanAmt,
for {
select {
case <-notifyUpdateChan:
case <-chanLink.Quit:
case <-chanLink.cg.Done():
close(doneChan)
return
}
@ -2326,7 +2326,7 @@ func handleStateUpdate(link *channelLink,
}
link.HandleChannelUpdate(remoteRev)
ctx, done := link.WithCtxQuitNoTimeout()
ctx, done := link.cg.Create(context.Background())
defer done()
remoteSigs, err := remoteChannel.SignNextCommitment(ctx)
@ -2372,7 +2372,7 @@ func updateState(batchTick chan time.Time, link *channelLink,
// Trigger update by ticking the batchTicker.
select {
case batchTick <- time.Now():
case <-link.Quit:
case <-link.cg.Done():
return fmt.Errorf("link shutting down")
}
return handleStateUpdate(link, remoteChannel)
@ -2380,7 +2380,7 @@ func updateState(batchTick chan time.Time, link *channelLink,
// The remote is triggering the state update, emulate this by
// signing and sending CommitSig to the link.
ctx, done := link.WithCtxQuitNoTimeout()
ctx, done := link.cg.Create(context.Background())
defer done()
remoteSigs, err := remoteChannel.SignNextCommitment(ctx)
@ -4946,7 +4946,7 @@ func (h *persistentLinkHarness) restartLink(
for {
select {
case <-notifyUpdateChan:
case <-chanLink.Quit:
case <-chanLink.cg.Done():
close(doneChan)
return
}
@ -5932,7 +5932,9 @@ func TestChannelLinkFail(t *testing.T) {
// Sign a commitment that will include
// signature for the HTLC just sent.
quitCtx, done := c.WithCtxQuitNoTimeout()
quitCtx, done := c.cg.Create(
context.Background(),
)
defer done()
sigs, err := remoteChannel.SignNextCommitment(
@ -5979,7 +5981,9 @@ func TestChannelLinkFail(t *testing.T) {
// Sign a commitment that will include
// signature for the HTLC just sent.
quitCtx, done := c.WithCtxQuitNoTimeout()
quitCtx, done := c.cg.Create(
context.Background(),
)
defer done()
sigs, err := remoteChannel.SignNextCommitment(

View File

@ -1190,7 +1190,7 @@ func (h *hopNetwork) createChannelLink(server, peer *mockServer,
for {
select {
case <-notifyUpdateChan:
case <-chanLink.Quit:
case <-chanLink.cg.Done():
close(doneChan)
return
}

View File

@ -145,7 +145,7 @@ type StateMachine[Event any, Env Environment] struct {
// query the internal state machine state.
stateQuery chan stateQuery[Event, Env]
wg *fn.GoroutineManager
gm fn.GoroutineManager
quit chan struct{}
startOnce sync.Once
@ -206,7 +206,7 @@ func NewStateMachine[Event any, Env Environment](
),
events: make(chan Event, 1),
stateQuery: make(chan stateQuery[Event, Env]),
wg: fn.NewGoroutineManager(),
gm: *fn.NewGoroutineManager(),
newStateEvents: fn.NewEventDistributor[State[Event, Env]](),
quit: make(chan struct{}),
}
@ -216,7 +216,7 @@ func NewStateMachine[Event any, Env Environment](
// the state machine to completion.
func (s *StateMachine[Event, Env]) Start(ctx context.Context) {
s.startOnce.Do(func() {
_ = s.wg.Go(ctx, func(ctx context.Context) {
_ = s.gm.Go(ctx, func(ctx context.Context) {
s.driveMachine(ctx)
})
})
@ -227,7 +227,7 @@ func (s *StateMachine[Event, Env]) Start(ctx context.Context) {
func (s *StateMachine[Event, Env]) Stop() {
s.stopOnce.Do(func() {
close(s.quit)
s.wg.Stop()
s.gm.Stop()
})
}
@ -335,8 +335,6 @@ func (s *StateMachine[Event, Env]) RemoveStateSub(sub StateSubscriber[
// executeDaemonEvent executes a daemon event, which is a special type of event
// that can be emitted as part of the state transition function of the state
// machine. An error is returned if the type of event is unknown.
//
//nolint:funlen
func (s *StateMachine[Event, Env]) executeDaemonEvent(ctx context.Context,
event DaemonEvent) error {
@ -360,7 +358,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(ctx context.Context,
// If a post-send event was specified, then we'll funnel
// that back into the main state machine now as well.
return fn.MapOptionZ(daemonEvent.PostSendEvent, func(event Event) error { //nolint:ll
launched := s.wg.Go(
launched := s.gm.Go(
ctx, func(ctx context.Context) {
s.log.DebugS(ctx, "Sending post-send event",
"event", lnutils.SpewLogClosure(event))
@ -386,7 +384,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(ctx context.Context,
// Otherwise, this has a SendWhen predicate, so we'll need
// launch a goroutine to poll the SendWhen, then send only once
// the predicate is true.
launched := s.wg.Go(ctx, func(ctx context.Context) {
launched := s.gm.Go(ctx, func(ctx context.Context) {
predicateTicker := time.NewTicker(
s.cfg.CustomPollInterval.UnwrapOr(pollInterval),
)
@ -456,7 +454,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(ctx context.Context,
return fmt.Errorf("unable to register spend: %w", err)
}
launched := s.wg.Go(ctx, func(ctx context.Context) {
launched := s.gm.Go(ctx, func(ctx context.Context) {
for {
select {
case spend, ok := <-spendEvent.Spend:
@ -502,7 +500,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(ctx context.Context,
return fmt.Errorf("unable to register conf: %w", err)
}
launched := s.wg.Go(ctx, func(ctx context.Context) {
launched := s.gm.Go(ctx, func(ctx context.Context) {
for {
select {
case <-confEvent.Confirmed:
@ -674,7 +672,7 @@ func (s *StateMachine[Event, Env]) driveMachine(ctx context.Context) {
return
}
case <-s.wg.Done():
case <-s.gm.Done():
return
}
}

View File

@ -1,6 +1,7 @@
package protofsm
import (
"context"
"encoding/hex"
"fmt"
"sync/atomic"
@ -14,7 +15,6 @@ import (
"github.com/lightningnetwork/lnd/lnwire"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"golang.org/x/net/context"
)
type dummyEvents interface {