lnd/watchtower/wtdb/version.go
2023-11-27 20:01:17 +02:00

245 lines
7.5 KiB
Go

package wtdb
import (
"fmt"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/watchtower/wtdb/migration1"
"github.com/lightningnetwork/lnd/watchtower/wtdb/migration2"
"github.com/lightningnetwork/lnd/watchtower/wtdb/migration3"
"github.com/lightningnetwork/lnd/watchtower/wtdb/migration4"
"github.com/lightningnetwork/lnd/watchtower/wtdb/migration5"
"github.com/lightningnetwork/lnd/watchtower/wtdb/migration6"
"github.com/lightningnetwork/lnd/watchtower/wtdb/migration7"
"github.com/lightningnetwork/lnd/watchtower/wtdb/migration8"
)
// txMigration is a function which takes a prior outdated version of the
// database instances and mutates the key/bucket structure to arrive at a more
// up-to-date version of the database. It uses an existing database transaction
// to do so.
type txMigration func(tx kvdb.RwTx) error
// dbMigration is a function which takes a prior outdated version of the
// database instances and mutates the key/bucket structure to arrive at a more
// up-to-date version of the database. If such a migration is defined, the
// migration is responsible for starting any database transactions. This
// migration type is useful in the case where it would be beneficial (in terms
// of RAM usage) to split the migration between multiple db transactions.
type dbMigration func(db kvdb.Backend) error
// version pairs a version number with the migration that would need to be
// applied from the prior version to upgrade.
type version struct {
txMigration txMigration
dbMigration dbMigration
}
// towerDBVersions stores all versions and migrations of the tower database.
// This list will be used when opening the database to determine if any
// migrations must be applied.
var towerDBVersions = []version{}
// clientDBVersions stores all versions and migrations of the client database.
// This list will be used when opening the database to determine if any
// migrations must be applied.
var clientDBVersions = []version{
{
txMigration: migration1.MigrateTowerToSessionIndex,
},
{
txMigration: migration2.MigrateClientChannelDetails,
},
{
txMigration: migration3.MigrateChannelIDIndex,
},
{
dbMigration: migration4.MigrateAckedUpdates(
migration4.DefaultSessionsPerTx,
),
},
{
txMigration: migration5.MigrateCompleteTowerToSessionIndex,
},
{
txMigration: migration6.MigrateSessionIDIndex,
},
{
txMigration: migration7.MigrateChannelToSessionIndex,
},
{
txMigration: migration8.MigrateChannelMaxHeights,
},
}
// getLatestDBVersion returns the last known database version.
func getLatestDBVersion(versions []version) uint32 {
return uint32(len(versions))
}
// getMigrations returns a slice of all updates with a greater number that
// curVersion that need to be applied to sync up with the latest version.
func getMigrations(versions []version, curVersion uint32) []version {
var updates []version
for i, v := range versions {
if uint32(i)+1 > curVersion {
updates = append(updates, v)
}
}
return updates
}
// getDBVersion retrieves the current database version from the metadata bucket
// using the dbVersionKey.
func getDBVersion(tx kvdb.RTx) (uint32, error) {
metadata := tx.ReadBucket(metadataBkt)
if metadata == nil {
return 0, ErrUninitializedDB
}
versionBytes := metadata.Get(dbVersionKey)
if len(versionBytes) != 4 {
return 0, ErrNoDBVersion
}
return byteOrder.Uint32(versionBytes), nil
}
// initDBVersion initializes the top-level metadata bucket and writes the passed
// version number as the current version.
func initDBVersion(tx kvdb.RwTx, version uint32) error {
_, err := tx.CreateTopLevelBucket(metadataBkt)
if err != nil {
return err
}
return putDBVersion(tx, version)
}
// putDBVersion stores the passed database version in the metadata bucket under
// the dbVersionKey.
func putDBVersion(tx kvdb.RwTx, version uint32) error {
metadata := tx.ReadWriteBucket(metadataBkt)
if metadata == nil {
return ErrUninitializedDB
}
versionBytes := make([]byte, 4)
byteOrder.PutUint32(versionBytes, version)
return metadata.Put(dbVersionKey, versionBytes)
}
// versionedDB is a private interface implemented by both the tower and client
// databases, permitting all versioning operations to be performed generically
// on either.
type versionedDB interface {
// bdb returns the underlying bbolt database.
bdb() kvdb.Backend
// Version returns the current version stored in the database.
Version() (uint32, error)
}
// initOrSyncVersions ensures that the database version is properly set before
// opening the database up for regular use. When the database is being
// initialized for the first time, the caller should set init to true, which
// will simply write the latest version to the database. Otherwise, passing init
// as false will cause the database to apply any needed migrations to ensure its
// version matches the latest version in the provided versions list.
func initOrSyncVersions(db versionedDB, init bool, versions []version) error {
// If the database has not yet been created, we'll initialize the
// database version with the latest known version.
if init {
return kvdb.Update(db.bdb(), func(tx kvdb.RwTx) error {
return initDBVersion(tx, getLatestDBVersion(versions))
}, func() {})
}
// Otherwise, ensure that any migrations are applied to ensure the data
// is in the format expected by the latest version.
return syncVersions(db, versions)
}
// syncVersions ensures the database version is consistent with the highest
// known database version, applying any migrations that have not been made. If
// the highest known version number is lower than the database's version, this
// method will fail to prevent accidental reversions.
func syncVersions(db versionedDB, versions []version) error {
curVersion, err := db.Version()
if err != nil {
return err
}
latestVersion := getLatestDBVersion(versions)
switch {
// Current version is higher than any known version, fail to prevent
// reversion.
case curVersion > latestVersion:
return channeldb.ErrDBReversion
// Current version matches highest known version, nothing to do.
case curVersion == latestVersion:
return nil
}
// Otherwise, apply any migrations in order to bring the database
// version up to the highest known version.
updates := getMigrations(versions, curVersion)
for i, update := range updates {
if update.dbMigration != nil && update.txMigration != nil {
return fmt.Errorf("cannot specify both a " +
"tx-migration and a db-migration for a " +
"single version")
}
version := curVersion + uint32(i) + 1
log.Infof("Applying migration #%d", version)
if update.dbMigration != nil {
err = update.dbMigration(db.bdb())
if err != nil {
log.Errorf("Unable to apply migration #%d: %v",
version, err)
return err
}
// Note that unlike a txMigration, here we update the
// db version in a transaction that is separate to the
// transaction in which the db migration took place.
// This means that the db migration function must be
// idempotent.
err = kvdb.Update(db.bdb(), func(tx kvdb.RwTx) error {
return putDBVersion(tx, version)
}, func() {})
if err != nil {
return err
}
continue
}
if update.txMigration == nil {
continue
}
err = kvdb.Update(db.bdb(), func(tx kvdb.RwTx) error {
err := update.txMigration(tx)
if err != nil {
log.Errorf("Unable to apply migration #%d: %v",
version, err)
return err
}
return putDBVersion(tx, version)
}, func() {})
if err != nil {
return err
}
}
return nil
}