lnd/contractcourt/chain_arbitrator.go
2024-10-02 18:10:00 -07:00

1361 lines
43 KiB
Go

package contractcourt
import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcwallet/walletdb"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channeldb/models"
"github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/fn"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/labels"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
"github.com/lightningnetwork/lnd/lnwire"
)
// ErrChainArbExiting signals that the chain arbitrator is shutting down.
var ErrChainArbExiting = errors.New("ChainArbitrator exiting")
// ResolutionMsg is a message sent by resolvers to outside sub-systems once an
// outgoing contract has been fully resolved. For multi-hop contracts, if we
// resolve the outgoing contract, we'll also need to ensure that the incoming
// contract is resolved as well. We package the items required to resolve the
// incoming contracts within this message.
type ResolutionMsg struct {
// SourceChan identifies the channel that this message is being sent
// from. This is the channel's short channel ID.
SourceChan lnwire.ShortChannelID
// HtlcIndex is the index of the contract within the original
// commitment trace.
HtlcIndex uint64
// Failure will be non-nil if the incoming contract should be canceled
// all together. This can happen if the outgoing contract was dust, if
// if the outgoing HTLC timed out.
Failure lnwire.FailureMessage
// PreImage will be non-nil if the incoming contract can successfully
// be redeemed. This can happen if we learn of the preimage from the
// outgoing HTLC on-chain.
PreImage *[32]byte
}
// ChainArbitratorConfig is a configuration struct that contains all the
// function closures and interface that required to arbitrate on-chain
// contracts for a particular chain.
type ChainArbitratorConfig struct {
// ChainHash is the chain that this arbitrator is to operate within.
ChainHash chainhash.Hash
// IncomingBroadcastDelta is the delta that we'll use to decide when to
// broadcast our commitment transaction if we have incoming htlcs. This
// value should be set based on our current fee estimation of the
// commitment transaction. We use this to determine when we should
// broadcast instead of just the HTLC timeout, as we want to ensure
// that the commitment transaction is already confirmed, by the time the
// HTLC expires. Otherwise we may end up not settling the htlc on-chain
// because the other party managed to time it out.
IncomingBroadcastDelta uint32
// OutgoingBroadcastDelta is the delta that we'll use to decide when to
// broadcast our commitment transaction if there are active outgoing
// htlcs. This value can be lower than the incoming broadcast delta.
OutgoingBroadcastDelta uint32
// NewSweepAddr is a function that returns a new address under control
// by the wallet. We'll use this to sweep any no-delay outputs as a
// result of unilateral channel closes.
//
// NOTE: This SHOULD return a p2wkh script.
NewSweepAddr func() ([]byte, error)
// PublishTx reliably broadcasts a transaction to the network. Once
// this function exits without an error, then they transaction MUST
// continually be rebroadcast if needed.
PublishTx func(*wire.MsgTx, string) error
// DeliverResolutionMsg is a function that will append an outgoing
// message to the "out box" for a ChannelLink. This is used to cancel
// backwards any HTLC's that are either dust, we're timing out, or
// settling on-chain to the incoming link.
DeliverResolutionMsg func(...ResolutionMsg) error
// MarkLinkInactive is a function closure that the ChainArbitrator will
// use to mark that active HTLC's shouldn't be attempted to be routed
// over a particular channel. This function will be called in that a
// ChannelArbitrator decides that it needs to go to chain in order to
// resolve contracts.
//
// TODO(roasbeef): rename, routing based
MarkLinkInactive func(wire.OutPoint) error
// ContractBreach is a function closure that the ChainArbitrator will
// use to notify the BreachArbitrator about a contract breach. It should
// only return a non-nil error when the BreachArbitrator has preserved
// the necessary breach info for this channel point. Once the breach
// resolution is persisted in the ChannelArbitrator, it will be safe
// to mark the channel closed.
ContractBreach func(wire.OutPoint, *lnwallet.BreachRetribution) error
// IsOurAddress is a function that returns true if the passed address
// is known to the underlying wallet. Otherwise, false should be
// returned.
IsOurAddress func(btcutil.Address) bool
// IncubateOutputs sends either an incoming HTLC, an outgoing HTLC, or
// both to the utxo nursery. Once this function returns, the nursery
// should have safely persisted the outputs to disk, and should start
// the process of incubation. This is used when a resolver wishes to
// pass off the output to the nursery as we're only waiting on an
// absolute/relative item block.
IncubateOutputs func(wire.OutPoint,
fn.Option[lnwallet.OutgoingHtlcResolution],
fn.Option[lnwallet.IncomingHtlcResolution],
uint32, fn.Option[int32]) error
// PreimageDB is a global store of all known pre-images. We'll use this
// to decide if we should broadcast a commitment transaction to claim
// an HTLC on-chain.
PreimageDB WitnessBeacon
// Notifier is an instance of a chain notifier we'll use to watch for
// certain on-chain events.
Notifier chainntnfs.ChainNotifier
// Mempool is the a mempool watcher that allows us to watch for events
// happened in mempool.
Mempool chainntnfs.MempoolWatcher
// Signer is a signer backed by the active lnd node. This should be
// capable of producing a signature as specified by a valid
// SignDescriptor.
Signer input.Signer
// FeeEstimator will be used to return fee estimates.
FeeEstimator chainfee.Estimator
// ChainIO allows us to query the state of the current main chain.
ChainIO lnwallet.BlockChainIO
// DisableChannel disables a channel, resulting in it not being able to
// forward payments.
DisableChannel func(wire.OutPoint) error
// Sweeper allows resolvers to sweep their final outputs.
Sweeper UtxoSweeper
// Registry is the invoice database that is used by resolvers to lookup
// preimages and settle invoices.
Registry Registry
// NotifyClosedChannel is a function closure that the ChainArbitrator
// will use to notify the ChannelNotifier about a newly closed channel.
NotifyClosedChannel func(wire.OutPoint)
// NotifyFullyResolvedChannel is a function closure that the
// ChainArbitrator will use to notify the ChannelNotifier about a newly
// resolved channel. The main difference to NotifyClosedChannel is that
// in case of a local force close the NotifyClosedChannel is called when
// the published commitment transaction confirms while
// NotifyFullyResolvedChannel is only called when the channel is fully
// resolved (which includes sweeping any time locked funds).
NotifyFullyResolvedChannel func(point wire.OutPoint)
// OnionProcessor is used to decode onion payloads for on-chain
// resolution.
OnionProcessor OnionProcessor
// PaymentsExpirationGracePeriod indicates a time window we let the
// other node to cancel an outgoing htlc that our node has initiated and
// has timed out.
PaymentsExpirationGracePeriod time.Duration
// IsForwardedHTLC checks for a given htlc, identified by channel id and
// htlcIndex, if it is a forwarded one.
IsForwardedHTLC func(chanID lnwire.ShortChannelID, htlcIndex uint64) bool
// Clock is the clock implementation that ChannelArbitrator uses.
// It is useful for testing.
Clock clock.Clock
// SubscribeBreachComplete is used by the breachResolver to register a
// subscription that notifies when the breach resolution process is
// complete.
SubscribeBreachComplete func(op *wire.OutPoint, c chan struct{}) (
bool, error)
// PutFinalHtlcOutcome stores the final outcome of an htlc in the
// database.
PutFinalHtlcOutcome func(chanId lnwire.ShortChannelID,
htlcId uint64, settled bool) error
// HtlcNotifier is an interface that htlc events are sent to.
HtlcNotifier HtlcNotifier
// Budget is the configured budget for the arbitrator.
Budget BudgetConfig
// QueryIncomingCircuit is used to find the outgoing HTLC's
// corresponding incoming HTLC circuit. It queries the circuit map for
// a given outgoing circuit key and returns the incoming circuit key.
//
// TODO(yy): this is a hacky way to get around the cycling import issue
// as we cannot import `htlcswitch` here. A proper way is to define an
// interface here that asks for method `LookupOpenCircuit`,
// meanwhile, turn `PaymentCircuit` into an interface or bring it to a
// lower package.
QueryIncomingCircuit func(circuit models.CircuitKey) *models.CircuitKey
// AuxLeafStore is an optional store that can be used to store auxiliary
// leaves for certain custom channel types.
AuxLeafStore fn.Option[lnwallet.AuxLeafStore]
// AuxSigner is an optional signer that can be used to sign auxiliary
// leaves for certain custom channel types.
AuxSigner fn.Option[lnwallet.AuxSigner]
// AuxResolver is an optional interface that can be used to modify the
// way contracts are resolved.
AuxResolver fn.Option[lnwallet.AuxContractResolver]
}
// ChainArbitrator is a sub-system that oversees the on-chain resolution of all
// active, and channel that are in the "pending close" state. Within the
// contractcourt package, the ChainArbitrator manages a set of active
// ContractArbitrators. Each ContractArbitrators is responsible for watching
// the chain for any activity that affects the state of the channel, and also
// for monitoring each contract in order to determine if any on-chain activity is
// required. Outside sub-systems interact with the ChainArbitrator in order to
// forcibly exit a contract, update the set of live signals for each contract,
// and to receive reports on the state of contract resolution.
type ChainArbitrator struct {
started int32 // To be used atomically.
stopped int32 // To be used atomically.
sync.Mutex
// activeChannels is a map of all the active contracts that are still
// open, and not fully resolved.
activeChannels map[wire.OutPoint]*ChannelArbitrator
// activeWatchers is a map of all the active chainWatchers for channels
// that are still considered open.
activeWatchers map[wire.OutPoint]*chainWatcher
// cfg is the config struct for the arbitrator that contains all
// methods and interface it needs to operate.
cfg ChainArbitratorConfig
// chanSource will be used by the ChainArbitrator to fetch all the
// active channels that it must still watch over.
chanSource *channeldb.DB
quit chan struct{}
wg sync.WaitGroup
}
// NewChainArbitrator returns a new instance of the ChainArbitrator using the
// passed config struct, and backing persistent database.
func NewChainArbitrator(cfg ChainArbitratorConfig,
db *channeldb.DB) *ChainArbitrator {
return &ChainArbitrator{
cfg: cfg,
activeChannels: make(map[wire.OutPoint]*ChannelArbitrator),
activeWatchers: make(map[wire.OutPoint]*chainWatcher),
chanSource: db,
quit: make(chan struct{}),
}
}
// arbChannel is a wrapper around an open channel that channel arbitrators
// interact with.
type arbChannel struct {
// channel is the in-memory channel state.
channel *channeldb.OpenChannel
// c references the chain arbitrator and is used by arbChannel
// internally.
c *ChainArbitrator
}
// NewAnchorResolutions returns the anchor resolutions for currently valid
// commitment transactions.
//
// NOTE: Part of the ArbChannel interface.
func (a *arbChannel) NewAnchorResolutions() (*lnwallet.AnchorResolutions,
error) {
// Get a fresh copy of the database state to base the anchor resolutions
// on. Unfortunately the channel instance that we have here isn't the
// same instance that is used by the link.
chanPoint := a.channel.FundingOutpoint
channel, err := a.c.chanSource.ChannelStateDB().FetchChannel(
nil, chanPoint,
)
if err != nil {
return nil, err
}
var chanOpts []lnwallet.ChannelOpt
a.c.cfg.AuxLeafStore.WhenSome(func(s lnwallet.AuxLeafStore) {
chanOpts = append(chanOpts, lnwallet.WithLeafStore(s))
})
a.c.cfg.AuxSigner.WhenSome(func(s lnwallet.AuxSigner) {
chanOpts = append(chanOpts, lnwallet.WithAuxSigner(s))
})
a.c.cfg.AuxResolver.WhenSome(func(s lnwallet.AuxContractResolver) {
chanOpts = append(chanOpts, lnwallet.WithAuxResolver(s))
})
chanMachine, err := lnwallet.NewLightningChannel(
a.c.cfg.Signer, channel, nil, chanOpts...,
)
if err != nil {
return nil, err
}
return chanMachine.NewAnchorResolutions()
}
// ForceCloseChan should force close the contract that this attendant is
// watching over. We'll use this when we decide that we need to go to chain. It
// should in addition tell the switch to remove the corresponding link, such
// that we won't accept any new updates. The returned summary contains all items
// needed to eventually resolve all outputs on chain.
//
// NOTE: Part of the ArbChannel interface.
func (a *arbChannel) ForceCloseChan() (*lnwallet.LocalForceCloseSummary, error) {
// First, we mark the channel as borked, this ensure
// that no new state transitions can happen, and also
// that the link won't be loaded into the switch.
if err := a.channel.MarkBorked(); err != nil {
return nil, err
}
// With the channel marked as borked, we'll now remove
// the link from the switch if its there. If the link
// is active, then this method will block until it
// exits.
chanPoint := a.channel.FundingOutpoint
if err := a.c.cfg.MarkLinkInactive(chanPoint); err != nil {
log.Errorf("unable to mark link inactive: %v", err)
}
// Now that we know the link can't mutate the channel
// state, we'll read the channel from disk the target
// channel according to its channel point.
channel, err := a.c.chanSource.ChannelStateDB().FetchChannel(
nil, chanPoint,
)
if err != nil {
return nil, err
}
var chanOpts []lnwallet.ChannelOpt
a.c.cfg.AuxLeafStore.WhenSome(func(s lnwallet.AuxLeafStore) {
chanOpts = append(chanOpts, lnwallet.WithLeafStore(s))
})
a.c.cfg.AuxSigner.WhenSome(func(s lnwallet.AuxSigner) {
chanOpts = append(chanOpts, lnwallet.WithAuxSigner(s))
})
a.c.cfg.AuxResolver.WhenSome(func(s lnwallet.AuxContractResolver) {
chanOpts = append(chanOpts, lnwallet.WithAuxResolver(s))
})
// Finally, we'll force close the channel completing
// the force close workflow.
chanMachine, err := lnwallet.NewLightningChannel(
a.c.cfg.Signer, channel, nil, chanOpts...,
)
if err != nil {
return nil, err
}
return chanMachine.ForceClose()
}
// newActiveChannelArbitrator creates a new instance of an active channel
// arbitrator given the state of the target channel.
func newActiveChannelArbitrator(channel *channeldb.OpenChannel,
c *ChainArbitrator, chanEvents *ChainEventSubscription) (*ChannelArbitrator, error) {
// TODO(roasbeef): fetch best height (or pass in) so can ensure block
// epoch delivers all the notifications to
chanPoint := channel.FundingOutpoint
log.Tracef("Creating ChannelArbitrator for ChannelPoint(%v)", chanPoint)
// Next we'll create the matching configuration struct that contains
// all interfaces and methods the arbitrator needs to do its job.
arbCfg := ChannelArbitratorConfig{
ChanPoint: chanPoint,
Channel: c.getArbChannel(channel),
ShortChanID: channel.ShortChanID(),
MarkCommitmentBroadcasted: channel.MarkCommitmentBroadcasted,
MarkChannelClosed: func(summary *channeldb.ChannelCloseSummary,
statuses ...channeldb.ChannelStatus) error {
err := channel.CloseChannel(summary, statuses...)
if err != nil {
return err
}
c.cfg.NotifyClosedChannel(summary.ChanPoint)
return nil
},
IsPendingClose: false,
ChainArbitratorConfig: c.cfg,
ChainEvents: chanEvents,
PutResolverReport: func(tx kvdb.RwTx,
report *channeldb.ResolverReport) error {
return c.chanSource.PutResolverReport(
tx, c.cfg.ChainHash, &chanPoint, report,
)
},
FetchHistoricalChannel: func() (*channeldb.OpenChannel, error) {
chanStateDB := c.chanSource.ChannelStateDB()
return chanStateDB.FetchHistoricalChannel(&chanPoint)
},
FindOutgoingHTLCDeadline: func(
htlc channeldb.HTLC) fn.Option[int32] {
return c.FindOutgoingHTLCDeadline(
channel.ShortChanID(), htlc,
)
},
}
// The final component needed is an arbitrator log that the arbitrator
// will use to keep track of its internal state using a backed
// persistent log.
//
// TODO(roasbeef); abstraction leak...
// * rework: adaptor method to set log scope w/ factory func
chanLog, err := newBoltArbitratorLog(
c.chanSource.Backend, arbCfg, c.cfg.ChainHash, chanPoint,
)
if err != nil {
return nil, err
}
arbCfg.MarkChannelResolved = func() error {
if c.cfg.NotifyFullyResolvedChannel != nil {
c.cfg.NotifyFullyResolvedChannel(chanPoint)
}
return c.ResolveContract(chanPoint)
}
// Finally, we'll need to construct a series of htlc Sets based on all
// currently known valid commitments.
htlcSets := make(map[HtlcSetKey]htlcSet)
htlcSets[LocalHtlcSet] = newHtlcSet(channel.LocalCommitment.Htlcs)
htlcSets[RemoteHtlcSet] = newHtlcSet(channel.RemoteCommitment.Htlcs)
pendingRemoteCommitment, err := channel.RemoteCommitChainTip()
if err != nil && err != channeldb.ErrNoPendingCommit {
return nil, err
}
if pendingRemoteCommitment != nil {
htlcSets[RemotePendingHtlcSet] = newHtlcSet(
pendingRemoteCommitment.Commitment.Htlcs,
)
}
return NewChannelArbitrator(
arbCfg, htlcSets, chanLog,
), nil
}
// getArbChannel returns an open channel wrapper for use by channel arbitrators.
func (c *ChainArbitrator) getArbChannel(
channel *channeldb.OpenChannel) *arbChannel {
return &arbChannel{
channel: channel,
c: c,
}
}
// ResolveContract marks a contract as fully resolved within the database.
// This is only to be done once all contracts which were live on the channel
// before hitting the chain have been resolved.
func (c *ChainArbitrator) ResolveContract(chanPoint wire.OutPoint) error {
log.Infof("Marking ChannelPoint(%v) fully resolved", chanPoint)
// First, we'll we'll mark the channel as fully closed from the PoV of
// the channel source.
err := c.chanSource.ChannelStateDB().MarkChanFullyClosed(&chanPoint)
if err != nil {
log.Errorf("ChainArbitrator: unable to mark ChannelPoint(%v) "+
"fully closed: %v", chanPoint, err)
return err
}
// Now that the channel has been marked as fully closed, we'll stop
// both the channel arbitrator and chain watcher for this channel if
// they're still active.
var arbLog ArbitratorLog
c.Lock()
chainArb := c.activeChannels[chanPoint]
delete(c.activeChannels, chanPoint)
chainWatcher := c.activeWatchers[chanPoint]
delete(c.activeWatchers, chanPoint)
c.Unlock()
if chainArb != nil {
arbLog = chainArb.log
if err := chainArb.Stop(); err != nil {
log.Warnf("unable to stop ChannelArbitrator(%v): %v",
chanPoint, err)
}
}
if chainWatcher != nil {
if err := chainWatcher.Stop(); err != nil {
log.Warnf("unable to stop ChainWatcher(%v): %v",
chanPoint, err)
}
}
// Once this has been marked as resolved, we'll wipe the log that the
// channel arbitrator was using to store its persistent state. We do
// this after marking the channel resolved, as otherwise, the
// arbitrator would be re-created, and think it was starting from the
// default state.
if arbLog != nil {
if err := arbLog.WipeHistory(); err != nil {
return err
}
}
return nil
}
// Start launches all goroutines that the ChainArbitrator needs to operate.
func (c *ChainArbitrator) Start() error {
if !atomic.CompareAndSwapInt32(&c.started, 0, 1) {
return nil
}
log.Infof("ChainArbitrator starting with config: budget=[%v]",
&c.cfg.Budget)
// First, we'll fetch all the channels that are still open, in order to
// collect them within our set of active contracts.
openChannels, err := c.chanSource.ChannelStateDB().FetchAllChannels()
if err != nil {
return err
}
if len(openChannels) > 0 {
log.Infof("Creating ChannelArbitrators for %v active channels",
len(openChannels))
}
// For each open channel, we'll configure then launch a corresponding
// ChannelArbitrator.
for _, channel := range openChannels {
chanPoint := channel.FundingOutpoint
channel := channel
// First, we'll create an active chainWatcher for this channel
// to ensure that we detect any relevant on chain events.
breachClosure := func(ret *lnwallet.BreachRetribution) error {
return c.cfg.ContractBreach(chanPoint, ret)
}
chainWatcher, err := newChainWatcher(
chainWatcherConfig{
chanState: channel,
notifier: c.cfg.Notifier,
signer: c.cfg.Signer,
isOurAddr: c.cfg.IsOurAddress,
contractBreach: breachClosure,
extractStateNumHint: lnwallet.GetStateNumHint,
auxLeafStore: c.cfg.AuxLeafStore,
auxResolver: c.cfg.AuxResolver,
},
)
if err != nil {
return err
}
c.activeWatchers[chanPoint] = chainWatcher
channelArb, err := newActiveChannelArbitrator(
channel, c, chainWatcher.SubscribeChannelEvents(),
)
if err != nil {
return err
}
c.activeChannels[chanPoint] = channelArb
// Republish any closing transactions for this channel.
err = c.republishClosingTxs(channel)
if err != nil {
log.Errorf("Failed to republish closing txs for "+
"channel %v", chanPoint)
}
}
// In addition to the channels that we know to be open, we'll also
// launch arbitrators to finishing resolving any channels that are in
// the pending close state.
closingChannels, err := c.chanSource.ChannelStateDB().FetchClosedChannels(
true,
)
if err != nil {
return err
}
if len(closingChannels) > 0 {
log.Infof("Creating ChannelArbitrators for %v closing channels",
len(closingChannels))
}
// Next, for each channel is the closing state, we'll launch a
// corresponding more restricted resolver, as we don't have to watch
// the chain any longer, only resolve the contracts on the confirmed
// commitment.
//nolint:lll
for _, closeChanInfo := range closingChannels {
// We can leave off the CloseContract and ForceCloseChan
// methods as the channel is already closed at this point.
chanPoint := closeChanInfo.ChanPoint
arbCfg := ChannelArbitratorConfig{
ChanPoint: chanPoint,
ShortChanID: closeChanInfo.ShortChanID,
ChainArbitratorConfig: c.cfg,
ChainEvents: &ChainEventSubscription{},
IsPendingClose: true,
ClosingHeight: closeChanInfo.CloseHeight,
CloseType: closeChanInfo.CloseType,
PutResolverReport: func(tx kvdb.RwTx,
report *channeldb.ResolverReport) error {
return c.chanSource.PutResolverReport(
tx, c.cfg.ChainHash, &chanPoint, report,
)
},
FetchHistoricalChannel: func() (*channeldb.OpenChannel, error) {
chanStateDB := c.chanSource.ChannelStateDB()
return chanStateDB.FetchHistoricalChannel(&chanPoint)
},
FindOutgoingHTLCDeadline: func(
htlc channeldb.HTLC) fn.Option[int32] {
return c.FindOutgoingHTLCDeadline(
closeChanInfo.ShortChanID, htlc,
)
},
}
chanLog, err := newBoltArbitratorLog(
c.chanSource.Backend, arbCfg, c.cfg.ChainHash, chanPoint,
)
if err != nil {
return err
}
arbCfg.MarkChannelResolved = func() error {
if c.cfg.NotifyFullyResolvedChannel != nil {
c.cfg.NotifyFullyResolvedChannel(chanPoint)
}
return c.ResolveContract(chanPoint)
}
// We create an empty map of HTLC's here since it's possible
// that the channel is in StateDefault and updateActiveHTLCs is
// called. We want to avoid writing to an empty map. Since the
// channel is already in the process of being resolved, no new
// HTLCs will be added.
c.activeChannels[chanPoint] = NewChannelArbitrator(
arbCfg, make(map[HtlcSetKey]htlcSet), chanLog,
)
}
// Now, we'll start all chain watchers in parallel to shorten start up
// duration. In neutrino mode, this allows spend registrations to take
// advantage of batch spend reporting, instead of doing a single rescan
// per chain watcher.
//
// NOTE: After this point, we Stop the chain arb to ensure that any
// lingering goroutines are cleaned up before exiting.
watcherErrs := make(chan error, len(c.activeWatchers))
var wg sync.WaitGroup
for _, watcher := range c.activeWatchers {
wg.Add(1)
go func(w *chainWatcher) {
defer wg.Done()
select {
case watcherErrs <- w.Start():
case <-c.quit:
watcherErrs <- ErrChainArbExiting
}
}(watcher)
}
// Once all chain watchers have been started, seal the err chan to
// signal the end of the err stream.
go func() {
wg.Wait()
close(watcherErrs)
}()
// stopAndLog is a helper function which shuts down the chain arb and
// logs errors if they occur.
stopAndLog := func() {
if err := c.Stop(); err != nil {
log.Errorf("ChainArbitrator could not shutdown: %v", err)
}
}
// Handle all errors returned from spawning our chain watchers. If any
// of them failed, we will stop the chain arb to shutdown any active
// goroutines.
for err := range watcherErrs {
if err != nil {
stopAndLog()
return err
}
}
// Before we start all of our arbitrators, we do a preliminary state
// lookup so that we can combine all of these lookups in a single db
// transaction.
var startStates map[wire.OutPoint]*chanArbStartState
err = kvdb.View(c.chanSource, func(tx walletdb.ReadTx) error {
for _, arbitrator := range c.activeChannels {
startState, err := arbitrator.getStartState(tx)
if err != nil {
return err
}
startStates[arbitrator.cfg.ChanPoint] = startState
}
return nil
}, func() {
startStates = make(
map[wire.OutPoint]*chanArbStartState,
len(c.activeChannels),
)
})
if err != nil {
stopAndLog()
return err
}
// Launch all the goroutines for each arbitrator so they can carry out
// their duties.
for _, arbitrator := range c.activeChannels {
startState, ok := startStates[arbitrator.cfg.ChanPoint]
if !ok {
stopAndLog()
return fmt.Errorf("arbitrator: %v has no start state",
arbitrator.cfg.ChanPoint)
}
if err := arbitrator.Start(startState); err != nil {
stopAndLog()
return err
}
}
// Subscribe to a single stream of block epoch notifications that we
// will dispatch to all active arbitrators.
blockEpoch, err := c.cfg.Notifier.RegisterBlockEpochNtfn(nil)
if err != nil {
return err
}
// Start our goroutine which will dispatch blocks to each arbitrator.
c.wg.Add(1)
go func() {
defer c.wg.Done()
c.dispatchBlocks(blockEpoch)
}()
// TODO(roasbeef): eventually move all breach watching here
return nil
}
// blockRecipient contains the information we need to dispatch a block to a
// channel arbitrator.
type blockRecipient struct {
// chanPoint is the funding outpoint of the channel.
chanPoint wire.OutPoint
// blocks is the channel that new block heights are sent into. This
// channel should be sufficiently buffered as to not block the sender.
blocks chan<- int32
// quit is closed if the receiving entity is shutting down.
quit chan struct{}
}
// dispatchBlocks consumes a block epoch notification stream and dispatches
// blocks to each of the chain arb's active channel arbitrators. This function
// must be run in a goroutine.
func (c *ChainArbitrator) dispatchBlocks(
blockEpoch *chainntnfs.BlockEpochEvent) {
// getRecipients is a helper function which acquires the chain arb
// lock and returns a set of block recipients which can be used to
// dispatch blocks.
getRecipients := func() []blockRecipient {
c.Lock()
blocks := make([]blockRecipient, 0, len(c.activeChannels))
for _, channel := range c.activeChannels {
blocks = append(blocks, blockRecipient{
chanPoint: channel.cfg.ChanPoint,
blocks: channel.blocks,
quit: channel.quit,
})
}
c.Unlock()
return blocks
}
// On exit, cancel our blocks subscription and close each block channel
// so that the arbitrators know they will no longer be receiving blocks.
defer func() {
blockEpoch.Cancel()
recipients := getRecipients()
for _, recipient := range recipients {
close(recipient.blocks)
}
}()
// Consume block epochs until we receive the instruction to shutdown.
for {
select {
// Consume block epochs, exiting if our subscription is
// terminated.
case block, ok := <-blockEpoch.Epochs:
if !ok {
log.Trace("dispatchBlocks block epoch " +
"cancelled")
return
}
// Get the set of currently active channels block
// subscription channels and dispatch the block to
// each.
for _, recipient := range getRecipients() {
select {
// Deliver the block to the arbitrator.
case recipient.blocks <- block.Height:
// If the recipient is shutting down, exit
// without delivering the block. This may be
// the case when two blocks are mined in quick
// succession, and the arbitrator resolves
// after the first block, and does not need to
// consume the second block.
case <-recipient.quit:
log.Debugf("channel: %v exit without "+
"receiving block: %v",
recipient.chanPoint,
block.Height)
// If the chain arb is shutting down, we don't
// need to deliver any more blocks (everything
// will be shutting down).
case <-c.quit:
return
}
}
// Exit if the chain arbitrator is shutting down.
case <-c.quit:
return
}
}
}
// republishClosingTxs will load any stored cooperative or unilateral closing
// transactions and republish them. This helps ensure propagation of the
// transactions in the event that prior publications failed.
func (c *ChainArbitrator) republishClosingTxs(
channel *channeldb.OpenChannel) error {
// If the channel has had its unilateral close broadcasted already,
// republish it in case it didn't propagate.
if channel.HasChanStatus(channeldb.ChanStatusCommitBroadcasted) {
err := c.rebroadcast(
channel, channeldb.ChanStatusCommitBroadcasted,
)
if err != nil {
return err
}
}
// If the channel has had its cooperative close broadcasted
// already, republish it in case it didn't propagate.
if channel.HasChanStatus(channeldb.ChanStatusCoopBroadcasted) {
err := c.rebroadcast(
channel, channeldb.ChanStatusCoopBroadcasted,
)
if err != nil {
return err
}
}
return nil
}
// rebroadcast is a helper method which will republish the unilateral or
// cooperative close transaction or a channel in a particular state.
//
// NOTE: There is no risk to calling this method if the channel isn't in either
// CommitmentBroadcasted or CoopBroadcasted, but the logs will be misleading.
func (c *ChainArbitrator) rebroadcast(channel *channeldb.OpenChannel,
state channeldb.ChannelStatus) error {
chanPoint := channel.FundingOutpoint
var (
closeTx *wire.MsgTx
kind string
err error
)
switch state {
case channeldb.ChanStatusCommitBroadcasted:
kind = "force"
closeTx, err = channel.BroadcastedCommitment()
case channeldb.ChanStatusCoopBroadcasted:
kind = "coop"
closeTx, err = channel.BroadcastedCooperative()
default:
return fmt.Errorf("unknown closing state: %v", state)
}
switch {
// This can happen for channels that had their closing tx published
// before we started storing it to disk.
case err == channeldb.ErrNoCloseTx:
log.Warnf("Channel %v is in state %v, but no %s closing tx "+
"to re-publish...", chanPoint, state, kind)
return nil
case err != nil:
return err
}
log.Infof("Re-publishing %s close tx(%v) for channel %v",
kind, closeTx.TxHash(), chanPoint)
label := labels.MakeLabel(
labels.LabelTypeChannelClose, &channel.ShortChannelID,
)
err = c.cfg.PublishTx(closeTx, label)
if err != nil && err != lnwallet.ErrDoubleSpend {
log.Warnf("Unable to broadcast %s close tx(%v): %v",
kind, closeTx.TxHash(), err)
}
return nil
}
// Stop signals the ChainArbitrator to trigger a graceful shutdown. Any active
// channel arbitrators will be signalled to exit, and this method will block
// until they've all exited.
func (c *ChainArbitrator) Stop() error {
if !atomic.CompareAndSwapInt32(&c.stopped, 0, 1) {
return nil
}
log.Info("ChainArbitrator shutting down...")
defer log.Debug("ChainArbitrator shutdown complete")
close(c.quit)
var (
activeWatchers = make(map[wire.OutPoint]*chainWatcher)
activeChannels = make(map[wire.OutPoint]*ChannelArbitrator)
)
// Copy the current set of active watchers and arbitrators to shutdown.
// We don't want to hold the lock when shutting down each watcher or
// arbitrator individually, as they may need to acquire this mutex.
c.Lock()
for chanPoint, watcher := range c.activeWatchers {
activeWatchers[chanPoint] = watcher
}
for chanPoint, arbitrator := range c.activeChannels {
activeChannels[chanPoint] = arbitrator
}
c.Unlock()
for chanPoint, watcher := range activeWatchers {
log.Tracef("Attempting to stop ChainWatcher(%v)",
chanPoint)
if err := watcher.Stop(); err != nil {
log.Errorf("unable to stop watcher for "+
"ChannelPoint(%v): %v", chanPoint, err)
}
}
for chanPoint, arbitrator := range activeChannels {
log.Tracef("Attempting to stop ChannelArbitrator(%v)",
chanPoint)
if err := arbitrator.Stop(); err != nil {
log.Errorf("unable to stop arbitrator for "+
"ChannelPoint(%v): %v", chanPoint, err)
}
}
c.wg.Wait()
return nil
}
// ContractUpdate is a message packages the latest set of active HTLCs on a
// commitment, and also identifies which commitment received a new set of
// HTLCs.
type ContractUpdate struct {
// HtlcKey identifies which commitment the HTLCs below are present on.
HtlcKey HtlcSetKey
// Htlcs are the of active HTLCs on the commitment identified by the
// above HtlcKey.
Htlcs []channeldb.HTLC
}
// ContractSignals is used by outside subsystems to notify a channel arbitrator
// of its ShortChannelID.
type ContractSignals struct {
// ShortChanID is the up to date short channel ID for a contract. This
// can change either if when the contract was added it didn't yet have
// a stable identifier, or in the case of a reorg.
ShortChanID lnwire.ShortChannelID
}
// UpdateContractSignals sends a set of active, up to date contract signals to
// the ChannelArbitrator which is has been assigned to the channel infield by
// the passed channel point.
func (c *ChainArbitrator) UpdateContractSignals(chanPoint wire.OutPoint,
signals *ContractSignals) error {
log.Infof("Attempting to update ContractSignals for ChannelPoint(%v)",
chanPoint)
c.Lock()
arbitrator, ok := c.activeChannels[chanPoint]
c.Unlock()
if !ok {
return fmt.Errorf("unable to find arbitrator")
}
arbitrator.UpdateContractSignals(signals)
return nil
}
// NotifyContractUpdate lets a channel arbitrator know that a new
// ContractUpdate is available. This calls the ChannelArbitrator's internal
// method NotifyContractUpdate which waits for a response on a done chan before
// returning. This method will return an error if the ChannelArbitrator is not
// in the activeChannels map. However, this only happens if the arbitrator is
// resolved and the related link would already be shut down.
func (c *ChainArbitrator) NotifyContractUpdate(chanPoint wire.OutPoint,
update *ContractUpdate) error {
c.Lock()
arbitrator, ok := c.activeChannels[chanPoint]
c.Unlock()
if !ok {
return fmt.Errorf("can't find arbitrator for %v", chanPoint)
}
arbitrator.notifyContractUpdate(update)
return nil
}
// GetChannelArbitrator safely returns the channel arbitrator for a given
// channel outpoint.
func (c *ChainArbitrator) GetChannelArbitrator(chanPoint wire.OutPoint) (
*ChannelArbitrator, error) {
c.Lock()
arbitrator, ok := c.activeChannels[chanPoint]
c.Unlock()
if !ok {
return nil, fmt.Errorf("unable to find arbitrator")
}
return arbitrator, nil
}
// forceCloseReq is a request sent from an outside sub-system to the arbitrator
// that watches a particular channel to broadcast the commitment transaction,
// and enter the resolution phase of the channel.
type forceCloseReq struct {
// errResp is a channel that will be sent upon either in the case of
// force close success (nil error), or in the case on an error.
//
// NOTE; This channel MUST be buffered.
errResp chan error
// closeTx is a channel that carries the transaction which ultimately
// closed out the channel.
closeTx chan *wire.MsgTx
}
// ForceCloseContract attempts to force close the channel infield by the passed
// channel point. A force close will immediately terminate the contract,
// causing it to enter the resolution phase. If the force close was successful,
// then the force close transaction itself will be returned.
//
// TODO(roasbeef): just return the summary itself?
func (c *ChainArbitrator) ForceCloseContract(chanPoint wire.OutPoint) (*wire.MsgTx, error) {
c.Lock()
arbitrator, ok := c.activeChannels[chanPoint]
c.Unlock()
if !ok {
return nil, fmt.Errorf("unable to find arbitrator")
}
log.Infof("Attempting to force close ChannelPoint(%v)", chanPoint)
// Before closing, we'll attempt to send a disable update for the
// channel. We do so before closing the channel as otherwise the current
// edge policy won't be retrievable from the graph.
if err := c.cfg.DisableChannel(chanPoint); err != nil {
log.Warnf("Unable to disable channel %v on "+
"close: %v", chanPoint, err)
}
errChan := make(chan error, 1)
respChan := make(chan *wire.MsgTx, 1)
// With the channel found, and the request crafted, we'll send over a
// force close request to the arbitrator that watches this channel.
select {
case arbitrator.forceCloseReqs <- &forceCloseReq{
errResp: errChan,
closeTx: respChan,
}:
case <-c.quit:
return nil, ErrChainArbExiting
}
// We'll await two responses: the error response, and the transaction
// that closed out the channel.
select {
case err := <-errChan:
if err != nil {
return nil, err
}
case <-c.quit:
return nil, ErrChainArbExiting
}
var closeTx *wire.MsgTx
select {
case closeTx = <-respChan:
case <-c.quit:
return nil, ErrChainArbExiting
}
return closeTx, nil
}
// WatchNewChannel sends the ChainArbitrator a message to create a
// ChannelArbitrator tasked with watching over a new channel. Once a new
// channel has finished its final funding flow, it should be registered with
// the ChainArbitrator so we can properly react to any on-chain events.
func (c *ChainArbitrator) WatchNewChannel(newChan *channeldb.OpenChannel) error {
c.Lock()
defer c.Unlock()
chanPoint := newChan.FundingOutpoint
log.Infof("Creating new ChannelArbitrator for ChannelPoint(%v)",
chanPoint)
// If we're already watching this channel, then we'll ignore this
// request.
if _, ok := c.activeChannels[chanPoint]; ok {
return nil
}
// First, also create an active chainWatcher for this channel to ensure
// that we detect any relevant on chain events.
chainWatcher, err := newChainWatcher(
chainWatcherConfig{
chanState: newChan,
notifier: c.cfg.Notifier,
signer: c.cfg.Signer,
isOurAddr: c.cfg.IsOurAddress,
contractBreach: func(
retInfo *lnwallet.BreachRetribution) error {
return c.cfg.ContractBreach(
chanPoint, retInfo,
)
},
extractStateNumHint: lnwallet.GetStateNumHint,
auxLeafStore: c.cfg.AuxLeafStore,
auxResolver: c.cfg.AuxResolver,
},
)
if err != nil {
return err
}
c.activeWatchers[chanPoint] = chainWatcher
// We'll also create a new channel arbitrator instance using this new
// channel, and our internal state.
channelArb, err := newActiveChannelArbitrator(
newChan, c, chainWatcher.SubscribeChannelEvents(),
)
if err != nil {
return err
}
// With the arbitrator created, we'll add it to our set of active
// arbitrators, then launch it.
c.activeChannels[chanPoint] = channelArb
if err := channelArb.Start(nil); err != nil {
return err
}
return chainWatcher.Start()
}
// SubscribeChannelEvents returns a new active subscription for the set of
// possible on-chain events for a particular channel. The struct can be used by
// callers to be notified whenever an event that changes the state of the
// channel on-chain occurs.
func (c *ChainArbitrator) SubscribeChannelEvents(
chanPoint wire.OutPoint) (*ChainEventSubscription, error) {
// First, we'll attempt to look up the active watcher for this channel.
// If we can't find it, then we'll return an error back to the caller.
c.Lock()
watcher, ok := c.activeWatchers[chanPoint]
c.Unlock()
if !ok {
return nil, fmt.Errorf("unable to find watcher for: %v",
chanPoint)
}
// With the watcher located, we'll request for it to create a new chain
// event subscription client.
return watcher.SubscribeChannelEvents(), nil
}
// FindOutgoingHTLCDeadline returns the deadline in absolute block height for
// the specified outgoing HTLC. For an outgoing HTLC, its deadline is defined
// by the timeout height of its corresponding incoming HTLC - this is the
// expiry height the that remote peer can spend his/her outgoing HTLC via the
// timeout path.
func (c *ChainArbitrator) FindOutgoingHTLCDeadline(scid lnwire.ShortChannelID,
outgoingHTLC channeldb.HTLC) fn.Option[int32] {
// Find the outgoing HTLC's corresponding incoming HTLC in the circuit
// map.
rHash := outgoingHTLC.RHash
circuit := models.CircuitKey{
ChanID: scid,
HtlcID: outgoingHTLC.HtlcIndex,
}
incomingCircuit := c.cfg.QueryIncomingCircuit(circuit)
// If there's no incoming circuit found, we will use the default
// deadline.
if incomingCircuit == nil {
log.Warnf("ChannelArbitrator(%v): incoming circuit key not "+
"found for rHash=%x, using default deadline instead",
scid, rHash)
return fn.None[int32]()
}
// If this is a locally initiated HTLC, it means we are the first hop.
// In this case, we can relax the deadline.
if incomingCircuit.ChanID.IsDefault() {
log.Infof("ChannelArbitrator(%v): using default deadline for "+
"locally initiated HTLC for rHash=%x", scid, rHash)
return fn.None[int32]()
}
log.Debugf("Found incoming circuit %v for rHash=%x using outgoing "+
"circuit %v", incomingCircuit, rHash, circuit)
c.Lock()
defer c.Unlock()
// Iterate over all active channels to find the incoming HTLC specified
// by its circuit key.
for cp, channelArb := range c.activeChannels {
// Skip if the SCID doesn't match.
if channelArb.cfg.ShortChanID != incomingCircuit.ChanID {
continue
}
// Make sure the channel arbitrator has the latest view of its
// active HTLCs.
channelArb.updateActiveHTLCs()
// Iterate all the known HTLCs to find the targeted incoming
// HTLC.
for _, htlcs := range channelArb.activeHTLCs {
for _, htlc := range htlcs.incomingHTLCs {
// Skip if the index doesn't match.
if htlc.HtlcIndex != incomingCircuit.HtlcID {
continue
}
log.Debugf("ChannelArbitrator(%v): found "+
"incoming HTLC in channel=%v using "+
"rHash=%x, refundTimeout=%v", scid,
cp, rHash, htlc.RefundTimeout)
return fn.Some(int32(htlc.RefundTimeout))
}
}
}
// If there's no incoming HTLC found, yet we have the incoming circuit,
// something is wrong - in this case, we return the none deadline.
log.Errorf("ChannelArbitrator(%v): incoming HTLC not found for "+
"rHash=%x, using default deadline instead", scid, rHash)
return fn.None[int32]()
}
// TODO(roasbeef): arbitration reports
// * types: contested, waiting for success conf, etc