Merge pull request #6158 from Crypt-iQ/breach_resolver_cancel

server+contractcourt: create breachResolver to ensure htlc's are failed back
This commit is contained in:
Olaoluwa Osuntokun 2022-01-25 16:03:08 -08:00 committed by GitHub
commit a57c650f96
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 680 additions and 136 deletions

View File

@ -0,0 +1,121 @@
package contractcourt
import (
"encoding/binary"
"io"
"github.com/lightningnetwork/lnd/channeldb"
)
// breachResolver is a resolver that will handle breached closes. In the
// future, this will likely take over the duties the current breacharbiter has.
type breachResolver struct {
// resolved reflects if the contract has been fully resolved or not.
resolved bool
// subscribed denotes whether or not the breach resolver has subscribed
// to the breacharbiter for breach resolution.
subscribed bool
// replyChan is closed when the breach arbiter has completed serving
// justice.
replyChan chan struct{}
contractResolverKit
}
// newBreachResolver instantiates a new breach resolver.
func newBreachResolver(resCfg ResolverConfig) *breachResolver {
r := &breachResolver{
contractResolverKit: *newContractResolverKit(resCfg),
replyChan: make(chan struct{}),
}
r.initLogger(r)
return r
}
// ResolverKey returns the unique identifier for this resolver.
func (b *breachResolver) ResolverKey() []byte {
key := newResolverID(b.ChanPoint)
return key[:]
}
// Resolve queries the breacharbiter to see if the justice transaction has been
// broadcast.
func (b *breachResolver) Resolve() (ContractResolver, error) {
if !b.subscribed {
complete, err := b.SubscribeBreachComplete(
&b.ChanPoint, b.replyChan,
)
if err != nil {
return nil, err
}
// If the breach resolution process is already complete, then
// we can cleanup and checkpoint the resolved state.
if complete {
b.resolved = true
return nil, b.Checkpoint(b)
}
// Prevent duplicate subscriptions.
b.subscribed = true
}
select {
case <-b.replyChan:
// The replyChan has been closed, signalling that the breach
// has been fully resolved. Checkpoint the resolved state and
// exit.
b.resolved = true
return nil, b.Checkpoint(b)
case <-b.quit:
}
return nil, errResolverShuttingDown
}
// Stop signals the breachResolver to stop.
func (b *breachResolver) Stop() {
close(b.quit)
}
// IsResolved returns true if the breachResolver is fully resolved and cleanup
// can occur.
func (b *breachResolver) IsResolved() bool {
return b.resolved
}
// SupplementState adds additional state to the breachResolver.
func (b *breachResolver) SupplementState(_ *channeldb.OpenChannel) {
}
// Encode encodes the breachResolver to the passed writer.
func (b *breachResolver) Encode(w io.Writer) error {
return binary.Write(w, endian, b.resolved)
}
// newBreachResolverFromReader attempts to decode an encoded breachResolver
// from the passed Reader instance, returning an active breachResolver.
func newBreachResolverFromReader(r io.Reader, resCfg ResolverConfig) (
*breachResolver, error) {
b := &breachResolver{
contractResolverKit: *newContractResolverKit(resCfg),
replyChan: make(chan struct{}),
}
if err := binary.Read(r, endian, &b.resolved); err != nil {
return nil, err
}
b.initLogger(b)
return b, nil
}
// A compile time assertion to ensure breachResolver meets the ContractResolver
// interface.
var _ ContractResolver = (*breachResolver)(nil)

View File

@ -185,6 +185,8 @@ type BreachArbiter struct {
cfg *BreachConfig
subscriptions map[wire.OutPoint]chan struct{}
quit chan struct{}
wg sync.WaitGroup
sync.Mutex
@ -194,8 +196,9 @@ type BreachArbiter struct {
// its dependent objects.
func NewBreachArbiter(cfg *BreachConfig) *BreachArbiter {
return &BreachArbiter{
cfg: cfg,
quit: make(chan struct{}),
cfg: cfg,
subscriptions: make(map[wire.OutPoint]chan struct{}),
quit: make(chan struct{}),
}
}
@ -322,6 +325,47 @@ func (b *BreachArbiter) IsBreached(chanPoint *wire.OutPoint) (bool, error) {
return b.cfg.Store.IsBreached(chanPoint)
}
// SubscribeBreachComplete is used by outside subsystems to be notified of a
// successful breach resolution.
func (b *BreachArbiter) SubscribeBreachComplete(chanPoint *wire.OutPoint,
c chan struct{}) (bool, error) {
breached, err := b.cfg.Store.IsBreached(chanPoint)
if err != nil {
// If an error occurs, no subscription will be registered.
return false, err
}
if !breached {
// If chanPoint no longer exists in the Store, then the breach
// was cleaned up successfully. Any subscription that occurs
// happens after the breach information was persisted to the
// underlying store.
return true, nil
}
// Otherwise since the channel point is not resolved, add a
// subscription. There can only be one subscription per channel point.
b.Lock()
defer b.Unlock()
b.subscriptions[*chanPoint] = c
return false, nil
}
// notifyBreachComplete is used by the BreachArbiter to notify outside
// subsystems that the breach resolution process is complete.
func (b *BreachArbiter) notifyBreachComplete(chanPoint *wire.OutPoint) {
b.Lock()
defer b.Unlock()
if c, ok := b.subscriptions[*chanPoint]; ok {
close(c)
}
// Remove the subscription.
delete(b.subscriptions, *chanPoint)
}
// contractObserver is the primary goroutine for the BreachArbiter. This
// goroutine is responsible for handling breach events coming from the
// contractcourt on the ContractBreaches channel. If a channel breach is
@ -857,6 +901,14 @@ func (b *BreachArbiter) cleanupBreach(chanPoint *wire.OutPoint) error {
err)
}
// This is after the Remove call so that the chan passed in via
// SubscribeBreachComplete is always notified, no matter when it is
// called. Otherwise, if notifyBreachComplete was before Remove, a
// very rare edge case could occur in which SubscribeBreachComplete
// is called after notifyBreachComplete and before Remove, meaning the
// caller would never be notified.
b.notifyBreachComplete(chanPoint)
return nil
}

View File

@ -36,16 +36,21 @@ type ContractResolutions struct {
// output. If the channel type doesn't include anchors, the value of
// this field will be nil.
AnchorResolution *lnwallet.AnchorResolution
// BreachResolution contains the data required to manage the lifecycle
// of a breach in the ChannelArbitrator.
BreachResolution *BreachResolution
}
// IsEmpty returns true if the set of resolutions is "empty". A resolution is
// empty if: our commitment output has been trimmed, and we don't have any
// incoming or outgoing HTLC's active.
// empty if: our commitment output has been trimmed, we don't have any
// incoming or outgoing HTLC's active, there is no anchor output to sweep, or
// there are no breached outputs to resolve.
func (c *ContractResolutions) IsEmpty() bool {
return c.CommitResolution == nil &&
len(c.HtlcResolutions.IncomingHTLCs) == 0 &&
len(c.HtlcResolutions.OutgoingHTLCs) == 0 &&
c.AnchorResolution == nil
c.AnchorResolution == nil && c.BreachResolution == nil
}
// ArbitratorLog is the primary source of persistent storage for the
@ -142,7 +147,7 @@ const (
// | | |
// | | |-> StateCommitmentBroadcasted: chain/user trigger
// | | |
// | | |-> StateContractClosed: local/remote close trigger
// | | |-> StateContractClosed: local/remote/breach close trigger
// | | | |
// | | | |-> StateWaitingFullResolution: contract resolutions not empty
// | | | | |
@ -152,9 +157,9 @@ const (
// | | | |
// | | | |-> StateFullyResolved: contract resolutions empty
// | | |
// | | |-> StateFullyResolved: coop/breach close trigger
// | | |-> StateFullyResolved: coop/breach(legacy) close trigger
// | |
// | |-> StateContractClosed: local/remote close trigger
// | |-> StateContractClosed: local/remote/breach close trigger
// | | |
// | | |-> StateWaitingFullResolution: contract resolutions not empty
// | | | |
@ -164,11 +169,11 @@ const (
// | | |
// | | |-> StateFullyResolved: contract resolutions empty
// | |
// | |-> StateFullyResolved: coop/breach close trigger
// | |-> StateFullyResolved: coop/breach(legacy) close trigger
// |
// |-> StateContractClosed: local/remote close trigger
// |-> StateContractClosed: local/remote/breach close trigger
// | |
// | |-> StateWaitingFullResolution: contract resolutions empty
// | |-> StateWaitingFullResolution: contract resolutions not empty
// | | |
// | | |-> StateWaitingFullResolution: contract resolutions not empty
// | | |
@ -176,7 +181,7 @@ const (
// | |
// | |-> StateFullyResolved: contract resolutions empty
// |
// |-> StateFullyResolved: coop/breach close trigger
// |-> StateFullyResolved: coop/breach(legacy) close trigger
// StateDefault is the default state. In this state, no major actions
// need to be executed.
@ -269,6 +274,10 @@ const (
// sweeping out direct commitment output form the remote party's
// commitment transaction.
resolverUnilateralSweep resolverType = 4
// resolverBreach is the type of resolver that manages a contract
// breach on-chain.
resolverBreach resolverType = 5
)
// resolverIDLen is the size of the resolver ID key. This is 36 bytes as we get
@ -341,6 +350,11 @@ var (
// store the anchor resolution, if any.
anchorResolutionKey = []byte("anchor-resolution")
// breachResolutionKey is the key under the logScope that we'll use to
// store the breach resolution, if any. This is used rather than the
// resolutionsKey.
breachResolutionKey = []byte("breach-resolution")
// actionsBucketKey is the key under the logScope that we'll use to
// store all chain actions once they're determined.
actionsBucketKey = []byte("chain-actions")
@ -464,6 +478,8 @@ func (b *boltArbitratorLog) writeResolver(contractBucket kvdb.RwBucket,
rType = resolverIncomingContest
case *commitSweepResolver:
rType = resolverUnilateralSweep
case *breachResolver:
rType = resolverBreach
}
if _, err := buf.Write([]byte{byte(rType)}); err != nil {
return err
@ -593,6 +609,11 @@ func (b *boltArbitratorLog) FetchUnresolvedContracts() ([]ContractResolver, erro
resReader, resolverCfg,
)
case resolverBreach:
res, err = newBreachResolverFromReader(
resReader, resolverCfg,
)
default:
return fmt.Errorf("unknown resolver type: %v", resType)
}
@ -785,6 +806,20 @@ func (b *boltArbitratorLog) LogContractResolutions(c *ContractResolutions) error
}
}
// Write out the breach resolution if present.
if c.BreachResolution != nil {
var b bytes.Buffer
err := encodeBreachResolution(&b, c.BreachResolution)
if err != nil {
return err
}
err = scopeBucket.Put(breachResolutionKey, b.Bytes())
if err != nil {
return err
}
}
return nil
})
}
@ -904,6 +939,18 @@ func (b *boltArbitratorLog) FetchContractResolutions() (*ContractResolutions, er
}
}
breachResBytes := scopeBucket.Get(breachResolutionKey)
if breachResBytes != nil {
c.BreachResolution = &BreachResolution{}
resReader := bytes.NewReader(breachResBytes)
err := decodeBreachResolution(
resReader, c.BreachResolution,
)
if err != nil {
return err
}
}
return nil
}, func() {
c = &ContractResolutions{}
@ -1372,6 +1419,21 @@ func decodeAnchorResolution(r io.Reader,
return input.ReadSignDescriptor(r, &a.AnchorSignDescriptor)
}
func encodeBreachResolution(w io.Writer, b *BreachResolution) error {
if _, err := w.Write(b.FundingOutPoint.Hash[:]); err != nil {
return err
}
return binary.Write(w, endian, b.FundingOutPoint.Index)
}
func decodeBreachResolution(r io.Reader, b *BreachResolution) error {
_, err := io.ReadFull(r, b.FundingOutPoint.Hash[:])
if err != nil {
return err
}
return binary.Read(r, endian, &b.FundingOutPoint.Index)
}
func encodeHtlcSetKey(w io.Writer, h *HtlcSetKey) error {
err := binary.Write(w, endian, h.IsRemote)
if err != nil {

View File

@ -100,14 +100,12 @@ type ChainArbitratorConfig struct {
MarkLinkInactive func(wire.OutPoint) error
// ContractBreach is a function closure that the ChainArbitrator will
// use to notify the breachArbiter about a contract breach. A callback
// should be passed that when called will mark the channel pending
// close in the database. It should only return a non-nil error when the
// breachArbiter has preserved the necessary breach info for this
// channel point, and the callback has succeeded, meaning it is safe to
// stop watching the channel.
ContractBreach func(wire.OutPoint, *lnwallet.BreachRetribution,
func() error) error
// use to notify the breachArbiter about a contract breach. It should
// only return a non-nil error when the breachArbiter has preserved
// the necessary breach info for this channel point. Once the breach
// resolution is persisted in the channel arbitrator, it will be safe
// to mark the channel closed.
ContractBreach func(wire.OutPoint, *lnwallet.BreachRetribution) error
// IsOurAddress is a function that returns true if the passed address
// is known to the underlying wallet. Otherwise, false should be
@ -183,6 +181,12 @@ type ChainArbitratorConfig struct {
// Clock is the clock implementation that ChannelArbitrator uses.
// It is useful for testing.
Clock clock.Clock
// SubscribeBreachComplete is used by the breachResolver to register a
// subscription that notifies when the breach resolution process is
// complete.
SubscribeBreachComplete func(op *wire.OutPoint, c chan struct{}) (
bool, error)
}
// ChainArbitrator is a sub-system that oversees the on-chain resolution of all
@ -506,19 +510,17 @@ func (c *ChainArbitrator) Start() error {
// First, we'll create an active chainWatcher for this channel
// to ensure that we detect any relevant on chain events.
breachClosure := func(ret *lnwallet.BreachRetribution) error {
return c.cfg.ContractBreach(chanPoint, ret)
}
chainWatcher, err := newChainWatcher(
chainWatcherConfig{
chanState: channel,
notifier: c.cfg.Notifier,
signer: c.cfg.Signer,
isOurAddr: c.cfg.IsOurAddress,
contractBreach: func(retInfo *lnwallet.BreachRetribution,
markClosed func() error) error {
return c.cfg.ContractBreach(
chanPoint, retInfo, markClosed,
)
},
chanState: channel,
notifier: c.cfg.Notifier,
signer: c.cfg.Signer,
isOurAddr: c.cfg.IsOurAddress,
contractBreach: breachClosure,
extractStateNumHint: lnwallet.GetStateNumHint,
},
)
@ -1116,11 +1118,11 @@ func (c *ChainArbitrator) WatchNewChannel(newChan *channeldb.OpenChannel) error
notifier: c.cfg.Notifier,
signer: c.cfg.Signer,
isOurAddr: c.cfg.IsOurAddress,
contractBreach: func(retInfo *lnwallet.BreachRetribution,
markClosed func() error) error {
contractBreach: func(
retInfo *lnwallet.BreachRetribution) error {
return c.cfg.ContractBreach(
chanPoint, retInfo, markClosed,
chanPoint, retInfo,
)
},
extractStateNumHint: lnwallet.GetStateNumHint,

View File

@ -9,6 +9,7 @@ import (
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/txscript"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
@ -57,6 +58,29 @@ type RemoteUnilateralCloseInfo struct {
CommitSet CommitSet
}
// BreachResolution wraps the outpoint of the breached channel.
type BreachResolution struct {
FundingOutPoint wire.OutPoint
}
// BreachCloseInfo wraps the BreachResolution with a CommitSet for the latest,
// non-breached state, with the AnchorResolution for the breached state.
type BreachCloseInfo struct {
*BreachResolution
*lnwallet.AnchorResolution
// CommitHash is the hash of the commitment transaction.
CommitHash chainhash.Hash
// CommitSet is the set of known valid commitments at the time the
// breach occurred on-chain.
CommitSet CommitSet
// CloseSummary gives the recipient of the BreachCloseInfo information
// to mark the channel closed in the database.
CloseSummary channeldb.ChannelCloseSummary
}
// CommitSet is a collection of the set of known valid commitments at a given
// instant. If ConfCommitKey is set, then the commitment identified by the
// HtlcSetKey has hit the chain. This struct will be used to examine all live
@ -124,7 +148,7 @@ type ChainEventSubscription struct {
// ContractBreach is a channel that will be sent upon if we detect a
// contract breach. The struct sent across the channel contains all the
// material required to bring the cheating channel peer to justice.
ContractBreach chan *lnwallet.BreachRetribution
ContractBreach chan *BreachCloseInfo
// Cancel cancels the subscription to the event stream for a particular
// channel. This method should be called once the caller no longer needs to
@ -150,13 +174,10 @@ type chainWatcherConfig struct {
signer input.Signer
// contractBreach is a method that will be called by the watcher if it
// detects that a contract breach transaction has been confirmed. A
// callback should be passed that when called will mark the channel
// pending close in the database. It will only return a non-nil error
// when the breachArbiter has preserved the necessary breach info for
// this channel point, and the callback has succeeded, meaning it is
// safe to stop watching the channel.
contractBreach func(*lnwallet.BreachRetribution, func() error) error
// detects that a contract breach transaction has been confirmed. It
// will only return a non-nil error when the breachArbiter has
// preserved the necessary breach info for this channel point.
contractBreach func(*lnwallet.BreachRetribution) error
// isOurAddr is a function that returns true if the passed address is
// known to us.
@ -311,7 +332,7 @@ func (c *chainWatcher) SubscribeChannelEvents() *ChainEventSubscription {
RemoteUnilateralClosure: make(chan *RemoteUnilateralCloseInfo, 1),
LocalUnilateralClosure: make(chan *LocalUnilateralCloseInfo, 1),
CooperativeClosure: make(chan *CooperativeCloseInfo, 1),
ContractBreach: make(chan *lnwallet.BreachRetribution, 1),
ContractBreach: make(chan *BreachCloseInfo, 1),
Cancel: func() {
c.Lock()
delete(c.clientSubscriptions, clientID)
@ -785,12 +806,27 @@ func (c *chainWatcher) handleKnownRemoteState(
return false, nil
}
// Create an AnchorResolution for the breached state.
anchorRes, err := lnwallet.NewAnchorResolution(
c.cfg.chanState, commitSpend.SpendingTx,
)
if err != nil {
return false, fmt.Errorf("unable to create anchor "+
"resolution: %v", err)
}
// We'll set the ConfCommitKey here as the remote htlc set. This is
// only used to ensure a nil-pointer-dereference doesn't occur and is
// not used otherwise. The HTLC's may not exist for the
// RemotePendingHtlcSet.
chainSet.commitSet.ConfCommitKey = &RemoteHtlcSet
// THEY'RE ATTEMPTING TO VIOLATE THE CONTRACT LAID OUT WITHIN THE
// PAYMENT CHANNEL. Therefore we close the signal indicating a revoked
// broadcast to allow subscribers to swiftly dispatch justice!!!
err = c.dispatchContractBreach(
commitSpend, &chainSet.remoteCommit,
broadcastStateNum, retribution,
commitSpend, chainSet, broadcastStateNum, retribution,
anchorRes,
)
if err != nil {
return false, fmt.Errorf("unable to handle channel "+
@ -1083,8 +1119,9 @@ func (c *chainWatcher) dispatchRemoteForceClose(
// materials required to bring the cheater to justice, then notify all
// registered subscribers of this event.
func (c *chainWatcher) dispatchContractBreach(spendEvent *chainntnfs.SpendDetail,
remoteCommit *channeldb.ChannelCommitment, broadcastStateNum uint64,
retribution *lnwallet.BreachRetribution) error {
chainSet *chainSet, broadcastStateNum uint64,
retribution *lnwallet.BreachRetribution,
anchorRes *lnwallet.AnchorResolution) error {
log.Warnf("Remote peer has breached the channel contract for "+
"ChannelPoint(%v). Revoked state #%v was broadcast!!!",
@ -1125,7 +1162,7 @@ func (c *chainWatcher) dispatchContractBreach(spendEvent *chainntnfs.SpendDetail
return spew.Sdump(retribution)
}))
settledBalance := remoteCommit.LocalBalance.ToSatoshis()
settledBalance := chainSet.remoteCommit.LocalBalance.ToSatoshis()
closeSummary := channeldb.ChannelCloseSummary{
ChanPoint: c.cfg.chanState.FundingOutpoint,
ChainHash: c.cfg.chanState.ChainHash,
@ -1151,38 +1188,35 @@ func (c *chainWatcher) dispatchContractBreach(spendEvent *chainntnfs.SpendDetail
closeSummary.LastChanSyncMsg = chanSync
}
// We create a function closure that will mark the channel as pending
// close in the database. We pass it to the contracBreach method such
// that it can ensure safe handoff of the breach before we close the
// channel.
markClosed := func() error {
// At this point, we've successfully received an ack for the
// breach close, and we can mark the channel as pending force
// closed.
if err := c.cfg.chanState.CloseChannel(
&closeSummary, channeldb.ChanStatusRemoteCloseInitiator,
); err != nil {
return err
}
log.Infof("Breached channel=%v marked pending-closed",
c.cfg.chanState.FundingOutpoint)
return nil
}
// Hand the retribution info over to the breach arbiter.
if err := c.cfg.contractBreach(retribution, markClosed); err != nil {
// Hand the retribution info over to the breach arbiter. This function
// will wait for a response from the breach arbiter and then proceed to
// send a BreachCloseInfo to the channel arbitrator. The channel arb
// will then mark the channel as closed after resolutions and the
// commit set are logged in the arbitrator log.
if err := c.cfg.contractBreach(retribution); err != nil {
log.Errorf("unable to hand breached contract off to "+
"breachArbiter: %v", err)
return err
}
breachRes := &BreachResolution{
FundingOutPoint: c.cfg.chanState.FundingOutpoint,
}
breachInfo := &BreachCloseInfo{
CommitHash: spendEvent.SpendingTx.TxHash(),
BreachResolution: breachRes,
AnchorResolution: anchorRes,
CommitSet: chainSet.commitSet,
CloseSummary: closeSummary,
}
// With the event processed and channel closed, we'll now notify all
// subscribers of the event.
c.Lock()
for _, sub := range c.clientSubscriptions {
select {
case sub.ContractBreach <- retribution:
case sub.ContractBreach <- breachInfo:
case <-c.quit:
c.Unlock()
return fmt.Errorf("quitting")

View File

@ -747,8 +747,8 @@ const (
coopCloseTrigger
// breachCloseTrigger is a transition trigger driven by a remote breach
// being confirmed. In this case the channel arbitrator won't have to
// do anything, so we'll just clean up and exit gracefully.
// being confirmed. In this case the channel arbitrator will wait for
// the breacharbiter to finish and then clean up gracefully.
breachCloseTrigger
)
@ -852,9 +852,8 @@ func (c *ChannelArbitrator) stateStep(
// If the trigger is a cooperative close being confirmed, then
// we can go straight to StateFullyResolved, as there won't be
// any contracts to resolve. The same is true in the case of a
// breach.
case coopCloseTrigger, breachCloseTrigger:
// any contracts to resolve.
case coopCloseTrigger:
nextState = StateFullyResolved
// Otherwise, if this state advance was triggered by a
@ -868,6 +867,14 @@ func (c *ChannelArbitrator) stateStep(
fallthrough
case remoteCloseTrigger:
nextState = StateContractClosed
case breachCloseTrigger:
nextContractState, err := c.checkLegacyBreach()
if nextContractState == StateError {
return nextContractState, nil, err
}
nextState = nextContractState
}
// If we're in this state, then we've decided to broadcast the
@ -890,7 +897,23 @@ func (c *ChannelArbitrator) stateStep(
c.cfg.ChanPoint, trigger, StateContractClosed)
return StateContractClosed, closeTx, nil
case coopCloseTrigger, breachCloseTrigger:
case breachCloseTrigger:
nextContractState, err := c.checkLegacyBreach()
if nextContractState == StateError {
log.Infof("ChannelArbitrator(%v): unable to "+
"advance breach close resolution: %v",
c.cfg.ChanPoint, nextContractState)
return StateError, closeTx, err
}
log.Infof("ChannelArbitrator(%v): detected %s close "+
"after closing channel, fast-forwarding to %s"+
" to resolve contract", c.cfg.ChanPoint,
trigger, nextContractState)
return nextContractState, closeTx, nil
case coopCloseTrigger:
log.Infof("ChannelArbitrator(%v): detected %s "+
"close after closing channel, fast-forwarding "+
"to %s to resolve contract",
@ -994,10 +1017,18 @@ func (c *ChannelArbitrator) stateStep(
case localCloseTrigger, remoteCloseTrigger:
nextState = StateContractClosed
// If a coop close or breach was confirmed, jump straight to
// the fully resolved state.
case coopCloseTrigger, breachCloseTrigger:
// If a coop close was confirmed, jump straight to the fully
// resolved state.
case coopCloseTrigger:
nextState = StateFullyResolved
case breachCloseTrigger:
nextContractState, err := c.checkLegacyBreach()
if nextContractState == StateError {
return nextContractState, closeTx, err
}
nextState = nextContractState
}
log.Infof("ChannelArbitrator(%v): trigger %v moving from "+
@ -1996,10 +2027,62 @@ func (c *ChannelArbitrator) prepContractResolutions(
commitHash := contractResolutions.CommitHash
failureMsg := &lnwire.FailPermanentChannelFailure{}
var htlcResolvers []ContractResolver
// We instantiate an anchor resolver if the commitment tx has an
// anchor.
if contractResolutions.AnchorResolution != nil {
anchorResolver := newAnchorResolver(
contractResolutions.AnchorResolution.AnchorSignDescriptor,
contractResolutions.AnchorResolution.CommitAnchor,
height, c.cfg.ChanPoint, resolverCfg,
)
htlcResolvers = append(htlcResolvers, anchorResolver)
}
// If this is a breach close, we'll create a breach resolver, determine
// the htlc's to fail back, and exit. This is done because the other
// steps taken for non-breach-closes do not matter for breach-closes.
if contractResolutions.BreachResolution != nil {
breachResolver := newBreachResolver(resolverCfg)
htlcResolvers = append(htlcResolvers, breachResolver)
// We'll use the CommitSet, we'll fail back all outgoing HTLC's
// that exist on either of the remote commitments. The map is
// used to deduplicate any shared htlc's.
remoteOutgoing := make(map[uint64]channeldb.HTLC)
for htlcSetKey, htlcs := range confCommitSet.HtlcSets {
if !htlcSetKey.IsRemote {
continue
}
for _, htlc := range htlcs {
if htlc.Incoming {
continue
}
remoteOutgoing[htlc.HtlcIndex] = htlc
}
}
// Now we'll loop over the map and create ResolutionMsgs for
// each of them.
for _, htlc := range remoteOutgoing {
failMsg := ResolutionMsg{
SourceChan: c.cfg.ShortChanID,
HtlcIndex: htlc.HtlcIndex,
Failure: failureMsg,
}
msgsToSend = append(msgsToSend, failMsg)
}
return htlcResolvers, msgsToSend, nil
}
// For each HTLC, we'll either act immediately, meaning we'll instantly
// fail the HTLC, or we'll act only once the transaction has been
// confirmed, in which case we'll need an HTLC resolver.
var htlcResolvers []ContractResolver
for htlcAction, htlcs := range htlcActions {
switch htlcAction {
@ -2145,17 +2228,6 @@ func (c *ChannelArbitrator) prepContractResolutions(
htlcResolvers = append(htlcResolvers, resolver)
}
// We instantiate an anchor resolver if the commitmentment tx has an
// anchor.
if contractResolutions.AnchorResolution != nil {
anchorResolver := newAnchorResolver(
contractResolutions.AnchorResolution.AnchorSignDescriptor,
contractResolutions.AnchorResolution.CommitAnchor,
height, c.cfg.ChanPoint, resolverCfg,
)
htlcResolvers = append(htlcResolvers, anchorResolver)
}
return htlcResolvers, msgsToSend, nil
}
@ -2574,14 +2646,59 @@ func (c *ChannelArbitrator) channelAttendant(bestHeight int32) {
// the ChainWatcher and BreachArbiter, we don't have to do
// anything in particular, so just advance our state and
// gracefully exit.
case <-c.cfg.ChainEvents.ContractBreach:
case breachInfo := <-c.cfg.ChainEvents.ContractBreach:
log.Infof("ChannelArbitrator(%v): remote party has "+
"breached channel!", c.cfg.ChanPoint)
// In the breach case, we'll only have anchor and
// breach resolutions.
contractRes := &ContractResolutions{
CommitHash: breachInfo.CommitHash,
BreachResolution: breachInfo.BreachResolution,
AnchorResolution: breachInfo.AnchorResolution,
}
// We'll transition to the ContractClosed state and log
// the set of resolutions such that they can be turned
// into resolvers later on. We'll also insert the
// CommitSet of the latest set of commitments.
err := c.log.LogContractResolutions(contractRes)
if err != nil {
log.Errorf("Unable to write resolutions: %v",
err)
return
}
err = c.log.InsertConfirmedCommitSet(
&breachInfo.CommitSet,
)
if err != nil {
log.Errorf("Unable to write commit set: %v",
err)
return
}
// The channel is finally marked pending closed here as
// the breacharbiter and channel arbitrator have
// persisted the relevant states.
closeSummary := &breachInfo.CloseSummary
err = c.cfg.MarkChannelClosed(
closeSummary,
channeldb.ChanStatusRemoteCloseInitiator,
)
if err != nil {
log.Errorf("Unable to mark channel closed: %v",
err)
return
}
log.Infof("Breached channel=%v marked pending-closed",
breachInfo.BreachResolution.FundingOutPoint)
// We'll advance our state machine until it reaches a
// terminal state.
_, _, err := c.advanceState(
uint32(bestHeight), breachCloseTrigger, nil,
_, _, err = c.advanceState(
uint32(bestHeight), breachCloseTrigger,
&breachInfo.CommitSet,
)
if err != nil {
log.Errorf("Unable to advance state: %v", err)
@ -2661,3 +2778,25 @@ func (c *ChannelArbitrator) channelAttendant(bestHeight int32) {
}
}
}
// checkLegacyBreach returns StateFullyResolved if the channel was closed with
// a breach transaction before the channel arbitrator launched its own breach
// resolver. StateContractClosed is returned if this is a modern breach close
// with a breach resolver. StateError is returned if the log lookup failed.
func (c *ChannelArbitrator) checkLegacyBreach() (ArbitratorState, error) {
// A previous version of the channel arbitrator would make the breach
// close skip to StateFullyResolved. If there are no contract
// resolutions in the bolt arbitrator log, then this is an older breach
// close. Otherwise, if there are resolutions, the state should advance
// to StateContractClosed.
_, err := c.log.FetchContractResolutions()
if err == errNoResolutions {
// This is an older breach close still in the database.
return StateFullyResolved, nil
} else if err != nil {
return StateError, err
}
// This is a modern breach close with resolvers.
return StateContractClosed, nil
}

View File

@ -205,6 +205,9 @@ type chanArbTestCtx struct {
log ArbitratorLog
sweeper *mockSweeper
breachSubscribed chan struct{}
breachResolutionChan chan struct{}
}
func (c *chanArbTestCtx) CleanUp() {
@ -303,13 +306,17 @@ func withMarkClosed(markClosed func(*channeldb.ChannelCloseSummary,
func createTestChannelArbitrator(t *testing.T, log ArbitratorLog,
opts ...testChanArbOption) (*chanArbTestCtx, error) {
chanArbCtx := &chanArbTestCtx{
breachSubscribed: make(chan struct{}),
}
chanPoint := wire.OutPoint{}
shortChanID := lnwire.ShortChannelID{}
chanEvents := &ChainEventSubscription{
RemoteUnilateralClosure: make(chan *RemoteUnilateralCloseInfo, 1),
LocalUnilateralClosure: make(chan *LocalUnilateralCloseInfo, 1),
CooperativeClosure: make(chan *CooperativeCloseInfo, 1),
ContractBreach: make(chan *lnwallet.BreachRetribution, 1),
ContractBreach: make(chan *BreachCloseInfo, 1),
}
resolutionChan := make(chan []ResolutionMsg, 1)
@ -346,6 +353,13 @@ func createTestChannelArbitrator(t *testing.T, log ArbitratorLog,
return true
},
SubscribeBreachComplete: func(op *wire.OutPoint,
c chan struct{}) (bool, error) {
chanArbCtx.breachResolutionChan = c
chanArbCtx.breachSubscribed <- struct{}{}
return false, nil
},
Clock: clock.NewDefaultClock(),
Sweeper: mockSweeper,
}
@ -425,16 +439,16 @@ func createTestChannelArbitrator(t *testing.T, log ArbitratorLog,
chanArb := NewChannelArbitrator(*arbCfg, htlcSets, log)
return &chanArbTestCtx{
t: t,
chanArb: chanArb,
cleanUp: cleanUp,
resolvedChan: resolvedChan,
resolutions: resolutionChan,
log: log,
incubationRequests: incubateChan,
sweeper: mockSweeper,
}, nil
chanArbCtx.t = t
chanArbCtx.chanArb = chanArb
chanArbCtx.cleanUp = cleanUp
chanArbCtx.resolvedChan = resolvedChan
chanArbCtx.resolutions = resolutionChan
chanArbCtx.log = log
chanArbCtx.incubationRequests = incubateChan
chanArbCtx.sweeper = mockSweeper
return chanArbCtx, nil
}
// TestChannelArbitratorCooperativeClose tests that the ChannelArbitertor
@ -661,11 +675,13 @@ func TestChannelArbitratorLocalForceClose(t *testing.T) {
// TestChannelArbitratorBreachClose tests that the ChannelArbitrator goes
// through the expected states in case we notice a breach in the chain, and
// gracefully exits.
// is able to properly progress the breachResolver and anchorResolver to a
// successful resolution.
func TestChannelArbitratorBreachClose(t *testing.T) {
log := &mockArbitratorLog{
state: StateDefault,
newStates: make(chan ArbitratorState, 5),
resolvers: make(map[ContractResolver]struct{}),
}
chanArbCtx, err := createTestChannelArbitrator(t, log)
@ -673,6 +689,8 @@ func TestChannelArbitratorBreachClose(t *testing.T) {
t.Fatalf("unable to create ChannelArbitrator: %v", err)
}
chanArb := chanArbCtx.chanArb
chanArb.cfg.PreimageDB = newMockWitnessBeacon()
chanArb.cfg.Registry = &mockRegistry{}
if err := chanArb.Start(nil); err != nil {
t.Fatalf("unable to start ChannelArbitrator: %v", err)
@ -686,13 +704,99 @@ func TestChannelArbitratorBreachClose(t *testing.T) {
// It should start out in the default state.
chanArbCtx.AssertState(StateDefault)
// Send a breach close event.
chanArb.cfg.ChainEvents.ContractBreach <- &lnwallet.BreachRetribution{}
// We create two HTLCs, one incoming and one outgoing. We will later
// assert that we only receive a ResolutionMsg for the outgoing HTLC.
outgoingIdx := uint64(2)
// It should transition StateDefault -> StateFullyResolved.
chanArbCtx.AssertStateTransitions(
StateFullyResolved,
)
rHash1 := [lntypes.PreimageSize]byte{1, 2, 3}
htlc1 := channeldb.HTLC{
RHash: rHash1,
OutputIndex: 2,
Incoming: false,
HtlcIndex: outgoingIdx,
LogIndex: 2,
}
rHash2 := [lntypes.PreimageSize]byte{2, 2, 2}
htlc2 := channeldb.HTLC{
RHash: rHash2,
OutputIndex: 3,
Incoming: true,
HtlcIndex: 3,
LogIndex: 3,
}
anchorRes := &lnwallet.AnchorResolution{
AnchorSignDescriptor: input.SignDescriptor{
Output: &wire.TxOut{Value: 1},
},
}
// Create the BreachCloseInfo that the chain_watcher would normally
// send to the channel_arbitrator.
breachInfo := &BreachCloseInfo{
BreachResolution: &BreachResolution{
FundingOutPoint: wire.OutPoint{},
},
AnchorResolution: anchorRes,
CommitSet: CommitSet{
ConfCommitKey: &RemoteHtlcSet,
HtlcSets: map[HtlcSetKey][]channeldb.HTLC{
RemoteHtlcSet: {htlc1, htlc2},
},
},
CommitHash: chainhash.Hash{},
}
// Send a breach close event.
chanArb.cfg.ChainEvents.ContractBreach <- breachInfo
// It should transition StateDefault -> StateContractClosed.
chanArbCtx.AssertStateTransitions(StateContractClosed)
// We should receive one ResolutionMsg as there was only one outgoing
// HTLC at the time of the breach.
select {
case res := <-chanArbCtx.resolutions:
require.Equal(t, 1, len(res))
require.Equal(t, outgoingIdx, res[0].HtlcIndex)
case <-time.After(5 * time.Second):
t.Fatal("expected to receive a resolution msg")
}
// We should now transition from StateContractClosed to
// StateWaitingFullResolution.
chanArbCtx.AssertStateTransitions(StateWaitingFullResolution)
// One of the resolvers should be an anchor resolver and the other
// should be a breach resolver.
require.Equal(t, 2, len(chanArb.activeResolvers))
var anchorExists, breachExists bool
for _, resolver := range chanArb.activeResolvers {
switch resolver.(type) {
case *anchorResolver:
anchorExists = true
case *breachResolver:
breachExists = true
default:
t.Fatalf("did not expect resolver %T", resolver)
}
}
require.True(t, anchorExists && breachExists)
// The anchor resolver is expected to re-offer the anchor input to the
// sweeper.
<-chanArbCtx.sweeper.sweptInputs
// Wait for SubscribeBreachComplete to be called.
<-chanArbCtx.breachSubscribed
// We'll now close the breach channel so that the state transitions to
// StateFullyResolved.
close(chanArbCtx.breachResolutionChan)
chanArbCtx.AssertStateTransitions(StateFullyResolved)
// It should also mark the channel as resolved.
select {
@ -1318,12 +1422,14 @@ func TestChannelArbitratorPersistence(t *testing.T) {
// TestChannelArbitratorForceCloseBreachedChannel tests that the channel
// arbitrator is able to handle a channel in the process of being force closed
// is breached by the remote node. In these cases we expect the
// ChannelArbitrator to gracefully exit, as the breach is handled by other
// subsystems.
// ChannelArbitrator to properly execute the breachResolver flow and then
// gracefully exit once the breachResolver receives the signal from what would
// normally be the breacharbiter.
func TestChannelArbitratorForceCloseBreachedChannel(t *testing.T) {
log := &mockArbitratorLog{
state: StateDefault,
newStates: make(chan ArbitratorState, 5),
resolvers: make(map[ContractResolver]struct{}),
}
chanArbCtx, err := createTestChannelArbitrator(t, log)
@ -1389,6 +1495,20 @@ func TestChannelArbitratorForceCloseBreachedChannel(t *testing.T) {
t.Fatalf("no response received")
}
// Before restarting, we'll need to modify the arbitrator log to have
// a set of contract resolutions and a commit set.
log.resolutions = &ContractResolutions{
BreachResolution: &BreachResolution{
FundingOutPoint: wire.OutPoint{},
},
}
log.commitSet = &CommitSet{
ConfCommitKey: &RemoteHtlcSet,
HtlcSets: map[HtlcSetKey][]channeldb.HTLC{
RemoteHtlcSet: {},
},
}
// We mimic that the channel is breached while the channel arbitrator
// is down. This means that on restart it will be started with a
// pending close channel, of type BreachClose.
@ -1402,7 +1522,18 @@ func TestChannelArbitratorForceCloseBreachedChannel(t *testing.T) {
}
defer chanArbCtx.CleanUp()
// Finally it should advance to StateFullyResolved.
// We should transition to StateContractClosed.
chanArbCtx.AssertStateTransitions(
StateContractClosed, StateWaitingFullResolution,
)
// Wait for SubscribeBreachComplete to be called.
<-chanArbCtx.breachSubscribed
// We'll close the breachResolutionChan to cleanup the breachResolver
// and make the state transition to StateFullyResolved.
close(chanArbCtx.breachResolutionChan)
chanArbCtx.AssertStateTransitions(StateFullyResolved)
// It should also mark the channel as resolved.

View File

@ -41,6 +41,9 @@ Postgres](https://github.com/lightningnetwork/lnd/pull/6111)
## Bug fixes
* [A new resolver for breach closes was introduced that handles sweeping
anchor outputs and failing back HTLCs.](https://github.com/lightningnetwork/lnd/pull/6158)
* [Return the nearest known fee rate when a given conf target cannot be found
from Web API fee estimator.](https://github.com/lightningnetwork/lnd/pull/6062)

View File

@ -1026,6 +1026,20 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
// breach events from the ChannelArbitrator to the breachArbiter,
contractBreaches := make(chan *contractcourt.ContractBreachEvent, 1)
s.breachArbiter = contractcourt.NewBreachArbiter(&contractcourt.BreachConfig{
CloseLink: closeLink,
DB: s.chanStateDB,
Estimator: s.cc.FeeEstimator,
GenSweepScript: newSweepPkScriptGen(cc.Wallet),
Notifier: cc.ChainNotifier,
PublishTransaction: cc.Wallet.PublishTransaction,
ContractBreaches: contractBreaches,
Signer: cc.Wallet.Cfg.Signer,
Store: contractcourt.NewRetributionStore(
dbs.ChanStateDB,
),
})
s.chainArb = contractcourt.NewChainArbitrator(contractcourt.ChainArbitratorConfig{
ChainHash: *s.cfg.ActiveNetParams.GenesisHash,
IncomingBroadcastDelta: lncfg.DefaultIncomingBroadcastDelta,
@ -1074,8 +1088,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
},
IsOurAddress: cc.Wallet.IsOurAddress,
ContractBreach: func(chanPoint wire.OutPoint,
breachRet *lnwallet.BreachRetribution,
markClosed func() error) error {
breachRet *lnwallet.BreachRetribution) error {
// processACK will handle the breachArbiter ACKing the
// event.
@ -1087,8 +1100,9 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
}
// If the breachArbiter successfully handled
// the event, we can mark the channel closed.
finalErr <- markClosed()
// the event, we can signal that the handoff
// was successful.
finalErr <- nil
}
event := &contractcourt.ContractBreachEvent{
@ -1104,9 +1118,8 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
return ErrServerShuttingDown
}
// We'll wait for a final error to be available, either
// from the breachArbiter or from our markClosed
// function closure.
// We'll wait for a final error to be available from
// the breachArbiter.
select {
case err := <-finalErr:
return err
@ -1125,22 +1138,9 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
PaymentsExpirationGracePeriod: cfg.PaymentsExpirationGracePeriod,
IsForwardedHTLC: s.htlcSwitch.IsForwardedHTLC,
Clock: clock.NewDefaultClock(),
SubscribeBreachComplete: s.breachArbiter.SubscribeBreachComplete,
}, dbs.ChanStateDB)
s.breachArbiter = contractcourt.NewBreachArbiter(&contractcourt.BreachConfig{
CloseLink: closeLink,
DB: s.chanStateDB,
Estimator: s.cc.FeeEstimator,
GenSweepScript: newSweepPkScriptGen(cc.Wallet),
Notifier: cc.ChainNotifier,
PublishTransaction: cc.Wallet.PublishTransaction,
ContractBreaches: contractBreaches,
Signer: cc.Wallet.Cfg.Signer,
Store: contractcourt.NewRetributionStore(
dbs.ChanStateDB,
),
})
// Select the configuration and furnding parameters for Bitcoin or
// Litecoin, depending on the primary registered chain.
primaryChain := cfg.registeredChains.PrimaryChain()