mirror of
https://github.com/lightningnetwork/lnd.git
synced 2024-11-19 09:53:54 +01:00
28a828a11e
Only the MissionControl instance should use this variable and it should not be accessible to users of MissionControl.
654 lines
19 KiB
Go
654 lines
19 KiB
Go
package routing
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/btcsuite/btcd/btcutil"
|
|
"github.com/btcsuite/btcwallet/walletdb"
|
|
"github.com/lightningnetwork/lnd/channeldb"
|
|
"github.com/lightningnetwork/lnd/fn"
|
|
"github.com/lightningnetwork/lnd/kvdb"
|
|
"github.com/lightningnetwork/lnd/lnwire"
|
|
"github.com/lightningnetwork/lnd/routing/route"
|
|
)
|
|
|
|
const (
|
|
// DefaultPenaltyHalfLife is the default half-life duration. The
|
|
// half-life duration defines after how much time a penalized node or
|
|
// channel is back at 50% probability.
|
|
DefaultPenaltyHalfLife = time.Hour
|
|
|
|
// minSecondChanceInterval is the minimum time required between
|
|
// second-chance failures.
|
|
//
|
|
// If nodes return a channel policy related failure, they may get a
|
|
// second chance to forward the payment. It could be that the channel
|
|
// policy that we are aware of is not up to date. This is especially
|
|
// important in case of mobile apps that are mostly offline.
|
|
//
|
|
// However, we don't want to give nodes the option to endlessly return
|
|
// new channel updates so that we are kept busy trying to route through
|
|
// that node until the payment loop times out.
|
|
//
|
|
// Therefore we only grant a second chance to a node if the previous
|
|
// second chance is sufficiently long ago. This is what
|
|
// minSecondChanceInterval defines. If a second policy failure comes in
|
|
// within that interval, we will apply a penalty.
|
|
//
|
|
// Second chances granted are tracked on the level of node pairs. This
|
|
// means that if a node has multiple channels to the same peer, they
|
|
// will only get a single second chance to route to that peer again.
|
|
// Nodes forward non-strict, so it isn't necessary to apply a less
|
|
// restrictive channel level tracking scheme here.
|
|
minSecondChanceInterval = time.Minute
|
|
|
|
// DefaultMaxMcHistory is the default maximum history size.
|
|
DefaultMaxMcHistory = 1000
|
|
|
|
// DefaultMcFlushInterval is the default interval we use to flush MC state
|
|
// to the database.
|
|
DefaultMcFlushInterval = time.Second
|
|
|
|
// prevSuccessProbability is the assumed probability for node pairs that
|
|
// successfully relayed the previous attempt.
|
|
prevSuccessProbability = 0.95
|
|
|
|
// DefaultAprioriWeight is the default a priori weight. See
|
|
// MissionControlConfig for further explanation.
|
|
DefaultAprioriWeight = 0.5
|
|
|
|
// DefaultMinFailureRelaxInterval is the default minimum time that must
|
|
// have passed since the previously recorded failure before the failure
|
|
// amount may be raised.
|
|
DefaultMinFailureRelaxInterval = time.Minute
|
|
|
|
// DefaultFeeEstimationTimeout is the default value for
|
|
// FeeEstimationTimeout. It defines the maximum duration that the
|
|
// probing fee estimation is allowed to take.
|
|
DefaultFeeEstimationTimeout = time.Minute
|
|
|
|
// DefaultMissionControlNamespace is the name of the default mission
|
|
// control name space. This is used as the sub-bucket key within the
|
|
// top level DB bucket to store mission control results.
|
|
DefaultMissionControlNamespace = "default"
|
|
)
|
|
|
|
var (
|
|
// ErrInvalidMcHistory is returned if we get a negative mission control
|
|
// history count.
|
|
ErrInvalidMcHistory = errors.New("mission control history must be " +
|
|
">= 0")
|
|
|
|
// ErrInvalidFailureInterval is returned if we get an invalid failure
|
|
// interval.
|
|
ErrInvalidFailureInterval = errors.New("failure interval must be >= 0")
|
|
)
|
|
|
|
// NodeResults contains previous results from a node to its peers.
|
|
type NodeResults map[route.Vertex]TimedPairResult
|
|
|
|
// MissionControl contains state which summarizes the past attempts of HTLC
|
|
// routing by external callers when sending payments throughout the network. It
|
|
// acts as a shared memory during routing attempts with the goal to optimize the
|
|
// payment attempt success rate.
|
|
//
|
|
// Failed payment attempts are reported to mission control. These reports are
|
|
// used to track the time of the last node or channel level failure. The time
|
|
// since the last failure is used to estimate a success probability that is fed
|
|
// into the path finding process for subsequent payment attempts.
|
|
type MissionControl struct {
|
|
// state is the internal mission control state that is input for
|
|
// probability estimation.
|
|
state *missionControlState
|
|
|
|
// now is expected to return the current time. It is supplied as an
|
|
// external function to enable deterministic unit tests.
|
|
now func() time.Time
|
|
|
|
// selfNode is our pubkey.
|
|
selfNode route.Vertex
|
|
|
|
store *missionControlStore
|
|
|
|
// estimator is the probability estimator that is used with the payment
|
|
// results that mission control collects.
|
|
estimator Estimator
|
|
|
|
// onConfigUpdate is a function that is called whenever the
|
|
// mission control state is updated.
|
|
onConfigUpdate fn.Option[func(cfg *MissionControlConfig)]
|
|
|
|
mu sync.Mutex
|
|
|
|
// TODO(roasbeef): further counters, if vertex continually unavailable,
|
|
// add to another generation
|
|
|
|
// TODO(roasbeef): also add favorable metrics for nodes
|
|
}
|
|
|
|
// MissionControlConfig defines parameters that control mission control
|
|
// behaviour.
|
|
type MissionControlConfig struct {
|
|
// Estimator gives probability estimates for node pairs.
|
|
Estimator Estimator
|
|
|
|
// OnConfigUpdate is function that is called whenever the
|
|
// mission control state is updated.
|
|
OnConfigUpdate fn.Option[func(cfg *MissionControlConfig)]
|
|
|
|
// MaxMcHistory defines the maximum number of payment results that are
|
|
// held on disk.
|
|
MaxMcHistory int
|
|
|
|
// McFlushInterval defines the ticker interval when we flush the
|
|
// accumulated state to the DB.
|
|
McFlushInterval time.Duration
|
|
|
|
// MinFailureRelaxInterval is the minimum time that must have passed
|
|
// since the previously recorded failure before the failure amount may
|
|
// be raised.
|
|
MinFailureRelaxInterval time.Duration
|
|
}
|
|
|
|
func (c *MissionControlConfig) validate() error {
|
|
if c.MaxMcHistory < 0 {
|
|
return ErrInvalidMcHistory
|
|
}
|
|
|
|
if c.MinFailureRelaxInterval < 0 {
|
|
return ErrInvalidFailureInterval
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// String returns a string representation of a mission control config.
|
|
func (c *MissionControlConfig) String() string {
|
|
return fmt.Sprintf("maximum history: %v, minimum failure relax "+
|
|
"interval: %v", c.MaxMcHistory, c.MinFailureRelaxInterval)
|
|
}
|
|
|
|
// TimedPairResult describes a timestamped pair result.
|
|
type TimedPairResult struct {
|
|
// FailTime is the time of the last failure.
|
|
FailTime time.Time
|
|
|
|
// FailAmt is the amount of the last failure. This amount may be pushed
|
|
// up if a later success is higher than the last failed amount.
|
|
FailAmt lnwire.MilliSatoshi
|
|
|
|
// SuccessTime is the time of the last success.
|
|
SuccessTime time.Time
|
|
|
|
// SuccessAmt is the highest amount that successfully forwarded. This
|
|
// isn't necessarily the last success amount. The value of this field
|
|
// may also be pushed down if a later failure is lower than the highest
|
|
// success amount. Because of this, SuccessAmt may not match
|
|
// SuccessTime.
|
|
SuccessAmt lnwire.MilliSatoshi
|
|
}
|
|
|
|
// MissionControlSnapshot contains a snapshot of the current state of mission
|
|
// control.
|
|
type MissionControlSnapshot struct {
|
|
// Pairs is a list of channels for which specific information is
|
|
// logged.
|
|
Pairs []MissionControlPairSnapshot
|
|
}
|
|
|
|
// MissionControlPairSnapshot contains a snapshot of the current node pair
|
|
// state in mission control.
|
|
type MissionControlPairSnapshot struct {
|
|
// Pair is the node pair of which the state is described.
|
|
Pair DirectedNodePair
|
|
|
|
// TimedPairResult contains the data for this pair.
|
|
TimedPairResult
|
|
}
|
|
|
|
// paymentResult is the information that becomes available when a payment
|
|
// attempt completes.
|
|
type paymentResult struct {
|
|
id uint64
|
|
timeFwd, timeReply time.Time
|
|
route *mcRoute
|
|
success bool
|
|
failureSourceIdx *int
|
|
failure lnwire.FailureMessage
|
|
}
|
|
|
|
// NewMissionControl returns a new instance of missionControl.
|
|
func NewMissionControl(db kvdb.Backend, self route.Vertex,
|
|
cfg *MissionControlConfig) (*MissionControl, error) {
|
|
|
|
log.Debugf("Instantiating mission control with config: %v, %v", cfg,
|
|
cfg.Estimator)
|
|
|
|
if err := cfg.validate(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
store, err := newMissionControlStore(
|
|
newDefaultNamespacedStore(db), cfg.MaxMcHistory,
|
|
cfg.McFlushInterval,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
mc := &MissionControl{
|
|
state: newMissionControlState(
|
|
cfg.MinFailureRelaxInterval,
|
|
),
|
|
now: time.Now,
|
|
selfNode: self,
|
|
store: store,
|
|
estimator: cfg.Estimator,
|
|
onConfigUpdate: cfg.OnConfigUpdate,
|
|
}
|
|
|
|
if err := mc.init(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return mc, nil
|
|
}
|
|
|
|
// RunStoreTicker runs the mission control store's ticker.
|
|
func (m *MissionControl) RunStoreTicker() {
|
|
m.store.run()
|
|
}
|
|
|
|
// StopStoreTicker stops the mission control store's ticker.
|
|
func (m *MissionControl) StopStoreTicker() {
|
|
log.Debug("Stopping mission control store ticker")
|
|
defer log.Debug("Mission control store ticker stopped")
|
|
|
|
m.store.stop()
|
|
}
|
|
|
|
// init initializes mission control with historical data.
|
|
func (m *MissionControl) init() error {
|
|
log.Debugf("Mission control state reconstruction started")
|
|
|
|
start := time.Now()
|
|
|
|
results, err := m.store.fetchAll()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, result := range results {
|
|
m.applyPaymentResult(result)
|
|
}
|
|
|
|
log.Debugf("Mission control state reconstruction finished: "+
|
|
"n=%v, time=%v", len(results), time.Since(start))
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetConfig returns the config that mission control is currently configured
|
|
// with. All fields are copied by value, so we do not need to worry about
|
|
// mutation.
|
|
func (m *MissionControl) GetConfig() *MissionControlConfig {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
return &MissionControlConfig{
|
|
Estimator: m.estimator,
|
|
MaxMcHistory: m.store.maxRecords,
|
|
McFlushInterval: m.store.flushInterval,
|
|
MinFailureRelaxInterval: m.state.minFailureRelaxInterval,
|
|
}
|
|
}
|
|
|
|
// SetConfig validates the config provided and updates mission control's config
|
|
// if it is valid.
|
|
func (m *MissionControl) SetConfig(cfg *MissionControlConfig) error {
|
|
if cfg == nil {
|
|
return errors.New("nil mission control config")
|
|
}
|
|
|
|
if err := cfg.validate(); err != nil {
|
|
return err
|
|
}
|
|
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
log.Infof("Active mission control cfg: %v, estimator: %v", cfg,
|
|
cfg.Estimator)
|
|
|
|
m.store.maxRecords = cfg.MaxMcHistory
|
|
m.state.minFailureRelaxInterval = cfg.MinFailureRelaxInterval
|
|
m.estimator = cfg.Estimator
|
|
|
|
// Execute the callback function if it is set.
|
|
m.onConfigUpdate.WhenSome(func(f func(cfg *MissionControlConfig)) {
|
|
f(cfg)
|
|
})
|
|
|
|
return nil
|
|
}
|
|
|
|
// ResetHistory resets the history of MissionControl returning it to a state as
|
|
// if no payment attempts have been made.
|
|
func (m *MissionControl) ResetHistory() error {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
if err := m.store.clear(); err != nil {
|
|
return err
|
|
}
|
|
|
|
m.state.resetHistory()
|
|
|
|
log.Debugf("Mission control history cleared")
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetProbability is expected to return the success probability of a payment
|
|
// from fromNode along edge.
|
|
func (m *MissionControl) GetProbability(fromNode, toNode route.Vertex,
|
|
amt lnwire.MilliSatoshi, capacity btcutil.Amount) float64 {
|
|
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
now := m.now()
|
|
results, _ := m.state.getLastPairResult(fromNode)
|
|
|
|
// Use a distinct probability estimation function for local channels.
|
|
if fromNode == m.selfNode {
|
|
return m.estimator.LocalPairProbability(now, results, toNode)
|
|
}
|
|
|
|
return m.estimator.PairProbability(
|
|
now, results, toNode, amt, capacity,
|
|
)
|
|
}
|
|
|
|
// GetHistorySnapshot takes a snapshot from the current mission control state
|
|
// and actual probability estimates.
|
|
func (m *MissionControl) GetHistorySnapshot() *MissionControlSnapshot {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
log.Debugf("Requesting history snapshot from mission control")
|
|
|
|
return m.state.getSnapshot()
|
|
}
|
|
|
|
// ImportHistory imports the set of mission control results provided to our
|
|
// in-memory state. These results are not persisted, so will not survive
|
|
// restarts.
|
|
func (m *MissionControl) ImportHistory(history *MissionControlSnapshot,
|
|
force bool) error {
|
|
|
|
if history == nil {
|
|
return errors.New("cannot import nil history")
|
|
}
|
|
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
log.Infof("Importing history snapshot with %v pairs to mission control",
|
|
len(history.Pairs))
|
|
|
|
imported := m.state.importSnapshot(history, force)
|
|
|
|
log.Infof("Imported %v results to mission control", imported)
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetPairHistorySnapshot returns the stored history for a given node pair.
|
|
func (m *MissionControl) GetPairHistorySnapshot(
|
|
fromNode, toNode route.Vertex) TimedPairResult {
|
|
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
results, ok := m.state.getLastPairResult(fromNode)
|
|
if !ok {
|
|
return TimedPairResult{}
|
|
}
|
|
|
|
result, ok := results[toNode]
|
|
if !ok {
|
|
return TimedPairResult{}
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
// ReportPaymentFail reports a failed payment to mission control as input for
|
|
// future probability estimates. The failureSourceIdx argument indicates the
|
|
// failure source. If it is nil, the failure source is unknown. This function
|
|
// returns a reason if this failure is a final failure. In that case no further
|
|
// payment attempts need to be made.
|
|
func (m *MissionControl) ReportPaymentFail(paymentID uint64, rt *route.Route,
|
|
failureSourceIdx *int, failure lnwire.FailureMessage) (
|
|
*channeldb.FailureReason, error) {
|
|
|
|
timestamp := m.now()
|
|
|
|
result := &paymentResult{
|
|
success: false,
|
|
timeFwd: timestamp,
|
|
timeReply: timestamp,
|
|
id: paymentID,
|
|
failureSourceIdx: failureSourceIdx,
|
|
failure: failure,
|
|
route: extractMCRoute(rt),
|
|
}
|
|
|
|
return m.processPaymentResult(result)
|
|
}
|
|
|
|
// ReportPaymentSuccess reports a successful payment to mission control as input
|
|
// for future probability estimates.
|
|
func (m *MissionControl) ReportPaymentSuccess(paymentID uint64,
|
|
rt *route.Route) error {
|
|
|
|
timestamp := m.now()
|
|
|
|
result := &paymentResult{
|
|
timeFwd: timestamp,
|
|
timeReply: timestamp,
|
|
id: paymentID,
|
|
success: true,
|
|
route: extractMCRoute(rt),
|
|
}
|
|
|
|
_, err := m.processPaymentResult(result)
|
|
return err
|
|
}
|
|
|
|
// processPaymentResult stores a payment result in the mission control store and
|
|
// updates mission control's in-memory state.
|
|
func (m *MissionControl) processPaymentResult(result *paymentResult) (
|
|
*channeldb.FailureReason, error) {
|
|
|
|
// Store complete result in database.
|
|
m.store.AddResult(result)
|
|
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
// Apply result to update mission control state.
|
|
reason := m.applyPaymentResult(result)
|
|
|
|
return reason, nil
|
|
}
|
|
|
|
// applyPaymentResult applies a payment result as input for future probability
|
|
// estimates. It returns a bool indicating whether this error is a final error
|
|
// and no further payment attempts need to be made.
|
|
func (m *MissionControl) applyPaymentResult(
|
|
result *paymentResult) *channeldb.FailureReason {
|
|
|
|
// Interpret result.
|
|
i := interpretResult(
|
|
result.route, result.success, result.failureSourceIdx,
|
|
result.failure,
|
|
)
|
|
|
|
if i.policyFailure != nil {
|
|
if m.state.requestSecondChance(
|
|
result.timeReply,
|
|
i.policyFailure.From, i.policyFailure.To,
|
|
) {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// If there is a node-level failure, record a failure for every tried
|
|
// connection of that node. A node-level failure can be considered as a
|
|
// failure that would have occurred with any of the node's channels.
|
|
//
|
|
// Ideally we'd also record the failure for the untried connections of
|
|
// the node. Unfortunately this would require access to the graph and
|
|
// adding this dependency and db calls does not outweigh the benefits.
|
|
//
|
|
// Untried connections will fall back to the node probability. After the
|
|
// call to setAllPairResult below, the node probability will be equal to
|
|
// the probability of the tried channels except that the a priori
|
|
// probability is mixed in too. This effect is controlled by the
|
|
// aprioriWeight parameter. If that parameter isn't set to an extreme
|
|
// and there are a few known connections, there shouldn't be much of a
|
|
// difference. The largest difference occurs when aprioriWeight is 1. In
|
|
// that case, a node-level failure would not be applied to untried
|
|
// channels.
|
|
if i.nodeFailure != nil {
|
|
log.Debugf("Reporting node failure to Mission Control: "+
|
|
"node=%v", *i.nodeFailure)
|
|
|
|
m.state.setAllFail(*i.nodeFailure, result.timeReply)
|
|
}
|
|
|
|
for pair, pairResult := range i.pairResults {
|
|
pairResult := pairResult
|
|
|
|
if pairResult.success {
|
|
log.Debugf("Reporting pair success to Mission "+
|
|
"Control: pair=%v, amt=%v",
|
|
pair, pairResult.amt)
|
|
} else {
|
|
log.Debugf("Reporting pair failure to Mission "+
|
|
"Control: pair=%v, amt=%v",
|
|
pair, pairResult.amt)
|
|
}
|
|
|
|
m.state.setLastPairResult(
|
|
pair.From, pair.To, result.timeReply, &pairResult, false,
|
|
)
|
|
}
|
|
|
|
return i.finalFailureReason
|
|
}
|
|
|
|
// namespacedDB is an implementation of the missionControlDB that gives a user
|
|
// of the interface access to a namespaced bucket within the top level mission
|
|
// control bucket.
|
|
type namespacedDB struct {
|
|
topLevelBucketKey []byte
|
|
namespace []byte
|
|
db kvdb.Backend
|
|
}
|
|
|
|
// A compile-time check to ensure that namespacedDB implements missionControlDB.
|
|
var _ missionControlDB = (*namespacedDB)(nil)
|
|
|
|
// newDefaultNamespacedStore creates an instance of namespaceDB that uses the
|
|
// default namespace.
|
|
func newDefaultNamespacedStore(db kvdb.Backend) missionControlDB {
|
|
return newNamespacedDB(db, DefaultMissionControlNamespace)
|
|
}
|
|
|
|
// newNamespacedDB creates a new instance of missionControlDB where the DB will
|
|
// have access to a namespaced bucket within the top level mission control
|
|
// bucket.
|
|
func newNamespacedDB(db kvdb.Backend, namespace string) missionControlDB {
|
|
return &namespacedDB{
|
|
db: db,
|
|
namespace: []byte(namespace),
|
|
topLevelBucketKey: resultsKey,
|
|
}
|
|
}
|
|
|
|
// update can be used to perform reads and writes on the given bucket.
|
|
//
|
|
// NOTE: this is part of the missionControlDB interface.
|
|
func (n *namespacedDB) update(f func(bkt walletdb.ReadWriteBucket) error,
|
|
reset func()) error {
|
|
|
|
return n.db.Update(func(tx kvdb.RwTx) error {
|
|
mcStoreBkt, err := tx.CreateTopLevelBucket(n.topLevelBucketKey)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot create top level mission "+
|
|
"control bucket: %w", err)
|
|
}
|
|
|
|
namespacedBkt, err := mcStoreBkt.CreateBucketIfNotExists(
|
|
n.namespace,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot create namespaced bucket "+
|
|
"(%s) in mission control store: %w",
|
|
n.namespace, err)
|
|
}
|
|
|
|
return f(namespacedBkt)
|
|
}, reset)
|
|
}
|
|
|
|
// view can be used to perform reads on the given bucket.
|
|
//
|
|
// NOTE: this is part of the missionControlDB interface.
|
|
func (n *namespacedDB) view(f func(bkt walletdb.ReadBucket) error,
|
|
reset func()) error {
|
|
|
|
return n.db.View(func(tx kvdb.RTx) error {
|
|
mcStoreBkt := tx.ReadBucket(n.topLevelBucketKey)
|
|
if mcStoreBkt == nil {
|
|
return fmt.Errorf("top level mission control bucket " +
|
|
"not found")
|
|
}
|
|
|
|
namespacedBkt := mcStoreBkt.NestedReadBucket(n.namespace)
|
|
if namespacedBkt == nil {
|
|
return fmt.Errorf("namespaced bucket (%s) not found "+
|
|
"in mission control store", n.namespace)
|
|
}
|
|
|
|
return f(namespacedBkt)
|
|
}, reset)
|
|
}
|
|
|
|
// purge will delete all the contents in the namespace.
|
|
//
|
|
// NOTE: this is part of the missionControlDB interface.
|
|
func (n *namespacedDB) purge() error {
|
|
return n.db.Update(func(tx kvdb.RwTx) error {
|
|
mcStoreBkt := tx.ReadWriteBucket(n.topLevelBucketKey)
|
|
if mcStoreBkt == nil {
|
|
return nil
|
|
}
|
|
|
|
err := mcStoreBkt.DeleteNestedBucket(n.namespace)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = mcStoreBkt.CreateBucket(n.namespace)
|
|
|
|
return err
|
|
}, func() {})
|
|
}
|