diff --git a/channeldb/db.go b/channeldb/db.go index 998ddf9ce..3906fe73f 100644 --- a/channeldb/db.go +++ b/channeldb/db.go @@ -23,6 +23,7 @@ import ( "github.com/lightningnetwork/lnd/channeldb/migration26" "github.com/lightningnetwork/lnd/channeldb/migration27" "github.com/lightningnetwork/lnd/channeldb/migration29" + "github.com/lightningnetwork/lnd/channeldb/migration30" "github.com/lightningnetwork/lnd/channeldb/migration_01_to_11" "github.com/lightningnetwork/lnd/clock" "github.com/lightningnetwork/lnd/kvdb" @@ -45,17 +46,34 @@ var ( // up-to-date version of the database. type migration func(tx kvdb.RwTx) error -type version struct { +// mandatoryVersion defines a db version that must be applied before the lnd +// starts. +type mandatoryVersion struct { number uint32 migration migration } +// optionalMigration defines an optional migration function. When a migration +// is optional, it usually involves a large scale of changes that might touch +// millions of keys. Due to OOM concern, the update cannot be safely done +// within one db transaction. Thus, for optional migrations, they must take the +// db backend and construct transactions as needed. +type optionalMigration func(db kvdb.Backend) error + +// optionalVersion defines a db version that can be optionally applied. When +// applying migrations, we must apply all the mandatory migrations first before +// attempting optional ones. +type optionalVersion struct { + name string + migration optionalMigration +} + var ( - // dbVersions is storing all versions of database. If current version - // of database don't match with latest version this list will be used - // for retrieving all migration function that are need to apply to the - // current db. - dbVersions = []version{ + // dbVersions is storing all mandatory versions of database. If current + // version of database don't match with latest version this list will + // be used for retrieving all migration function that are need to apply + // to the current db. + dbVersions = []mandatoryVersion{ { // The base DB version requires no migration. number: 0, @@ -237,6 +255,19 @@ var ( }, } + // optionalVersions stores all optional migrations that are applied + // after dbVersions. + // + // NOTE: optional migrations must be fault-tolerant and re-run already + // migrated data must be noop, which means the migration must be able + // to determine its state. + optionalVersions = []optionalVersion{ + { + name: "prune revocation log", + migration: migration30.MigrateRevocationLog, + }, + } + // Big endian is the preferred byte order, due to cursor scans over // integer keys iterating in order. byteOrder = binary.BigEndian @@ -337,6 +368,13 @@ func CreateWithBackend(backend kvdb.Backend, modifiers ...OptionModifier) (*DB, backend.Close() return nil, err } + + // Grab the optional migration config. + omc := opts.OptionalMiragtionConfig + if err := chanDB.applyOptionalVersions(omc); err != nil { + backend.Close() + return nil, err + } } return chanDB, nil @@ -1309,7 +1347,7 @@ func (c *ChannelStateDB) DeleteChannelOpeningState(outPoint []byte) error { // syncVersions function is used for safe db version synchronization. It // applies migration functions to the current database and recovers the // previous state of db if at least one error/panic appeared during migration. -func (d *DB) syncVersions(versions []version) error { +func (d *DB) syncVersions(versions []mandatoryVersion) error { meta, err := d.FetchMeta(nil) if err != nil { if err == ErrMetaNotFound { @@ -1379,6 +1417,61 @@ func (d *DB) syncVersions(versions []version) error { }, func() {}) } +// applyOptionalVersions takes a config to determine whether the optional +// migrations will be applied. +// +// NOTE: only support the prune_revocation_log optional migration atm. +func (d *DB) applyOptionalVersions(cfg OptionalMiragtionConfig) error { + om, err := d.fetchOptionalMeta() + if err != nil { + if err == ErrMetaNotFound { + om = &OptionalMeta{ + Versions: make(map[uint64]string), + } + } else { + return err + } + } + + log.Infof("Checking for optional update: prune_revocation_log=%v, "+ + "db_version=%s", cfg.PruneRevocationLog, om) + + // Exit early if the optional migration is not specified. + if !cfg.PruneRevocationLog { + return nil + } + + // Exit early if the optional migration has already been applied. + if _, ok := om.Versions[0]; ok { + return nil + } + + // Get the optional version. + version := optionalVersions[0] + log.Infof("Performing database optional migration: %s", version.name) + + // Migrate the data. + if err := version.migration(d); err != nil { + log.Errorf("Unable to apply optional migration: %s, error: %v", + version.name, err) + return err + } + + // Update the optional meta. Notice that unlike the mandatory db + // migrations where we perform the migration and updating meta in a + // single db transaction, we use different transactions here. Even when + // the following update is failed, we should be fine here as we would + // re-run the optional migration again, which is a noop, during next + // startup. + om.Versions[0] = version.name + if err := d.putOptionalMeta(om); err != nil { + log.Errorf("Unable to update optional meta: %v", err) + return err + } + + return nil +} + // ChannelGraph returns the current instance of the directed channel graph. func (d *DB) ChannelGraph() *ChannelGraph { return d.graph @@ -1390,13 +1483,15 @@ func (d *DB) ChannelStateDB() *ChannelStateDB { return d.channelStateDB } -func getLatestDBVersion(versions []version) uint32 { +func getLatestDBVersion(versions []mandatoryVersion) uint32 { return versions[len(versions)-1].number } // getMigrationsToApply retrieves the migration function that should be // applied to the database. -func getMigrationsToApply(versions []version, version uint32) ([]migration, []uint32) { +func getMigrationsToApply(versions []mandatoryVersion, + version uint32) ([]migration, []uint32) { + migrations := make([]migration, 0, len(versions)) migrationVersions := make([]uint32, 0, len(versions)) diff --git a/channeldb/meta.go b/channeldb/meta.go index df4b0df34..9173224b8 100644 --- a/channeldb/meta.go +++ b/channeldb/meta.go @@ -1,7 +1,11 @@ package channeldb import ( + "bytes" + "fmt" + "github.com/lightningnetwork/lnd/kvdb" + "github.com/lightningnetwork/lnd/tlv" ) var ( @@ -12,6 +16,10 @@ var ( // dbVersionKey is a boltdb key and it's used for storing/retrieving // current database version. dbVersionKey = []byte("dbp") + + // dbVersionKey is a boltdb key and it's used for storing/retrieving + // a list of optional migrations that have been applied. + optionalVersionKey = []byte("ovk") ) // Meta structure holds the database meta information. @@ -80,3 +88,92 @@ func putDbVersion(metaBucket kvdb.RwBucket, meta *Meta) error { byteOrder.PutUint32(scratch, meta.DbVersionNumber) return metaBucket.Put(dbVersionKey, scratch) } + +// OptionalMeta structure holds the database optional migration information. +type OptionalMeta struct { + // Versions is a set that contains the versions that have been applied. + // When saved to disk, only the indexes are stored. + Versions map[uint64]string +} + +func (om *OptionalMeta) String() string { + s := "" + for index, name := range om.Versions { + s += fmt.Sprintf("%d: %s", index, name) + } + if s == "" { + s = "empty" + } + return s +} + +// fetchOptionalMeta reads the optional meta from the database. +func (d *DB) fetchOptionalMeta() (*OptionalMeta, error) { + om := &OptionalMeta{ + Versions: make(map[uint64]string), + } + + err := kvdb.View(d, func(tx kvdb.RTx) error { + metaBucket := tx.ReadBucket(metaBucket) + if metaBucket == nil { + return ErrMetaNotFound + } + + vBytes := metaBucket.Get(optionalVersionKey) + // Exit early if nothing found. + if vBytes == nil { + return nil + } + + // Read the versions' length. + r := bytes.NewReader(vBytes) + vLen, err := tlv.ReadVarInt(r, &[8]byte{}) + if err != nil { + return err + } + + // Write the version index. + for i := uint64(0); i < vLen; i++ { + version, err := tlv.ReadVarInt(r, &[8]byte{}) + if err != nil { + return err + } + om.Versions[version] = optionalVersions[i].name + } + + return nil + }, func() {}) + if err != nil { + return nil, err + } + + return om, nil +} + +// fetchOptionalMeta writes an optional meta to the database. +func (d *DB) putOptionalMeta(om *OptionalMeta) error { + return kvdb.Update(d, func(tx kvdb.RwTx) error { + metaBucket, err := tx.CreateTopLevelBucket(metaBucket) + if err != nil { + return err + } + + var b bytes.Buffer + + // Write the total length. + err = tlv.WriteVarInt(&b, uint64(len(om.Versions)), &[8]byte{}) + if err != nil { + return err + } + + // Write the version indexes. + for v := range om.Versions { + err := tlv.WriteVarInt(&b, v, &[8]byte{}) + if err != nil { + return err + } + } + + return metaBucket.Put(optionalVersionKey, b.Bytes()) + }, func() {}) +} diff --git a/channeldb/meta_test.go b/channeldb/meta_test.go index a2366cfc3..830cee5ba 100644 --- a/channeldb/meta_test.go +++ b/channeldb/meta_test.go @@ -44,7 +44,7 @@ func applyMigration(t *testing.T, beforeMigration, afterMigration func(d *DB), t.Fatalf("unable to store meta data: %v", err) } - versions := []version{ + versions := []mandatoryVersion{ { number: 0, migration: nil, @@ -124,7 +124,7 @@ func TestOrderOfMigrations(t *testing.T) { t.Parallel() appliedMigration := -1 - versions := []version{ + versions := []mandatoryVersion{ {0, nil}, {1, nil}, {2, func(tx kvdb.RwTx) error { @@ -498,3 +498,85 @@ func TestMigrationDryRun(t *testing.T) { true, true) } + +// TestOptionalMeta checks the basic read and write for the optional meta. +func TestOptionalMeta(t *testing.T) { + t.Parallel() + + db, cleanUp, err := MakeTestDB() + defer cleanUp() + require.NoError(t, err) + + // Test read an empty optional meta. + om, err := db.fetchOptionalMeta() + require.NoError(t, err, "error getting optional meta") + require.Empty(t, om.Versions, "expected empty versions") + + // Test write an optional meta. + om = &OptionalMeta{ + Versions: map[uint64]string{ + 0: optionalVersions[0].name, + }, + } + err = db.putOptionalMeta(om) + require.NoError(t, err, "error putting optional meta") + + om1, err := db.fetchOptionalMeta() + require.NoError(t, err, "error getting optional meta") + require.Equal(t, om, om1, "unexpected empty versions") + require.Equal(t, "0: prune revocation log", om.String()) +} + +// TestApplyOptionalVersions checks that the optional migration is applied as +// expected based on the config. +func TestApplyOptionalVersions(t *testing.T) { + t.Parallel() + + db, cleanUp, err := MakeTestDB() + defer cleanUp() + require.NoError(t, err) + + // Overwrite the migration function so we can count how many times the + // migration has happened. + migrateCount := 0 + optionalVersions[0].migration = func(_ kvdb.Backend) error { + migrateCount++ + return nil + } + + // Test that when the flag is false, no migration happens. + cfg := OptionalMiragtionConfig{} + err = db.applyOptionalVersions(cfg) + require.NoError(t, err, "failed to apply optional migration") + require.Equal(t, 0, migrateCount, "expected no migration") + + // Check the optional meta is not updated. + om, err := db.fetchOptionalMeta() + require.NoError(t, err, "error getting optional meta") + require.Empty(t, om.Versions, "expected empty versions") + + // Test that when specified, the optional migration is applied. + cfg.PruneRevocationLog = true + err = db.applyOptionalVersions(cfg) + require.NoError(t, err, "failed to apply optional migration") + require.Equal(t, 1, migrateCount, "expected migration") + + // Fetch the updated optional meta. + om, err = db.fetchOptionalMeta() + require.NoError(t, err, "error getting optional meta") + + // Verify that the optional meta is updated as expected. + omExpected := &OptionalMeta{ + Versions: map[uint64]string{ + 0: optionalVersions[0].name, + }, + } + require.Equal(t, omExpected, om, "unexpected empty versions") + + // Test that though specified, the optional migration is not run since + // it's already been applied. + cfg.PruneRevocationLog = true + err = db.applyOptionalVersions(cfg) + require.NoError(t, err, "failed to apply optional migration") + require.Equal(t, 1, migrateCount, "expected no migration") +} diff --git a/channeldb/options.go b/channeldb/options.go index 7e121d254..a3af349d2 100644 --- a/channeldb/options.go +++ b/channeldb/options.go @@ -25,9 +25,18 @@ const ( DefaultPreAllocCacheNumNodes = 15000 ) +// OptionalMiragtionConfig defines the flags used to signal whether a +// particular migration needs to be applied. +type OptionalMiragtionConfig struct { + // PruneRevocationLog specifies that the revocation log migration needs + // to be applied. + PruneRevocationLog bool +} + // Options holds parameters for tuning and customizing a channeldb.DB. type Options struct { kvdb.BoltBackendConfig + OptionalMiragtionConfig // RejectCacheSize is the maximum number of rejectCacheEntries to hold // in the rejection cache. @@ -76,12 +85,13 @@ func DefaultOptions() Options { AutoCompactMinAge: kvdb.DefaultBoltAutoCompactMinAge, DBTimeout: kvdb.DefaultDBTimeout, }, - RejectCacheSize: DefaultRejectCacheSize, - ChannelCacheSize: DefaultChannelCacheSize, - PreAllocCacheNumNodes: DefaultPreAllocCacheNumNodes, - UseGraphCache: true, - NoMigration: false, - clock: clock.NewDefaultClock(), + OptionalMiragtionConfig: OptionalMiragtionConfig{}, + RejectCacheSize: DefaultRejectCacheSize, + ChannelCacheSize: DefaultChannelCacheSize, + PreAllocCacheNumNodes: DefaultPreAllocCacheNumNodes, + UseGraphCache: true, + NoMigration: false, + clock: clock.NewDefaultClock(), } } @@ -176,3 +186,11 @@ func OptionKeepFailedPaymentAttempts(keepFailedPaymentAttempts bool) OptionModif o.keepFailedPaymentAttempts = keepFailedPaymentAttempts } } + +// OptionPruneRevocationLog specifies whether the migration for pruning +// revocation logs needs to be applied or not. +func OptionPruneRevocationLog(prune bool) OptionModifier { + return func(o *Options) { + o.OptionalMiragtionConfig.PruneRevocationLog = prune + } +}