multi: add abstraction for Router and SessionSource graph access

In this commit, we completely remove the Router's dependence on a Graph
source that requires a `kvdb.RTx`. In so doing, we are more prepared for
a future where the Graph source is backed by different DB structure such
as pure SQL.

The two areas affected here are: the ChannelRouter's graph access that
it uses for pathfinding. And the SessionSource's graph access that it
uses for payments.

The ChannelRouter gets given a Graph and the SessionSource is given a
GraphSessionFactory which it can use to create a new session. Behind the
scenes, this will acquire a kvdb.RTx that will be used for calls to the
Graph's `ForEachNodeChannel` method.
This commit is contained in:
Elle Mouton 2024-06-25 19:58:57 -07:00
parent 90d6b863a8
commit 8c0df98439
No known key found for this signature in database
GPG Key ID: D7D916376026F177
11 changed files with 288 additions and 128 deletions

View File

@ -0,0 +1,141 @@
package graphsession
import (
"fmt"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing"
"github.com/lightningnetwork/lnd/routing/route"
)
// Factory implements the routing.GraphSessionFactory and can be used to start
// a session with a ReadOnlyGraph.
type Factory struct {
graph ReadOnlyGraph
}
// NewGraphSessionFactory constructs a new Factory which can then be used to
// start a new session.
func NewGraphSessionFactory(graph ReadOnlyGraph) routing.GraphSessionFactory {
return &Factory{
graph: graph,
}
}
// NewGraphSession will produce a new Graph to use for a path-finding session.
// It returns the Graph along with a call-back that must be called once Graph
// access is complete. This call-back will close any read-only transaction that
// was created at Graph construction time.
//
// NOTE: This is part of the routing.GraphSessionFactory interface.
func (g *Factory) NewGraphSession() (routing.Graph, func() error, error) {
tx, err := g.graph.NewPathFindTx()
if err != nil {
return nil, nil, err
}
session := &session{
graph: g.graph,
tx: tx,
}
return session, session.close, nil
}
// A compile-time check to ensure that Factory implements the
// routing.GraphSessionFactory interface.
var _ routing.GraphSessionFactory = (*Factory)(nil)
// session is an implementation of the routing.Graph interface where the same
// read-only transaction is held across calls to the graph and can be used to
// access the backing channel graph.
type session struct {
graph graph
tx kvdb.RTx
}
// NewRoutingGraph constructs a session that which does not first start a
// read-only transaction and so each call on the routing.Graph will create a
// new transaction.
func NewRoutingGraph(graph ReadOnlyGraph) routing.Graph {
return &session{
graph: graph,
}
}
// close closes the read-only transaction being used to access the backing
// graph. If no transaction was started then this is a no-op.
func (g *session) close() error {
if g.tx == nil {
return nil
}
err := g.tx.Rollback()
if err != nil {
return fmt.Errorf("error closing db tx: %w", err)
}
return nil
}
// ForEachNodeChannel calls the callback for every channel of the given node.
//
// NOTE: Part of the routing.Graph interface.
func (g *session) ForEachNodeChannel(nodePub route.Vertex,
cb func(channel *channeldb.DirectedChannel) error) error {
return g.graph.ForEachNodeDirectedChannel(g.tx, nodePub, cb)
}
// FetchNodeFeatures returns the features of the given node. If the node is
// unknown, assume no additional features are supported.
//
// NOTE: Part of the routing.Graph interface.
func (g *session) FetchNodeFeatures(nodePub route.Vertex) (
*lnwire.FeatureVector, error) {
return g.graph.FetchNodeFeatures(nodePub)
}
// A compile-time check to ensure that *session implements the
// routing.Graph interface.
var _ routing.Graph = (*session)(nil)
// ReadOnlyGraph is a graph extended with a call to create a new read-only
// transaction that can then be used to make further queries to the graph.
type ReadOnlyGraph interface {
// NewPathFindTx returns a new read transaction that can be used for a
// single path finding session. Will return nil if the graph cache is
// enabled.
NewPathFindTx() (kvdb.RTx, error)
graph
}
// graph describes the API necessary for a graph source to have access to on a
// database implementation, like channeldb.ChannelGraph, in order to be used by
// the Router for pathfinding.
type graph interface {
// ForEachNodeDirectedChannel iterates through all channels of a given
// node, executing the passed callback on the directed edge representing
// the channel and its incoming policy. If the callback returns an
// error, then the iteration is halted with the error propagated back
// up to the caller.
//
// Unknown policies are passed into the callback as nil values.
//
// NOTE: if a nil tx is provided, then it is expected that the
// implementation create a read only tx.
ForEachNodeDirectedChannel(tx kvdb.RTx, node route.Vertex,
cb func(channel *channeldb.DirectedChannel) error) error
// FetchNodeFeatures returns the features of a given node. If no
// features are known for the node, an empty feature vector is returned.
FetchNodeFeatures(node route.Vertex) (*lnwire.FeatureVector, error)
}
// A compile-time check to ensure that *channeldb.ChannelGraph implements the
// graph interface.
var _ graph = (*channeldb.ChannelGraph)(nil)

View File

@ -5,7 +5,6 @@ import (
"github.com/btcsuite/btcd/btcutil"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/route"
)
@ -22,58 +21,16 @@ type Graph interface {
FetchNodeFeatures(nodePub route.Vertex) (*lnwire.FeatureVector, error)
}
// CachedGraph is a Graph implementation that retrieves from the
// database.
type CachedGraph struct {
graph *channeldb.ChannelGraph
tx kvdb.RTx
}
// A compile time assertion to make sure CachedGraph implements the Graph
// interface.
var _ Graph = (*CachedGraph)(nil)
// NewCachedGraph instantiates a new db-connected routing graph. It implicitly
// instantiates a new read transaction.
func NewCachedGraph(graph *channeldb.ChannelGraph) (*CachedGraph, error) {
tx, err := graph.NewPathFindTx()
if err != nil {
return nil, err
}
return &CachedGraph{
graph: graph,
tx: tx,
}, nil
}
// Close attempts to close the underlying db transaction. This is a no-op in
// case the underlying graph uses an in-memory cache.
func (g *CachedGraph) Close() error {
if g.tx == nil {
return nil
}
return g.tx.Rollback()
}
// ForEachNodeChannel calls the callback for every channel of the given node.
//
// NOTE: Part of the Graph interface.
func (g *CachedGraph) ForEachNodeChannel(nodePub route.Vertex,
cb func(channel *channeldb.DirectedChannel) error) error {
return g.graph.ForEachNodeDirectedChannel(g.tx, nodePub, cb)
}
// FetchNodeFeatures returns the features of the given node. If the node is
// unknown, assume no additional features are supported.
//
// NOTE: Part of the Graph interface.
func (g *CachedGraph) FetchNodeFeatures(nodePub route.Vertex) (
*lnwire.FeatureVector, error) {
return g.graph.FetchNodeFeatures(nodePub)
// GraphSessionFactory can be used to produce a new Graph instance which can
// then be used for a path-finding session. Depending on the implementation,
// the Graph session will represent a DB connection where a read-lock is being
// held across calls to the backing Graph.
type GraphSessionFactory interface {
// NewGraphSession will produce a new Graph to use for a path-finding
// session. It returns the Graph along with a call-back that must be
// called once Graph access is complete. This call-back will close any
// read-only transaction that was created at Graph construction time.
NewGraphSession() (Graph, func() error, error)
}
// FetchAmountPairCapacity determines the maximal public capacity between two

View File

@ -7,6 +7,7 @@ import (
"testing"
"time"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/route"
@ -201,10 +202,7 @@ func (c *integratedRoutingContext) testPayment(maxParts uint32,
session, err := newPaymentSession(
&payment, c.graph.source.pubkey, getBandwidthHints,
func() (Graph, func(), error) {
return c.graph, func() {}, nil
},
mc, c.pathFindingCfg,
newMockGraphSessionFactory(c.graph), mc, c.pathFindingCfg,
)
if err != nil {
c.t.Fatal(err)
@ -307,3 +305,88 @@ func getNodeIndex(route *route.Route, failureSource route.Vertex) *int {
}
return nil
}
type mockGraphSessionFactory struct {
Graph
}
func newMockGraphSessionFactory(graph Graph) GraphSessionFactory {
return &mockGraphSessionFactory{Graph: graph}
}
func (m *mockGraphSessionFactory) NewGraphSession() (Graph, func() error,
error) {
return m, func() error {
return nil
}, nil
}
var _ GraphSessionFactory = (*mockGraphSessionFactory)(nil)
var _ Graph = (*mockGraphSessionFactory)(nil)
type mockGraphSessionFactoryChanDB struct {
graph *channeldb.ChannelGraph
}
func newMockGraphSessionFactoryFromChanDB(
graph *channeldb.ChannelGraph) *mockGraphSessionFactoryChanDB {
return &mockGraphSessionFactoryChanDB{
graph: graph,
}
}
func (g *mockGraphSessionFactoryChanDB) NewGraphSession() (Graph, func() error,
error) {
tx, err := g.graph.NewPathFindTx()
if err != nil {
return nil, nil, err
}
session := &mockGraphSessionChanDB{
graph: g.graph,
tx: tx,
}
return session, session.close, nil
}
var _ GraphSessionFactory = (*mockGraphSessionFactoryChanDB)(nil)
type mockGraphSessionChanDB struct {
graph *channeldb.ChannelGraph
tx kvdb.RTx
}
func newMockGraphSessionChanDB(graph *channeldb.ChannelGraph) Graph {
return &mockGraphSessionChanDB{
graph: graph,
}
}
func (g *mockGraphSessionChanDB) close() error {
if g.tx == nil {
return nil
}
err := g.tx.Rollback()
if err != nil {
return fmt.Errorf("error closing db tx: %w", err)
}
return nil
}
func (g *mockGraphSessionChanDB) ForEachNodeChannel(nodePub route.Vertex,
cb func(channel *channeldb.DirectedChannel) error) error {
return g.graph.ForEachNodeDirectedChannel(g.tx, nodePub, cb)
}
func (g *mockGraphSessionChanDB) FetchNodeFeatures(nodePub route.Vertex) (
*lnwire.FeatureVector, error) {
return g.graph.FetchNodeFeatures(nodePub)
}

View File

@ -3201,14 +3201,16 @@ func dbFindPath(graph *channeldb.ChannelGraph,
return nil, err
}
routingGraph, err := NewCachedGraph(graph)
graphSessFactory := newMockGraphSessionFactoryFromChanDB(graph)
graphSess, closeGraphSess, err := graphSessFactory.NewGraphSession()
if err != nil {
return nil, err
}
defer func() {
if err := routingGraph.Close(); err != nil {
log.Errorf("Error closing db tx: %v", err)
if err := closeGraphSess(); err != nil {
log.Errorf("Error closing graph session: %v", err)
}
}()
@ -3216,7 +3218,7 @@ func dbFindPath(graph *channeldb.ChannelGraph,
&graphParams{
additionalEdges: additionalEdges,
bandwidthHints: bandwidthHints,
graph: routingGraph,
graph: graphSess,
},
r, cfg, sourceNode.PubKeyBytes, source, target, amt, timePref,
finalHtlcExpiry,

View File

@ -175,7 +175,7 @@ type paymentSession struct {
pathFinder pathFinder
getRoutingGraph func() (Graph, func(), error)
graphSessFactory GraphSessionFactory
// pathFindingConfig defines global parameters that control the
// trade-off in path finding between fees and probability.
@ -196,9 +196,8 @@ type paymentSession struct {
// newPaymentSession instantiates a new payment session.
func newPaymentSession(p *LightningPayment, selfNode route.Vertex,
getBandwidthHints func(Graph) (bandwidthHints, error),
getRoutingGraph func() (Graph, func(), error),
missionControl MissionController, pathFindingConfig PathFindingConfig) (
*paymentSession, error) {
graphSessFactory GraphSessionFactory, missionControl MissionController,
pathFindingConfig PathFindingConfig) (*paymentSession, error) {
edges, err := RouteHintsToEdges(p.RouteHints, p.Target)
if err != nil {
@ -213,7 +212,7 @@ func newPaymentSession(p *LightningPayment, selfNode route.Vertex,
getBandwidthHints: getBandwidthHints,
payment: p,
pathFinder: findPath,
getRoutingGraph: getRoutingGraph,
graphSessFactory: graphSessFactory,
pathFindingConfig: pathFindingConfig,
missionControl: missionControl,
minShardAmt: DefaultShardMinAmt,
@ -280,8 +279,8 @@ func (p *paymentSession) RequestRoute(maxAmt, feeLimit lnwire.MilliSatoshi,
}
for {
// Get a routing graph.
routingGraph, cleanup, err := p.getRoutingGraph()
// Get a routing graph session.
graph, closeGraph, err := p.graphSessFactory.NewGraphSession()
if err != nil {
return nil, err
}
@ -292,7 +291,7 @@ func (p *paymentSession) RequestRoute(maxAmt, feeLimit lnwire.MilliSatoshi,
// don't have enough bandwidth to carry the payment. New
// bandwidth hints are queried for every new path finding
// attempt, because concurrent payments may change balances.
bandwidthHints, err := p.getBandwidthHints(routingGraph)
bandwidthHints, err := p.getBandwidthHints(graph)
if err != nil {
return nil, err
}
@ -304,15 +303,17 @@ func (p *paymentSession) RequestRoute(maxAmt, feeLimit lnwire.MilliSatoshi,
&graphParams{
additionalEdges: p.additionalEdges,
bandwidthHints: bandwidthHints,
graph: routingGraph,
graph: graph,
},
restrictions, &p.pathFindingConfig,
p.selfNode, p.selfNode, p.payment.Target,
maxAmt, p.payment.TimePref, finalHtlcExpiry,
)
// Close routing graph.
cleanup()
// Close routing graph session.
if err := closeGraph(); err != nil {
log.Errorf("could not close graph session: %v", err)
}
switch {
case err == errNoPathFound:

View File

@ -16,9 +16,10 @@ var _ PaymentSessionSource = (*SessionSource)(nil)
// SessionSource defines a source for the router to retrieve new payment
// sessions.
type SessionSource struct {
// Graph is the channel graph that will be used to gather metrics from
// and also to carry out path finding queries.
Graph *channeldb.ChannelGraph
// GraphSessionFactory can be used to gain access to a Graph session.
// If the backing DB allows it, this will mean that a read transaction
// is being held during the use of the session.
GraphSessionFactory GraphSessionFactory
// SourceNode is the graph's source node.
SourceNode *channeldb.LightningNode
@ -44,21 +45,6 @@ type SessionSource struct {
PathFindingConfig PathFindingConfig
}
// getRoutingGraph returns a routing graph and a clean-up function for
// pathfinding.
func (m *SessionSource) getRoutingGraph() (Graph, func(), error) {
routingTx, err := NewCachedGraph(m.Graph)
if err != nil {
return nil, nil, err
}
return routingTx, func() {
err := routingTx.Close()
if err != nil {
log.Errorf("Error closing db tx: %v", err)
}
}, nil
}
// NewPaymentSession creates a new payment session backed by the latest prune
// view from Mission Control. An optional set of routing hints can be provided
// in order to populate additional edges to explore when finding a path to the
@ -74,7 +60,7 @@ func (m *SessionSource) NewPaymentSession(p *LightningPayment) (
session, err := newPaymentSession(
p, m.SourceNode.PubKeyBytes, getBandwidthHints,
m.getRoutingGraph, m.MissionControl, m.PathFindingConfig,
m.GraphSessionFactory, m.MissionControl, m.PathFindingConfig,
)
if err != nil {
return nil, err

View File

@ -119,9 +119,7 @@ func TestUpdateAdditionalEdge(t *testing.T) {
func(Graph) (bandwidthHints, error) {
return &mockBandwidthHints{}, nil
},
func() (Graph, func(), error) {
return &sessionGraph{}, func() {}, nil
},
newMockGraphSessionFactory(&sessionGraph{}),
&MissionControl{},
PathFindingConfig{},
)
@ -199,9 +197,7 @@ func TestRequestRoute(t *testing.T) {
func(Graph) (bandwidthHints, error) {
return &mockBandwidthHints{}, nil
},
func() (Graph, func(), error) {
return &sessionGraph{}, func() {}, nil
},
newMockGraphSessionFactory(&sessionGraph{}),
&MissionControl{},
PathFindingConfig{},
)

View File

@ -319,6 +319,9 @@ type ChannelPolicy struct {
// the configuration MUST be non-nil for the ChannelRouter to carry out its
// duties.
type Config struct {
// RoutingGraph is a graph source that will be used for pathfinding.
RoutingGraph Graph
// Graph is the channel graph that the ChannelRouter will use to gather
// metrics from and also to carry out path finding queries.
// TODO(roasbeef): make into an interface
@ -453,10 +456,6 @@ type ChannelRouter struct {
// when doing any path finding.
selfNode *channeldb.LightningNode
// cachedGraph is an instance of Graph that caches the source
// node as well as the channel graph itself in memory.
cachedGraph Graph
// newBlocks is a channel in which new blocks connected to the end of
// the main chain are sent over, and blocks updated after a call to
// UpdateFilter.
@ -515,10 +514,7 @@ func New(cfg Config) (*ChannelRouter, error) {
}
r := &ChannelRouter{
cfg: &cfg,
cachedGraph: &CachedGraph{
graph: cfg.Graph,
},
cfg: &cfg,
networkUpdates: make(chan *routingMsg),
topologyClients: &lnutils.SyncMap[uint64, *topologyClient]{},
ntfnClientUpdates: make(chan *topologyClientUpdate),
@ -2118,7 +2114,7 @@ func (r *ChannelRouter) FindRoute(req *RouteRequest) (*route.Route, float64,
// We'll attempt to obtain a set of bandwidth hints that can help us
// eliminate certain routes early on in the path finding process.
bandwidthHints, err := newBandwidthManager(
r.cachedGraph, r.selfNode.PubKeyBytes, r.cfg.GetLink,
r.cfg.RoutingGraph, r.selfNode.PubKeyBytes, r.cfg.GetLink,
)
if err != nil {
return nil, 0, err
@ -2145,7 +2141,7 @@ func (r *ChannelRouter) FindRoute(req *RouteRequest) (*route.Route, float64,
&graphParams{
additionalEdges: req.RouteHints,
bandwidthHints: bandwidthHints,
graph: r.cachedGraph,
graph: r.cfg.RoutingGraph,
},
req.Restrictions, &r.cfg.PathFindingConfig,
r.selfNode.PubKeyBytes, req.Source, req.Target, req.Amount,
@ -3131,7 +3127,7 @@ func (r *ChannelRouter) BuildRoute(amt *lnwire.MilliSatoshi,
// We'll attempt to obtain a set of bandwidth hints that helps us select
// the best outgoing channel to use in case no outgoing channel is set.
bandwidthHints, err := newBandwidthManager(
r.cachedGraph, r.selfNode.PubKeyBytes, r.cfg.GetLink,
r.cfg.RoutingGraph, r.selfNode.PubKeyBytes, r.cfg.GetLink,
)
if err != nil {
return nil, err
@ -3147,7 +3143,7 @@ func (r *ChannelRouter) BuildRoute(amt *lnwire.MilliSatoshi,
sourceNode := r.selfNode.PubKeyBytes
unifiers, senderAmt, err := getRouteUnifiers(
sourceNode, hops, useMinAmt, runningAmt, outgoingChans,
r.cachedGraph, bandwidthHints,
r.cfg.RoutingGraph, bandwidthHints,
)
if err != nil {
return nil, err

View File

@ -77,6 +77,7 @@ func (c *testCtx) RestartRouter(t *testing.T) {
// With the chainView reset, we'll now re-create the router itself, and
// start it.
router, err := New(Config{
RoutingGraph: newMockGraphSessionChanDB(c.graph),
Graph: c.graph,
Chain: c.chain,
ChainView: c.chainView,
@ -140,7 +141,9 @@ func createTestCtxFromGraphInstanceAssumeValid(t *testing.T,
sourceNode, err := graphInstance.graph.SourceNode()
require.NoError(t, err)
sessionSource := &SessionSource{
Graph: graphInstance.graph,
GraphSessionFactory: newMockGraphSessionFactoryFromChanDB(
graphInstance.graph,
),
SourceNode: sourceNode,
GetLink: graphInstance.getLink,
PathFindingConfig: pathFindingConfig,
@ -154,6 +157,7 @@ func createTestCtxFromGraphInstanceAssumeValid(t *testing.T,
}
router, err := New(Config{
RoutingGraph: newMockGraphSessionChanDB(graphInstance.graph),
Graph: graphInstance.graph,
Chain: chain,
ChainView: chainView,
@ -1763,6 +1767,7 @@ func TestWakeUpOnStaleBranch(t *testing.T) {
// Create new router with same graph database.
router, err := New(Config{
RoutingGraph: newMockGraphSessionChanDB(ctx.graph),
Graph: ctx.graph,
Chain: ctx.chain,
ChainView: ctx.chainView,

View File

@ -41,6 +41,7 @@ import (
"github.com/lightningnetwork/lnd/chanbackup"
"github.com/lightningnetwork/lnd/chanfitness"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channeldb/graphsession"
"github.com/lightningnetwork/lnd/channeldb/models"
"github.com/lightningnetwork/lnd/channelnotifier"
"github.com/lightningnetwork/lnd/contractcourt"
@ -691,22 +692,9 @@ func (r *rpcServer) addDeps(s *server, macService *macaroons.Service,
FetchAmountPairCapacity: func(nodeFrom, nodeTo route.Vertex,
amount lnwire.MilliSatoshi) (btcutil.Amount, error) {
routingGraph, err := routing.NewCachedGraph(graph)
if err != nil {
return 0, err
}
defer func() {
closeErr := routingGraph.Close()
if closeErr != nil {
rpcsLog.Errorf("not able to close "+
"routing graph tx: %v",
closeErr)
}
}()
return routing.FetchAmountPairCapacity(
routingGraph, selfNode.PubKeyBytes, nodeFrom,
nodeTo, amount,
graphsession.NewRoutingGraph(graph),
selfNode.PubKeyBytes, nodeFrom, nodeTo, amount,
)
},
FetchChannelEndpoints: func(chanID uint64) (route.Vertex,

View File

@ -32,6 +32,7 @@ import (
"github.com/lightningnetwork/lnd/chanbackup"
"github.com/lightningnetwork/lnd/chanfitness"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channeldb/graphsession"
"github.com/lightningnetwork/lnd/channeldb/models"
"github.com/lightningnetwork/lnd/channelnotifier"
"github.com/lightningnetwork/lnd/clock"
@ -956,7 +957,9 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
return nil, fmt.Errorf("error getting source node: %w", err)
}
paymentSessionSource := &routing.SessionSource{
Graph: chanGraph,
GraphSessionFactory: graphsession.NewGraphSessionFactory(
chanGraph,
),
SourceNode: sourceNode,
MissionControl: s.missionControl,
GetLink: s.htlcSwitch.GetLinkByShortID,
@ -967,9 +970,11 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
s.controlTower = routing.NewControlTower(paymentControl)
strictPruning := (cfg.Bitcoin.Node == "neutrino" ||
cfg.Routing.StrictZombiePruning)
strictPruning := cfg.Bitcoin.Node == "neutrino" ||
cfg.Routing.StrictZombiePruning
s.chanRouter, err = routing.New(routing.Config{
RoutingGraph: graphsession.NewRoutingGraph(chanGraph),
Graph: chanGraph,
Chain: cc.ChainIO,
ChainView: cc.ChainView,