multi: further decouple graph

To further separate the channel graph from the channel state, we
refactor the AddrsForNode method to use the graphs's public methods
instead of directly accessing any buckets. This makes sure that we can
have the channel state cached with just its buckets while not using a
kvdb level cache for the graph.
At the same time we refactor the graph's test to also be less dependent
upon the channel state DB.
This commit is contained in:
Oliver Gugger 2021-09-21 19:18:18 +02:00
parent 11cf4216e4
commit d6fa912188
No known key found for this signature in database
GPG Key ID: 8E4256593F177720
10 changed files with 254 additions and 214 deletions

View File

@ -148,7 +148,7 @@ func (d *databaseChannelGraph) addRandChannel(node1, node2 *btcec.PublicKey,
return nil, err
}
dbNode, err := d.db.FetchLightningNode(nil, vertex)
dbNode, err := d.db.FetchLightningNode(vertex)
switch {
case err == channeldb.ErrGraphNodeNotFound:
fallthrough

View File

@ -23,6 +23,7 @@ import (
"github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/route"
)
const (
@ -286,10 +287,14 @@ func CreateWithBackend(backend kvdb.Backend, modifiers ...OptionModifier) (*DB,
// Set the parent pointer (only used in tests).
chanDB.channelStateDB.parent = chanDB
chanDB.graph = newChannelGraph(
var err error
chanDB.graph, err = NewChannelGraph(
backend, opts.RejectCacheSize, opts.ChannelCacheSize,
opts.BatchCommitInterval,
)
if err != nil {
return nil, err
}
// Synchronize the version of database and apply migrations if needed.
if err := chanDB.syncVersions(dbVersions); err != nil {
@ -305,7 +310,7 @@ func (d *DB) Path() string {
return d.dbPath
}
var topLevelBuckets = [][]byte{
var dbTopLevelBuckets = [][]byte{
openChannelBucket,
closedChannelBucket,
forwardingLogBucket,
@ -316,10 +321,6 @@ var topLevelBuckets = [][]byte{
paymentsIndexBucket,
peersBucket,
nodeInfoBucket,
nodeBucket,
edgeBucket,
edgeIndexBucket,
graphMetaBucket,
metaBucket,
closeSummaryBucket,
outpointBucket,
@ -330,7 +331,7 @@ var topLevelBuckets = [][]byte{
// operation is fully atomic.
func (d *DB) Wipe() error {
err := kvdb.Update(d, func(tx kvdb.RwTx) error {
for _, tlb := range topLevelBuckets {
for _, tlb := range dbTopLevelBuckets {
err := tx.DeleteTopLevelBucket(tlb)
if err != nil && err != kvdb.ErrBucketNotFound {
return err
@ -358,42 +359,12 @@ func initChannelDB(db kvdb.Backend) error {
return nil
}
for _, tlb := range topLevelBuckets {
for _, tlb := range dbTopLevelBuckets {
if _, err := tx.CreateTopLevelBucket(tlb); err != nil {
return err
}
}
nodes := tx.ReadWriteBucket(nodeBucket)
_, err = nodes.CreateBucket(aliasIndexBucket)
if err != nil {
return err
}
_, err = nodes.CreateBucket(nodeUpdateIndexBucket)
if err != nil {
return err
}
edges := tx.ReadWriteBucket(edgeBucket)
if _, err := edges.CreateBucket(edgeIndexBucket); err != nil {
return err
}
if _, err := edges.CreateBucket(edgeUpdateIndexBucket); err != nil {
return err
}
if _, err := edges.CreateBucket(channelPointBucket); err != nil {
return err
}
if _, err := edges.CreateBucket(zombieBucket); err != nil {
return err
}
graphMeta := tx.ReadWriteBucket(graphMetaBucket)
_, err = graphMeta.CreateBucket(pruneLogBucket)
if err != nil {
return err
}
meta.DbVersionNumber = getLatestDBVersion(dbVersions)
return putMeta(meta, tx)
}, func() {})
@ -1157,30 +1128,21 @@ func (d *DB) AddrsForNode(nodePub *btcec.PublicKey) ([]net.Addr,
return nil, err
}
var graphNode LightningNode
err = kvdb.View(d, func(tx kvdb.RTx) error {
// We'll also query the graph for this peer to see if they have
// any addresses that we don't currently have stored within the
// link node database.
nodes := tx.ReadBucket(nodeBucket)
if nodes == nil {
return ErrGraphNotFound
}
compressedPubKey := nodePub.SerializeCompressed()
graphNode, err = fetchLightningNode(nodes, compressedPubKey)
if err != nil && err != ErrGraphNodeNotFound {
// If the node isn't found, then that's OK, as we still
// have the link node data.
return err
}
return nil
}, func() {
linkNode = nil
})
// We'll also query the graph for this peer to see if they have any
// addresses that we don't currently have stored within the link node
// database.
pubKey, err := route.NewVertexFromBytes(nodePub.SerializeCompressed())
if err != nil {
return nil, err
}
graphNode, err := d.graph.FetchLightningNode(pubKey)
if err != nil && err != ErrGraphNodeNotFound {
return nil, err
} else if err == ErrGraphNodeNotFound {
// If the node isn't found, then that's OK, as we still have the
// link node data. But any other error needs to be returned.
graphNode = &LightningNode{}
}
// Now that we have both sources of addrs for this node, we'll use a
// map to de-duplicate any addresses between the two sources, and

View File

@ -184,10 +184,14 @@ type ChannelGraph struct {
nodeScheduler batch.Scheduler
}
// newChannelGraph allocates a new ChannelGraph backed by a DB instance. The
// NewChannelGraph allocates a new ChannelGraph backed by a DB instance. The
// returned instance has its own unique reject cache and channel cache.
func newChannelGraph(db kvdb.Backend, rejectCacheSize, chanCacheSize int,
batchCommitInterval time.Duration) *ChannelGraph {
func NewChannelGraph(db kvdb.Backend, rejectCacheSize, chanCacheSize int,
batchCommitInterval time.Duration) (*ChannelGraph, error) {
if err := initChannelGraph(db); err != nil {
return nil, err
}
g := &ChannelGraph{
db: db,
@ -201,7 +205,85 @@ func newChannelGraph(db kvdb.Backend, rejectCacheSize, chanCacheSize int,
db, nil, batchCommitInterval,
)
return g
return g, nil
}
var graphTopLevelBuckets = [][]byte{
nodeBucket,
edgeBucket,
edgeIndexBucket,
graphMetaBucket,
}
// Wipe completely deletes all saved state within all used buckets within the
// database. The deletion is done in a single transaction, therefore this
// operation is fully atomic.
func (c *ChannelGraph) Wipe() error {
err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
for _, tlb := range graphTopLevelBuckets {
err := tx.DeleteTopLevelBucket(tlb)
if err != nil && err != kvdb.ErrBucketNotFound {
return err
}
}
return nil
}, func() {})
if err != nil {
return err
}
return initChannelGraph(c.db)
}
// createChannelDB creates and initializes a fresh version of channeldb. In
// the case that the target path has not yet been created or doesn't yet exist,
// then the path is created. Additionally, all required top-level buckets used
// within the database are created.
func initChannelGraph(db kvdb.Backend) error {
err := kvdb.Update(db, func(tx kvdb.RwTx) error {
for _, tlb := range graphTopLevelBuckets {
if _, err := tx.CreateTopLevelBucket(tlb); err != nil {
return err
}
}
nodes := tx.ReadWriteBucket(nodeBucket)
_, err := nodes.CreateBucketIfNotExists(aliasIndexBucket)
if err != nil {
return err
}
_, err = nodes.CreateBucketIfNotExists(nodeUpdateIndexBucket)
if err != nil {
return err
}
edges := tx.ReadWriteBucket(edgeBucket)
_, err = edges.CreateBucketIfNotExists(edgeIndexBucket)
if err != nil {
return err
}
_, err = edges.CreateBucketIfNotExists(edgeUpdateIndexBucket)
if err != nil {
return err
}
_, err = edges.CreateBucketIfNotExists(channelPointBucket)
if err != nil {
return err
}
_, err = edges.CreateBucketIfNotExists(zombieBucket)
if err != nil {
return err
}
graphMeta := tx.ReadWriteBucket(graphMetaBucket)
_, err = graphMeta.CreateBucketIfNotExists(pruneLogBucket)
return err
}, func() {})
if err != nil {
return fmt.Errorf("unable to create new channel graph: %v", err)
}
return nil
}
// Database returns a pointer to the underlying database.
@ -218,7 +300,9 @@ func (c *ChannelGraph) Database() kvdb.Backend {
// NOTE: If an edge can't be found, or wasn't advertised, then a nil pointer
// for that particular channel edge routing policy will be passed into the
// callback.
func (c *ChannelGraph) ForEachChannel(cb func(*ChannelEdgeInfo, *ChannelEdgePolicy, *ChannelEdgePolicy) error) error {
func (c *ChannelGraph) ForEachChannel(cb func(*ChannelEdgeInfo,
*ChannelEdgePolicy, *ChannelEdgePolicy) error) error {
// TODO(roasbeef): ptr map to reduce # of allocs? no duplicates
return kvdb.View(c.db, func(tx kvdb.RTx) error {
@ -2356,17 +2440,11 @@ func (l *LightningNode) isPublic(tx kvdb.RTx, sourcePubKey []byte) (bool, error)
// FetchLightningNode attempts to look up a target node by its identity public
// key. If the node isn't found in the database, then ErrGraphNodeNotFound is
// returned.
//
// If the caller wishes to re-use an existing boltdb transaction, then it
// should be passed as the first argument. Otherwise the first argument should
// be nil and a fresh transaction will be created to execute the graph
// traversal.
func (c *ChannelGraph) FetchLightningNode(tx kvdb.RTx, nodePub route.Vertex) (
func (c *ChannelGraph) FetchLightningNode(nodePub route.Vertex) (
*LightningNode, error) {
var node *LightningNode
fetchNode := func(tx kvdb.RTx) error {
err := kvdb.View(c.db, func(tx kvdb.RTx) error {
// First grab the nodes bucket which stores the mapping from
// pubKey to node information.
nodes := tx.ReadBucket(nodeBucket)
@ -2393,14 +2471,9 @@ func (c *ChannelGraph) FetchLightningNode(tx kvdb.RTx, nodePub route.Vertex) (
node = &n
return nil
}
var err error
if tx == nil {
err = kvdb.View(c.db, fetchNode, func() {})
} else {
err = fetchNode(tx)
}
}, func() {
node = nil
})
if err != nil {
return nil, err
}

View File

@ -6,10 +6,12 @@ import (
"errors"
"fmt"
"image/color"
"io/ioutil"
"math"
"math/big"
prand "math/rand"
"net"
"os"
"reflect"
"runtime"
"sync"
@ -45,6 +47,48 @@ var (
testPub = route.Vertex{2, 202, 4}
)
// MakeTestGraph creates a new instance of the ChannelGraph for testing
// purposes. A callback which cleans up the created temporary directories is
// also returned and intended to be executed after the test completes.
func MakeTestGraph(modifiers ...OptionModifier) (*ChannelGraph, func(), error) {
// First, create a temporary directory to be used for the duration of
// this test.
tempDirName, err := ioutil.TempDir("", "channelgraph")
if err != nil {
return nil, nil, err
}
opts := DefaultOptions()
for _, modifier := range modifiers {
modifier(&opts)
}
// Next, create channelgraph for the first time.
backend, backendCleanup, err := kvdb.GetTestBackend(tempDirName, "cgr")
if err != nil {
backendCleanup()
return nil, nil, err
}
graph, err := NewChannelGraph(
backend, opts.RejectCacheSize, opts.ChannelCacheSize,
opts.BatchCommitInterval,
)
if err != nil {
backendCleanup()
_ = os.RemoveAll(tempDirName)
return nil, nil, err
}
cleanUp := func() {
_ = backend.Close()
backendCleanup()
_ = os.RemoveAll(tempDirName)
}
return graph, cleanUp, nil
}
func createLightningNode(db kvdb.Backend, priv *btcec.PrivateKey) (*LightningNode, error) {
updateTime := prand.Int63()
@ -76,14 +120,12 @@ func createTestVertex(db kvdb.Backend) (*LightningNode, error) {
func TestNodeInsertionAndDeletion(t *testing.T) {
t.Parallel()
db, cleanUp, err := MakeTestDB()
graph, cleanUp, err := MakeTestGraph()
defer cleanUp()
if err != nil {
t.Fatalf("unable to make test database: %v", err)
}
graph := db.ChannelGraph()
// We'd like to test basic insertion/deletion for vertexes from the
// graph, so we'll create a test vertex to start with.
node := &LightningNode{
@ -107,7 +149,7 @@ func TestNodeInsertionAndDeletion(t *testing.T) {
// Next, fetch the node from the database to ensure everything was
// serialized properly.
dbNode, err := graph.FetchLightningNode(nil, testPub)
dbNode, err := graph.FetchLightningNode(testPub)
if err != nil {
t.Fatalf("unable to locate node: %v", err)
}
@ -131,7 +173,7 @@ func TestNodeInsertionAndDeletion(t *testing.T) {
// Finally, attempt to fetch the node again. This should fail as the
// node should have been deleted from the database.
_, err = graph.FetchLightningNode(nil, testPub)
_, err = graph.FetchLightningNode(testPub)
if err != ErrGraphNodeNotFound {
t.Fatalf("fetch after delete should fail!")
}
@ -142,14 +184,12 @@ func TestNodeInsertionAndDeletion(t *testing.T) {
func TestPartialNode(t *testing.T) {
t.Parallel()
db, cleanUp, err := MakeTestDB()
graph, cleanUp, err := MakeTestGraph()
defer cleanUp()
if err != nil {
t.Fatalf("unable to make test database: %v", err)
}
graph := db.ChannelGraph()
// We want to be able to insert nodes into the graph that only has the
// PubKey set.
node := &LightningNode{
@ -163,7 +203,7 @@ func TestPartialNode(t *testing.T) {
// Next, fetch the node from the database to ensure everything was
// serialized properly.
dbNode, err := graph.FetchLightningNode(nil, testPub)
dbNode, err := graph.FetchLightningNode(testPub)
if err != nil {
t.Fatalf("unable to locate node: %v", err)
}
@ -195,7 +235,7 @@ func TestPartialNode(t *testing.T) {
// Finally, attempt to fetch the node again. This should fail as the
// node should have been deleted from the database.
_, err = graph.FetchLightningNode(nil, testPub)
_, err = graph.FetchLightningNode(testPub)
if err != ErrGraphNodeNotFound {
t.Fatalf("fetch after delete should fail!")
}
@ -204,14 +244,12 @@ func TestPartialNode(t *testing.T) {
func TestAliasLookup(t *testing.T) {
t.Parallel()
db, cleanUp, err := MakeTestDB()
graph, cleanUp, err := MakeTestGraph()
defer cleanUp()
if err != nil {
t.Fatalf("unable to make test database: %v", err)
}
graph := db.ChannelGraph()
// We'd like to test the alias index within the database, so first
// create a new test node.
testNode, err := createTestVertex(graph.db)
@ -258,13 +296,11 @@ func TestAliasLookup(t *testing.T) {
func TestSourceNode(t *testing.T) {
t.Parallel()
db, cleanUp, err := MakeTestDB()
defer cleanUp()
graph, cleanUp, err := MakeTestGraph()
if err != nil {
t.Fatalf("unable to make test database: %v", err)
}
graph := db.ChannelGraph()
defer cleanUp()
// We'd like to test the setting/getting of the source node, so we
// first create a fake node to use within the test.
@ -299,14 +335,12 @@ func TestSourceNode(t *testing.T) {
func TestEdgeInsertionDeletion(t *testing.T) {
t.Parallel()
db, cleanUp, err := MakeTestDB()
graph, cleanUp, err := MakeTestGraph()
defer cleanUp()
if err != nil {
t.Fatalf("unable to make test database: %v", err)
}
graph := db.ChannelGraph()
// We'd like to test the insertion/deletion of edges, so we create two
// vertexes to connect.
node1, err := createTestVertex(graph.db)
@ -434,13 +468,12 @@ func createEdge(height, txIndex uint32, txPosition uint16, outPointIndex uint32,
func TestDisconnectBlockAtHeight(t *testing.T) {
t.Parallel()
db, cleanUp, err := MakeTestDB()
graph, cleanUp, err := MakeTestGraph()
defer cleanUp()
if err != nil {
t.Fatalf("unable to make test database: %v", err)
}
graph := db.ChannelGraph()
sourceNode, err := createTestVertex(graph.db)
if err != nil {
t.Fatalf("unable to create source node: %v", err)
@ -721,14 +754,12 @@ func createChannelEdge(db kvdb.Backend, node1, node2 *LightningNode) (*ChannelEd
func TestEdgeInfoUpdates(t *testing.T) {
t.Parallel()
db, cleanUp, err := MakeTestDB()
graph, cleanUp, err := MakeTestGraph()
defer cleanUp()
if err != nil {
t.Fatalf("unable to make test database: %v", err)
}
graph := db.ChannelGraph()
// We'd like to test the update of edges inserted into the database, so
// we create two vertexes to connect.
node1, err := createTestVertex(graph.db)
@ -851,14 +882,12 @@ func newEdgePolicy(chanID uint64, db kvdb.Backend,
func TestGraphTraversal(t *testing.T) {
t.Parallel()
db, cleanUp, err := MakeTestDB()
graph, cleanUp, err := MakeTestGraph()
defer cleanUp()
if err != nil {
t.Fatalf("unable to make test database: %v", err)
}
graph := db.ChannelGraph()
// We'd like to test some of the graph traversal capabilities within
// the DB, so we'll create a series of fake nodes to insert into the
// graph.
@ -1112,13 +1141,12 @@ func assertChanViewEqualChanPoints(t *testing.T, a []EdgePoint, b []*wire.OutPoi
func TestGraphPruning(t *testing.T) {
t.Parallel()
db, cleanUp, err := MakeTestDB()
graph, cleanUp, err := MakeTestGraph()
defer cleanUp()
if err != nil {
t.Fatalf("unable to make test database: %v", err)
}
graph := db.ChannelGraph()
sourceNode, err := createTestVertex(graph.db)
if err != nil {
t.Fatalf("unable to create source node: %v", err)
@ -1320,14 +1348,12 @@ func TestGraphPruning(t *testing.T) {
func TestHighestChanID(t *testing.T) {
t.Parallel()
db, cleanUp, err := MakeTestDB()
graph, cleanUp, err := MakeTestGraph()
defer cleanUp()
if err != nil {
t.Fatalf("unable to make test database: %v", err)
}
graph := db.ChannelGraph()
// If we don't yet have any channels in the database, then we should
// get a channel ID of zero if we ask for the highest channel ID.
bestID, err := graph.HighestChanID()
@ -1397,14 +1423,12 @@ func TestHighestChanID(t *testing.T) {
func TestChanUpdatesInHorizon(t *testing.T) {
t.Parallel()
db, cleanUp, err := MakeTestDB()
graph, cleanUp, err := MakeTestGraph()
defer cleanUp()
if err != nil {
t.Fatalf("unable to make test database: %v", err)
}
graph := db.ChannelGraph()
// If we issue an arbitrary query before any channel updates are
// inserted in the database, we should get zero results.
chanUpdates, err := graph.ChanUpdatesInHorizon(
@ -1567,14 +1591,12 @@ func TestChanUpdatesInHorizon(t *testing.T) {
func TestNodeUpdatesInHorizon(t *testing.T) {
t.Parallel()
db, cleanUp, err := MakeTestDB()
graph, cleanUp, err := MakeTestGraph()
defer cleanUp()
if err != nil {
t.Fatalf("unable to make test database: %v", err)
}
graph := db.ChannelGraph()
startTime := time.Unix(1234, 0)
endTime := startTime
@ -1690,14 +1712,12 @@ func TestNodeUpdatesInHorizon(t *testing.T) {
func TestFilterKnownChanIDs(t *testing.T) {
t.Parallel()
db, cleanUp, err := MakeTestDB()
graph, cleanUp, err := MakeTestGraph()
defer cleanUp()
if err != nil {
t.Fatalf("unable to make test database: %v", err)
}
graph := db.ChannelGraph()
// If we try to filter out a set of channel ID's before we even know of
// any channels, then we should get the entire set back.
preChanIDs := []uint64{1, 2, 3, 4}
@ -1807,14 +1827,12 @@ func TestFilterKnownChanIDs(t *testing.T) {
func TestFilterChannelRange(t *testing.T) {
t.Parallel()
db, cleanUp, err := MakeTestDB()
graph, cleanUp, err := MakeTestGraph()
defer cleanUp()
if err != nil {
t.Fatalf("unable to make test database: %v", err)
}
graph := db.ChannelGraph()
// We'll first populate our graph with two nodes. All channels created
// below will be made between these two nodes.
node1, err := createTestVertex(graph.db)
@ -1941,14 +1959,12 @@ func TestFilterChannelRange(t *testing.T) {
func TestFetchChanInfos(t *testing.T) {
t.Parallel()
db, cleanUp, err := MakeTestDB()
graph, cleanUp, err := MakeTestGraph()
defer cleanUp()
if err != nil {
t.Fatalf("unable to make test database: %v", err)
}
graph := db.ChannelGraph()
// We'll first populate our graph with two nodes. All channels created
// below will be made between these two nodes.
node1, err := createTestVertex(graph.db)
@ -2063,14 +2079,12 @@ func TestFetchChanInfos(t *testing.T) {
func TestIncompleteChannelPolicies(t *testing.T) {
t.Parallel()
db, cleanUp, err := MakeTestDB()
graph, cleanUp, err := MakeTestGraph()
defer cleanUp()
if err != nil {
t.Fatalf("unable to make test database: %v", err)
}
graph := db.ChannelGraph()
// Create two nodes.
node1, err := createTestVertex(graph.db)
if err != nil {
@ -2171,13 +2185,12 @@ func TestIncompleteChannelPolicies(t *testing.T) {
func TestChannelEdgePruningUpdateIndexDeletion(t *testing.T) {
t.Parallel()
db, cleanUp, err := MakeTestDB()
graph, cleanUp, err := MakeTestGraph()
defer cleanUp()
if err != nil {
t.Fatalf("unable to make test database: %v", err)
}
graph := db.ChannelGraph()
sourceNode, err := createTestVertex(graph.db)
if err != nil {
t.Fatalf("unable to create source node: %v", err)
@ -2326,7 +2339,7 @@ func TestChannelEdgePruningUpdateIndexDeletion(t *testing.T) {
func TestPruneGraphNodes(t *testing.T) {
t.Parallel()
db, cleanUp, err := MakeTestDB()
graph, cleanUp, err := MakeTestGraph()
defer cleanUp()
if err != nil {
t.Fatalf("unable to make test database: %v", err)
@ -2334,7 +2347,6 @@ func TestPruneGraphNodes(t *testing.T) {
// We'll start off by inserting our source node, to ensure that it's
// the only node left after we prune the graph.
graph := db.ChannelGraph()
sourceNode, err := createTestVertex(graph.db)
if err != nil {
t.Fatalf("unable to create source node: %v", err)
@ -2398,7 +2410,7 @@ func TestPruneGraphNodes(t *testing.T) {
// Finally, we'll ensure that node3, the only fully unconnected node as
// properly deleted from the graph and not another node in its place.
_, err = graph.FetchLightningNode(nil, node3.PubKeyBytes)
_, err = graph.FetchLightningNode(node3.PubKeyBytes)
if err == nil {
t.Fatalf("node 3 should have been deleted!")
}
@ -2410,14 +2422,12 @@ func TestPruneGraphNodes(t *testing.T) {
func TestAddChannelEdgeShellNodes(t *testing.T) {
t.Parallel()
db, cleanUp, err := MakeTestDB()
graph, cleanUp, err := MakeTestGraph()
defer cleanUp()
if err != nil {
t.Fatalf("unable to make test database: %v", err)
}
graph := db.ChannelGraph()
// To start, we'll create two nodes, and only add one of them to the
// channel graph.
node1, err := createTestVertex(graph.db)
@ -2441,7 +2451,7 @@ func TestAddChannelEdgeShellNodes(t *testing.T) {
// Ensure that node1 was inserted as a full node, while node2 only has
// a shell node present.
node1, err = graph.FetchLightningNode(nil, node1.PubKeyBytes)
node1, err = graph.FetchLightningNode(node1.PubKeyBytes)
if err != nil {
t.Fatalf("unable to fetch node1: %v", err)
}
@ -2449,7 +2459,7 @@ func TestAddChannelEdgeShellNodes(t *testing.T) {
t.Fatalf("have shell announcement for node1, shouldn't")
}
node2, err = graph.FetchLightningNode(nil, node2.PubKeyBytes)
node2, err = graph.FetchLightningNode(node2.PubKeyBytes)
if err != nil {
t.Fatalf("unable to fetch node2: %v", err)
}
@ -2464,14 +2474,12 @@ func TestAddChannelEdgeShellNodes(t *testing.T) {
func TestNodePruningUpdateIndexDeletion(t *testing.T) {
t.Parallel()
db, cleanUp, err := MakeTestDB()
graph, cleanUp, err := MakeTestGraph()
defer cleanUp()
if err != nil {
t.Fatalf("unable to make test database: %v", err)
}
graph := db.ChannelGraph()
// We'll first populate our graph with a single node that will be
// removed shortly.
node1, err := createTestVertex(graph.db)
@ -2534,44 +2542,41 @@ func TestNodeIsPublic(t *testing.T) {
// We'll need to create a separate database and channel graph for each
// participant to replicate real-world scenarios (private edges being in
// some graphs but not others, etc.).
aliceDB, cleanUp, err := MakeTestDB()
aliceGraph, cleanUp, err := MakeTestGraph()
defer cleanUp()
if err != nil {
t.Fatalf("unable to make test database: %v", err)
}
aliceNode, err := createTestVertex(aliceDB)
aliceNode, err := createTestVertex(aliceGraph.db)
if err != nil {
t.Fatalf("unable to create test node: %v", err)
}
aliceGraph := aliceDB.ChannelGraph()
if err := aliceGraph.SetSourceNode(aliceNode); err != nil {
t.Fatalf("unable to set source node: %v", err)
}
bobDB, cleanUp, err := MakeTestDB()
bobGraph, cleanUp, err := MakeTestGraph()
defer cleanUp()
if err != nil {
t.Fatalf("unable to make test database: %v", err)
}
bobNode, err := createTestVertex(bobDB)
bobNode, err := createTestVertex(bobGraph.db)
if err != nil {
t.Fatalf("unable to create test node: %v", err)
}
bobGraph := bobDB.ChannelGraph()
if err := bobGraph.SetSourceNode(bobNode); err != nil {
t.Fatalf("unable to set source node: %v", err)
}
carolDB, cleanUp, err := MakeTestDB()
carolGraph, cleanUp, err := MakeTestGraph()
defer cleanUp()
if err != nil {
t.Fatalf("unable to make test database: %v", err)
}
carolNode, err := createTestVertex(carolDB)
carolNode, err := createTestVertex(carolGraph.db)
if err != nil {
t.Fatalf("unable to create test node: %v", err)
}
carolGraph := carolDB.ChannelGraph()
if err := carolGraph.SetSourceNode(carolNode); err != nil {
t.Fatalf("unable to set source node: %v", err)
}
@ -2683,14 +2688,12 @@ func TestNodeIsPublic(t *testing.T) {
func TestDisabledChannelIDs(t *testing.T) {
t.Parallel()
db, cleanUp, err := MakeTestDB()
graph, cleanUp, err := MakeTestGraph()
if err != nil {
t.Fatalf("unable to make test database: %v", err)
}
defer cleanUp()
graph := db.ChannelGraph()
// Create first node and add it to the graph.
node1, err := createTestVertex(graph.db)
if err != nil {
@ -2781,14 +2784,12 @@ func TestDisabledChannelIDs(t *testing.T) {
func TestEdgePolicyMissingMaxHtcl(t *testing.T) {
t.Parallel()
db, cleanUp, err := MakeTestDB()
graph, cleanUp, err := MakeTestGraph()
defer cleanUp()
if err != nil {
t.Fatalf("unable to make test database: %v", err)
}
graph := db.ChannelGraph()
// We'd like to test the update of edges inserted into the database, so
// we create two vertexes to connect.
node1, err := createTestVertex(graph.db)
@ -2961,12 +2962,11 @@ func TestGraphZombieIndex(t *testing.T) {
t.Parallel()
// We'll start by creating our test graph along with a test edge.
db, cleanUp, err := MakeTestDB()
graph, cleanUp, err := MakeTestGraph()
defer cleanUp()
if err != nil {
t.Fatalf("unable to create test database: %v", err)
}
graph := db.ChannelGraph()
node1, err := createTestVertex(graph.db)
if err != nil {
@ -3136,7 +3136,7 @@ func compareEdgePolicies(a, b *ChannelEdgePolicy) error {
return nil
}
// TestLightningNodeSigVerifcation checks that we can use the LightningNode's
// TestLightningNodeSigVerification checks that we can use the LightningNode's
// pubkey to verify signatures.
func TestLightningNodeSigVerification(t *testing.T) {
t.Parallel()
@ -3164,13 +3164,13 @@ func TestLightningNodeSigVerification(t *testing.T) {
}
// Create a LightningNode from the same private key.
db, cleanUp, err := MakeTestDB()
graph, cleanUp, err := MakeTestGraph()
if err != nil {
t.Fatalf("unable to make test database: %v", err)
}
defer cleanUp()
node, err := createLightningNode(db, priv)
node, err := createLightningNode(graph.db, priv)
if err != nil {
t.Fatalf("unable to create node: %v", err)
}
@ -3214,11 +3214,10 @@ func TestComputeFee(t *testing.T) {
func TestBatchedAddChannelEdge(t *testing.T) {
t.Parallel()
db, cleanUp, err := MakeTestDB()
graph, cleanUp, err := MakeTestGraph()
require.Nil(t, err)
defer cleanUp()
graph := db.ChannelGraph()
sourceNode, err := createTestVertex(graph.db)
require.Nil(t, err)
err = graph.SetSourceNode(sourceNode)
@ -3297,12 +3296,10 @@ func TestBatchedAddChannelEdge(t *testing.T) {
func TestBatchedUpdateEdgePolicy(t *testing.T) {
t.Parallel()
db, cleanUp, err := MakeTestDB()
graph, cleanUp, err := MakeTestGraph()
require.Nil(t, err)
defer cleanUp()
graph := db.ChannelGraph()
// We'd like to test the update of edges inserted into the database, so
// we create two vertexes to connect.
node1, err := createTestVertex(graph.db)

View File

@ -85,7 +85,7 @@ func (g *dbRoutingTx) sourceNode() route.Vertex {
func (g *dbRoutingTx) fetchNodeFeatures(nodePub route.Vertex) (
*lnwire.FeatureVector, error) {
targetNode, err := g.graph.FetchLightningNode(g.tx, nodePub)
targetNode, err := g.graph.FetchLightningNode(nodePub)
switch err {
// If the node exists and has features, return them directly.

View File

@ -23,6 +23,7 @@ import (
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/record"
"github.com/lightningnetwork/lnd/routing/route"
@ -148,26 +149,36 @@ type testChan struct {
// makeTestGraph creates a new instance of a channeldb.ChannelGraph for testing
// purposes. A callback which cleans up the created temporary directories is
// also returned and intended to be executed after the test completes.
func makeTestGraph() (*channeldb.ChannelGraph, func(), error) {
func makeTestGraph() (*channeldb.ChannelGraph, kvdb.Backend, func(), error) {
// First, create a temporary directory to be used for the duration of
// this test.
tempDirName, err := ioutil.TempDir("", "channeldb")
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
// Next, create channeldb for the first time.
cdb, err := channeldb.Open(tempDirName)
// Next, create channelgraph for the first time.
backend, backendCleanup, err := kvdb.GetTestBackend(tempDirName, "cgr")
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
cleanUp := func() {
cdb.Close()
os.RemoveAll(tempDirName)
backendCleanup()
_ = os.RemoveAll(tempDirName)
}
return cdb.ChannelGraph(), cleanUp, nil
opts := channeldb.DefaultOptions()
graph, err := channeldb.NewChannelGraph(
backend, opts.RejectCacheSize, opts.ChannelCacheSize,
opts.BatchCommitInterval,
)
if err != nil {
cleanUp()
return nil, nil, nil, err
}
return graph, backend, cleanUp, nil
}
// parseTestGraph returns a fully populated ChannelGraph given a path to a JSON
@ -197,7 +208,7 @@ func parseTestGraph(path string) (*testGraphInstance, error) {
testAddrs = append(testAddrs, testAddr)
// Next, create a temporary graph database for usage within the test.
graph, cleanUp, err := makeTestGraph()
graph, graphBackend, cleanUp, err := makeTestGraph()
if err != nil {
return nil, err
}
@ -381,11 +392,12 @@ func parseTestGraph(path string) (*testGraphInstance, error) {
}
return &testGraphInstance{
graph: graph,
cleanUp: cleanUp,
aliasMap: aliasMap,
privKeyMap: privKeyMap,
channelIDs: channelIDs,
graph: graph,
graphBackend: graphBackend,
cleanUp: cleanUp,
aliasMap: aliasMap,
privKeyMap: privKeyMap,
channelIDs: channelIDs,
}, nil
}
@ -447,8 +459,9 @@ type testChannel struct {
}
type testGraphInstance struct {
graph *channeldb.ChannelGraph
cleanUp func()
graph *channeldb.ChannelGraph
graphBackend kvdb.Backend
cleanUp func()
// aliasMap is a map from a node's alias to its public key. This type is
// provided in order to allow easily look up from the human memorable alias
@ -482,7 +495,7 @@ func createTestGraphFromChannels(testChannels []*testChannel, source string) (
testAddrs = append(testAddrs, testAddr)
// Next, create a temporary graph database for usage within the test.
graph, cleanUp, err := makeTestGraph()
graph, graphBackend, cleanUp, err := makeTestGraph()
if err != nil {
return nil, err
}
@ -671,10 +684,11 @@ func createTestGraphFromChannels(testChannels []*testChannel, source string) (
}
return &testGraphInstance{
graph: graph,
cleanUp: cleanUp,
aliasMap: aliasMap,
privKeyMap: privKeyMap,
graph: graph,
graphBackend: graphBackend,
cleanUp: cleanUp,
aliasMap: aliasMap,
privKeyMap: privKeyMap,
}, nil
}
@ -2120,7 +2134,7 @@ func TestPathFindSpecExample(t *testing.T) {
// Carol, so we set "B" as the source node so path finding starts from
// Bob.
bob := ctx.aliases["B"]
bobNode, err := ctx.graph.FetchLightningNode(nil, bob)
bobNode, err := ctx.graph.FetchLightningNode(bob)
if err != nil {
t.Fatalf("unable to find bob: %v", err)
}
@ -2170,7 +2184,7 @@ func TestPathFindSpecExample(t *testing.T) {
// Next, we'll set A as the source node so we can assert that we create
// the proper route for any queries starting with Alice.
alice := ctx.aliases["A"]
aliceNode, err := ctx.graph.FetchLightningNode(nil, alice)
aliceNode, err := ctx.graph.FetchLightningNode(alice)
if err != nil {
t.Fatalf("unable to find alice: %v", err)
}

View File

@ -2505,8 +2505,10 @@ func (r *ChannelRouter) GetChannelByID(chanID lnwire.ShortChannelID) (
// within the graph.
//
// NOTE: This method is part of the ChannelGraphSource interface.
func (r *ChannelRouter) FetchLightningNode(node route.Vertex) (*channeldb.LightningNode, error) {
return r.cfg.Graph.FetchLightningNode(nil, node)
func (r *ChannelRouter) FetchLightningNode(
node route.Vertex) (*channeldb.LightningNode, error) {
return r.cfg.Graph.FetchLightningNode(node)
}
// ForEachNode is used to iterate over every node in router topology.

View File

@ -125,8 +125,7 @@ func createTestCtxFromGraphInstanceAssumeValid(t *testing.T,
}
mc, err := NewMissionControl(
graphInstance.graph.Database(), route.Vertex{},
mcConfig,
graphInstance.graphBackend, route.Vertex{}, mcConfig,
)
require.NoError(t, err, "failed to create missioncontrol")
@ -188,7 +187,6 @@ func createTestCtxFromGraphInstanceAssumeValid(t *testing.T,
cleanUp := func() {
ctx.router.Stop()
graphInstance.cleanUp()
}
return ctx, cleanUp
@ -197,17 +195,10 @@ func createTestCtxFromGraphInstanceAssumeValid(t *testing.T,
func createTestCtxSingleNode(t *testing.T,
startingHeight uint32) (*testCtx, func()) {
var (
graph *channeldb.ChannelGraph
sourceNode *channeldb.LightningNode
cleanup func()
err error
)
graph, cleanup, err = makeTestGraph()
graph, graphBackend, cleanup, err := makeTestGraph()
require.NoError(t, err, "failed to make test graph")
sourceNode, err = createTestNode()
sourceNode, err := createTestNode()
require.NoError(t, err, "failed to create test node")
require.NoError(t,
@ -215,8 +206,9 @@ func createTestCtxSingleNode(t *testing.T,
)
graphInstance := &testGraphInstance{
graph: graph,
cleanUp: cleanup,
graph: graph,
graphBackend: graphBackend,
cleanUp: cleanup,
}
return createTestCtxFromGraphInstance(
@ -1577,7 +1569,7 @@ func TestAddEdgeUnknownVertexes(t *testing.T) {
t.Fatalf("unable to find any routes: %v", err)
}
copy1, err := ctx.graph.FetchLightningNode(nil, pub1)
copy1, err := ctx.graph.FetchLightningNode(pub1)
if err != nil {
t.Fatalf("unable to fetch node: %v", err)
}
@ -1586,7 +1578,7 @@ func TestAddEdgeUnknownVertexes(t *testing.T) {
t.Fatalf("fetched node not equal to original")
}
copy2, err := ctx.graph.FetchLightningNode(nil, pub2)
copy2, err := ctx.graph.FetchLightningNode(pub2)
if err != nil {
t.Fatalf("unable to fetch node: %v", err)
}

View File

@ -5539,7 +5539,7 @@ func (r *rpcServer) GetNodeInfo(ctx context.Context,
// With the public key decoded, attempt to fetch the node corresponding
// to this public key. If the node cannot be found, then an error will
// be returned.
node, err := graph.FetchLightningNode(nil, pubKey)
node, err := graph.FetchLightningNode(pubKey)
switch {
case err == channeldb.ErrGraphNodeNotFound:
return nil, status.Error(codes.NotFound, err.Error())

View File

@ -3921,7 +3921,7 @@ func (s *server) fetchNodeAdvertisedAddr(pub *btcec.PublicKey) (net.Addr, error)
return nil, err
}
node, err := s.graphDB.FetchLightningNode(nil, vertex)
node, err := s.graphDB.FetchLightningNode(vertex)
if err != nil {
return nil, err
}