diff --git a/routing/integrated_routing_context_test.go b/routing/integrated_routing_context_test.go index 061dcfc3a..902c2670c 100644 --- a/routing/integrated_routing_context_test.go +++ b/routing/integrated_routing_context_test.go @@ -163,12 +163,12 @@ func (c *integratedRoutingContext) testPayment(maxParts uint32, } }) - // Instantiate a new mission control with the current configuration + // Instantiate a new mission controller with the current configuration // values. - mc, err := NewMissionControl(db, c.source.pubkey, &c.mcCfg) - if err != nil { - c.t.Fatal(err) - } + mcController, err := NewMissionController(db, c.source.pubkey, &c.mcCfg) + require.NoError(c.t, err) + + mc := mcController.GetDefaultStore() getBandwidthHints := func(_ Graph) (bandwidthHints, error) { // Create bandwidth hints based on local channel balances. diff --git a/routing/missioncontrol.go b/routing/missioncontrol.go index 299e288f6..26e81c9c0 100644 --- a/routing/missioncontrol.go +++ b/routing/missioncontrol.go @@ -9,6 +9,7 @@ import ( "github.com/btcsuite/btcd/btcutil" "github.com/btcsuite/btcwallet/walletdb" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/clock" "github.com/lightningnetwork/lnd/fn" "github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/lnwire" @@ -90,6 +91,16 @@ var ( // NodeResults contains previous results from a node to its peers. type NodeResults map[route.Vertex]TimedPairResult +// mcConfig holds various config members that will be required by all +// MissionControl instances and will be the same regardless of namespace. +type mcConfig struct { + // clock is a time source used by mission control. + clock clock.Clock + + // selfNode is our pubkey. + selfNode route.Vertex +} + // MissionControl contains state which summarizes the past attempts of HTLC // routing by external callers when sending payments throughout the network. It // acts as a shared memory during routing attempts with the goal to optimize the @@ -100,17 +111,12 @@ type NodeResults map[route.Vertex]TimedPairResult // since the last failure is used to estimate a success probability that is fed // into the path finding process for subsequent payment attempts. type MissionControl struct { + cfg *mcConfig + // state is the internal mission control state that is input for // probability estimation. state *missionControlState - // now is expected to return the current time. It is supplied as an - // external function to enable deterministic unit tests. - now func() time.Time - - // selfNode is our pubkey. - selfNode route.Vertex - store *missionControlStore // estimator is the probability estimator that is used with the payment @@ -122,6 +128,16 @@ type MissionControl struct { onConfigUpdate fn.Option[func(cfg *MissionControlConfig)] mu sync.Mutex +} + +// MissionController manages MissionControl instances in various namespaces. +// +// NOTE: currently it only has a MissionControl in the default namespace. +type MissionController struct { + cfg *mcConfig + + mc *MissionControl + mu sync.Mutex // TODO(roasbeef): further counters, if vertex continually unavailable, // add to another generation @@ -129,6 +145,14 @@ type MissionControl struct { // TODO(roasbeef): also add favorable metrics for nodes } +// GetDefaultStore returns the MissionControl in the default namespace. +func (m *MissionController) GetDefaultStore() *MissionControl { + m.mu.Lock() + defer m.mu.Unlock() + + return m.mc +} + // MissionControlConfig defines parameters that control mission control // behaviour. type MissionControlConfig struct { @@ -220,9 +244,9 @@ type paymentResult struct { failure lnwire.FailureMessage } -// NewMissionControl returns a new instance of missionControl. -func NewMissionControl(db kvdb.Backend, self route.Vertex, - cfg *MissionControlConfig) (*MissionControl, error) { +// NewMissionController returns a new instance of MissionController. +func NewMissionController(db kvdb.Backend, self route.Vertex, + cfg *MissionControlConfig) (*MissionController, error) { log.Debugf("Instantiating mission control with config: %v, %v", cfg, cfg.Estimator) @@ -239,18 +263,26 @@ func NewMissionControl(db kvdb.Backend, self route.Vertex, return nil, err } - mc := &MissionControl{ - state: newMissionControlState( - cfg.MinFailureRelaxInterval, - ), - now: time.Now, - selfNode: self, + mcCfg := &mcConfig{ + clock: clock.NewDefaultClock(), + selfNode: self, + } + + // Create a mission control in the default namespace. + defaultMC := &MissionControl{ + cfg: mcCfg, + state: newMissionControlState(cfg.MinFailureRelaxInterval), store: store, estimator: cfg.Estimator, onConfigUpdate: cfg.OnConfigUpdate, } - if err := mc.init(); err != nil { + mc := &MissionController{ + cfg: mcCfg, + mc: defaultMC, + } + + if err := mc.mc.init(); err != nil { return nil, err } @@ -258,22 +290,31 @@ func NewMissionControl(db kvdb.Backend, self route.Vertex, } // RunStoreTicker runs the mission control store's ticker. -func (m *MissionControl) RunStoreTicker() { - m.store.run() +func (m *MissionController) RunStoreTicker() { + m.mu.Lock() + defer m.mu.Unlock() + + m.mc.store.run() } // StopStoreTicker stops the mission control store's ticker. -func (m *MissionControl) StopStoreTicker() { +func (m *MissionController) StopStoreTicker() { log.Debug("Stopping mission control store ticker") defer log.Debug("Mission control store ticker stopped") - m.store.stop() + m.mu.Lock() + defer m.mu.Unlock() + + m.mc.store.stop() } // init initializes mission control with historical data. func (m *MissionControl) init() error { log.Debugf("Mission control state reconstruction started") + m.mu.Lock() + defer m.mu.Unlock() + start := time.Now() results, err := m.store.fetchAll() @@ -282,7 +323,7 @@ func (m *MissionControl) init() error { } for _, result := range results { - m.applyPaymentResult(result) + _ = m.applyPaymentResult(result) } log.Debugf("Mission control state reconstruction finished: "+ @@ -360,11 +401,11 @@ func (m *MissionControl) GetProbability(fromNode, toNode route.Vertex, m.mu.Lock() defer m.mu.Unlock() - now := m.now() + now := m.cfg.clock.Now() results, _ := m.state.getLastPairResult(fromNode) // Use a distinct probability estimation function for local channels. - if fromNode == m.selfNode { + if fromNode == m.cfg.selfNode { return m.estimator.LocalPairProbability(now, results, toNode) } @@ -436,7 +477,7 @@ func (m *MissionControl) ReportPaymentFail(paymentID uint64, rt *route.Route, failureSourceIdx *int, failure lnwire.FailureMessage) ( *channeldb.FailureReason, error) { - timestamp := m.now() + timestamp := m.cfg.clock.Now() result := &paymentResult{ success: false, @@ -456,7 +497,7 @@ func (m *MissionControl) ReportPaymentFail(paymentID uint64, rt *route.Route, func (m *MissionControl) ReportPaymentSuccess(paymentID uint64, rt *route.Route) error { - timestamp := m.now() + timestamp := m.cfg.clock.Now() result := &paymentResult{ timeFwd: timestamp, diff --git a/routing/missioncontrol_state.go b/routing/missioncontrol_state.go index 7d6633f2b..c61ec8335 100644 --- a/routing/missioncontrol_state.go +++ b/routing/missioncontrol_state.go @@ -37,7 +37,8 @@ func newMissionControlState( } } -// getLastPairResult returns the current state for connections to the given node. +// getLastPairResult returns the current state for connections to the given +// node. func (m *missionControlState) getLastPairResult(node route.Vertex) (NodeResults, bool) { @@ -45,8 +46,8 @@ func (m *missionControlState) getLastPairResult(node route.Vertex) (NodeResults, return result, ok } -// ResetHistory resets the history of MissionControl returning it to a state as -// if no payment attempts have been made. +// ResetHistory resets the history of missionControlState returning it to a +// state as if no payment attempts have been made. func (m *missionControlState) resetHistory() { m.lastPairResult = make(map[route.Vertex]NodeResults) m.lastSecondChance = make(map[DirectedNodePair]time.Time) diff --git a/routing/missioncontrol_test.go b/routing/missioncontrol_test.go index 4a0f73871..1905b07db 100644 --- a/routing/missioncontrol_test.go +++ b/routing/missioncontrol_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/lightningnetwork/lnd/clock" "github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing/route" @@ -40,9 +41,10 @@ var ( ) type mcTestContext struct { - t *testing.T - mc *MissionControl - now time.Time + t *testing.T + mc *MissionControl + + clock *testClock db kvdb.Backend dbPath string @@ -52,8 +54,8 @@ type mcTestContext struct { func createMcTestContext(t *testing.T) *mcTestContext { ctx := &mcTestContext{ - t: t, - now: mcTestTime, + t: t, + clock: newTestClock(mcTestTime), } file, err := os.CreateTemp("", "*.db") @@ -85,7 +87,7 @@ func createMcTestContext(t *testing.T) *mcTestContext { // restartMc creates a new instances of mission control on the same database. func (ctx *mcTestContext) restartMc() { // Since we don't run a timer to store results in unit tests, we store - // them here before fetching back everything in NewMissionControl. + // them here before fetching back everything in NewMissionController. if ctx.mc != nil { require.NoError(ctx.t, ctx.mc.store.storeResults()) } @@ -99,7 +101,7 @@ func (ctx *mcTestContext) restartMc() { estimator, err := NewAprioriEstimator(aCfg) require.NoError(ctx.t, err) - mc, err := NewMissionControl( + mc, err := NewMissionController( ctx.db, mcTestSelf, &MissionControlConfig{Estimator: estimator}, ) @@ -107,8 +109,8 @@ func (ctx *mcTestContext) restartMc() { ctx.t.Fatal(err) } - mc.now = func() time.Time { return ctx.now } - ctx.mc = mc + mc.cfg.clock = ctx.clock + ctx.mc = mc.GetDefaultStore() } // Assert that mission control returns a probability for an edge. @@ -150,7 +152,7 @@ func (ctx *mcTestContext) reportSuccess() { func TestMissionControl(t *testing.T) { ctx := createMcTestContext(t) - ctx.now = testTime + ctx.clock.setTime(testTime) testTime := time.Date(2018, time.January, 9, 14, 00, 00, 0, time.UTC) @@ -178,7 +180,7 @@ func TestMissionControl(t *testing.T) { // Edge decay started. The node probability weighted average should now // have shifted from 1:1 to 1:0.5 -> 60%. The connection probability is // half way through the recovery, so we expect 30% here. - ctx.now = testTime.Add(30 * time.Minute) + ctx.clock.setTime(testTime.Add(30 * time.Minute)) ctx.expectP(1000, 0.3) // Edge fails again, this time without a min penalization amt. The edge @@ -188,7 +190,7 @@ func TestMissionControl(t *testing.T) { ctx.expectP(500, 0) // Edge decay started. - ctx.now = testTime.Add(60 * time.Minute) + ctx.clock.setTime(testTime.Add(60 * time.Minute)) ctx.expectP(1000, 0.3) // Restart mission control to test persistence. @@ -230,3 +232,29 @@ func TestMissionControlChannelUpdate(t *testing.T) { ) ctx.expectP(100, 0) } + +// testClock is an implementation of clock.Clock that lets the caller overwrite +// the current time at any point. +type testClock struct { + now time.Time + clock.Clock +} + +// newTestClock constructs a new testClock. +func newTestClock(startTime time.Time) *testClock { + return &testClock{ + now: startTime, + } +} + +// Now returns the underlying current time. +// +// NOTE: this is part of the clock.Clock interface. +func (c *testClock) Now() time.Time { + return c.now +} + +// setTime overwrites the current time. +func (c *testClock) setTime(n time.Time) { + c.now = n +} diff --git a/routing/mock_test.go b/routing/mock_test.go index 32f83420f..35ac3ecd9 100644 --- a/routing/mock_test.go +++ b/routing/mock_test.go @@ -125,10 +125,10 @@ func (m *mockPaymentSessionSourceOld) NewPaymentSessionEmpty() PaymentSession { } type mockMissionControlOld struct { - MissionControl + MissionController } -var _ MissionController = (*mockMissionControlOld)(nil) +var _ MissionControlQuerier = (*mockMissionControlOld)(nil) func (m *mockMissionControlOld) ReportPaymentFail( paymentID uint64, rt *route.Route, @@ -657,7 +657,7 @@ type mockMissionControl struct { mock.Mock } -var _ MissionController = (*mockMissionControl)(nil) +var _ MissionControlQuerier = (*mockMissionControl)(nil) func (m *mockMissionControl) ReportPaymentFail( paymentID uint64, rt *route.Route, diff --git a/routing/payment_session.go b/routing/payment_session.go index 9ddb62600..89fb2fd13 100644 --- a/routing/payment_session.go +++ b/routing/payment_session.go @@ -159,7 +159,7 @@ type PaymentSession interface { // paymentSession is used during an HTLC routings session to prune the local // chain view in response to failures, and also report those failures back to -// MissionControl. The snapshot copied for this session will only ever grow, +// MissionController. The snapshot copied for this session will only ever grow, // and will now be pruned after a decay like the main view within mission // control. We do this as we want to avoid the case where we continually try a // bad edge or route multiple times in a session. This can lead to an infinite @@ -184,7 +184,7 @@ type paymentSession struct { // trade-off in path finding between fees and probability. pathFindingConfig PathFindingConfig - missionControl MissionController + missionControl MissionControlQuerier // minShardAmt is the amount beyond which we won't try to further split // the payment if no route is found. If the maximum number of htlcs @@ -199,7 +199,8 @@ type paymentSession struct { // newPaymentSession instantiates a new payment session. func newPaymentSession(p *LightningPayment, selfNode route.Vertex, getBandwidthHints func(Graph) (bandwidthHints, error), - graphSessFactory GraphSessionFactory, missionControl MissionController, + graphSessFactory GraphSessionFactory, + missionControl MissionControlQuerier, pathFindingConfig PathFindingConfig) (*paymentSession, error) { edges, err := RouteHintsToEdges(p.RouteHints, p.Target) @@ -266,7 +267,7 @@ func (p *paymentSession) RequestRoute(maxAmt, feeLimit lnwire.MilliSatoshi, // Taking into account this prune view, we'll attempt to locate a path // to our destination, respecting the recommendations from - // MissionControl. + // MissionController. restrictions := &RestrictParams{ ProbabilitySource: p.missionControl.GetProbability, FeeLimit: feeLimit, diff --git a/routing/payment_session_source.go b/routing/payment_session_source.go index c89d6a8e5..ccee9bc44 100644 --- a/routing/payment_session_source.go +++ b/routing/payment_session_source.go @@ -11,7 +11,7 @@ import ( "github.com/lightningnetwork/lnd/zpay32" ) -// A compile time assertion to ensure MissionControl meets the +// A compile time assertion to ensure SessionSource meets the // PaymentSessionSource interface. var _ PaymentSessionSource = (*SessionSource)(nil) @@ -40,7 +40,7 @@ type SessionSource struct { // then take into account this set of pruned vertexes/edges to reduce // route failure and pass on graph information gained to the next // execution. - MissionControl MissionController + MissionControl MissionControlQuerier // PathFindingConfig defines global parameters that control the // trade-off in path finding between fees and probability. diff --git a/routing/router.go b/routing/router.go index 3b340f35a..1fea60ddb 100644 --- a/routing/router.go +++ b/routing/router.go @@ -167,9 +167,9 @@ type PaymentSessionSource interface { NewPaymentSessionEmpty() PaymentSession } -// MissionController is an interface that exposes failure reporting and +// MissionControlQuerier is an interface that exposes failure reporting and // probability estimation. -type MissionController interface { +type MissionControlQuerier interface { // ReportPaymentFail reports a failed payment to mission control as // input for future probability estimates. It returns a bool indicating // whether this error is a final error and no further payment attempts @@ -260,7 +260,7 @@ type Config struct { // Each run will then take into account this set of pruned // vertexes/edges to reduce route failure and pass on graph information // gained to the next execution. - MissionControl MissionController + MissionControl MissionControlQuerier // SessionSource defines a source for the router to retrieve new payment // sessions. diff --git a/routing/router_test.go b/routing/router_test.go index 0380bab00..1ca56f4b9 100644 --- a/routing/router_test.go +++ b/routing/router_test.go @@ -128,10 +128,11 @@ func createTestCtxFromGraphInstanceAssumeValid(t *testing.T, mcConfig := &MissionControlConfig{Estimator: estimator} - mc, err := NewMissionControl( + mcController, err := NewMissionController( graphInstance.graphBackend, route.Vertex{}, mcConfig, ) require.NoError(t, err, "failed to create missioncontrol") + mc := mcController.GetDefaultStore() sourceNode, err := graphInstance.graph.SourceNode() require.NoError(t, err) @@ -1081,11 +1082,15 @@ func TestSendPaymentErrorPathPruning(t *testing.T) { return preImage, nil }) - ctx.router.cfg.MissionControl.(*MissionControl).ResetHistory() + require.IsType(t, ctx.router.cfg.MissionControl, &MissionControl{}) + mc, _ := ctx.router.cfg.MissionControl.(*MissionControl) + + err := mc.ResetHistory() + require.NoError(t, err) // When we try to dispatch that payment, we should receive an error as // both attempts should fail and cause both routes to be pruned. - _, _, err := ctx.router.SendPayment(payment) + _, _, err = ctx.router.SendPayment(payment) require.Error(t, err, "payment didn't return error") // The final error returned should also indicate that the peer wasn't @@ -1102,12 +1107,10 @@ func TestSendPaymentErrorPathPruning(t *testing.T) { // We expect the first attempt to have failed with a // TemporaryChannelFailure, the second with UnknownNextPeer. msg := htlcs[0].Failure.Message - _, ok := msg.(*lnwire.FailTemporaryChannelFailure) - require.True(t, ok, "unexpected fail message") + require.IsType(t, msg, &lnwire.FailTemporaryChannelFailure{}) msg = htlcs[1].Failure.Message - _, ok = msg.(*lnwire.FailUnknownNextPeer) - require.True(t, ok, "unexpected fail message") + require.IsType(t, msg, &lnwire.FailUnknownNextPeer{}) err = ctx.router.cfg.MissionControl.(*MissionControl).ResetHistory() require.NoError(t, err, "reset history failed") @@ -1144,7 +1147,11 @@ func TestSendPaymentErrorPathPruning(t *testing.T) { getAliasFromPubKey(rt.Hops[0].PubKeyBytes, ctx.aliases), ) - ctx.router.cfg.MissionControl.(*MissionControl).ResetHistory() + require.IsType(t, ctx.router.cfg.MissionControl, &MissionControl{}) + mc, _ = ctx.router.cfg.MissionControl.(*MissionControl) + + err = mc.ResetHistory() + require.NoError(t, err) // Finally, we'll modify the SendToSwitch function to indicate that the // roasbeef -> luoji channel has insufficient capacity. This should diff --git a/rpcserver.go b/rpcserver.go index 44a3ae391..dc286603c 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -729,7 +729,7 @@ func (r *rpcServer) addDeps(s *server, macService *macaroons.Service, return info.NodeKey1Bytes, info.NodeKey2Bytes, nil }, FindRoute: s.chanRouter.FindRoute, - MissionControl: s.missionControl, + MissionControl: s.missionControl.GetDefaultStore(), ActiveNetParams: r.cfg.ActiveNetParams.Params, Tower: s.controlTower, MaxTotalTimelock: r.cfg.MaxOutgoingCltvExpiry, @@ -6071,7 +6071,8 @@ func (r *rpcServer) AddInvoice(ctx context.Context, return r.server.chanRouter.FindBlindedPaths( r.selfNode, amt, - r.server.missionControl.GetProbability, + r.server.missionControl.GetDefaultStore(). + GetProbability, blindingRestrictions, ) }, diff --git a/server.go b/server.go index 9a7276948..621661e0e 100644 --- a/server.go +++ b/server.go @@ -275,7 +275,7 @@ type server struct { breachArbitrator *contractcourt.BreachArbitrator - missionControl *routing.MissionControl + missionControl *routing.MissionController graphBuilder *graph.Builder @@ -955,7 +955,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, McFlushInterval: routingConfig.McFlushInterval, MinFailureRelaxInterval: routing.DefaultMinFailureRelaxInterval, } - s.missionControl, err = routing.NewMissionControl( + s.missionControl, err = routing.NewMissionController( dbs.ChanStateDB, selfNode.PubKeyBytes, mcCfg, ) if err != nil { @@ -985,7 +985,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanGraph, ), SourceNode: sourceNode, - MissionControl: s.missionControl, + MissionControl: s.missionControl.GetDefaultStore(), GetLink: s.htlcSwitch.GetLinkByShortID, PathFindingConfig: pathFindingConfig, } @@ -1020,7 +1020,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, Chain: cc.ChainIO, Payer: s.htlcSwitch, Control: s.controlTower, - MissionControl: s.missionControl, + MissionControl: s.missionControl.GetDefaultStore(), SessionSource: paymentSessionSource, GetLink: s.htlcSwitch.GetLinkByShortID, NextPaymentID: sequencer.NextID,