diff --git a/graph/db/graph.go b/graph/db/graph.go index ad9ad2ad4..315651a0d 100644 --- a/graph/db/graph.go +++ b/graph/db/graph.go @@ -403,16 +403,6 @@ func initChannelGraph(db kvdb.Backend) error { return nil } -// NewPathFindTx returns a new read transaction that can be used for a single -// path finding session. Will return nil if the graph cache is enabled. -func (c *ChannelGraph) NewPathFindTx() (kvdb.RTx, error) { - if c.graphCache != nil { - return nil, nil - } - - return c.db.BeginReadTx() -} - // AddrsForNode returns all known addresses for the target node public key that // the graph DB is aware of. The returned boolean indicates if the given node is // unknown to the graph DB or not. @@ -3907,6 +3897,64 @@ func (c *ChannelGraph) IsClosedScid(scid lnwire.ShortChannelID) (bool, error) { return isClosed, nil } +// GraphSession will provide the call-back with access to a NodeTraverser +// instance which can be used to perform queries against the channel graph. If +// the graph cache is not enabled, then the call-back will be provided with +// access to the graph via a consistent read-only transaction. +func (c *ChannelGraph) GraphSession(cb func(graph NodeTraverser) error) error { + var ( + tx kvdb.RTx + err error + commit = func() {} + ) + if c.graphCache == nil { + tx, err = c.db.BeginReadTx() + if err != nil { + return err + } + + commit = func() { + if err := tx.Rollback(); err != nil { + log.Errorf("Unable to rollback tx: %v", err) + } + } + } + defer commit() + + return cb(&nodeTraverserSession{ + db: c, + tx: tx, + }) +} + +// nodeTraverserSession implements the NodeTraverser interface but with a +// backing read only transaction for a consistent view of the graph in the case +// where the graph Cache has not been enabled. +type nodeTraverserSession struct { + tx kvdb.RTx + db *ChannelGraph +} + +// ForEachNodeDirectedChannel calls the callback for every channel of the given +// node. +// +// NOTE: Part of the NodeTraverser interface. +func (c *nodeTraverserSession) ForEachNodeDirectedChannel(nodePub route.Vertex, + cb func(channel *DirectedChannel) error) error { + + return c.db.ForEachNodeDirectedChannelTx(c.tx, nodePub, cb) +} + +// FetchNodeFeatures returns the features of the given node. If the node is +// unknown, assume no additional features are supported. +// +// NOTE: Part of the NodeTraverser interface. +func (c *nodeTraverserSession) FetchNodeFeatures(nodePub route.Vertex) ( + *lnwire.FeatureVector, error) { + + return c.db.FetchNodeFeaturesTx(c.tx, nodePub) +} + func putLightningNode(nodeBucket kvdb.RwBucket, aliasBucket kvdb.RwBucket, // nolint:dupl updateIndex kvdb.RwBucket, node *models.LightningNode) error { diff --git a/graph/graphsession/graph_session.go b/graph/graphsession/graph_session.go deleted file mode 100644 index 535a8beb5..000000000 --- a/graph/graphsession/graph_session.go +++ /dev/null @@ -1,137 +0,0 @@ -package graphsession - -import ( - "fmt" - - graphdb "github.com/lightningnetwork/lnd/graph/db" - "github.com/lightningnetwork/lnd/kvdb" - "github.com/lightningnetwork/lnd/lnwire" - "github.com/lightningnetwork/lnd/routing" - "github.com/lightningnetwork/lnd/routing/route" -) - -// Factory implements the routing.GraphSessionFactory and can be used to start -// a session with a ReadOnlyGraph. -type Factory struct { - graph ReadOnlyGraph -} - -// NewGraphSessionFactory constructs a new Factory which can then be used to -// start a new session. -func NewGraphSessionFactory(graph ReadOnlyGraph) routing.GraphSessionFactory { - return &Factory{ - graph: graph, - } -} - -// NewGraphSession will produce a new Graph to use for a path-finding session. -// It returns the Graph along with a call-back that must be called once Graph -// access is complete. This call-back will close any read-only transaction that -// was created at Graph construction time. -// -// NOTE: This is part of the routing.GraphSessionFactory interface. -func (g *Factory) NewGraphSession() (routing.Graph, func() error, error) { - tx, err := g.graph.NewPathFindTx() - if err != nil { - return nil, nil, err - } - - session := &session{ - graph: g.graph, - tx: tx, - } - - return session, session.close, nil -} - -// A compile-time check to ensure that Factory implements the -// routing.GraphSessionFactory interface. -var _ routing.GraphSessionFactory = (*Factory)(nil) - -// session is an implementation of the routing.Graph interface where the same -// read-only transaction is held across calls to the graph and can be used to -// access the backing channel graph. -type session struct { - graph graph - tx kvdb.RTx -} - -// close closes the read-only transaction being used to access the backing -// graph. If no transaction was started then this is a no-op. -func (g *session) close() error { - if g.tx == nil { - return nil - } - - err := g.tx.Rollback() - if err != nil { - return fmt.Errorf("error closing db tx: %w", err) - } - - return nil -} - -// ForEachNodeDirectedChannel calls the callback for every channel of the given -// node. -// -// NOTE: Part of the routing.Graph interface. -func (g *session) ForEachNodeDirectedChannel(nodePub route.Vertex, - cb func(channel *graphdb.DirectedChannel) error) error { - - return g.graph.ForEachNodeDirectedChannelTx(g.tx, nodePub, cb) -} - -// FetchNodeFeatures returns the features of the given node. If the node is -// unknown, assume no additional features are supported. -// -// NOTE: Part of the routing.Graph interface. -func (g *session) FetchNodeFeatures(nodePub route.Vertex) ( - *lnwire.FeatureVector, error) { - - return g.graph.FetchNodeFeaturesTx(g.tx, nodePub) -} - -// A compile-time check to ensure that *session implements the -// routing.Graph interface. -var _ routing.Graph = (*session)(nil) - -// ReadOnlyGraph is a graph extended with a call to create a new read-only -// transaction that can then be used to make further queries to the graph. -type ReadOnlyGraph interface { - // NewPathFindTx returns a new read transaction that can be used for a - // single path finding session. Will return nil if the graph cache is - // enabled. - NewPathFindTx() (kvdb.RTx, error) - - graph -} - -// graph describes the API necessary for a graph source to have access to on a -// database implementation, like channeldb.ChannelGraph, in order to be used by -// the Router for pathfinding. -type graph interface { - // ForEachNodeDirectedChannelTx iterates through all channels of a given - // node, executing the passed callback on the directed edge representing - // the channel and its incoming policy. If the callback returns an - // error, then the iteration is halted with the error propagated back - // up to the caller. - // - // Unknown policies are passed into the callback as nil values. - // - // NOTE: if a nil tx is provided, then it is expected that the - // implementation create a read only tx. - ForEachNodeDirectedChannelTx(tx kvdb.RTx, node route.Vertex, - cb func(channel *graphdb.DirectedChannel) error) error - - // FetchNodeFeaturesTx returns the features of a given node. If no - // features are known for the node, an empty feature vector is returned. - // - // NOTE: if a nil tx is provided, then it is expected that the - // implementation create a read only tx. - FetchNodeFeaturesTx(tx kvdb.RTx, node route.Vertex) ( - *lnwire.FeatureVector, error) -} - -// A compile-time check to ensure that *channeldb.ChannelGraph implements the -// graph interface. -var _ graph = (*graphdb.ChannelGraph)(nil) diff --git a/routing/graph.go b/routing/graph.go index fa1e7eb1d..115f2daf1 100644 --- a/routing/graph.go +++ b/routing/graph.go @@ -21,16 +21,15 @@ type Graph interface { FetchNodeFeatures(nodePub route.Vertex) (*lnwire.FeatureVector, error) } -// GraphSessionFactory can be used to produce a new Graph instance which can -// then be used for a path-finding session. Depending on the implementation, -// the Graph session will represent a DB connection where a read-lock is being -// held across calls to the backing Graph. +// GraphSessionFactory can be used to gain access to a graphdb.NodeTraverser +// instance which can then be used for a path-finding session. Depending on the +// implementation, the session will represent a DB connection where a read-lock +// is being held across calls to the backing graph. type GraphSessionFactory interface { - // NewGraphSession will produce a new Graph to use for a path-finding - // session. It returns the Graph along with a call-back that must be - // called once Graph access is complete. This call-back will close any - // read-only transaction that was created at Graph construction time. - NewGraphSession() (Graph, func() error, error) + // GraphSession will provide the call-back with access to a + // graphdb.NodeTraverser instance which can be used to perform queries + // against the channel graph. + GraphSession(cb func(graph graphdb.NodeTraverser) error) error } // FetchAmountPairCapacity determines the maximal public capacity between two diff --git a/routing/integrated_routing_context_test.go b/routing/integrated_routing_context_test.go index 1402ce9e4..7270b6f6d 100644 --- a/routing/integrated_routing_context_test.go +++ b/routing/integrated_routing_context_test.go @@ -8,7 +8,6 @@ import ( "time" "github.com/lightningnetwork/lnd/fn/v2" - graphdb "github.com/lightningnetwork/lnd/graph/db" "github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing/route" @@ -211,7 +210,7 @@ func (c *integratedRoutingContext) testPayment(maxParts uint32, session, err := newPaymentSession( &payment, c.graph.source.pubkey, getBandwidthHints, - newMockGraphSessionFactory(c.graph), mc, c.pathFindingCfg, + c.graph, mc, c.pathFindingCfg, ) if err != nil { c.t.Fatal(err) @@ -317,88 +316,3 @@ func getNodeIndex(route *route.Route, failureSource route.Vertex) *int { } return nil } - -type mockGraphSessionFactory struct { - Graph -} - -func newMockGraphSessionFactory(graph Graph) GraphSessionFactory { - return &mockGraphSessionFactory{Graph: graph} -} - -func (m *mockGraphSessionFactory) NewGraphSession() (Graph, func() error, - error) { - - return m, func() error { - return nil - }, nil -} - -var _ GraphSessionFactory = (*mockGraphSessionFactory)(nil) -var _ Graph = (*mockGraphSessionFactory)(nil) - -type mockGraphSessionFactoryChanDB struct { - graph *graphdb.ChannelGraph -} - -func newMockGraphSessionFactoryFromChanDB( - graph *graphdb.ChannelGraph) *mockGraphSessionFactoryChanDB { - - return &mockGraphSessionFactoryChanDB{ - graph: graph, - } -} - -func (g *mockGraphSessionFactoryChanDB) NewGraphSession() (Graph, func() error, - error) { - - tx, err := g.graph.NewPathFindTx() - if err != nil { - return nil, nil, err - } - - session := &mockGraphSessionChanDB{ - graph: g.graph, - tx: tx, - } - - return session, session.close, nil -} - -var _ GraphSessionFactory = (*mockGraphSessionFactoryChanDB)(nil) - -type mockGraphSessionChanDB struct { - graph *graphdb.ChannelGraph - tx kvdb.RTx -} - -func newMockGraphSessionChanDB(graph *graphdb.ChannelGraph) Graph { - return &mockGraphSessionChanDB{ - graph: graph, - } -} - -func (g *mockGraphSessionChanDB) close() error { - if g.tx == nil { - return nil - } - - err := g.tx.Rollback() - if err != nil { - return fmt.Errorf("error closing db tx: %w", err) - } - - return nil -} - -func (g *mockGraphSessionChanDB) ForEachNodeDirectedChannel(nodePub route.Vertex, - cb func(channel *graphdb.DirectedChannel) error) error { - - return g.graph.ForEachNodeDirectedChannelTx(g.tx, nodePub, cb) -} - -func (g *mockGraphSessionChanDB) FetchNodeFeatures(nodePub route.Vertex) ( - *lnwire.FeatureVector, error) { - - return g.graph.FetchNodeFeaturesTx(g.tx, nodePub) -} diff --git a/routing/integrated_routing_test.go b/routing/integrated_routing_test.go index 4a2447b48..9636b10f7 100644 --- a/routing/integrated_routing_test.go +++ b/routing/integrated_routing_test.go @@ -404,5 +404,5 @@ func TestPaymentAddrOnlyNoSplit(t *testing.T) { // The payment should have failed since we need to split in order to // route a payment to the destination, but they don't actually support // MPP. - require.Equal(t, err.Error(), errNoPathFound.Error()) + require.ErrorIs(t, err, errNoPathFound) } diff --git a/routing/mock_graph_test.go b/routing/mock_graph_test.go index 68dc1e80a..cb5d07f3a 100644 --- a/routing/mock_graph_test.go +++ b/routing/mock_graph_test.go @@ -227,6 +227,17 @@ func (m *mockGraph) FetchNodeFeatures(nodePub route.Vertex) ( return lnwire.EmptyFeatureVector(), nil } +// GraphSession will provide the call-back with access to a +// graphdb.NodeTraverser instance which can be used to perform queries against +// the channel graph. +// +// NOTE: Part of the GraphSessionFactory interface. +func (m *mockGraph) GraphSession( + cb func(graph graphdb.NodeTraverser) error) error { + + return cb(m) +} + // htlcResult describes the resolution of an htlc. If failure is nil, the htlc // was settled. type htlcResult struct { diff --git a/routing/pathfind_test.go b/routing/pathfind_test.go index f91d95d6b..fd92ed934 100644 --- a/routing/pathfind_test.go +++ b/routing/pathfind_test.go @@ -3221,30 +3221,25 @@ func dbFindPath(graph *graphdb.ChannelGraph, return nil, err } - graphSessFactory := newMockGraphSessionFactoryFromChanDB(graph) + var route []*unifiedEdge + err = graph.GraphSession(func(graph graphdb.NodeTraverser) error { + route, _, err = findPath( + &graphParams{ + additionalEdges: additionalEdges, + bandwidthHints: bandwidthHints, + graph: graph, + }, + r, cfg, sourceNode.PubKeyBytes, source, target, amt, + timePref, finalHtlcExpiry, + ) - graphSess, closeGraphSess, err := graphSessFactory.NewGraphSession() + return err + }) if err != nil { return nil, err } - defer func() { - if err := closeGraphSess(); err != nil { - log.Errorf("Error closing graph session: %v", err) - } - }() - - route, _, err := findPath( - &graphParams{ - additionalEdges: additionalEdges, - bandwidthHints: bandwidthHints, - graph: graphSess, - }, - r, cfg, sourceNode.PubKeyBytes, source, target, amt, timePref, - finalHtlcExpiry, - ) - - return route, err + return route, nil } // dbFindBlindedPaths calls findBlindedPaths after getting a db transaction from @@ -3258,8 +3253,7 @@ func dbFindBlindedPaths(graph *graphdb.ChannelGraph, } return findBlindedPaths( - newMockGraphSessionChanDB(graph), sourceNode.PubKeyBytes, - restrictions, + graph, sourceNode.PubKeyBytes, restrictions, ) } diff --git a/routing/payment_session.go b/routing/payment_session.go index b21d40998..607c10443 100644 --- a/routing/payment_session.go +++ b/routing/payment_session.go @@ -343,19 +343,7 @@ func (p *paymentSession) RequestRoute(maxAmt, feeLimit lnwire.MilliSatoshi, } for { - // Get a routing graph session. - graph, closeGraph, err := p.graphSessFactory.NewGraphSession() - if err != nil { - return nil, err - } - - err = findPath(graph) - // First, close routing graph session. - // NOTE: this will be removed in an upcoming commit. - if graphErr := closeGraph(); graphErr != nil { - log.Errorf("could not close graph session: %v", - graphErr) - } + err := p.graphSessFactory.GraphSession(findPath) // If there is an error, and it is not a path finding error, we // return it immediately. if err != nil && !lnutils.ErrorAs[*pathFindingError](err) { @@ -365,7 +353,8 @@ func (p *paymentSession) RequestRoute(maxAmt, feeLimit lnwire.MilliSatoshi, // to check the underlying error. // //nolint:errorlint - err = err.(*pathFindingError).Unwrap() + pErr, _ := err.(*pathFindingError) + err = pErr.Unwrap() } // Otherwise, we'll switch on the path finding error. diff --git a/routing/payment_session_test.go b/routing/payment_session_test.go index 278e09044..51eeda425 100644 --- a/routing/payment_session_test.go +++ b/routing/payment_session_test.go @@ -4,6 +4,7 @@ import ( "testing" "time" + graphdb "github.com/lightningnetwork/lnd/graph/db" "github.com/lightningnetwork/lnd/graph/db/models" "github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/lnwire" @@ -118,7 +119,7 @@ func TestUpdateAdditionalEdge(t *testing.T) { func(Graph) (bandwidthHints, error) { return &mockBandwidthHints{}, nil }, - newMockGraphSessionFactory(&sessionGraph{}), + &sessionGraph{}, &MissionControl{}, PathFindingConfig{}, ) @@ -196,7 +197,7 @@ func TestRequestRoute(t *testing.T) { func(Graph) (bandwidthHints, error) { return &mockBandwidthHints{}, nil }, - newMockGraphSessionFactory(&sessionGraph{}), + &sessionGraph{}, &MissionControl{}, PathFindingConfig{}, ) @@ -257,3 +258,9 @@ type sessionGraph struct { func (g *sessionGraph) sourceNode() route.Vertex { return route.Vertex{} } + +func (g *sessionGraph) GraphSession( + cb func(graph graphdb.NodeTraverser) error) error { + + return cb(g) +} diff --git a/routing/router_test.go b/routing/router_test.go index 543f3b000..6efc75df2 100644 --- a/routing/router_test.go +++ b/routing/router_test.go @@ -135,20 +135,18 @@ func createTestCtxFromGraphInstanceAssumeValid(t *testing.T, sourceNode, err := graphInstance.graph.SourceNode() require.NoError(t, err) sessionSource := &SessionSource{ - GraphSessionFactory: newMockGraphSessionFactoryFromChanDB( - graphInstance.graph, - ), - SourceNode: sourceNode, - GetLink: graphInstance.getLink, - PathFindingConfig: pathFindingConfig, - MissionControl: mc, + GraphSessionFactory: graphInstance.graph, + SourceNode: sourceNode, + GetLink: graphInstance.getLink, + PathFindingConfig: pathFindingConfig, + MissionControl: mc, } graphBuilder := newMockGraphBuilder(graphInstance.graph) router, err := New(Config{ SelfNode: sourceNode.PubKeyBytes, - RoutingGraph: newMockGraphSessionChanDB(graphInstance.graph), + RoutingGraph: graphInstance.graph, Chain: chain, Payer: &mockPaymentAttemptDispatcherOld{}, Control: makeMockControlTower(), diff --git a/server.go b/server.go index 0e370d293..ecb3eceae 100644 --- a/server.go +++ b/server.go @@ -45,7 +45,6 @@ import ( "github.com/lightningnetwork/lnd/graph" graphdb "github.com/lightningnetwork/lnd/graph/db" "github.com/lightningnetwork/lnd/graph/db/models" - "github.com/lightningnetwork/lnd/graph/graphsession" "github.com/lightningnetwork/lnd/healthcheck" "github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/htlcswitch/hop" @@ -1038,13 +1037,11 @@ func newServer(cfg *Config, listenAddrs []net.Addr, return nil, fmt.Errorf("error getting source node: %w", err) } paymentSessionSource := &routing.SessionSource{ - GraphSessionFactory: graphsession.NewGraphSessionFactory( - dbs.GraphDB, - ), - SourceNode: sourceNode, - MissionControl: s.defaultMC, - GetLink: s.htlcSwitch.GetLinkByShortID, - PathFindingConfig: pathFindingConfig, + GraphSessionFactory: dbs.GraphDB, + SourceNode: sourceNode, + MissionControl: s.defaultMC, + GetLink: s.htlcSwitch.GetLinkByShortID, + PathFindingConfig: pathFindingConfig, } paymentControl := channeldb.NewPaymentControl(dbs.ChanStateDB)