server+contractcourt: add breachResolver that subscribes to breacharbiter

Introduces a breachResolver that subscribes to the breacharbiter to
determine if the final justice transaction has confirmed and can
clean itself up.
This commit is contained in:
eugene 2022-01-13 14:45:10 -05:00
parent 234fdc6c9c
commit f99933fa69
4 changed files with 196 additions and 16 deletions

View File

@ -0,0 +1,121 @@
package contractcourt
import (
"encoding/binary"
"io"
"github.com/lightningnetwork/lnd/channeldb"
)
// breachResolver is a resolver that will handle breached closes. In the
// future, this will likely take over the duties the current breacharbiter has.
type breachResolver struct {
// resolved reflects if the contract has been fully resolved or not.
resolved bool
// subscribed denotes whether or not the breach resolver has subscribed
// to the breacharbiter for breach resolution.
subscribed bool
// replyChan is closed when the breach arbiter has completed serving
// justice.
replyChan chan struct{}
contractResolverKit
}
// newBreachResolver instantiates a new breach resolver.
func newBreachResolver(resCfg ResolverConfig) *breachResolver {
r := &breachResolver{
contractResolverKit: *newContractResolverKit(resCfg),
replyChan: make(chan struct{}),
}
r.initLogger(r)
return r
}
// ResolverKey returns the unique identifier for this resolver.
func (b *breachResolver) ResolverKey() []byte {
key := newResolverID(b.ChanPoint)
return key[:]
}
// Resolve queries the breacharbiter to see if the justice transaction has been
// broadcast.
func (b *breachResolver) Resolve() (ContractResolver, error) {
if !b.subscribed {
complete, err := b.SubscribeBreachComplete(
&b.ChanPoint, b.replyChan,
)
if err != nil {
return nil, err
}
// If the breach resolution process is already complete, then
// we can cleanup and checkpoint the resolved state.
if complete {
b.resolved = true
return nil, b.Checkpoint(b)
}
// Prevent duplicate subscriptions.
b.subscribed = true
}
select {
case <-b.replyChan:
// The replyChan has been closed, signalling that the breach
// has been fully resolved. Checkpoint the resolved state and
// exit.
b.resolved = true
return nil, b.Checkpoint(b)
case <-b.quit:
}
return nil, errResolverShuttingDown
}
// Stop signals the breachResolver to stop.
func (b *breachResolver) Stop() {
close(b.quit)
}
// IsResolved returns true if the breachResolver is fully resolved and cleanup
// can occur.
func (b *breachResolver) IsResolved() bool {
return b.resolved
}
// SupplementState adds additional state to the breachResolver.
func (b *breachResolver) SupplementState(_ *channeldb.OpenChannel) {
}
// Encode encodes the breachResolver to the passed writer.
func (b *breachResolver) Encode(w io.Writer) error {
return binary.Write(w, endian, b.resolved)
}
// newBreachResolverFromReader attempts to decode an encoded breachResolver
// from the passed Reader instance, returning an active breachResolver.
func newBreachResolverFromReader(r io.Reader, resCfg ResolverConfig) (
*breachResolver, error) {
b := &breachResolver{
contractResolverKit: *newContractResolverKit(resCfg),
replyChan: make(chan struct{}),
}
if err := binary.Read(r, endian, &b.resolved); err != nil {
return nil, err
}
b.initLogger(b)
return b, nil
}
// A compile time assertion to ensure breachResolver meets the ContractResolver
// interface.
var _ ContractResolver = (*breachResolver)(nil)

View File

@ -185,6 +185,8 @@ type BreachArbiter struct {
cfg *BreachConfig cfg *BreachConfig
subscriptions map[wire.OutPoint]chan struct{}
quit chan struct{} quit chan struct{}
wg sync.WaitGroup wg sync.WaitGroup
sync.Mutex sync.Mutex
@ -194,8 +196,9 @@ type BreachArbiter struct {
// its dependent objects. // its dependent objects.
func NewBreachArbiter(cfg *BreachConfig) *BreachArbiter { func NewBreachArbiter(cfg *BreachConfig) *BreachArbiter {
return &BreachArbiter{ return &BreachArbiter{
cfg: cfg, cfg: cfg,
quit: make(chan struct{}), subscriptions: make(map[wire.OutPoint]chan struct{}),
quit: make(chan struct{}),
} }
} }
@ -322,6 +325,47 @@ func (b *BreachArbiter) IsBreached(chanPoint *wire.OutPoint) (bool, error) {
return b.cfg.Store.IsBreached(chanPoint) return b.cfg.Store.IsBreached(chanPoint)
} }
// SubscribeBreachComplete is used by outside subsystems to be notified of a
// successful breach resolution.
func (b *BreachArbiter) SubscribeBreachComplete(chanPoint *wire.OutPoint,
c chan struct{}) (bool, error) {
breached, err := b.cfg.Store.IsBreached(chanPoint)
if err != nil {
// If an error occurs, no subscription will be registered.
return false, err
}
if !breached {
// If chanPoint no longer exists in the Store, then the breach
// was cleaned up successfully. Any subscription that occurs
// happens after the breach information was persisted to the
// underlying store.
return true, nil
}
// Otherwise since the channel point is not resolved, add a
// subscription. There can only be one subscription per channel point.
b.Lock()
defer b.Unlock()
b.subscriptions[*chanPoint] = c
return false, nil
}
// notifyBreachComplete is used by the BreachArbiter to notify outside
// subsystems that the breach resolution process is complete.
func (b *BreachArbiter) notifyBreachComplete(chanPoint *wire.OutPoint) {
b.Lock()
defer b.Unlock()
if c, ok := b.subscriptions[*chanPoint]; ok {
close(c)
}
// Remove the subscription.
delete(b.subscriptions, *chanPoint)
}
// contractObserver is the primary goroutine for the BreachArbiter. This // contractObserver is the primary goroutine for the BreachArbiter. This
// goroutine is responsible for handling breach events coming from the // goroutine is responsible for handling breach events coming from the
// contractcourt on the ContractBreaches channel. If a channel breach is // contractcourt on the ContractBreaches channel. If a channel breach is
@ -857,6 +901,14 @@ func (b *BreachArbiter) cleanupBreach(chanPoint *wire.OutPoint) error {
err) err)
} }
// This is after the Remove call so that the chan passed in via
// SubscribeBreachComplete is always notified, no matter when it is
// called. Otherwise, if notifyBreachComplete was before Remove, a
// very rare edge case could occur in which SubscribeBreachComplete
// is called after notifyBreachComplete and before Remove, meaning the
// caller would never be notified.
b.notifyBreachComplete(chanPoint)
return nil return nil
} }

View File

@ -183,6 +183,12 @@ type ChainArbitratorConfig struct {
// Clock is the clock implementation that ChannelArbitrator uses. // Clock is the clock implementation that ChannelArbitrator uses.
// It is useful for testing. // It is useful for testing.
Clock clock.Clock Clock clock.Clock
// SubscribeBreachComplete is used by the breachResolver to register a
// subscription that notifies when the breach resolution process is
// complete.
SubscribeBreachComplete func(op *wire.OutPoint, c chan struct{}) (
bool, error)
} }
// ChainArbitrator is a sub-system that oversees the on-chain resolution of all // ChainArbitrator is a sub-system that oversees the on-chain resolution of all

View File

@ -1026,6 +1026,20 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
// breach events from the ChannelArbitrator to the breachArbiter, // breach events from the ChannelArbitrator to the breachArbiter,
contractBreaches := make(chan *contractcourt.ContractBreachEvent, 1) contractBreaches := make(chan *contractcourt.ContractBreachEvent, 1)
s.breachArbiter = contractcourt.NewBreachArbiter(&contractcourt.BreachConfig{
CloseLink: closeLink,
DB: s.chanStateDB,
Estimator: s.cc.FeeEstimator,
GenSweepScript: newSweepPkScriptGen(cc.Wallet),
Notifier: cc.ChainNotifier,
PublishTransaction: cc.Wallet.PublishTransaction,
ContractBreaches: contractBreaches,
Signer: cc.Wallet.Cfg.Signer,
Store: contractcourt.NewRetributionStore(
dbs.ChanStateDB,
),
})
s.chainArb = contractcourt.NewChainArbitrator(contractcourt.ChainArbitratorConfig{ s.chainArb = contractcourt.NewChainArbitrator(contractcourt.ChainArbitratorConfig{
ChainHash: *s.cfg.ActiveNetParams.GenesisHash, ChainHash: *s.cfg.ActiveNetParams.GenesisHash,
IncomingBroadcastDelta: lncfg.DefaultIncomingBroadcastDelta, IncomingBroadcastDelta: lncfg.DefaultIncomingBroadcastDelta,
@ -1125,22 +1139,9 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
PaymentsExpirationGracePeriod: cfg.PaymentsExpirationGracePeriod, PaymentsExpirationGracePeriod: cfg.PaymentsExpirationGracePeriod,
IsForwardedHTLC: s.htlcSwitch.IsForwardedHTLC, IsForwardedHTLC: s.htlcSwitch.IsForwardedHTLC,
Clock: clock.NewDefaultClock(), Clock: clock.NewDefaultClock(),
SubscribeBreachComplete: s.breachArbiter.SubscribeBreachComplete,
}, dbs.ChanStateDB) }, dbs.ChanStateDB)
s.breachArbiter = contractcourt.NewBreachArbiter(&contractcourt.BreachConfig{
CloseLink: closeLink,
DB: s.chanStateDB,
Estimator: s.cc.FeeEstimator,
GenSweepScript: newSweepPkScriptGen(cc.Wallet),
Notifier: cc.ChainNotifier,
PublishTransaction: cc.Wallet.PublishTransaction,
ContractBreaches: contractBreaches,
Signer: cc.Wallet.Cfg.Signer,
Store: contractcourt.NewRetributionStore(
dbs.ChanStateDB,
),
})
// Select the configuration and furnding parameters for Bitcoin or // Select the configuration and furnding parameters for Bitcoin or
// Litecoin, depending on the primary registered chain. // Litecoin, depending on the primary registered chain.
primaryChain := cfg.registeredChains.PrimaryChain() primaryChain := cfg.registeredChains.PrimaryChain()