contractcourt: remove the immediate param used in Resolve

This `immediate` flag was added as a hack so during a restart, the
pending resolvers would offer the inputs to the sweeper and ask it to
sweep them immediately. This is no longer need due to `blockbeat`, as
now during restart, a block is always sent to all subsystems via the
flow `ChainArb` -> `ChannelArb` -> resolvers -> sweeper. Thus, when
there are pending inputs offered, they will be processed by the sweeper
immediately.
This commit is contained in:
yyforyongyu 2024-06-05 00:56:39 +08:00
parent 71295534bb
commit e2e59bd90c
No known key found for this signature in database
GPG Key ID: 9BCD95C4FF296868
14 changed files with 36 additions and 56 deletions

View File

@ -84,7 +84,7 @@ func (c *anchorResolver) ResolverKey() []byte {
} }
// Resolve offers the anchor output to the sweeper and waits for it to be swept. // Resolve offers the anchor output to the sweeper and waits for it to be swept.
func (c *anchorResolver) Resolve(_ bool) (ContractResolver, error) { func (c *anchorResolver) Resolve() (ContractResolver, error) {
// Attempt to update the sweep parameters to the post-confirmation // Attempt to update the sweep parameters to the post-confirmation
// situation. We don't want to force sweep anymore, because the anchor // situation. We don't want to force sweep anymore, because the anchor
// lost its special purpose to get the commitment confirmed. It is just // lost its special purpose to get the commitment confirmed. It is just

View File

@ -47,7 +47,7 @@ func (b *breachResolver) ResolverKey() []byte {
// been broadcast. // been broadcast.
// //
// TODO(yy): let sweeper handle the breach inputs. // TODO(yy): let sweeper handle the breach inputs.
func (b *breachResolver) Resolve(_ bool) (ContractResolver, error) { func (b *breachResolver) Resolve() (ContractResolver, error) {
if !b.subscribed { if !b.subscribed {
complete, err := b.SubscribeBreachComplete( complete, err := b.SubscribeBreachComplete(
&b.ChanPoint, b.replyChan, &b.ChanPoint, b.replyChan,

View File

@ -816,7 +816,7 @@ func (c *ChannelArbitrator) relaunchResolvers(commitSet *CommitSet,
// TODO(roasbeef): this isn't re-launched? // TODO(roasbeef): this isn't re-launched?
} }
c.launchResolvers(unresolvedContracts, true) c.launchResolvers(unresolvedContracts)
return nil return nil
} }
@ -1355,7 +1355,7 @@ func (c *ChannelArbitrator) stateStep(
// Finally, we'll launch all the required contract resolvers. // Finally, we'll launch all the required contract resolvers.
// Once they're all resolved, we're no longer needed. // Once they're all resolved, we're no longer needed.
c.launchResolvers(resolvers, false) c.launchResolvers(resolvers)
nextState = StateWaitingFullResolution nextState = StateWaitingFullResolution
@ -1579,16 +1579,14 @@ func (c *ChannelArbitrator) findCommitmentDeadlineAndValue(heightHint uint32,
} }
// launchResolvers updates the activeResolvers list and starts the resolvers. // launchResolvers updates the activeResolvers list and starts the resolvers.
func (c *ChannelArbitrator) launchResolvers(resolvers []ContractResolver, func (c *ChannelArbitrator) launchResolvers(resolvers []ContractResolver) {
immediate bool) {
c.activeResolversLock.Lock() c.activeResolversLock.Lock()
defer c.activeResolversLock.Unlock()
c.activeResolvers = resolvers c.activeResolvers = resolvers
c.activeResolversLock.Unlock()
for _, contract := range resolvers { for _, contract := range resolvers {
c.wg.Add(1) c.wg.Add(1)
go c.resolveContract(contract, immediate) go c.resolveContract(contract)
} }
} }
@ -2560,9 +2558,7 @@ func (c *ChannelArbitrator) replaceResolver(oldResolver,
// contracts. // contracts.
// //
// NOTE: This MUST be run as a goroutine. // NOTE: This MUST be run as a goroutine.
func (c *ChannelArbitrator) resolveContract(currentContract ContractResolver, func (c *ChannelArbitrator) resolveContract(currentContract ContractResolver) {
immediate bool) {
defer c.wg.Done() defer c.wg.Done()
log.Debugf("ChannelArbitrator(%v): attempting to resolve %T", log.Debugf("ChannelArbitrator(%v): attempting to resolve %T",
@ -2583,7 +2579,7 @@ func (c *ChannelArbitrator) resolveContract(currentContract ContractResolver,
default: default:
// Otherwise, we'll attempt to resolve the current // Otherwise, we'll attempt to resolve the current
// contract. // contract.
nextContract, err := currentContract.Resolve(immediate) nextContract, err := currentContract.Resolve()
if err != nil { if err != nil {
if err == errResolverShuttingDown { if err == errResolverShuttingDown {
return return

View File

@ -165,9 +165,7 @@ func (c *commitSweepResolver) getCommitTxConfHeight() (uint32, error) {
// returned. // returned.
// //
// NOTE: This function MUST be run as a goroutine. // NOTE: This function MUST be run as a goroutine.
// func (c *commitSweepResolver) Resolve() (ContractResolver, error) {
//nolint:funlen
func (c *commitSweepResolver) Resolve(_ bool) (ContractResolver, error) {
// If we're already resolved, then we can exit early. // If we're already resolved, then we can exit early.
if c.resolved { if c.resolved {
return nil, nil return nil, nil

View File

@ -82,7 +82,7 @@ func (i *commitSweepResolverTestContext) resolve() {
// Start resolver. // Start resolver.
i.resolverResultChan = make(chan resolveResult, 1) i.resolverResultChan = make(chan resolveResult, 1)
go func() { go func() {
nextResolver, err := i.resolver.Resolve(false) nextResolver, err := i.resolver.Resolve()
i.resolverResultChan <- resolveResult{ i.resolverResultChan <- resolveResult{
nextResolver: nextResolver, nextResolver: nextResolver,
err: err, err: err,

View File

@ -42,7 +42,7 @@ type ContractResolver interface {
// resolution, then another resolve is returned. // resolution, then another resolve is returned.
// //
// NOTE: This function MUST be run as a goroutine. // NOTE: This function MUST be run as a goroutine.
Resolve(immediate bool) (ContractResolver, error) Resolve() (ContractResolver, error)
// SupplementState allows the user of a ContractResolver to supplement // SupplementState allows the user of a ContractResolver to supplement
// it with state required for the proper resolution of a contract. // it with state required for the proper resolution of a contract.

View File

@ -90,9 +90,7 @@ func (h *htlcIncomingContestResolver) processFinalHtlcFail() error {
// as we have no remaining actions left at our disposal. // as we have no remaining actions left at our disposal.
// //
// NOTE: Part of the ContractResolver interface. // NOTE: Part of the ContractResolver interface.
func (h *htlcIncomingContestResolver) Resolve( func (h *htlcIncomingContestResolver) Resolve() (ContractResolver, error) {
_ bool) (ContractResolver, error) {
// If we're already full resolved, then we don't have anything further // If we're already full resolved, then we don't have anything further
// to do. // to do.
if h.resolved { if h.resolved {

View File

@ -395,7 +395,7 @@ func (i *incomingResolverTestContext) resolve() {
i.resolveErr = make(chan error, 1) i.resolveErr = make(chan error, 1)
go func() { go func() {
var err error var err error
i.nextResolver, err = i.resolver.Resolve(false) i.nextResolver, err = i.resolver.Resolve()
i.resolveErr <- err i.resolveErr <- err
}() }()

View File

@ -49,9 +49,7 @@ func newOutgoingContestResolver(res lnwallet.OutgoingHtlcResolution,
// When either of these two things happens, we'll create a new resolver which // When either of these two things happens, we'll create a new resolver which
// is able to handle the final resolution of the contract. We're only the pivot // is able to handle the final resolution of the contract. We're only the pivot
// point. // point.
func (h *htlcOutgoingContestResolver) Resolve( func (h *htlcOutgoingContestResolver) Resolve() (ContractResolver, error) {
_ bool) (ContractResolver, error) {
// If we're already full resolved, then we don't have anything further // If we're already full resolved, then we don't have anything further
// to do. // to do.
if h.resolved { if h.resolved {

View File

@ -209,7 +209,7 @@ func (i *outgoingResolverTestContext) resolve() {
// Start resolver. // Start resolver.
i.resolverResultChan = make(chan resolveResult, 1) i.resolverResultChan = make(chan resolveResult, 1)
go func() { go func() {
nextResolver, err := i.resolver.Resolve(false) nextResolver, err := i.resolver.Resolve()
i.resolverResultChan <- resolveResult{ i.resolverResultChan <- resolveResult{
nextResolver: nextResolver, nextResolver: nextResolver,
err: err, err: err,

View File

@ -115,9 +115,7 @@ func (h *htlcSuccessResolver) ResolverKey() []byte {
// TODO(roasbeef): create multi to batch // TODO(roasbeef): create multi to batch
// //
// NOTE: Part of the ContractResolver interface. // NOTE: Part of the ContractResolver interface.
func (h *htlcSuccessResolver) Resolve( func (h *htlcSuccessResolver) Resolve() (ContractResolver, error) {
immediate bool) (ContractResolver, error) {
// If we're already resolved, then we can exit early. // If we're already resolved, then we can exit early.
if h.resolved { if h.resolved {
return nil, nil return nil, nil
@ -126,12 +124,12 @@ func (h *htlcSuccessResolver) Resolve(
// If we don't have a success transaction, then this means that this is // If we don't have a success transaction, then this means that this is
// an output on the remote party's commitment transaction. // an output on the remote party's commitment transaction.
if h.htlcResolution.SignedSuccessTx == nil { if h.htlcResolution.SignedSuccessTx == nil {
return h.resolveRemoteCommitOutput(immediate) return h.resolveRemoteCommitOutput()
} }
// Otherwise this an output on our own commitment, and we must start by // Otherwise this an output on our own commitment, and we must start by
// broadcasting the second-level success transaction. // broadcasting the second-level success transaction.
secondLevelOutpoint, err := h.broadcastSuccessTx(immediate) secondLevelOutpoint, err := h.broadcastSuccessTx()
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -165,8 +163,8 @@ func (h *htlcSuccessResolver) Resolve(
// broadcasting the second-level success transaction. It returns the ultimate // broadcasting the second-level success transaction. It returns the ultimate
// outpoint of the second-level tx, that we must wait to be spent for the // outpoint of the second-level tx, that we must wait to be spent for the
// resolver to be fully resolved. // resolver to be fully resolved.
func (h *htlcSuccessResolver) broadcastSuccessTx( func (h *htlcSuccessResolver) broadcastSuccessTx() (
immediate bool) (*wire.OutPoint, error) { *wire.OutPoint, error) {
// If we have non-nil SignDetails, this means that have a 2nd level // If we have non-nil SignDetails, this means that have a 2nd level
// HTLC transaction that is signed using sighash SINGLE|ANYONECANPAY // HTLC transaction that is signed using sighash SINGLE|ANYONECANPAY
@ -175,7 +173,7 @@ func (h *htlcSuccessResolver) broadcastSuccessTx(
// the checkpointed outputIncubating field to determine if we already // the checkpointed outputIncubating field to determine if we already
// swept the HTLC output into the second level transaction. // swept the HTLC output into the second level transaction.
if h.htlcResolution.SignDetails != nil { if h.htlcResolution.SignDetails != nil {
return h.broadcastReSignedSuccessTx(immediate) return h.broadcastReSignedSuccessTx()
} }
// Otherwise we'll publish the second-level transaction directly and // Otherwise we'll publish the second-level transaction directly and
@ -225,10 +223,8 @@ func (h *htlcSuccessResolver) broadcastSuccessTx(
// broadcastReSignedSuccessTx handles the case where we have non-nil // broadcastReSignedSuccessTx handles the case where we have non-nil
// SignDetails, and offers the second level transaction to the Sweeper, that // SignDetails, and offers the second level transaction to the Sweeper, that
// will re-sign it and attach fees at will. // will re-sign it and attach fees at will.
// func (h *htlcSuccessResolver) broadcastReSignedSuccessTx() (*wire.OutPoint,
//nolint:funlen error) {
func (h *htlcSuccessResolver) broadcastReSignedSuccessTx(immediate bool) (
*wire.OutPoint, error) {
// Keep track of the tx spending the HTLC output on the commitment, as // Keep track of the tx spending the HTLC output on the commitment, as
// this will be the confirmed second-level tx we'll ultimately sweep. // this will be the confirmed second-level tx we'll ultimately sweep.
@ -287,7 +283,6 @@ func (h *htlcSuccessResolver) broadcastReSignedSuccessTx(immediate bool) (
sweep.Params{ sweep.Params{
Budget: budget, Budget: budget,
DeadlineHeight: deadline, DeadlineHeight: deadline,
Immediate: immediate,
}, },
) )
if err != nil { if err != nil {
@ -419,7 +414,7 @@ func (h *htlcSuccessResolver) broadcastReSignedSuccessTx(immediate bool) (
// resolveRemoteCommitOutput handles sweeping an HTLC output on the remote // resolveRemoteCommitOutput handles sweeping an HTLC output on the remote
// commitment with the preimage. In this case we can sweep the output directly, // commitment with the preimage. In this case we can sweep the output directly,
// and don't have to broadcast a second-level transaction. // and don't have to broadcast a second-level transaction.
func (h *htlcSuccessResolver) resolveRemoteCommitOutput(immediate bool) ( func (h *htlcSuccessResolver) resolveRemoteCommitOutput() (
ContractResolver, error) { ContractResolver, error) {
isTaproot := txscript.IsPayToTaproot( isTaproot := txscript.IsPayToTaproot(
@ -471,7 +466,6 @@ func (h *htlcSuccessResolver) resolveRemoteCommitOutput(immediate bool) (
sweep.Params{ sweep.Params{
Budget: budget, Budget: budget,
DeadlineHeight: deadline, DeadlineHeight: deadline,
Immediate: immediate,
}, },
) )
if err != nil { if err != nil {

View File

@ -134,7 +134,7 @@ func (i *htlcResolverTestContext) resolve() {
// Start resolver. // Start resolver.
i.resolverResultChan = make(chan resolveResult, 1) i.resolverResultChan = make(chan resolveResult, 1)
go func() { go func() {
nextResolver, err := i.resolver.Resolve(false) nextResolver, err := i.resolver.Resolve()
i.resolverResultChan <- resolveResult{ i.resolverResultChan <- resolveResult{
nextResolver: nextResolver, nextResolver: nextResolver,
err: err, err: err,

View File

@ -418,9 +418,7 @@ func checkSizeAndIndex(witness wire.TxWitness, size, index int) bool {
// see a direct sweep via the timeout clause. // see a direct sweep via the timeout clause.
// //
// NOTE: Part of the ContractResolver interface. // NOTE: Part of the ContractResolver interface.
func (h *htlcTimeoutResolver) Resolve( func (h *htlcTimeoutResolver) Resolve() (ContractResolver, error) {
immediate bool) (ContractResolver, error) {
// If we're already resolved, then we can exit early. // If we're already resolved, then we can exit early.
if h.resolved { if h.resolved {
return nil, nil return nil, nil
@ -429,7 +427,7 @@ func (h *htlcTimeoutResolver) Resolve(
// Start by spending the HTLC output, either by broadcasting the // Start by spending the HTLC output, either by broadcasting the
// second-level timeout transaction, or directly if this is the remote // second-level timeout transaction, or directly if this is the remote
// commitment. // commitment.
commitSpend, err := h.spendHtlcOutput(immediate) commitSpend, err := h.spendHtlcOutput()
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -477,7 +475,7 @@ func (h *htlcTimeoutResolver) Resolve(
// sweepSecondLevelTx sends a second level timeout transaction to the sweeper. // sweepSecondLevelTx sends a second level timeout transaction to the sweeper.
// This transaction uses the SINLGE|ANYONECANPAY flag. // This transaction uses the SINLGE|ANYONECANPAY flag.
func (h *htlcTimeoutResolver) sweepSecondLevelTx(immediate bool) error { func (h *htlcTimeoutResolver) sweepSecondLevelTx() error {
log.Infof("%T(%x): offering second-layer timeout tx to sweeper: %v", log.Infof("%T(%x): offering second-layer timeout tx to sweeper: %v",
h, h.htlc.RHash[:], h, h.htlc.RHash[:],
spew.Sdump(h.htlcResolution.SignedTimeoutTx)) spew.Sdump(h.htlcResolution.SignedTimeoutTx))
@ -538,7 +536,6 @@ func (h *htlcTimeoutResolver) sweepSecondLevelTx(immediate bool) error {
sweep.Params{ sweep.Params{
Budget: budget, Budget: budget,
DeadlineHeight: h.incomingHTLCExpiryHeight, DeadlineHeight: h.incomingHTLCExpiryHeight,
Immediate: immediate,
}, },
) )
if err != nil { if err != nil {
@ -572,7 +569,7 @@ func (h *htlcTimeoutResolver) sendSecondLevelTxLegacy() error {
// sweeper. This is used when the remote party goes on chain, and we're able to // sweeper. This is used when the remote party goes on chain, and we're able to
// sweep an HTLC we offered after a timeout. Only the CLTV encumbered outputs // sweep an HTLC we offered after a timeout. Only the CLTV encumbered outputs
// are resolved via this path. // are resolved via this path.
func (h *htlcTimeoutResolver) sweepDirectHtlcOutput(immediate bool) error { func (h *htlcTimeoutResolver) sweepDirectHtlcOutput() error {
var htlcWitnessType input.StandardWitnessType var htlcWitnessType input.StandardWitnessType
if h.isTaproot() { if h.isTaproot() {
htlcWitnessType = input.TaprootHtlcOfferedRemoteTimeout htlcWitnessType = input.TaprootHtlcOfferedRemoteTimeout
@ -612,7 +609,6 @@ func (h *htlcTimeoutResolver) sweepDirectHtlcOutput(immediate bool) error {
// This is an outgoing HTLC, so we want to make sure // This is an outgoing HTLC, so we want to make sure
// that we sweep it before the incoming HTLC expires. // that we sweep it before the incoming HTLC expires.
DeadlineHeight: h.incomingHTLCExpiryHeight, DeadlineHeight: h.incomingHTLCExpiryHeight,
Immediate: immediate,
}, },
) )
if err != nil { if err != nil {
@ -627,8 +623,8 @@ func (h *htlcTimeoutResolver) sweepDirectHtlcOutput(immediate bool) error {
// used to spend the output into the next stage. If this is the remote // used to spend the output into the next stage. If this is the remote
// commitment, the output will be swept directly without the timeout // commitment, the output will be swept directly without the timeout
// transaction. // transaction.
func (h *htlcTimeoutResolver) spendHtlcOutput( func (h *htlcTimeoutResolver) spendHtlcOutput() (
immediate bool) (*chainntnfs.SpendDetail, error) { *chainntnfs.SpendDetail, error) {
switch { switch {
// If we have non-nil SignDetails, this means that have a 2nd level // If we have non-nil SignDetails, this means that have a 2nd level
@ -636,7 +632,7 @@ func (h *htlcTimeoutResolver) spendHtlcOutput(
// (the case for anchor type channels). In this case we can re-sign it // (the case for anchor type channels). In this case we can re-sign it
// and attach fees at will. We let the sweeper handle this job. // and attach fees at will. We let the sweeper handle this job.
case h.htlcResolution.SignDetails != nil && !h.outputIncubating: case h.htlcResolution.SignDetails != nil && !h.outputIncubating:
if err := h.sweepSecondLevelTx(immediate); err != nil { if err := h.sweepSecondLevelTx(); err != nil {
log.Errorf("Sending timeout tx to sweeper: %v", err) log.Errorf("Sending timeout tx to sweeper: %v", err)
return nil, err return nil, err
@ -645,7 +641,7 @@ func (h *htlcTimeoutResolver) spendHtlcOutput(
// If this is a remote commitment there's no second level timeout txn, // If this is a remote commitment there's no second level timeout txn,
// and we can just send this directly to the sweeper. // and we can just send this directly to the sweeper.
case h.htlcResolution.SignedTimeoutTx == nil && !h.outputIncubating: case h.htlcResolution.SignedTimeoutTx == nil && !h.outputIncubating:
if err := h.sweepDirectHtlcOutput(immediate); err != nil { if err := h.sweepDirectHtlcOutput(); err != nil {
log.Errorf("Sending direct spend to sweeper: %v", err) log.Errorf("Sending direct spend to sweeper: %v", err)
return nil, err return nil, err

View File

@ -390,7 +390,7 @@ func testHtlcTimeoutResolver(t *testing.T, testCase htlcTimeoutTestCase) {
go func() { go func() {
defer wg.Done() defer wg.Done()
_, err := resolver.Resolve(false) _, err := resolver.Resolve()
if err != nil { if err != nil {
resolveErr <- err resolveErr <- err
} }