diff --git a/contractcourt/breach_resolver.go b/contractcourt/breach_resolver.go new file mode 100644 index 000000000..922ec609a --- /dev/null +++ b/contractcourt/breach_resolver.go @@ -0,0 +1,121 @@ +package contractcourt + +import ( + "encoding/binary" + "io" + + "github.com/lightningnetwork/lnd/channeldb" +) + +// breachResolver is a resolver that will handle breached closes. In the +// future, this will likely take over the duties the current breacharbiter has. +type breachResolver struct { + // resolved reflects if the contract has been fully resolved or not. + resolved bool + + // subscribed denotes whether or not the breach resolver has subscribed + // to the breacharbiter for breach resolution. + subscribed bool + + // replyChan is closed when the breach arbiter has completed serving + // justice. + replyChan chan struct{} + + contractResolverKit +} + +// newBreachResolver instantiates a new breach resolver. +func newBreachResolver(resCfg ResolverConfig) *breachResolver { + r := &breachResolver{ + contractResolverKit: *newContractResolverKit(resCfg), + replyChan: make(chan struct{}), + } + + r.initLogger(r) + + return r +} + +// ResolverKey returns the unique identifier for this resolver. +func (b *breachResolver) ResolverKey() []byte { + key := newResolverID(b.ChanPoint) + return key[:] +} + +// Resolve queries the breacharbiter to see if the justice transaction has been +// broadcast. +func (b *breachResolver) Resolve() (ContractResolver, error) { + if !b.subscribed { + complete, err := b.SubscribeBreachComplete( + &b.ChanPoint, b.replyChan, + ) + if err != nil { + return nil, err + } + + // If the breach resolution process is already complete, then + // we can cleanup and checkpoint the resolved state. + if complete { + b.resolved = true + return nil, b.Checkpoint(b) + } + + // Prevent duplicate subscriptions. + b.subscribed = true + } + + select { + case <-b.replyChan: + // The replyChan has been closed, signalling that the breach + // has been fully resolved. Checkpoint the resolved state and + // exit. + b.resolved = true + return nil, b.Checkpoint(b) + case <-b.quit: + } + + return nil, errResolverShuttingDown +} + +// Stop signals the breachResolver to stop. +func (b *breachResolver) Stop() { + close(b.quit) +} + +// IsResolved returns true if the breachResolver is fully resolved and cleanup +// can occur. +func (b *breachResolver) IsResolved() bool { + return b.resolved +} + +// SupplementState adds additional state to the breachResolver. +func (b *breachResolver) SupplementState(_ *channeldb.OpenChannel) { +} + +// Encode encodes the breachResolver to the passed writer. +func (b *breachResolver) Encode(w io.Writer) error { + return binary.Write(w, endian, b.resolved) +} + +// newBreachResolverFromReader attempts to decode an encoded breachResolver +// from the passed Reader instance, returning an active breachResolver. +func newBreachResolverFromReader(r io.Reader, resCfg ResolverConfig) ( + *breachResolver, error) { + + b := &breachResolver{ + contractResolverKit: *newContractResolverKit(resCfg), + replyChan: make(chan struct{}), + } + + if err := binary.Read(r, endian, &b.resolved); err != nil { + return nil, err + } + + b.initLogger(b) + + return b, nil +} + +// A compile time assertion to ensure breachResolver meets the ContractResolver +// interface. +var _ ContractResolver = (*breachResolver)(nil) diff --git a/contractcourt/breacharbiter.go b/contractcourt/breacharbiter.go index 6d20580a7..b1daa857d 100644 --- a/contractcourt/breacharbiter.go +++ b/contractcourt/breacharbiter.go @@ -185,6 +185,8 @@ type BreachArbiter struct { cfg *BreachConfig + subscriptions map[wire.OutPoint]chan struct{} + quit chan struct{} wg sync.WaitGroup sync.Mutex @@ -194,8 +196,9 @@ type BreachArbiter struct { // its dependent objects. func NewBreachArbiter(cfg *BreachConfig) *BreachArbiter { return &BreachArbiter{ - cfg: cfg, - quit: make(chan struct{}), + cfg: cfg, + subscriptions: make(map[wire.OutPoint]chan struct{}), + quit: make(chan struct{}), } } @@ -322,6 +325,47 @@ func (b *BreachArbiter) IsBreached(chanPoint *wire.OutPoint) (bool, error) { return b.cfg.Store.IsBreached(chanPoint) } +// SubscribeBreachComplete is used by outside subsystems to be notified of a +// successful breach resolution. +func (b *BreachArbiter) SubscribeBreachComplete(chanPoint *wire.OutPoint, + c chan struct{}) (bool, error) { + + breached, err := b.cfg.Store.IsBreached(chanPoint) + if err != nil { + // If an error occurs, no subscription will be registered. + return false, err + } + + if !breached { + // If chanPoint no longer exists in the Store, then the breach + // was cleaned up successfully. Any subscription that occurs + // happens after the breach information was persisted to the + // underlying store. + return true, nil + } + + // Otherwise since the channel point is not resolved, add a + // subscription. There can only be one subscription per channel point. + b.Lock() + defer b.Unlock() + b.subscriptions[*chanPoint] = c + + return false, nil +} + +// notifyBreachComplete is used by the BreachArbiter to notify outside +// subsystems that the breach resolution process is complete. +func (b *BreachArbiter) notifyBreachComplete(chanPoint *wire.OutPoint) { + b.Lock() + defer b.Unlock() + if c, ok := b.subscriptions[*chanPoint]; ok { + close(c) + } + + // Remove the subscription. + delete(b.subscriptions, *chanPoint) +} + // contractObserver is the primary goroutine for the BreachArbiter. This // goroutine is responsible for handling breach events coming from the // contractcourt on the ContractBreaches channel. If a channel breach is @@ -857,6 +901,14 @@ func (b *BreachArbiter) cleanupBreach(chanPoint *wire.OutPoint) error { err) } + // This is after the Remove call so that the chan passed in via + // SubscribeBreachComplete is always notified, no matter when it is + // called. Otherwise, if notifyBreachComplete was before Remove, a + // very rare edge case could occur in which SubscribeBreachComplete + // is called after notifyBreachComplete and before Remove, meaning the + // caller would never be notified. + b.notifyBreachComplete(chanPoint) + return nil } diff --git a/contractcourt/briefcase.go b/contractcourt/briefcase.go index 7c80b922a..e6d5e4cf0 100644 --- a/contractcourt/briefcase.go +++ b/contractcourt/briefcase.go @@ -36,16 +36,21 @@ type ContractResolutions struct { // output. If the channel type doesn't include anchors, the value of // this field will be nil. AnchorResolution *lnwallet.AnchorResolution + + // BreachResolution contains the data required to manage the lifecycle + // of a breach in the ChannelArbitrator. + BreachResolution *BreachResolution } // IsEmpty returns true if the set of resolutions is "empty". A resolution is -// empty if: our commitment output has been trimmed, and we don't have any -// incoming or outgoing HTLC's active. +// empty if: our commitment output has been trimmed, we don't have any +// incoming or outgoing HTLC's active, there is no anchor output to sweep, or +// there are no breached outputs to resolve. func (c *ContractResolutions) IsEmpty() bool { return c.CommitResolution == nil && len(c.HtlcResolutions.IncomingHTLCs) == 0 && len(c.HtlcResolutions.OutgoingHTLCs) == 0 && - c.AnchorResolution == nil + c.AnchorResolution == nil && c.BreachResolution == nil } // ArbitratorLog is the primary source of persistent storage for the @@ -142,7 +147,7 @@ const ( // | | | // | | |-> StateCommitmentBroadcasted: chain/user trigger // | | | - // | | |-> StateContractClosed: local/remote close trigger + // | | |-> StateContractClosed: local/remote/breach close trigger // | | | | // | | | |-> StateWaitingFullResolution: contract resolutions not empty // | | | | | @@ -152,9 +157,9 @@ const ( // | | | | // | | | |-> StateFullyResolved: contract resolutions empty // | | | - // | | |-> StateFullyResolved: coop/breach close trigger + // | | |-> StateFullyResolved: coop/breach(legacy) close trigger // | | - // | |-> StateContractClosed: local/remote close trigger + // | |-> StateContractClosed: local/remote/breach close trigger // | | | // | | |-> StateWaitingFullResolution: contract resolutions not empty // | | | | @@ -164,11 +169,11 @@ const ( // | | | // | | |-> StateFullyResolved: contract resolutions empty // | | - // | |-> StateFullyResolved: coop/breach close trigger + // | |-> StateFullyResolved: coop/breach(legacy) close trigger // | - // |-> StateContractClosed: local/remote close trigger + // |-> StateContractClosed: local/remote/breach close trigger // | | - // | |-> StateWaitingFullResolution: contract resolutions empty + // | |-> StateWaitingFullResolution: contract resolutions not empty // | | | // | | |-> StateWaitingFullResolution: contract resolutions not empty // | | | @@ -176,7 +181,7 @@ const ( // | | // | |-> StateFullyResolved: contract resolutions empty // | - // |-> StateFullyResolved: coop/breach close trigger + // |-> StateFullyResolved: coop/breach(legacy) close trigger // StateDefault is the default state. In this state, no major actions // need to be executed. @@ -269,6 +274,10 @@ const ( // sweeping out direct commitment output form the remote party's // commitment transaction. resolverUnilateralSweep resolverType = 4 + + // resolverBreach is the type of resolver that manages a contract + // breach on-chain. + resolverBreach resolverType = 5 ) // resolverIDLen is the size of the resolver ID key. This is 36 bytes as we get @@ -341,6 +350,11 @@ var ( // store the anchor resolution, if any. anchorResolutionKey = []byte("anchor-resolution") + // breachResolutionKey is the key under the logScope that we'll use to + // store the breach resolution, if any. This is used rather than the + // resolutionsKey. + breachResolutionKey = []byte("breach-resolution") + // actionsBucketKey is the key under the logScope that we'll use to // store all chain actions once they're determined. actionsBucketKey = []byte("chain-actions") @@ -464,6 +478,8 @@ func (b *boltArbitratorLog) writeResolver(contractBucket kvdb.RwBucket, rType = resolverIncomingContest case *commitSweepResolver: rType = resolverUnilateralSweep + case *breachResolver: + rType = resolverBreach } if _, err := buf.Write([]byte{byte(rType)}); err != nil { return err @@ -593,6 +609,11 @@ func (b *boltArbitratorLog) FetchUnresolvedContracts() ([]ContractResolver, erro resReader, resolverCfg, ) + case resolverBreach: + res, err = newBreachResolverFromReader( + resReader, resolverCfg, + ) + default: return fmt.Errorf("unknown resolver type: %v", resType) } @@ -785,6 +806,20 @@ func (b *boltArbitratorLog) LogContractResolutions(c *ContractResolutions) error } } + // Write out the breach resolution if present. + if c.BreachResolution != nil { + var b bytes.Buffer + err := encodeBreachResolution(&b, c.BreachResolution) + if err != nil { + return err + } + + err = scopeBucket.Put(breachResolutionKey, b.Bytes()) + if err != nil { + return err + } + } + return nil }) } @@ -904,6 +939,18 @@ func (b *boltArbitratorLog) FetchContractResolutions() (*ContractResolutions, er } } + breachResBytes := scopeBucket.Get(breachResolutionKey) + if breachResBytes != nil { + c.BreachResolution = &BreachResolution{} + resReader := bytes.NewReader(breachResBytes) + err := decodeBreachResolution( + resReader, c.BreachResolution, + ) + if err != nil { + return err + } + } + return nil }, func() { c = &ContractResolutions{} @@ -1372,6 +1419,21 @@ func decodeAnchorResolution(r io.Reader, return input.ReadSignDescriptor(r, &a.AnchorSignDescriptor) } +func encodeBreachResolution(w io.Writer, b *BreachResolution) error { + if _, err := w.Write(b.FundingOutPoint.Hash[:]); err != nil { + return err + } + return binary.Write(w, endian, b.FundingOutPoint.Index) +} + +func decodeBreachResolution(r io.Reader, b *BreachResolution) error { + _, err := io.ReadFull(r, b.FundingOutPoint.Hash[:]) + if err != nil { + return err + } + return binary.Read(r, endian, &b.FundingOutPoint.Index) +} + func encodeHtlcSetKey(w io.Writer, h *HtlcSetKey) error { err := binary.Write(w, endian, h.IsRemote) if err != nil { diff --git a/contractcourt/chain_arbitrator.go b/contractcourt/chain_arbitrator.go index a39d939f6..364bcd7e3 100644 --- a/contractcourt/chain_arbitrator.go +++ b/contractcourt/chain_arbitrator.go @@ -100,14 +100,12 @@ type ChainArbitratorConfig struct { MarkLinkInactive func(wire.OutPoint) error // ContractBreach is a function closure that the ChainArbitrator will - // use to notify the breachArbiter about a contract breach. A callback - // should be passed that when called will mark the channel pending - // close in the database. It should only return a non-nil error when the - // breachArbiter has preserved the necessary breach info for this - // channel point, and the callback has succeeded, meaning it is safe to - // stop watching the channel. - ContractBreach func(wire.OutPoint, *lnwallet.BreachRetribution, - func() error) error + // use to notify the breachArbiter about a contract breach. It should + // only return a non-nil error when the breachArbiter has preserved + // the necessary breach info for this channel point. Once the breach + // resolution is persisted in the channel arbitrator, it will be safe + // to mark the channel closed. + ContractBreach func(wire.OutPoint, *lnwallet.BreachRetribution) error // IsOurAddress is a function that returns true if the passed address // is known to the underlying wallet. Otherwise, false should be @@ -183,6 +181,12 @@ type ChainArbitratorConfig struct { // Clock is the clock implementation that ChannelArbitrator uses. // It is useful for testing. Clock clock.Clock + + // SubscribeBreachComplete is used by the breachResolver to register a + // subscription that notifies when the breach resolution process is + // complete. + SubscribeBreachComplete func(op *wire.OutPoint, c chan struct{}) ( + bool, error) } // ChainArbitrator is a sub-system that oversees the on-chain resolution of all @@ -506,19 +510,17 @@ func (c *ChainArbitrator) Start() error { // First, we'll create an active chainWatcher for this channel // to ensure that we detect any relevant on chain events. + breachClosure := func(ret *lnwallet.BreachRetribution) error { + return c.cfg.ContractBreach(chanPoint, ret) + } + chainWatcher, err := newChainWatcher( chainWatcherConfig{ - chanState: channel, - notifier: c.cfg.Notifier, - signer: c.cfg.Signer, - isOurAddr: c.cfg.IsOurAddress, - contractBreach: func(retInfo *lnwallet.BreachRetribution, - markClosed func() error) error { - - return c.cfg.ContractBreach( - chanPoint, retInfo, markClosed, - ) - }, + chanState: channel, + notifier: c.cfg.Notifier, + signer: c.cfg.Signer, + isOurAddr: c.cfg.IsOurAddress, + contractBreach: breachClosure, extractStateNumHint: lnwallet.GetStateNumHint, }, ) @@ -1116,11 +1118,11 @@ func (c *ChainArbitrator) WatchNewChannel(newChan *channeldb.OpenChannel) error notifier: c.cfg.Notifier, signer: c.cfg.Signer, isOurAddr: c.cfg.IsOurAddress, - contractBreach: func(retInfo *lnwallet.BreachRetribution, - markClosed func() error) error { + contractBreach: func( + retInfo *lnwallet.BreachRetribution) error { return c.cfg.ContractBreach( - chanPoint, retInfo, markClosed, + chanPoint, retInfo, ) }, extractStateNumHint: lnwallet.GetStateNumHint, diff --git a/contractcourt/chain_watcher.go b/contractcourt/chain_watcher.go index 973a0ca73..7e813ee4a 100644 --- a/contractcourt/chain_watcher.go +++ b/contractcourt/chain_watcher.go @@ -9,6 +9,7 @@ import ( "github.com/btcsuite/btcd/btcec" "github.com/btcsuite/btcd/chaincfg" + "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/txscript" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" @@ -57,6 +58,29 @@ type RemoteUnilateralCloseInfo struct { CommitSet CommitSet } +// BreachResolution wraps the outpoint of the breached channel. +type BreachResolution struct { + FundingOutPoint wire.OutPoint +} + +// BreachCloseInfo wraps the BreachResolution with a CommitSet for the latest, +// non-breached state, with the AnchorResolution for the breached state. +type BreachCloseInfo struct { + *BreachResolution + *lnwallet.AnchorResolution + + // CommitHash is the hash of the commitment transaction. + CommitHash chainhash.Hash + + // CommitSet is the set of known valid commitments at the time the + // breach occurred on-chain. + CommitSet CommitSet + + // CloseSummary gives the recipient of the BreachCloseInfo information + // to mark the channel closed in the database. + CloseSummary channeldb.ChannelCloseSummary +} + // CommitSet is a collection of the set of known valid commitments at a given // instant. If ConfCommitKey is set, then the commitment identified by the // HtlcSetKey has hit the chain. This struct will be used to examine all live @@ -124,7 +148,7 @@ type ChainEventSubscription struct { // ContractBreach is a channel that will be sent upon if we detect a // contract breach. The struct sent across the channel contains all the // material required to bring the cheating channel peer to justice. - ContractBreach chan *lnwallet.BreachRetribution + ContractBreach chan *BreachCloseInfo // Cancel cancels the subscription to the event stream for a particular // channel. This method should be called once the caller no longer needs to @@ -150,13 +174,10 @@ type chainWatcherConfig struct { signer input.Signer // contractBreach is a method that will be called by the watcher if it - // detects that a contract breach transaction has been confirmed. A - // callback should be passed that when called will mark the channel - // pending close in the database. It will only return a non-nil error - // when the breachArbiter has preserved the necessary breach info for - // this channel point, and the callback has succeeded, meaning it is - // safe to stop watching the channel. - contractBreach func(*lnwallet.BreachRetribution, func() error) error + // detects that a contract breach transaction has been confirmed. It + // will only return a non-nil error when the breachArbiter has + // preserved the necessary breach info for this channel point. + contractBreach func(*lnwallet.BreachRetribution) error // isOurAddr is a function that returns true if the passed address is // known to us. @@ -311,7 +332,7 @@ func (c *chainWatcher) SubscribeChannelEvents() *ChainEventSubscription { RemoteUnilateralClosure: make(chan *RemoteUnilateralCloseInfo, 1), LocalUnilateralClosure: make(chan *LocalUnilateralCloseInfo, 1), CooperativeClosure: make(chan *CooperativeCloseInfo, 1), - ContractBreach: make(chan *lnwallet.BreachRetribution, 1), + ContractBreach: make(chan *BreachCloseInfo, 1), Cancel: func() { c.Lock() delete(c.clientSubscriptions, clientID) @@ -785,12 +806,27 @@ func (c *chainWatcher) handleKnownRemoteState( return false, nil } + // Create an AnchorResolution for the breached state. + anchorRes, err := lnwallet.NewAnchorResolution( + c.cfg.chanState, commitSpend.SpendingTx, + ) + if err != nil { + return false, fmt.Errorf("unable to create anchor "+ + "resolution: %v", err) + } + + // We'll set the ConfCommitKey here as the remote htlc set. This is + // only used to ensure a nil-pointer-dereference doesn't occur and is + // not used otherwise. The HTLC's may not exist for the + // RemotePendingHtlcSet. + chainSet.commitSet.ConfCommitKey = &RemoteHtlcSet + // THEY'RE ATTEMPTING TO VIOLATE THE CONTRACT LAID OUT WITHIN THE // PAYMENT CHANNEL. Therefore we close the signal indicating a revoked // broadcast to allow subscribers to swiftly dispatch justice!!! err = c.dispatchContractBreach( - commitSpend, &chainSet.remoteCommit, - broadcastStateNum, retribution, + commitSpend, chainSet, broadcastStateNum, retribution, + anchorRes, ) if err != nil { return false, fmt.Errorf("unable to handle channel "+ @@ -1083,8 +1119,9 @@ func (c *chainWatcher) dispatchRemoteForceClose( // materials required to bring the cheater to justice, then notify all // registered subscribers of this event. func (c *chainWatcher) dispatchContractBreach(spendEvent *chainntnfs.SpendDetail, - remoteCommit *channeldb.ChannelCommitment, broadcastStateNum uint64, - retribution *lnwallet.BreachRetribution) error { + chainSet *chainSet, broadcastStateNum uint64, + retribution *lnwallet.BreachRetribution, + anchorRes *lnwallet.AnchorResolution) error { log.Warnf("Remote peer has breached the channel contract for "+ "ChannelPoint(%v). Revoked state #%v was broadcast!!!", @@ -1125,7 +1162,7 @@ func (c *chainWatcher) dispatchContractBreach(spendEvent *chainntnfs.SpendDetail return spew.Sdump(retribution) })) - settledBalance := remoteCommit.LocalBalance.ToSatoshis() + settledBalance := chainSet.remoteCommit.LocalBalance.ToSatoshis() closeSummary := channeldb.ChannelCloseSummary{ ChanPoint: c.cfg.chanState.FundingOutpoint, ChainHash: c.cfg.chanState.ChainHash, @@ -1151,38 +1188,35 @@ func (c *chainWatcher) dispatchContractBreach(spendEvent *chainntnfs.SpendDetail closeSummary.LastChanSyncMsg = chanSync } - // We create a function closure that will mark the channel as pending - // close in the database. We pass it to the contracBreach method such - // that it can ensure safe handoff of the breach before we close the - // channel. - markClosed := func() error { - // At this point, we've successfully received an ack for the - // breach close, and we can mark the channel as pending force - // closed. - if err := c.cfg.chanState.CloseChannel( - &closeSummary, channeldb.ChanStatusRemoteCloseInitiator, - ); err != nil { - return err - } - - log.Infof("Breached channel=%v marked pending-closed", - c.cfg.chanState.FundingOutpoint) - return nil - } - - // Hand the retribution info over to the breach arbiter. - if err := c.cfg.contractBreach(retribution, markClosed); err != nil { + // Hand the retribution info over to the breach arbiter. This function + // will wait for a response from the breach arbiter and then proceed to + // send a BreachCloseInfo to the channel arbitrator. The channel arb + // will then mark the channel as closed after resolutions and the + // commit set are logged in the arbitrator log. + if err := c.cfg.contractBreach(retribution); err != nil { log.Errorf("unable to hand breached contract off to "+ "breachArbiter: %v", err) return err } + breachRes := &BreachResolution{ + FundingOutPoint: c.cfg.chanState.FundingOutpoint, + } + + breachInfo := &BreachCloseInfo{ + CommitHash: spendEvent.SpendingTx.TxHash(), + BreachResolution: breachRes, + AnchorResolution: anchorRes, + CommitSet: chainSet.commitSet, + CloseSummary: closeSummary, + } + // With the event processed and channel closed, we'll now notify all // subscribers of the event. c.Lock() for _, sub := range c.clientSubscriptions { select { - case sub.ContractBreach <- retribution: + case sub.ContractBreach <- breachInfo: case <-c.quit: c.Unlock() return fmt.Errorf("quitting") diff --git a/contractcourt/channel_arbitrator.go b/contractcourt/channel_arbitrator.go index 944f4b736..a67c5e1e1 100644 --- a/contractcourt/channel_arbitrator.go +++ b/contractcourt/channel_arbitrator.go @@ -747,8 +747,8 @@ const ( coopCloseTrigger // breachCloseTrigger is a transition trigger driven by a remote breach - // being confirmed. In this case the channel arbitrator won't have to - // do anything, so we'll just clean up and exit gracefully. + // being confirmed. In this case the channel arbitrator will wait for + // the breacharbiter to finish and then clean up gracefully. breachCloseTrigger ) @@ -852,9 +852,8 @@ func (c *ChannelArbitrator) stateStep( // If the trigger is a cooperative close being confirmed, then // we can go straight to StateFullyResolved, as there won't be - // any contracts to resolve. The same is true in the case of a - // breach. - case coopCloseTrigger, breachCloseTrigger: + // any contracts to resolve. + case coopCloseTrigger: nextState = StateFullyResolved // Otherwise, if this state advance was triggered by a @@ -868,6 +867,14 @@ func (c *ChannelArbitrator) stateStep( fallthrough case remoteCloseTrigger: nextState = StateContractClosed + + case breachCloseTrigger: + nextContractState, err := c.checkLegacyBreach() + if nextContractState == StateError { + return nextContractState, nil, err + } + + nextState = nextContractState } // If we're in this state, then we've decided to broadcast the @@ -890,7 +897,23 @@ func (c *ChannelArbitrator) stateStep( c.cfg.ChanPoint, trigger, StateContractClosed) return StateContractClosed, closeTx, nil - case coopCloseTrigger, breachCloseTrigger: + case breachCloseTrigger: + nextContractState, err := c.checkLegacyBreach() + if nextContractState == StateError { + log.Infof("ChannelArbitrator(%v): unable to "+ + "advance breach close resolution: %v", + c.cfg.ChanPoint, nextContractState) + return StateError, closeTx, err + } + + log.Infof("ChannelArbitrator(%v): detected %s close "+ + "after closing channel, fast-forwarding to %s"+ + " to resolve contract", c.cfg.ChanPoint, + trigger, nextContractState) + + return nextContractState, closeTx, nil + + case coopCloseTrigger: log.Infof("ChannelArbitrator(%v): detected %s "+ "close after closing channel, fast-forwarding "+ "to %s to resolve contract", @@ -994,10 +1017,18 @@ func (c *ChannelArbitrator) stateStep( case localCloseTrigger, remoteCloseTrigger: nextState = StateContractClosed - // If a coop close or breach was confirmed, jump straight to - // the fully resolved state. - case coopCloseTrigger, breachCloseTrigger: + // If a coop close was confirmed, jump straight to the fully + // resolved state. + case coopCloseTrigger: nextState = StateFullyResolved + + case breachCloseTrigger: + nextContractState, err := c.checkLegacyBreach() + if nextContractState == StateError { + return nextContractState, closeTx, err + } + + nextState = nextContractState } log.Infof("ChannelArbitrator(%v): trigger %v moving from "+ @@ -1996,10 +2027,62 @@ func (c *ChannelArbitrator) prepContractResolutions( commitHash := contractResolutions.CommitHash failureMsg := &lnwire.FailPermanentChannelFailure{} + var htlcResolvers []ContractResolver + + // We instantiate an anchor resolver if the commitment tx has an + // anchor. + if contractResolutions.AnchorResolution != nil { + anchorResolver := newAnchorResolver( + contractResolutions.AnchorResolution.AnchorSignDescriptor, + contractResolutions.AnchorResolution.CommitAnchor, + height, c.cfg.ChanPoint, resolverCfg, + ) + htlcResolvers = append(htlcResolvers, anchorResolver) + } + + // If this is a breach close, we'll create a breach resolver, determine + // the htlc's to fail back, and exit. This is done because the other + // steps taken for non-breach-closes do not matter for breach-closes. + if contractResolutions.BreachResolution != nil { + breachResolver := newBreachResolver(resolverCfg) + htlcResolvers = append(htlcResolvers, breachResolver) + + // We'll use the CommitSet, we'll fail back all outgoing HTLC's + // that exist on either of the remote commitments. The map is + // used to deduplicate any shared htlc's. + remoteOutgoing := make(map[uint64]channeldb.HTLC) + for htlcSetKey, htlcs := range confCommitSet.HtlcSets { + if !htlcSetKey.IsRemote { + continue + } + + for _, htlc := range htlcs { + if htlc.Incoming { + continue + } + + remoteOutgoing[htlc.HtlcIndex] = htlc + } + } + + // Now we'll loop over the map and create ResolutionMsgs for + // each of them. + for _, htlc := range remoteOutgoing { + failMsg := ResolutionMsg{ + SourceChan: c.cfg.ShortChanID, + HtlcIndex: htlc.HtlcIndex, + Failure: failureMsg, + } + + msgsToSend = append(msgsToSend, failMsg) + } + + return htlcResolvers, msgsToSend, nil + } + // For each HTLC, we'll either act immediately, meaning we'll instantly // fail the HTLC, or we'll act only once the transaction has been // confirmed, in which case we'll need an HTLC resolver. - var htlcResolvers []ContractResolver for htlcAction, htlcs := range htlcActions { switch htlcAction { @@ -2145,17 +2228,6 @@ func (c *ChannelArbitrator) prepContractResolutions( htlcResolvers = append(htlcResolvers, resolver) } - // We instantiate an anchor resolver if the commitmentment tx has an - // anchor. - if contractResolutions.AnchorResolution != nil { - anchorResolver := newAnchorResolver( - contractResolutions.AnchorResolution.AnchorSignDescriptor, - contractResolutions.AnchorResolution.CommitAnchor, - height, c.cfg.ChanPoint, resolverCfg, - ) - htlcResolvers = append(htlcResolvers, anchorResolver) - } - return htlcResolvers, msgsToSend, nil } @@ -2574,14 +2646,59 @@ func (c *ChannelArbitrator) channelAttendant(bestHeight int32) { // the ChainWatcher and BreachArbiter, we don't have to do // anything in particular, so just advance our state and // gracefully exit. - case <-c.cfg.ChainEvents.ContractBreach: + case breachInfo := <-c.cfg.ChainEvents.ContractBreach: log.Infof("ChannelArbitrator(%v): remote party has "+ "breached channel!", c.cfg.ChanPoint) + // In the breach case, we'll only have anchor and + // breach resolutions. + contractRes := &ContractResolutions{ + CommitHash: breachInfo.CommitHash, + BreachResolution: breachInfo.BreachResolution, + AnchorResolution: breachInfo.AnchorResolution, + } + + // We'll transition to the ContractClosed state and log + // the set of resolutions such that they can be turned + // into resolvers later on. We'll also insert the + // CommitSet of the latest set of commitments. + err := c.log.LogContractResolutions(contractRes) + if err != nil { + log.Errorf("Unable to write resolutions: %v", + err) + return + } + err = c.log.InsertConfirmedCommitSet( + &breachInfo.CommitSet, + ) + if err != nil { + log.Errorf("Unable to write commit set: %v", + err) + return + } + + // The channel is finally marked pending closed here as + // the breacharbiter and channel arbitrator have + // persisted the relevant states. + closeSummary := &breachInfo.CloseSummary + err = c.cfg.MarkChannelClosed( + closeSummary, + channeldb.ChanStatusRemoteCloseInitiator, + ) + if err != nil { + log.Errorf("Unable to mark channel closed: %v", + err) + return + } + + log.Infof("Breached channel=%v marked pending-closed", + breachInfo.BreachResolution.FundingOutPoint) + // We'll advance our state machine until it reaches a // terminal state. - _, _, err := c.advanceState( - uint32(bestHeight), breachCloseTrigger, nil, + _, _, err = c.advanceState( + uint32(bestHeight), breachCloseTrigger, + &breachInfo.CommitSet, ) if err != nil { log.Errorf("Unable to advance state: %v", err) @@ -2661,3 +2778,25 @@ func (c *ChannelArbitrator) channelAttendant(bestHeight int32) { } } } + +// checkLegacyBreach returns StateFullyResolved if the channel was closed with +// a breach transaction before the channel arbitrator launched its own breach +// resolver. StateContractClosed is returned if this is a modern breach close +// with a breach resolver. StateError is returned if the log lookup failed. +func (c *ChannelArbitrator) checkLegacyBreach() (ArbitratorState, error) { + // A previous version of the channel arbitrator would make the breach + // close skip to StateFullyResolved. If there are no contract + // resolutions in the bolt arbitrator log, then this is an older breach + // close. Otherwise, if there are resolutions, the state should advance + // to StateContractClosed. + _, err := c.log.FetchContractResolutions() + if err == errNoResolutions { + // This is an older breach close still in the database. + return StateFullyResolved, nil + } else if err != nil { + return StateError, err + } + + // This is a modern breach close with resolvers. + return StateContractClosed, nil +} diff --git a/contractcourt/channel_arbitrator_test.go b/contractcourt/channel_arbitrator_test.go index 46742a384..6a0af45d3 100644 --- a/contractcourt/channel_arbitrator_test.go +++ b/contractcourt/channel_arbitrator_test.go @@ -205,6 +205,9 @@ type chanArbTestCtx struct { log ArbitratorLog sweeper *mockSweeper + + breachSubscribed chan struct{} + breachResolutionChan chan struct{} } func (c *chanArbTestCtx) CleanUp() { @@ -303,13 +306,17 @@ func withMarkClosed(markClosed func(*channeldb.ChannelCloseSummary, func createTestChannelArbitrator(t *testing.T, log ArbitratorLog, opts ...testChanArbOption) (*chanArbTestCtx, error) { + chanArbCtx := &chanArbTestCtx{ + breachSubscribed: make(chan struct{}), + } + chanPoint := wire.OutPoint{} shortChanID := lnwire.ShortChannelID{} chanEvents := &ChainEventSubscription{ RemoteUnilateralClosure: make(chan *RemoteUnilateralCloseInfo, 1), LocalUnilateralClosure: make(chan *LocalUnilateralCloseInfo, 1), CooperativeClosure: make(chan *CooperativeCloseInfo, 1), - ContractBreach: make(chan *lnwallet.BreachRetribution, 1), + ContractBreach: make(chan *BreachCloseInfo, 1), } resolutionChan := make(chan []ResolutionMsg, 1) @@ -346,6 +353,13 @@ func createTestChannelArbitrator(t *testing.T, log ArbitratorLog, return true }, + SubscribeBreachComplete: func(op *wire.OutPoint, + c chan struct{}) (bool, error) { + + chanArbCtx.breachResolutionChan = c + chanArbCtx.breachSubscribed <- struct{}{} + return false, nil + }, Clock: clock.NewDefaultClock(), Sweeper: mockSweeper, } @@ -425,16 +439,16 @@ func createTestChannelArbitrator(t *testing.T, log ArbitratorLog, chanArb := NewChannelArbitrator(*arbCfg, htlcSets, log) - return &chanArbTestCtx{ - t: t, - chanArb: chanArb, - cleanUp: cleanUp, - resolvedChan: resolvedChan, - resolutions: resolutionChan, - log: log, - incubationRequests: incubateChan, - sweeper: mockSweeper, - }, nil + chanArbCtx.t = t + chanArbCtx.chanArb = chanArb + chanArbCtx.cleanUp = cleanUp + chanArbCtx.resolvedChan = resolvedChan + chanArbCtx.resolutions = resolutionChan + chanArbCtx.log = log + chanArbCtx.incubationRequests = incubateChan + chanArbCtx.sweeper = mockSweeper + + return chanArbCtx, nil } // TestChannelArbitratorCooperativeClose tests that the ChannelArbitertor @@ -661,11 +675,13 @@ func TestChannelArbitratorLocalForceClose(t *testing.T) { // TestChannelArbitratorBreachClose tests that the ChannelArbitrator goes // through the expected states in case we notice a breach in the chain, and -// gracefully exits. +// is able to properly progress the breachResolver and anchorResolver to a +// successful resolution. func TestChannelArbitratorBreachClose(t *testing.T) { log := &mockArbitratorLog{ state: StateDefault, newStates: make(chan ArbitratorState, 5), + resolvers: make(map[ContractResolver]struct{}), } chanArbCtx, err := createTestChannelArbitrator(t, log) @@ -673,6 +689,8 @@ func TestChannelArbitratorBreachClose(t *testing.T) { t.Fatalf("unable to create ChannelArbitrator: %v", err) } chanArb := chanArbCtx.chanArb + chanArb.cfg.PreimageDB = newMockWitnessBeacon() + chanArb.cfg.Registry = &mockRegistry{} if err := chanArb.Start(nil); err != nil { t.Fatalf("unable to start ChannelArbitrator: %v", err) @@ -686,13 +704,99 @@ func TestChannelArbitratorBreachClose(t *testing.T) { // It should start out in the default state. chanArbCtx.AssertState(StateDefault) - // Send a breach close event. - chanArb.cfg.ChainEvents.ContractBreach <- &lnwallet.BreachRetribution{} + // We create two HTLCs, one incoming and one outgoing. We will later + // assert that we only receive a ResolutionMsg for the outgoing HTLC. + outgoingIdx := uint64(2) - // It should transition StateDefault -> StateFullyResolved. - chanArbCtx.AssertStateTransitions( - StateFullyResolved, - ) + rHash1 := [lntypes.PreimageSize]byte{1, 2, 3} + htlc1 := channeldb.HTLC{ + RHash: rHash1, + OutputIndex: 2, + Incoming: false, + HtlcIndex: outgoingIdx, + LogIndex: 2, + } + + rHash2 := [lntypes.PreimageSize]byte{2, 2, 2} + htlc2 := channeldb.HTLC{ + RHash: rHash2, + OutputIndex: 3, + Incoming: true, + HtlcIndex: 3, + LogIndex: 3, + } + + anchorRes := &lnwallet.AnchorResolution{ + AnchorSignDescriptor: input.SignDescriptor{ + Output: &wire.TxOut{Value: 1}, + }, + } + + // Create the BreachCloseInfo that the chain_watcher would normally + // send to the channel_arbitrator. + breachInfo := &BreachCloseInfo{ + BreachResolution: &BreachResolution{ + FundingOutPoint: wire.OutPoint{}, + }, + AnchorResolution: anchorRes, + CommitSet: CommitSet{ + ConfCommitKey: &RemoteHtlcSet, + HtlcSets: map[HtlcSetKey][]channeldb.HTLC{ + RemoteHtlcSet: {htlc1, htlc2}, + }, + }, + CommitHash: chainhash.Hash{}, + } + + // Send a breach close event. + chanArb.cfg.ChainEvents.ContractBreach <- breachInfo + + // It should transition StateDefault -> StateContractClosed. + chanArbCtx.AssertStateTransitions(StateContractClosed) + + // We should receive one ResolutionMsg as there was only one outgoing + // HTLC at the time of the breach. + select { + case res := <-chanArbCtx.resolutions: + require.Equal(t, 1, len(res)) + require.Equal(t, outgoingIdx, res[0].HtlcIndex) + case <-time.After(5 * time.Second): + t.Fatal("expected to receive a resolution msg") + } + + // We should now transition from StateContractClosed to + // StateWaitingFullResolution. + chanArbCtx.AssertStateTransitions(StateWaitingFullResolution) + + // One of the resolvers should be an anchor resolver and the other + // should be a breach resolver. + require.Equal(t, 2, len(chanArb.activeResolvers)) + + var anchorExists, breachExists bool + for _, resolver := range chanArb.activeResolvers { + switch resolver.(type) { + case *anchorResolver: + anchorExists = true + case *breachResolver: + breachExists = true + default: + t.Fatalf("did not expect resolver %T", resolver) + } + } + require.True(t, anchorExists && breachExists) + + // The anchor resolver is expected to re-offer the anchor input to the + // sweeper. + <-chanArbCtx.sweeper.sweptInputs + + // Wait for SubscribeBreachComplete to be called. + <-chanArbCtx.breachSubscribed + + // We'll now close the breach channel so that the state transitions to + // StateFullyResolved. + close(chanArbCtx.breachResolutionChan) + + chanArbCtx.AssertStateTransitions(StateFullyResolved) // It should also mark the channel as resolved. select { @@ -1318,12 +1422,14 @@ func TestChannelArbitratorPersistence(t *testing.T) { // TestChannelArbitratorForceCloseBreachedChannel tests that the channel // arbitrator is able to handle a channel in the process of being force closed // is breached by the remote node. In these cases we expect the -// ChannelArbitrator to gracefully exit, as the breach is handled by other -// subsystems. +// ChannelArbitrator to properly execute the breachResolver flow and then +// gracefully exit once the breachResolver receives the signal from what would +// normally be the breacharbiter. func TestChannelArbitratorForceCloseBreachedChannel(t *testing.T) { log := &mockArbitratorLog{ state: StateDefault, newStates: make(chan ArbitratorState, 5), + resolvers: make(map[ContractResolver]struct{}), } chanArbCtx, err := createTestChannelArbitrator(t, log) @@ -1389,6 +1495,20 @@ func TestChannelArbitratorForceCloseBreachedChannel(t *testing.T) { t.Fatalf("no response received") } + // Before restarting, we'll need to modify the arbitrator log to have + // a set of contract resolutions and a commit set. + log.resolutions = &ContractResolutions{ + BreachResolution: &BreachResolution{ + FundingOutPoint: wire.OutPoint{}, + }, + } + log.commitSet = &CommitSet{ + ConfCommitKey: &RemoteHtlcSet, + HtlcSets: map[HtlcSetKey][]channeldb.HTLC{ + RemoteHtlcSet: {}, + }, + } + // We mimic that the channel is breached while the channel arbitrator // is down. This means that on restart it will be started with a // pending close channel, of type BreachClose. @@ -1402,7 +1522,18 @@ func TestChannelArbitratorForceCloseBreachedChannel(t *testing.T) { } defer chanArbCtx.CleanUp() - // Finally it should advance to StateFullyResolved. + // We should transition to StateContractClosed. + chanArbCtx.AssertStateTransitions( + StateContractClosed, StateWaitingFullResolution, + ) + + // Wait for SubscribeBreachComplete to be called. + <-chanArbCtx.breachSubscribed + + // We'll close the breachResolutionChan to cleanup the breachResolver + // and make the state transition to StateFullyResolved. + close(chanArbCtx.breachResolutionChan) + chanArbCtx.AssertStateTransitions(StateFullyResolved) // It should also mark the channel as resolved. diff --git a/docs/release-notes/release-notes-0.14.2.md b/docs/release-notes/release-notes-0.14.2.md index 81c0458b2..d09f5e705 100644 --- a/docs/release-notes/release-notes-0.14.2.md +++ b/docs/release-notes/release-notes-0.14.2.md @@ -41,6 +41,9 @@ Postgres](https://github.com/lightningnetwork/lnd/pull/6111) ## Bug fixes +* [A new resolver for breach closes was introduced that handles sweeping + anchor outputs and failing back HTLCs.](https://github.com/lightningnetwork/lnd/pull/6158) + * [Return the nearest known fee rate when a given conf target cannot be found from Web API fee estimator.](https://github.com/lightningnetwork/lnd/pull/6062) diff --git a/server.go b/server.go index e401e1705..61687c42f 100644 --- a/server.go +++ b/server.go @@ -1026,6 +1026,20 @@ func newServer(cfg *Config, listenAddrs []net.Addr, // breach events from the ChannelArbitrator to the breachArbiter, contractBreaches := make(chan *contractcourt.ContractBreachEvent, 1) + s.breachArbiter = contractcourt.NewBreachArbiter(&contractcourt.BreachConfig{ + CloseLink: closeLink, + DB: s.chanStateDB, + Estimator: s.cc.FeeEstimator, + GenSweepScript: newSweepPkScriptGen(cc.Wallet), + Notifier: cc.ChainNotifier, + PublishTransaction: cc.Wallet.PublishTransaction, + ContractBreaches: contractBreaches, + Signer: cc.Wallet.Cfg.Signer, + Store: contractcourt.NewRetributionStore( + dbs.ChanStateDB, + ), + }) + s.chainArb = contractcourt.NewChainArbitrator(contractcourt.ChainArbitratorConfig{ ChainHash: *s.cfg.ActiveNetParams.GenesisHash, IncomingBroadcastDelta: lncfg.DefaultIncomingBroadcastDelta, @@ -1074,8 +1088,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, }, IsOurAddress: cc.Wallet.IsOurAddress, ContractBreach: func(chanPoint wire.OutPoint, - breachRet *lnwallet.BreachRetribution, - markClosed func() error) error { + breachRet *lnwallet.BreachRetribution) error { // processACK will handle the breachArbiter ACKing the // event. @@ -1087,8 +1100,9 @@ func newServer(cfg *Config, listenAddrs []net.Addr, } // If the breachArbiter successfully handled - // the event, we can mark the channel closed. - finalErr <- markClosed() + // the event, we can signal that the handoff + // was successful. + finalErr <- nil } event := &contractcourt.ContractBreachEvent{ @@ -1104,9 +1118,8 @@ func newServer(cfg *Config, listenAddrs []net.Addr, return ErrServerShuttingDown } - // We'll wait for a final error to be available, either - // from the breachArbiter or from our markClosed - // function closure. + // We'll wait for a final error to be available from + // the breachArbiter. select { case err := <-finalErr: return err @@ -1125,22 +1138,9 @@ func newServer(cfg *Config, listenAddrs []net.Addr, PaymentsExpirationGracePeriod: cfg.PaymentsExpirationGracePeriod, IsForwardedHTLC: s.htlcSwitch.IsForwardedHTLC, Clock: clock.NewDefaultClock(), + SubscribeBreachComplete: s.breachArbiter.SubscribeBreachComplete, }, dbs.ChanStateDB) - s.breachArbiter = contractcourt.NewBreachArbiter(&contractcourt.BreachConfig{ - CloseLink: closeLink, - DB: s.chanStateDB, - Estimator: s.cc.FeeEstimator, - GenSweepScript: newSweepPkScriptGen(cc.Wallet), - Notifier: cc.ChainNotifier, - PublishTransaction: cc.Wallet.PublishTransaction, - ContractBreaches: contractBreaches, - Signer: cc.Wallet.Cfg.Signer, - Store: contractcourt.NewRetributionStore( - dbs.ChanStateDB, - ), - }) - // Select the configuration and furnding parameters for Bitcoin or // Litecoin, depending on the primary registered chain. primaryChain := cfg.registeredChains.PrimaryChain()