From e004447da60c2599f1ad50979e28728bfc601fea Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Thu, 13 Feb 2025 09:44:14 +0200 Subject: [PATCH] multi: remove the need for the graphsession package In this commit, we add a `GraphSession` method to the `ChannelGraph`. This method provides a caller with access to a `NodeTraverser`. This is used by pathfinding to create a graph "session" overwhich to perform a set of queries for a pathfinding attempt. With this refactor, we hide details such as DB transaction creation and transaction commits from the caller. So with this, pathfinding does not need to remember to "close the graph session". With this commit, the `graphsession` package may be completely removed. --- graph/db/graph.go | 68 ++++++++-- graph/graphsession/graph_session.go | 137 --------------------- routing/graph.go | 17 ++- routing/integrated_routing_context_test.go | 88 +------------ routing/integrated_routing_test.go | 2 +- routing/mock_graph_test.go | 11 ++ routing/pathfind_test.go | 36 +++--- routing/payment_session.go | 17 +-- routing/payment_session_test.go | 11 +- routing/router_test.go | 14 +-- server.go | 13 +- 11 files changed, 117 insertions(+), 297 deletions(-) delete mode 100644 graph/graphsession/graph_session.go diff --git a/graph/db/graph.go b/graph/db/graph.go index ad9ad2ad4..315651a0d 100644 --- a/graph/db/graph.go +++ b/graph/db/graph.go @@ -403,16 +403,6 @@ func initChannelGraph(db kvdb.Backend) error { return nil } -// NewPathFindTx returns a new read transaction that can be used for a single -// path finding session. Will return nil if the graph cache is enabled. -func (c *ChannelGraph) NewPathFindTx() (kvdb.RTx, error) { - if c.graphCache != nil { - return nil, nil - } - - return c.db.BeginReadTx() -} - // AddrsForNode returns all known addresses for the target node public key that // the graph DB is aware of. The returned boolean indicates if the given node is // unknown to the graph DB or not. @@ -3907,6 +3897,64 @@ func (c *ChannelGraph) IsClosedScid(scid lnwire.ShortChannelID) (bool, error) { return isClosed, nil } +// GraphSession will provide the call-back with access to a NodeTraverser +// instance which can be used to perform queries against the channel graph. If +// the graph cache is not enabled, then the call-back will be provided with +// access to the graph via a consistent read-only transaction. +func (c *ChannelGraph) GraphSession(cb func(graph NodeTraverser) error) error { + var ( + tx kvdb.RTx + err error + commit = func() {} + ) + if c.graphCache == nil { + tx, err = c.db.BeginReadTx() + if err != nil { + return err + } + + commit = func() { + if err := tx.Rollback(); err != nil { + log.Errorf("Unable to rollback tx: %v", err) + } + } + } + defer commit() + + return cb(&nodeTraverserSession{ + db: c, + tx: tx, + }) +} + +// nodeTraverserSession implements the NodeTraverser interface but with a +// backing read only transaction for a consistent view of the graph in the case +// where the graph Cache has not been enabled. +type nodeTraverserSession struct { + tx kvdb.RTx + db *ChannelGraph +} + +// ForEachNodeDirectedChannel calls the callback for every channel of the given +// node. +// +// NOTE: Part of the NodeTraverser interface. +func (c *nodeTraverserSession) ForEachNodeDirectedChannel(nodePub route.Vertex, + cb func(channel *DirectedChannel) error) error { + + return c.db.ForEachNodeDirectedChannelTx(c.tx, nodePub, cb) +} + +// FetchNodeFeatures returns the features of the given node. If the node is +// unknown, assume no additional features are supported. +// +// NOTE: Part of the NodeTraverser interface. +func (c *nodeTraverserSession) FetchNodeFeatures(nodePub route.Vertex) ( + *lnwire.FeatureVector, error) { + + return c.db.FetchNodeFeaturesTx(c.tx, nodePub) +} + func putLightningNode(nodeBucket kvdb.RwBucket, aliasBucket kvdb.RwBucket, // nolint:dupl updateIndex kvdb.RwBucket, node *models.LightningNode) error { diff --git a/graph/graphsession/graph_session.go b/graph/graphsession/graph_session.go deleted file mode 100644 index 535a8beb5..000000000 --- a/graph/graphsession/graph_session.go +++ /dev/null @@ -1,137 +0,0 @@ -package graphsession - -import ( - "fmt" - - graphdb "github.com/lightningnetwork/lnd/graph/db" - "github.com/lightningnetwork/lnd/kvdb" - "github.com/lightningnetwork/lnd/lnwire" - "github.com/lightningnetwork/lnd/routing" - "github.com/lightningnetwork/lnd/routing/route" -) - -// Factory implements the routing.GraphSessionFactory and can be used to start -// a session with a ReadOnlyGraph. -type Factory struct { - graph ReadOnlyGraph -} - -// NewGraphSessionFactory constructs a new Factory which can then be used to -// start a new session. -func NewGraphSessionFactory(graph ReadOnlyGraph) routing.GraphSessionFactory { - return &Factory{ - graph: graph, - } -} - -// NewGraphSession will produce a new Graph to use for a path-finding session. -// It returns the Graph along with a call-back that must be called once Graph -// access is complete. This call-back will close any read-only transaction that -// was created at Graph construction time. -// -// NOTE: This is part of the routing.GraphSessionFactory interface. -func (g *Factory) NewGraphSession() (routing.Graph, func() error, error) { - tx, err := g.graph.NewPathFindTx() - if err != nil { - return nil, nil, err - } - - session := &session{ - graph: g.graph, - tx: tx, - } - - return session, session.close, nil -} - -// A compile-time check to ensure that Factory implements the -// routing.GraphSessionFactory interface. -var _ routing.GraphSessionFactory = (*Factory)(nil) - -// session is an implementation of the routing.Graph interface where the same -// read-only transaction is held across calls to the graph and can be used to -// access the backing channel graph. -type session struct { - graph graph - tx kvdb.RTx -} - -// close closes the read-only transaction being used to access the backing -// graph. If no transaction was started then this is a no-op. -func (g *session) close() error { - if g.tx == nil { - return nil - } - - err := g.tx.Rollback() - if err != nil { - return fmt.Errorf("error closing db tx: %w", err) - } - - return nil -} - -// ForEachNodeDirectedChannel calls the callback for every channel of the given -// node. -// -// NOTE: Part of the routing.Graph interface. -func (g *session) ForEachNodeDirectedChannel(nodePub route.Vertex, - cb func(channel *graphdb.DirectedChannel) error) error { - - return g.graph.ForEachNodeDirectedChannelTx(g.tx, nodePub, cb) -} - -// FetchNodeFeatures returns the features of the given node. If the node is -// unknown, assume no additional features are supported. -// -// NOTE: Part of the routing.Graph interface. -func (g *session) FetchNodeFeatures(nodePub route.Vertex) ( - *lnwire.FeatureVector, error) { - - return g.graph.FetchNodeFeaturesTx(g.tx, nodePub) -} - -// A compile-time check to ensure that *session implements the -// routing.Graph interface. -var _ routing.Graph = (*session)(nil) - -// ReadOnlyGraph is a graph extended with a call to create a new read-only -// transaction that can then be used to make further queries to the graph. -type ReadOnlyGraph interface { - // NewPathFindTx returns a new read transaction that can be used for a - // single path finding session. Will return nil if the graph cache is - // enabled. - NewPathFindTx() (kvdb.RTx, error) - - graph -} - -// graph describes the API necessary for a graph source to have access to on a -// database implementation, like channeldb.ChannelGraph, in order to be used by -// the Router for pathfinding. -type graph interface { - // ForEachNodeDirectedChannelTx iterates through all channels of a given - // node, executing the passed callback on the directed edge representing - // the channel and its incoming policy. If the callback returns an - // error, then the iteration is halted with the error propagated back - // up to the caller. - // - // Unknown policies are passed into the callback as nil values. - // - // NOTE: if a nil tx is provided, then it is expected that the - // implementation create a read only tx. - ForEachNodeDirectedChannelTx(tx kvdb.RTx, node route.Vertex, - cb func(channel *graphdb.DirectedChannel) error) error - - // FetchNodeFeaturesTx returns the features of a given node. If no - // features are known for the node, an empty feature vector is returned. - // - // NOTE: if a nil tx is provided, then it is expected that the - // implementation create a read only tx. - FetchNodeFeaturesTx(tx kvdb.RTx, node route.Vertex) ( - *lnwire.FeatureVector, error) -} - -// A compile-time check to ensure that *channeldb.ChannelGraph implements the -// graph interface. -var _ graph = (*graphdb.ChannelGraph)(nil) diff --git a/routing/graph.go b/routing/graph.go index fa1e7eb1d..115f2daf1 100644 --- a/routing/graph.go +++ b/routing/graph.go @@ -21,16 +21,15 @@ type Graph interface { FetchNodeFeatures(nodePub route.Vertex) (*lnwire.FeatureVector, error) } -// GraphSessionFactory can be used to produce a new Graph instance which can -// then be used for a path-finding session. Depending on the implementation, -// the Graph session will represent a DB connection where a read-lock is being -// held across calls to the backing Graph. +// GraphSessionFactory can be used to gain access to a graphdb.NodeTraverser +// instance which can then be used for a path-finding session. Depending on the +// implementation, the session will represent a DB connection where a read-lock +// is being held across calls to the backing graph. type GraphSessionFactory interface { - // NewGraphSession will produce a new Graph to use for a path-finding - // session. It returns the Graph along with a call-back that must be - // called once Graph access is complete. This call-back will close any - // read-only transaction that was created at Graph construction time. - NewGraphSession() (Graph, func() error, error) + // GraphSession will provide the call-back with access to a + // graphdb.NodeTraverser instance which can be used to perform queries + // against the channel graph. + GraphSession(cb func(graph graphdb.NodeTraverser) error) error } // FetchAmountPairCapacity determines the maximal public capacity between two diff --git a/routing/integrated_routing_context_test.go b/routing/integrated_routing_context_test.go index 1402ce9e4..7270b6f6d 100644 --- a/routing/integrated_routing_context_test.go +++ b/routing/integrated_routing_context_test.go @@ -8,7 +8,6 @@ import ( "time" "github.com/lightningnetwork/lnd/fn/v2" - graphdb "github.com/lightningnetwork/lnd/graph/db" "github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing/route" @@ -211,7 +210,7 @@ func (c *integratedRoutingContext) testPayment(maxParts uint32, session, err := newPaymentSession( &payment, c.graph.source.pubkey, getBandwidthHints, - newMockGraphSessionFactory(c.graph), mc, c.pathFindingCfg, + c.graph, mc, c.pathFindingCfg, ) if err != nil { c.t.Fatal(err) @@ -317,88 +316,3 @@ func getNodeIndex(route *route.Route, failureSource route.Vertex) *int { } return nil } - -type mockGraphSessionFactory struct { - Graph -} - -func newMockGraphSessionFactory(graph Graph) GraphSessionFactory { - return &mockGraphSessionFactory{Graph: graph} -} - -func (m *mockGraphSessionFactory) NewGraphSession() (Graph, func() error, - error) { - - return m, func() error { - return nil - }, nil -} - -var _ GraphSessionFactory = (*mockGraphSessionFactory)(nil) -var _ Graph = (*mockGraphSessionFactory)(nil) - -type mockGraphSessionFactoryChanDB struct { - graph *graphdb.ChannelGraph -} - -func newMockGraphSessionFactoryFromChanDB( - graph *graphdb.ChannelGraph) *mockGraphSessionFactoryChanDB { - - return &mockGraphSessionFactoryChanDB{ - graph: graph, - } -} - -func (g *mockGraphSessionFactoryChanDB) NewGraphSession() (Graph, func() error, - error) { - - tx, err := g.graph.NewPathFindTx() - if err != nil { - return nil, nil, err - } - - session := &mockGraphSessionChanDB{ - graph: g.graph, - tx: tx, - } - - return session, session.close, nil -} - -var _ GraphSessionFactory = (*mockGraphSessionFactoryChanDB)(nil) - -type mockGraphSessionChanDB struct { - graph *graphdb.ChannelGraph - tx kvdb.RTx -} - -func newMockGraphSessionChanDB(graph *graphdb.ChannelGraph) Graph { - return &mockGraphSessionChanDB{ - graph: graph, - } -} - -func (g *mockGraphSessionChanDB) close() error { - if g.tx == nil { - return nil - } - - err := g.tx.Rollback() - if err != nil { - return fmt.Errorf("error closing db tx: %w", err) - } - - return nil -} - -func (g *mockGraphSessionChanDB) ForEachNodeDirectedChannel(nodePub route.Vertex, - cb func(channel *graphdb.DirectedChannel) error) error { - - return g.graph.ForEachNodeDirectedChannelTx(g.tx, nodePub, cb) -} - -func (g *mockGraphSessionChanDB) FetchNodeFeatures(nodePub route.Vertex) ( - *lnwire.FeatureVector, error) { - - return g.graph.FetchNodeFeaturesTx(g.tx, nodePub) -} diff --git a/routing/integrated_routing_test.go b/routing/integrated_routing_test.go index 4a2447b48..9636b10f7 100644 --- a/routing/integrated_routing_test.go +++ b/routing/integrated_routing_test.go @@ -404,5 +404,5 @@ func TestPaymentAddrOnlyNoSplit(t *testing.T) { // The payment should have failed since we need to split in order to // route a payment to the destination, but they don't actually support // MPP. - require.Equal(t, err.Error(), errNoPathFound.Error()) + require.ErrorIs(t, err, errNoPathFound) } diff --git a/routing/mock_graph_test.go b/routing/mock_graph_test.go index 68dc1e80a..cb5d07f3a 100644 --- a/routing/mock_graph_test.go +++ b/routing/mock_graph_test.go @@ -227,6 +227,17 @@ func (m *mockGraph) FetchNodeFeatures(nodePub route.Vertex) ( return lnwire.EmptyFeatureVector(), nil } +// GraphSession will provide the call-back with access to a +// graphdb.NodeTraverser instance which can be used to perform queries against +// the channel graph. +// +// NOTE: Part of the GraphSessionFactory interface. +func (m *mockGraph) GraphSession( + cb func(graph graphdb.NodeTraverser) error) error { + + return cb(m) +} + // htlcResult describes the resolution of an htlc. If failure is nil, the htlc // was settled. type htlcResult struct { diff --git a/routing/pathfind_test.go b/routing/pathfind_test.go index f91d95d6b..fd92ed934 100644 --- a/routing/pathfind_test.go +++ b/routing/pathfind_test.go @@ -3221,30 +3221,25 @@ func dbFindPath(graph *graphdb.ChannelGraph, return nil, err } - graphSessFactory := newMockGraphSessionFactoryFromChanDB(graph) + var route []*unifiedEdge + err = graph.GraphSession(func(graph graphdb.NodeTraverser) error { + route, _, err = findPath( + &graphParams{ + additionalEdges: additionalEdges, + bandwidthHints: bandwidthHints, + graph: graph, + }, + r, cfg, sourceNode.PubKeyBytes, source, target, amt, + timePref, finalHtlcExpiry, + ) - graphSess, closeGraphSess, err := graphSessFactory.NewGraphSession() + return err + }) if err != nil { return nil, err } - defer func() { - if err := closeGraphSess(); err != nil { - log.Errorf("Error closing graph session: %v", err) - } - }() - - route, _, err := findPath( - &graphParams{ - additionalEdges: additionalEdges, - bandwidthHints: bandwidthHints, - graph: graphSess, - }, - r, cfg, sourceNode.PubKeyBytes, source, target, amt, timePref, - finalHtlcExpiry, - ) - - return route, err + return route, nil } // dbFindBlindedPaths calls findBlindedPaths after getting a db transaction from @@ -3258,8 +3253,7 @@ func dbFindBlindedPaths(graph *graphdb.ChannelGraph, } return findBlindedPaths( - newMockGraphSessionChanDB(graph), sourceNode.PubKeyBytes, - restrictions, + graph, sourceNode.PubKeyBytes, restrictions, ) } diff --git a/routing/payment_session.go b/routing/payment_session.go index b21d40998..607c10443 100644 --- a/routing/payment_session.go +++ b/routing/payment_session.go @@ -343,19 +343,7 @@ func (p *paymentSession) RequestRoute(maxAmt, feeLimit lnwire.MilliSatoshi, } for { - // Get a routing graph session. - graph, closeGraph, err := p.graphSessFactory.NewGraphSession() - if err != nil { - return nil, err - } - - err = findPath(graph) - // First, close routing graph session. - // NOTE: this will be removed in an upcoming commit. - if graphErr := closeGraph(); graphErr != nil { - log.Errorf("could not close graph session: %v", - graphErr) - } + err := p.graphSessFactory.GraphSession(findPath) // If there is an error, and it is not a path finding error, we // return it immediately. if err != nil && !lnutils.ErrorAs[*pathFindingError](err) { @@ -365,7 +353,8 @@ func (p *paymentSession) RequestRoute(maxAmt, feeLimit lnwire.MilliSatoshi, // to check the underlying error. // //nolint:errorlint - err = err.(*pathFindingError).Unwrap() + pErr, _ := err.(*pathFindingError) + err = pErr.Unwrap() } // Otherwise, we'll switch on the path finding error. diff --git a/routing/payment_session_test.go b/routing/payment_session_test.go index 278e09044..51eeda425 100644 --- a/routing/payment_session_test.go +++ b/routing/payment_session_test.go @@ -4,6 +4,7 @@ import ( "testing" "time" + graphdb "github.com/lightningnetwork/lnd/graph/db" "github.com/lightningnetwork/lnd/graph/db/models" "github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/lnwire" @@ -118,7 +119,7 @@ func TestUpdateAdditionalEdge(t *testing.T) { func(Graph) (bandwidthHints, error) { return &mockBandwidthHints{}, nil }, - newMockGraphSessionFactory(&sessionGraph{}), + &sessionGraph{}, &MissionControl{}, PathFindingConfig{}, ) @@ -196,7 +197,7 @@ func TestRequestRoute(t *testing.T) { func(Graph) (bandwidthHints, error) { return &mockBandwidthHints{}, nil }, - newMockGraphSessionFactory(&sessionGraph{}), + &sessionGraph{}, &MissionControl{}, PathFindingConfig{}, ) @@ -257,3 +258,9 @@ type sessionGraph struct { func (g *sessionGraph) sourceNode() route.Vertex { return route.Vertex{} } + +func (g *sessionGraph) GraphSession( + cb func(graph graphdb.NodeTraverser) error) error { + + return cb(g) +} diff --git a/routing/router_test.go b/routing/router_test.go index 543f3b000..6efc75df2 100644 --- a/routing/router_test.go +++ b/routing/router_test.go @@ -135,20 +135,18 @@ func createTestCtxFromGraphInstanceAssumeValid(t *testing.T, sourceNode, err := graphInstance.graph.SourceNode() require.NoError(t, err) sessionSource := &SessionSource{ - GraphSessionFactory: newMockGraphSessionFactoryFromChanDB( - graphInstance.graph, - ), - SourceNode: sourceNode, - GetLink: graphInstance.getLink, - PathFindingConfig: pathFindingConfig, - MissionControl: mc, + GraphSessionFactory: graphInstance.graph, + SourceNode: sourceNode, + GetLink: graphInstance.getLink, + PathFindingConfig: pathFindingConfig, + MissionControl: mc, } graphBuilder := newMockGraphBuilder(graphInstance.graph) router, err := New(Config{ SelfNode: sourceNode.PubKeyBytes, - RoutingGraph: newMockGraphSessionChanDB(graphInstance.graph), + RoutingGraph: graphInstance.graph, Chain: chain, Payer: &mockPaymentAttemptDispatcherOld{}, Control: makeMockControlTower(), diff --git a/server.go b/server.go index 0e370d293..ecb3eceae 100644 --- a/server.go +++ b/server.go @@ -45,7 +45,6 @@ import ( "github.com/lightningnetwork/lnd/graph" graphdb "github.com/lightningnetwork/lnd/graph/db" "github.com/lightningnetwork/lnd/graph/db/models" - "github.com/lightningnetwork/lnd/graph/graphsession" "github.com/lightningnetwork/lnd/healthcheck" "github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/htlcswitch/hop" @@ -1038,13 +1037,11 @@ func newServer(cfg *Config, listenAddrs []net.Addr, return nil, fmt.Errorf("error getting source node: %w", err) } paymentSessionSource := &routing.SessionSource{ - GraphSessionFactory: graphsession.NewGraphSessionFactory( - dbs.GraphDB, - ), - SourceNode: sourceNode, - MissionControl: s.defaultMC, - GetLink: s.htlcSwitch.GetLinkByShortID, - PathFindingConfig: pathFindingConfig, + GraphSessionFactory: dbs.GraphDB, + SourceNode: sourceNode, + MissionControl: s.defaultMC, + GetLink: s.htlcSwitch.GetLinkByShortID, + PathFindingConfig: pathFindingConfig, } paymentControl := channeldb.NewPaymentControl(dbs.ChanStateDB)