channeldb+graphdb: init graph DB outside of channel db

We also now use the graph DB's own optional functions. An instance of
the graph is currently still passed to the channeldb's
`CreateWithBackend` function. This will be removed in a later commit
once the two have been completely disjoint.
This commit is contained in:
Elle Mouton 2024-10-22 13:54:37 +02:00
parent 74a4b1922b
commit 1859993734
No known key found for this signature in database
GPG key ID: D7D916376026F177
11 changed files with 98 additions and 203 deletions

View file

@ -351,24 +351,24 @@ type DB struct {
// to updates will take place as necessary.
// TODO(bhandras): deprecate this function.
func Open(dbPath string, modifiers ...OptionModifier) (*DB, error) {
opts := DefaultOptions()
for _, modifier := range modifiers {
modifier(&opts)
}
backend, err := kvdb.GetBoltBackend(&kvdb.BoltBackendConfig{
DBPath: dbPath,
DBFileName: dbName,
NoFreelistSync: opts.NoFreelistSync,
AutoCompact: opts.AutoCompact,
AutoCompactMinAge: opts.AutoCompactMinAge,
DBTimeout: opts.DBTimeout,
NoFreelistSync: true,
AutoCompact: false,
AutoCompactMinAge: kvdb.DefaultBoltAutoCompactMinAge,
DBTimeout: kvdb.DefaultDBTimeout,
})
if err != nil {
return nil, err
}
db, err := CreateWithBackend(backend, modifiers...)
graphDB, err := graphdb.NewChannelGraph(backend)
if err != nil {
return nil, err
}
db, err := CreateWithBackend(backend, graphDB, modifiers...)
if err == nil {
db.dbPath = dbPath
}
@ -377,7 +377,7 @@ func Open(dbPath string, modifiers ...OptionModifier) (*DB, error) {
// CreateWithBackend creates channeldb instance using the passed kvdb.Backend.
// Any necessary schemas migrations due to updates will take place as necessary.
func CreateWithBackend(backend kvdb.Backend,
func CreateWithBackend(backend kvdb.Backend, graph *graphdb.ChannelGraph,
modifiers ...OptionModifier) (*DB, error) {
opts := DefaultOptions()
@ -404,21 +404,12 @@ func CreateWithBackend(backend kvdb.Backend,
keepFailedPaymentAttempts: opts.keepFailedPaymentAttempts,
storeFinalHtlcResolutions: opts.storeFinalHtlcResolutions,
noRevLogAmtData: opts.NoRevLogAmtData,
graph: graph,
}
// Set the parent pointer (only used in tests).
chanDB.channelStateDB.parent = chanDB
var err error
chanDB.graph, err = graphdb.NewChannelGraph(
backend, opts.RejectCacheSize, opts.ChannelCacheSize,
opts.BatchCommitInterval, opts.PreAllocCacheNumNodes,
opts.UseGraphCache, opts.NoMigration,
)
if err != nil {
return nil, err
}
// Synchronize the version of database and apply migrations if needed.
if !opts.NoMigration {
if err := chanDB.syncVersions(dbVersions); err != nil {
@ -1866,7 +1857,13 @@ func MakeTestDB(t *testing.T, modifiers ...OptionModifier) (*DB, error) {
return nil, err
}
cdb, err := CreateWithBackend(backend, modifiers...)
graphDB, err := graphdb.NewChannelGraph(backend)
if err != nil {
backendCleanup()
return nil, err
}
cdb, err := CreateWithBackend(backend, graphDB, modifiers...)
if err != nil {
backendCleanup()
return nil, err

View file

@ -51,7 +51,10 @@ func TestOpenWithCreate(t *testing.T) {
require.NoError(t, err, "unable to get test db backend")
t.Cleanup(cleanup)
cdb, err := CreateWithBackend(backend)
graphDB, err := graphdb.NewChannelGraph(backend)
require.NoError(t, err)
cdb, err := CreateWithBackend(backend, graphDB)
require.NoError(t, err, "unable to create channeldb")
if err := cdb.Close(); err != nil {
t.Fatalf("unable to close channeldb: %v", err)
@ -87,7 +90,10 @@ func TestWipe(t *testing.T) {
require.NoError(t, err, "unable to get test db backend")
t.Cleanup(cleanup)
fullDB, err := CreateWithBackend(backend)
graphDB, err := graphdb.NewChannelGraph(backend)
require.NoError(t, err)
fullDB, err := CreateWithBackend(backend, graphDB)
require.NoError(t, err, "unable to create channeldb")
defer fullDB.Close()

View file

@ -6,6 +6,7 @@ import (
"github.com/btcsuite/btcwallet/walletdb"
"github.com/go-errors/errors"
graphdb "github.com/lightningnetwork/lnd/graph/db"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/stretchr/testify/require"
)
@ -422,7 +423,10 @@ func TestMigrationReversion(t *testing.T) {
backend, cleanup, err := kvdb.GetTestBackend(tempDirName, "cdb")
require.NoError(t, err, "unable to get test db backend")
cdb, err := CreateWithBackend(backend)
graphDB, err := graphdb.NewChannelGraph(backend)
require.NoError(t, err)
cdb, err := CreateWithBackend(backend, graphDB)
if err != nil {
cleanup()
t.Fatalf("unable to open channeldb: %v", err)
@ -448,7 +452,10 @@ func TestMigrationReversion(t *testing.T) {
require.NoError(t, err, "unable to get test db backend")
t.Cleanup(cleanup)
_, err = CreateWithBackend(backend)
graphDB, err = graphdb.NewChannelGraph(backend)
require.NoError(t, err)
_, err = CreateWithBackend(backend, graphDB)
if err != ErrDBReversion {
t.Fatalf("unexpected error when opening channeldb, "+
"want: %v, got: %v", ErrDBReversion, err)
@ -676,8 +683,11 @@ func TestMarkerAndTombstone(t *testing.T) {
err = db.View(EnsureNoTombstone, func() {})
require.ErrorContains(t, err, string(tombstoneText))
graphDB, err := graphdb.NewChannelGraph(db.Backend)
require.NoError(t, err)
// Now that the DB has a tombstone, we should no longer be able to open
// it once we close it.
_, err = CreateWithBackend(db.Backend)
_, err = CreateWithBackend(db.Backend, graphDB)
require.ErrorContains(t, err, string(tombstoneText))
}

View file

@ -1,10 +1,7 @@
package channeldb
import (
"time"
"github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/kvdb"
)
const (
@ -35,30 +32,8 @@ type OptionalMiragtionConfig struct {
// Options holds parameters for tuning and customizing a channeldb.DB.
type Options struct {
kvdb.BoltBackendConfig
OptionalMiragtionConfig
// RejectCacheSize is the maximum number of rejectCacheEntries to hold
// in the rejection cache.
RejectCacheSize int
// ChannelCacheSize is the maximum number of ChannelEdges to hold in the
// channel cache.
ChannelCacheSize int
// BatchCommitInterval is the maximum duration the batch schedulers will
// wait before attempting to commit a pending set of updates.
BatchCommitInterval time.Duration
// PreAllocCacheNumNodes is the number of nodes we expect to be in the
// graph cache, so we can pre-allocate the map accordingly.
PreAllocCacheNumNodes int
// UseGraphCache denotes whether the in-memory graph cache should be
// used or a fallback version that uses the underlying database for
// path finding.
UseGraphCache bool
// NoMigration specifies that underlying backend was opened in read-only
// mode and migrations shouldn't be performed. This can be useful for
// applications that use the channeldb package as a library.
@ -87,17 +62,7 @@ type Options struct {
// DefaultOptions returns an Options populated with default values.
func DefaultOptions() Options {
return Options{
BoltBackendConfig: kvdb.BoltBackendConfig{
NoFreelistSync: true,
AutoCompact: false,
AutoCompactMinAge: kvdb.DefaultBoltAutoCompactMinAge,
DBTimeout: kvdb.DefaultDBTimeout,
},
OptionalMiragtionConfig: OptionalMiragtionConfig{},
RejectCacheSize: DefaultRejectCacheSize,
ChannelCacheSize: DefaultChannelCacheSize,
PreAllocCacheNumNodes: DefaultPreAllocCacheNumNodes,
UseGraphCache: true,
NoMigration: false,
clock: clock.NewDefaultClock(),
}
@ -106,34 +71,6 @@ func DefaultOptions() Options {
// OptionModifier is a function signature for modifying the default Options.
type OptionModifier func(*Options)
// OptionSetRejectCacheSize sets the RejectCacheSize to n.
func OptionSetRejectCacheSize(n int) OptionModifier {
return func(o *Options) {
o.RejectCacheSize = n
}
}
// OptionSetChannelCacheSize sets the ChannelCacheSize to n.
func OptionSetChannelCacheSize(n int) OptionModifier {
return func(o *Options) {
o.ChannelCacheSize = n
}
}
// OptionSetPreAllocCacheNumNodes sets the PreAllocCacheNumNodes to n.
func OptionSetPreAllocCacheNumNodes(n int) OptionModifier {
return func(o *Options) {
o.PreAllocCacheNumNodes = n
}
}
// OptionSetUseGraphCache sets the UseGraphCache option to the given value.
func OptionSetUseGraphCache(use bool) OptionModifier {
return func(o *Options) {
o.UseGraphCache = use
}
}
// OptionNoRevLogAmtData sets the NoRevLogAmtData option to the given value. If
// it is set to true then amount data will not be stored in the revocation log.
func OptionNoRevLogAmtData(noAmtData bool) OptionModifier {
@ -142,36 +79,6 @@ func OptionNoRevLogAmtData(noAmtData bool) OptionModifier {
}
}
// OptionSetSyncFreelist allows the database to sync its freelist.
func OptionSetSyncFreelist(b bool) OptionModifier {
return func(o *Options) {
o.NoFreelistSync = !b
}
}
// OptionAutoCompact turns on automatic database compaction on startup.
func OptionAutoCompact() OptionModifier {
return func(o *Options) {
o.AutoCompact = true
}
}
// OptionAutoCompactMinAge sets the minimum age for automatic database
// compaction.
func OptionAutoCompactMinAge(minAge time.Duration) OptionModifier {
return func(o *Options) {
o.AutoCompactMinAge = minAge
}
}
// OptionSetBatchCommitInterval sets the batch commit interval for the internval
// batch schedulers.
func OptionSetBatchCommitInterval(interval time.Duration) OptionModifier {
return func(o *Options) {
o.BatchCommitInterval = interval
}
}
// OptionNoMigration allows the database to be opened in read only mode by
// disabling migrations.
func OptionNoMigration(b bool) OptionModifier {

View file

@ -1,27 +0,0 @@
package channeldb_test
import (
"testing"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/stretchr/testify/require"
)
// TestDefaultOptions tests the default options are created as intended.
func TestDefaultOptions(t *testing.T) {
opts := channeldb.DefaultOptions()
require.True(t, opts.NoFreelistSync)
require.False(t, opts.AutoCompact)
require.Equal(
t, kvdb.DefaultBoltAutoCompactMinAge, opts.AutoCompactMinAge,
)
require.Equal(t, kvdb.DefaultDBTimeout, opts.DBTimeout)
require.Equal(
t, channeldb.DefaultRejectCacheSize, opts.RejectCacheSize,
)
require.Equal(
t, channeldb.DefaultChannelCacheSize, opts.ChannelCacheSize,
)
}

View file

@ -35,6 +35,7 @@ import (
"github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/fn"
"github.com/lightningnetwork/lnd/funding"
graphdb "github.com/lightningnetwork/lnd/graph/db"
"github.com/lightningnetwork/lnd/invoices"
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/kvdb"
@ -1022,16 +1023,37 @@ func (d *DefaultDatabaseBuilder) BuildDatabase(
"instances")
}
graphDBOptions := []graphdb.OptionModifier{
graphdb.WithRejectCacheSize(cfg.Caches.RejectCacheSize),
graphdb.WithChannelCacheSize(cfg.Caches.ChannelCacheSize),
graphdb.WithBatchCommitInterval(cfg.DB.BatchCommitInterval),
graphdb.WithUseGraphCache(!cfg.DB.NoGraphCache),
}
// We want to pre-allocate the channel graph cache according to what we
// expect for mainnet to speed up memory allocation.
if cfg.ActiveNetParams.Name == chaincfg.MainNetParams.Name {
graphDBOptions = append(
graphDBOptions, graphdb.WithPreAllocCacheNumNodes(
graphdb.DefaultPreAllocCacheNumNodes,
),
)
}
graphDB, err := graphdb.NewChannelGraph(
databaseBackends.GraphDB, graphDBOptions...,
)
if err != nil {
cleanUp()
err := fmt.Errorf("unable to open graph DB: %w", err)
d.logger.Error(err)
return nil, nil, err
}
dbOptions := []channeldb.OptionModifier{
channeldb.OptionSetRejectCacheSize(cfg.Caches.RejectCacheSize),
channeldb.OptionSetChannelCacheSize(
cfg.Caches.ChannelCacheSize,
),
channeldb.OptionSetBatchCommitInterval(
cfg.DB.BatchCommitInterval,
),
channeldb.OptionDryRunMigration(cfg.DryRunMigration),
channeldb.OptionSetUseGraphCache(!cfg.DB.NoGraphCache),
channeldb.OptionKeepFailedPaymentAttempts(
cfg.KeepFailedPaymentAttempts,
),
@ -1042,27 +1064,17 @@ func (d *DefaultDatabaseBuilder) BuildDatabase(
channeldb.OptionNoRevLogAmtData(cfg.DB.NoRevLogAmtData),
}
// We want to pre-allocate the channel graph cache according to what we
// expect for mainnet to speed up memory allocation.
if cfg.ActiveNetParams.Name == chaincfg.MainNetParams.Name {
dbOptions = append(
dbOptions, channeldb.OptionSetPreAllocCacheNumNodes(
channeldb.DefaultPreAllocCacheNumNodes,
),
)
}
// Otherwise, we'll open two instances, one for the state we only need
// locally, and the other for things we want to ensure are replicated.
dbs.GraphDB, err = channeldb.CreateWithBackend(
databaseBackends.GraphDB, dbOptions...,
databaseBackends.ChanStateDB, graphDB, dbOptions...,
)
switch {
// Give the DB a chance to dry run the migration. Since we know that
// both the channel state and graph DBs are still always behind the same
// backend, we know this would be applied to both of those DBs.
case err == channeldb.ErrDryRunMigrationOK:
d.logger.Infof("Graph DB dry run migration successful")
d.logger.Infof("Channel DB dry run migration successful")
return nil, nil, err
case err != nil:

View file

@ -198,11 +198,15 @@ type ChannelGraph struct {
// NewChannelGraph allocates a new ChannelGraph backed by a DB instance. The
// returned instance has its own unique reject cache and channel cache.
func NewChannelGraph(db kvdb.Backend, rejectCacheSize, chanCacheSize int,
batchCommitInterval time.Duration, preAllocCacheNumNodes int,
useGraphCache, noMigrations bool) (*ChannelGraph, error) {
func NewChannelGraph(db kvdb.Backend, options ...OptionModifier) (*ChannelGraph,
error) {
if !noMigrations {
opts := DefaultOptions()
for _, o := range options {
o(opts)
}
if !opts.NoMigration {
if err := initChannelGraph(db); err != nil {
return nil, err
}
@ -210,20 +214,20 @@ func NewChannelGraph(db kvdb.Backend, rejectCacheSize, chanCacheSize int,
g := &ChannelGraph{
db: db,
rejectCache: newRejectCache(rejectCacheSize),
chanCache: newChannelCache(chanCacheSize),
rejectCache: newRejectCache(opts.RejectCacheSize),
chanCache: newChannelCache(opts.ChannelCacheSize),
}
g.chanScheduler = batch.NewTimeScheduler(
db, &g.cacheMu, batchCommitInterval,
db, &g.cacheMu, opts.BatchCommitInterval,
)
g.nodeScheduler = batch.NewTimeScheduler(
db, nil, batchCommitInterval,
db, nil, opts.BatchCommitInterval,
)
// The graph cache can be turned off (e.g. for mobile users) for a
// speed/memory usage tradeoff.
if useGraphCache {
g.graphCache = NewGraphCache(preAllocCacheNumNodes)
if opts.UseGraphCache {
g.graphCache = NewGraphCache(opts.PreAllocCacheNumNodes)
startTime := time.Now()
log.Debugf("Populating in-memory channel graph, this might " +
"take a while...")

View file

@ -79,11 +79,7 @@ func MakeTestGraph(t testing.TB, modifiers ...OptionModifier) (*ChannelGraph, er
return nil, err
}
graph, err := NewChannelGraph(
backend, opts.RejectCacheSize, opts.ChannelCacheSize,
opts.BatchCommitInterval, opts.PreAllocCacheNumNodes,
true, false,
)
graph, err := NewChannelGraph(backend)
if err != nil {
backendCleanup()
return nil, err
@ -4016,12 +4012,7 @@ func TestGraphLoading(t *testing.T) {
defer backend.Close()
defer backendCleanup()
opts := DefaultOptions()
graph, err := NewChannelGraph(
backend, opts.RejectCacheSize, opts.ChannelCacheSize,
opts.BatchCommitInterval, opts.PreAllocCacheNumNodes,
true, false,
)
graph, err := NewChannelGraph(backend)
require.NoError(t, err)
// Populate the graph with test data.
@ -4031,11 +4022,7 @@ func TestGraphLoading(t *testing.T) {
// Recreate the graph. This should cause the graph cache to be
// populated.
graphReloaded, err := NewChannelGraph(
backend, opts.RejectCacheSize, opts.ChannelCacheSize,
opts.BatchCommitInterval, opts.PreAllocCacheNumNodes,
true, false,
)
graphReloaded, err := NewChannelGraph(backend)
require.NoError(t, err)
// Assert that the cache content is identical.

View file

@ -63,6 +63,13 @@ func DefaultOptions() *Options {
// OptionModifier is a function signature for modifying the default Options.
type OptionModifier func(*Options)
// WithRejectCacheSize sets the RejectCacheSize to n.
func WithRejectCacheSize(n int) OptionModifier {
return func(o *Options) {
o.RejectCacheSize = n
}
}
// WithChannelCacheSize sets the ChannelCacheSize to n.
func WithChannelCacheSize(n int) OptionModifier {
return func(o *Options) {

View file

@ -17,7 +17,6 @@ import (
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
graphdb "github.com/lightningnetwork/lnd/graph/db"
"github.com/lightningnetwork/lnd/graph/db/models"
"github.com/lightningnetwork/lnd/htlcswitch"
@ -1100,11 +1099,8 @@ func makeTestGraph(t *testing.T, useCache bool) (*graphdb.ChannelGraph,
t.Cleanup(backendCleanup)
opts := channeldb.DefaultOptions()
graph, err := graphdb.NewChannelGraph(
backend, opts.RejectCacheSize, opts.ChannelCacheSize,
opts.BatchCommitInterval, opts.PreAllocCacheNumNodes,
useCache, false,
backend, graphdb.WithUseGraphCache(useCache),
)
if err != nil {
return nil, nil, err

View file

@ -21,7 +21,6 @@ import (
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
sphinx "github.com/lightningnetwork/lightning-onion"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/fn"
graphdb "github.com/lightningnetwork/lnd/graph/db"
"github.com/lightningnetwork/lnd/graph/db/models"
@ -167,11 +166,8 @@ func makeTestGraph(t *testing.T, useCache bool) (*graphdb.ChannelGraph,
t.Cleanup(backendCleanup)
opts := channeldb.DefaultOptions()
graph, err := graphdb.NewChannelGraph(
backend, opts.RejectCacheSize, opts.ChannelCacheSize,
opts.BatchCommitInterval, opts.PreAllocCacheNumNodes,
useCache, false,
backend, graphdb.WithUseGraphCache(useCache),
)
if err != nil {
return nil, nil, err