Merge pull request #5561 from guggero/database-migration-lndinit

kvdb: refactor as preparation for DB migration command in lndinit
This commit is contained in:
Oliver Gugger 2022-10-13 10:59:38 +02:00 committed by GitHub
commit 4b827179bb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 322 additions and 46 deletions

View file

@ -436,9 +436,14 @@ func (d *DB) Wipe() error {
// the database are created.
func initChannelDB(db kvdb.Backend) error {
err := kvdb.Update(db, func(tx kvdb.RwTx) error {
// Check if DB was marked as inactive with a tomb stone.
if err := EnsureNoTombstone(tx); err != nil {
return err
}
meta := &Meta{}
// Check if DB is already initialized.
err := fetchMeta(meta, tx)
err := FetchMeta(meta, tx)
if err == nil {
return nil
}
@ -1417,7 +1422,7 @@ func (c *ChannelStateDB) DeleteChannelOpeningState(outPoint []byte) error {
// applies migration functions to the current database and recovers the
// previous state of db if at least one error/panic appeared during migration.
func (d *DB) syncVersions(versions []mandatoryVersion) error {
meta, err := d.FetchMeta(nil)
meta, err := d.FetchMeta()
if err != nil {
if err == ErrMetaNotFound {
meta = &Meta{}
@ -1561,6 +1566,12 @@ func (d *DB) ChannelStateDB() *ChannelStateDB {
return d.channelStateDB
}
// LatestDBVersion returns the number of the latest database version currently
// known to the channel DB.
func LatestDBVersion() uint32 {
return getLatestDBVersion(dbVersions)
}
func getLatestDBVersion(versions []mandatoryVersion) uint32 {
return versions[len(versions)-1].number
}

View file

@ -2,6 +2,7 @@ package channeldb
import (
"bytes"
"errors"
"fmt"
"github.com/lightningnetwork/lnd/kvdb"
@ -20,6 +21,15 @@ var (
// dbVersionKey is a boltdb key and it's used for storing/retrieving
// a list of optional migrations that have been applied.
optionalVersionKey = []byte("ovk")
// TombstoneKey is the key under which we add a tag in the source DB
// after we've successfully and completely migrated it to the target/
// destination DB.
TombstoneKey = []byte("data-migration-tombstone")
// ErrMarkerNotPresent is the error that is returned if the queried
// marker is not present in the given database.
ErrMarkerNotPresent = errors.New("marker not present")
)
// Meta structure holds the database meta information.
@ -28,13 +38,12 @@ type Meta struct {
DbVersionNumber uint32
}
// FetchMeta fetches the meta data from boltdb and returns filled meta
// structure.
func (d *DB) FetchMeta(tx kvdb.RTx) (*Meta, error) {
// FetchMeta fetches the metadata from boltdb and returns filled meta structure.
func (d *DB) FetchMeta() (*Meta, error) {
var meta *Meta
err := kvdb.View(d, func(tx kvdb.RTx) error {
return fetchMeta(meta, tx)
return FetchMeta(meta, tx)
}, func() {
meta = &Meta{}
})
@ -45,10 +54,9 @@ func (d *DB) FetchMeta(tx kvdb.RTx) (*Meta, error) {
return meta, nil
}
// fetchMeta is an internal helper function used in order to allow callers to
// re-use a database transaction. See the publicly exported FetchMeta method
// for more information.
func fetchMeta(meta *Meta, tx kvdb.RTx) error {
// FetchMeta is a helper function used in order to allow callers to re-use a
// database transaction.
func FetchMeta(meta *Meta, tx kvdb.RTx) error {
metaBucket := tx.ReadBucket(metaBucket)
if metaBucket == nil {
return ErrMetaNotFound
@ -150,7 +158,7 @@ func (d *DB) fetchOptionalMeta() (*OptionalMeta, error) {
return om, nil
}
// fetchOptionalMeta writes an optional meta to the database.
// putOptionalMeta writes an optional meta to the database.
func (d *DB) putOptionalMeta(om *OptionalMeta) error {
return kvdb.Update(d, func(tx kvdb.RwTx) error {
metaBucket, err := tx.CreateTopLevelBucket(metaBucket)
@ -177,3 +185,62 @@ func (d *DB) putOptionalMeta(om *OptionalMeta) error {
return metaBucket.Put(optionalVersionKey, b.Bytes())
}, func() {})
}
// CheckMarkerPresent returns the marker under the requested key or
// ErrMarkerNotFound if either the root bucket or the marker key within that
// bucket does not exist.
func CheckMarkerPresent(tx kvdb.RTx, markerKey []byte) ([]byte, error) {
markerBucket := tx.ReadBucket(markerKey)
if markerBucket == nil {
return nil, ErrMarkerNotPresent
}
val := markerBucket.Get(markerKey)
// If we wrote the marker correctly, we created a bucket _and_ created a
// key with a non-empty value. It doesn't matter to us whether the key
// exists or whether its value is empty, to us, it just means the marker
// isn't there.
if len(val) == 0 {
return nil, ErrMarkerNotPresent
}
return val, nil
}
// EnsureNoTombstone returns an error if there is a tombstone marker in the DB
// of the given transaction.
func EnsureNoTombstone(tx kvdb.RTx) error {
marker, err := CheckMarkerPresent(tx, TombstoneKey)
if err == ErrMarkerNotPresent {
// No marker present, so no tombstone. The DB is still alive.
return nil
}
if err != nil {
return err
}
// There was no error so there is a tombstone marker/tag. We cannot use
// this DB anymore.
return fmt.Errorf("refusing to use db, it was marked with a tombstone "+
"after successful data migration; tombstone reads: %s",
string(marker))
}
// AddMarker adds the marker with the given key into a top level bucket with the
// same name. So the structure will look like:
//
// marker-key (top level bucket)
// |-> marker-key:marker-value (key/value pair)
func AddMarker(tx kvdb.RwTx, markerKey, markerValue []byte) error {
if len(markerValue) == 0 {
return fmt.Errorf("marker value cannot be empty")
}
markerBucket, err := tx.CreateTopLevelBucket(markerKey)
if err != nil {
return err
}
return markerBucket.Put(markerKey, markerValue)
}

View file

@ -4,6 +4,7 @@ import (
"bytes"
"testing"
"github.com/btcsuite/btcwallet/walletdb"
"github.com/go-errors/errors"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/stretchr/testify/require"
@ -91,7 +92,7 @@ func TestVersionFetchPut(t *testing.T) {
t.Fatal(err)
}
meta, err := db.FetchMeta(nil)
meta, err := db.FetchMeta()
if err != nil {
t.Fatal(err)
}
@ -107,7 +108,7 @@ func TestVersionFetchPut(t *testing.T) {
t.Fatalf("update of meta failed %v", err)
}
meta, err = db.FetchMeta(nil)
meta, err = db.FetchMeta()
if err != nil {
t.Fatal(err)
}
@ -228,7 +229,7 @@ func TestMigrationWithPanic(t *testing.T) {
// Check that version of database and data wasn't changed.
afterMigrationFunc := func(d *DB) {
meta, err := d.FetchMeta(nil)
meta, err := d.FetchMeta()
if err != nil {
t.Fatal(err)
}
@ -303,7 +304,7 @@ func TestMigrationWithFatal(t *testing.T) {
// Check that version of database and initial data wasn't changed.
afterMigrationFunc := func(d *DB) {
meta, err := d.FetchMeta(nil)
meta, err := d.FetchMeta()
if err != nil {
t.Fatal(err)
}
@ -377,7 +378,7 @@ func TestMigrationWithoutErrors(t *testing.T) {
// Check that version of database and data was properly changed.
afterMigrationFunc := func(d *DB) {
meta, err := d.FetchMeta(nil)
meta, err := d.FetchMeta()
if err != nil {
t.Fatal(err)
}
@ -469,7 +470,7 @@ func TestMigrationDryRun(t *testing.T) {
// Check that version of database version is not modified.
afterMigrationFunc := func(d *DB) {
err := kvdb.View(d, func(tx kvdb.RTx) error {
meta, err := d.FetchMeta(nil)
meta, err := d.FetchMeta()
if err != nil {
t.Fatal(err)
}
@ -574,3 +575,115 @@ func TestApplyOptionalVersions(t *testing.T) {
require.NoError(t, err, "failed to apply optional migration")
require.Equal(t, 1, migrateCount, "expected no migration")
}
// TestFetchMeta tests that the FetchMeta returns the latest DB version for a
// freshly created DB instance.
func TestFetchMeta(t *testing.T) {
t.Parallel()
db, cleanUp, err := MakeTestDB()
defer cleanUp()
require.NoError(t, err)
meta := &Meta{}
err = db.View(func(tx walletdb.ReadTx) error {
return FetchMeta(meta, tx)
}, func() {
meta = &Meta{}
})
require.NoError(t, err)
require.Equal(t, LatestDBVersion(), meta.DbVersionNumber)
}
// TestMarkerAndTombstone tests that markers like a tombstone can be added to a
// DB.
func TestMarkerAndTombstone(t *testing.T) {
t.Parallel()
db, cleanUp, err := MakeTestDB()
defer cleanUp()
require.NoError(t, err)
// Test that a generic marker is not present in a fresh DB.
var marker []byte
err = db.View(func(tx walletdb.ReadTx) error {
var err error
marker, err = CheckMarkerPresent(tx, []byte("foo"))
return err
}, func() {
marker = nil
})
require.ErrorIs(t, err, ErrMarkerNotPresent)
require.Nil(t, marker)
// Only adding the marker bucket should not be enough to be counted as
// a marker, we explicitly also want the value to be set.
err = db.Update(func(tx walletdb.ReadWriteTx) error {
_, err := tx.CreateTopLevelBucket([]byte("foo"))
return err
}, func() {})
require.NoError(t, err)
err = db.View(func(tx walletdb.ReadTx) error {
var err error
marker, err = CheckMarkerPresent(tx, []byte("foo"))
return err
}, func() {
marker = nil
})
require.ErrorIs(t, err, ErrMarkerNotPresent)
require.Nil(t, marker)
// Test that a tombstone marker is not present in a fresh DB.
err = db.View(EnsureNoTombstone, func() {})
require.NoError(t, err)
// Add a generic marker now and assert that it can be read.
err = db.Update(func(tx walletdb.ReadWriteTx) error {
return AddMarker(tx, []byte("foo"), []byte("bar"))
}, func() {})
require.NoError(t, err)
err = db.View(func(tx walletdb.ReadTx) error {
var err error
marker, err = CheckMarkerPresent(tx, []byte("foo"))
return err
}, func() {
marker = nil
})
require.NoError(t, err)
require.Equal(t, []byte("bar"), marker)
// A tombstone should still not be present.
err = db.View(EnsureNoTombstone, func() {})
require.NoError(t, err)
// Finally, add a tombstone.
tombstoneText := []byte("RIP test DB")
err = db.Update(func(tx walletdb.ReadWriteTx) error {
return AddMarker(tx, TombstoneKey, tombstoneText)
}, func() {})
require.NoError(t, err)
// We can read it as a normal marker.
err = db.View(func(tx walletdb.ReadTx) error {
var err error
marker, err = CheckMarkerPresent(tx, TombstoneKey)
return err
}, func() {
marker = nil
})
require.NoError(t, err)
require.Equal(t, tombstoneText, marker)
// But also as a tombstone, and now we should get an error that the DB
// cannot be used anymore.
err = db.View(EnsureNoTombstone, func() {})
require.ErrorContains(t, err, string(tombstoneText))
// Now that the DB has a tombstone, we should no longer be able to open
// it once we close it.
_, err = CreateWithBackend(db.Backend)
require.ErrorContains(t, err, string(tombstoneText))
}

View file

@ -116,6 +116,10 @@ crash](https://github.com/lightningnetwork/lnd/pull/7019).
* Updated the github actions to use `make fmt-check` in its [build
process](https://github.com/lightningnetwork/lnd/pull/6853).
* Database related code was refactored to [allow external tools to use it more
easily](https://github.com/lightningnetwork/lnd/pull/5561), in preparation for
adding a data migration functionality to `lndinit`.
# Contributors (Alphabetical Order)
* Carla Kirk-Cohen

View file

@ -92,7 +92,7 @@ func getKeyVal(kv *KV) ([]byte, []byte) {
return getKey(kv.key), val
}
// BucketKey is a helper functon used in tests to create a bucket key from
// BucketKey is a helper function used in tests to create a bucket key from
// passed bucket list.
func BucketKey(buckets ...string) string {
var bucketKey []byte
@ -130,3 +130,10 @@ func ValueKey(key string, buckets ...string) string {
return string(makeValueKey(bucket, []byte(key)))
}
// SequenceKey is a helper function used in tests or external tools to create a
// sequence key from the passed bucket list.
func SequenceKey(buckets ...string) string {
id := makeBucketID([]byte(BucketKey(buckets...)))
return string(makeSequenceKey(id[:]))
}

34
kvdb/etcd/bucket_test.go Normal file
View file

@ -0,0 +1,34 @@
//go:build kvdb_etcd
// +build kvdb_etcd
package etcd
import (
"crypto/sha256"
"testing"
"github.com/stretchr/testify/require"
)
// TestBucketKey tests that a key for a bucket can be created correctly.
func TestBucketKey(t *testing.T) {
rootID := sha256.Sum256([]byte("@"))
key := append(rootID[:], []byte("foo")...)
key = append(key, 0xff)
require.Equal(t, string(key), BucketKey("foo"))
}
// TestBucketVal tests that a key for a bucket value can be created correctly.
func TestBucketVal(t *testing.T) {
rootID := sha256.Sum256([]byte("@"))
key := append(rootID[:], []byte("foo")...)
key = append(key, 0xff)
keyID := sha256.Sum256(key)
require.Equal(t, string(keyID[:]), BucketVal("foo"))
}
// TestSequenceKey tests that a key for a sequence can be created correctly.
func TestSequenceKey(t *testing.T) {
require.Contains(t, SequenceKey("foo", "bar", "baz"), "$seq$")
}

View file

@ -22,7 +22,7 @@ const (
// the etcd instance.
etcdConnectionTimeout = 10 * time.Second
// etcdLongTimeout is a timeout for longer taking etcd operatons.
// etcdLongTimeout is a timeout for longer taking etcd operations.
etcdLongTimeout = 30 * time.Second
// etcdDefaultRootBucketId is used as the root bucket key. Note that
@ -52,8 +52,8 @@ type commitStatsCollector struct {
fail map[string]*callerStats
}
// newCommitStatsColletor creates a new commitStatsCollector instance.
func newCommitStatsColletor() *commitStatsCollector {
// newCommitStatsCollector creates a new commitStatsCollector instance.
func newCommitStatsCollector() *commitStatsCollector {
return &commitStatsCollector{
succ: make(map[string]*callerStats),
fail: make(map[string]*callerStats),
@ -133,9 +133,10 @@ type db struct {
// Enforce db implements the walletdb.DB interface.
var _ walletdb.DB = (*db)(nil)
// newEtcdBackend returns a db object initialized with the passed backend
// config. If etcd connection cannot be established, then returns error.
func newEtcdBackend(ctx context.Context, cfg Config) (*db, error) {
// NewEtcdClient creates a new etcd v3 API client.
func NewEtcdClient(ctx context.Context, cfg Config) (*clientv3.Client,
context.Context, func(), error) {
clientCfg := clientv3.Config{
Endpoints: []string{cfg.Host},
DialTimeout: etcdConnectionTimeout,
@ -153,7 +154,7 @@ func newEtcdBackend(ctx context.Context, cfg Config) (*db, error) {
tlsConfig, err := tlsInfo.ClientConfig()
if err != nil {
return nil, err
return nil, nil, nil, err
}
clientCfg.TLS = tlsConfig
@ -164,7 +165,7 @@ func newEtcdBackend(ctx context.Context, cfg Config) (*db, error) {
cli, err := clientv3.New(clientCfg)
if err != nil {
cancel()
return nil, err
return nil, nil, nil, err
}
// Apply the namespace.
@ -172,6 +173,17 @@ func newEtcdBackend(ctx context.Context, cfg Config) (*db, error) {
cli.Watcher = namespace.NewWatcher(cli.Watcher, cfg.Namespace)
cli.Lease = namespace.NewLease(cli.Lease, cfg.Namespace)
return cli, ctx, cancel, nil
}
// newEtcdBackend returns a db object initialized with the passed backend
// config. If etcd connection cannot be established, then returns error.
func newEtcdBackend(ctx context.Context, cfg Config) (*db, error) {
cli, ctx, cancel, err := NewEtcdClient(ctx, cfg)
if err != nil {
return nil, err
}
backend := &db{
cfg: cfg,
ctx: ctx,
@ -181,7 +193,7 @@ func newEtcdBackend(ctx context.Context, cfg Config) (*db, error) {
}
if cfg.CollectStats {
backend.commitStatsCollector = newCommitStatsColletor()
backend.commitStatsCollector = newCommitStatsCollector()
}
return backend, nil

View file

@ -12,7 +12,9 @@ import (
"github.com/stretchr/testify/require"
)
func TestCopy(t *testing.T) {
// TestDump tests that the Dump() method creates a one-to-one copy of the
// database content.
func TestDump(t *testing.T) {
t.Parallel()
f := NewEtcdTestFixture(t)
@ -45,6 +47,8 @@ func TestCopy(t *testing.T) {
require.Equal(t, expected, f.Dump())
}
// TestAbortContext tests that an update on the database is aborted if the
// database's main context in cancelled.
func TestAbortContext(t *testing.T) {
t.Parallel()
@ -73,3 +77,26 @@ func TestAbortContext(t *testing.T) {
// No changes in the DB.
require.Equal(t, map[string]string{}, f.Dump())
}
// TestNewEtcdClient tests that an etcd v3 client can be created correctly.
func TestNewEtcdClient(t *testing.T) {
t.Parallel()
f := NewEtcdTestFixture(t)
defer f.Cleanup()
client, ctx, cancel, err := NewEtcdClient(
context.Background(), f.BackendConfig(),
)
require.NoError(t, err)
t.Cleanup(cancel)
_, err = client.Put(ctx, "foo/bar", "baz")
require.NoError(t, err)
resp, err := client.Get(ctx, "foo/bar")
require.NoError(t, err)
require.Len(t, resp.Kvs, 1)
require.Equal(t, "baz", string(resp.Kvs[0].Value))
}

View file

@ -41,8 +41,8 @@ func NewTestEtcdInstance(t *testing.T, path string) (*Config, func()) {
return config, cleanup
}
// NewTestEtcdTestFixture creates a new etcd-test fixture. This is helper
// object to facilitate etcd tests and ensure pre and post conditions.
// NewEtcdTestFixture creates a new etcd-test fixture. This is helper
// object to facilitate etcd tests and ensure pre- and post-conditions.
func NewEtcdTestFixture(t *testing.T) *EtcdTestFixture {
tmpDir := t.TempDir()
@ -128,7 +128,7 @@ func (f *EtcdTestFixture) Dump() map[string]string {
return result
}
// BackendConfig returns the backend config for connecting to theembedded
// BackendConfig returns the backend config for connecting to the embedded
// etcd instance.
func (f *EtcdTestFixture) BackendConfig() Config {
return *f.config

View file

@ -318,7 +318,7 @@ func (b *readWriteBucket) DeleteNestedBucket(key []byte) error {
}
// Put updates the value for the passed key.
// Returns ErrKeyRequred if te passed key is empty.
// Returns ErrKeyRequired if the passed key is empty.
func (b *readWriteBucket) Put(key, value []byte) error {
if len(key) == 0 {
return walletdb.ErrKeyRequired
@ -335,7 +335,7 @@ func (b *readWriteBucket) Put(key, value []byte) error {
}
// Delete deletes the key/value pointed to by the passed key.
// Returns ErrKeyRequred if the passed key is empty.
// Returns ErrKeyRequired if the passed key is empty.
func (b *readWriteBucket) Delete(key []byte) error {
if key == nil {
return nil
@ -360,7 +360,7 @@ func (b *readWriteBucket) Tx() walletdb.ReadWriteTx {
return b.tx
}
// NextSequence returns an autoincrementing sequence number for this bucket.
// NextSequence returns an auto-incrementing sequence number for this bucket.
// Note that this is not a thread safe function and as such it must not be used
// for synchronization.
func (b *readWriteBucket) NextSequence() (uint64, error) {
@ -396,7 +396,7 @@ func (b *readWriteBucket) Sequence() uint64 {
return 0
}
// Otherwise try to parse a 64 bit unsigned integer from the value.
// Otherwise try to parse a 64-bit unsigned integer from the value.
num, _ := strconv.ParseUint(string(val), 10, 64)
return num
@ -414,7 +414,7 @@ func flattenMap(m map[string]struct{}) []string {
return result
}
// Prefetch will prefetch all keys in the passed paths as well as all bucket
// Prefetch will prefetch all keys in the passed-in paths as well as all bucket
// keys along the paths.
func (b *readWriteBucket) Prefetch(paths ...[]string) {
keys := make(map[string]struct{})

View file

@ -12,11 +12,12 @@ import (
)
const (
channelDBName = "channel.db"
macaroonDBName = "macaroons.db"
decayedLogDbName = "sphinxreplay.db"
towerClientDBName = "wtclient.db"
towerServerDBName = "watchtower.db"
ChannelDBName = "channel.db"
MacaroonDBName = "macaroons.db"
DecayedLogDbName = "sphinxreplay.db"
TowerClientDBName = "wtclient.db"
TowerServerDBName = "watchtower.db"
WalletDBName = "wallet.db"
BoltBackend = "bolt"
EtcdBackend = "etcd"
@ -393,7 +394,7 @@ func (db *DB) GetBackends(ctx context.Context, chanDBPath,
// We're using all bbolt based databases by default.
boltBackend, err := kvdb.GetBoltBackend(&kvdb.BoltBackendConfig{
DBPath: chanDBPath,
DBFileName: channelDBName,
DBFileName: ChannelDBName,
DBTimeout: db.Bolt.DBTimeout,
NoFreelistSync: db.Bolt.NoFreelistSync,
AutoCompact: db.Bolt.AutoCompact,
@ -406,7 +407,7 @@ func (db *DB) GetBackends(ctx context.Context, chanDBPath,
macaroonBackend, err := kvdb.GetBoltBackend(&kvdb.BoltBackendConfig{
DBPath: walletDBPath,
DBFileName: macaroonDBName,
DBFileName: MacaroonDBName,
DBTimeout: db.Bolt.DBTimeout,
NoFreelistSync: db.Bolt.NoFreelistSync,
AutoCompact: db.Bolt.AutoCompact,
@ -419,7 +420,7 @@ func (db *DB) GetBackends(ctx context.Context, chanDBPath,
decayedLogBackend, err := kvdb.GetBoltBackend(&kvdb.BoltBackendConfig{
DBPath: chanDBPath,
DBFileName: decayedLogDbName,
DBFileName: DecayedLogDbName,
DBTimeout: db.Bolt.DBTimeout,
NoFreelistSync: db.Bolt.NoFreelistSync,
AutoCompact: db.Bolt.AutoCompact,
@ -437,7 +438,7 @@ func (db *DB) GetBackends(ctx context.Context, chanDBPath,
towerClientBackend, err = kvdb.GetBoltBackend(
&kvdb.BoltBackendConfig{
DBPath: chanDBPath,
DBFileName: towerClientDBName,
DBFileName: TowerClientDBName,
DBTimeout: db.Bolt.DBTimeout,
NoFreelistSync: db.Bolt.NoFreelistSync,
AutoCompact: db.Bolt.AutoCompact,
@ -458,7 +459,7 @@ func (db *DB) GetBackends(ctx context.Context, chanDBPath,
towerServerBackend, err = kvdb.GetBoltBackend(
&kvdb.BoltBackendConfig{
DBPath: towerServerDBPath,
DBFileName: towerServerDBName,
DBFileName: TowerServerDBName,
DBTimeout: db.Bolt.DBTimeout,
NoFreelistSync: db.Bolt.NoFreelistSync,
AutoCompact: db.Bolt.AutoCompact,