From d03047f3131756918192de05d19cf401884955ce Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Fri, 14 Oct 2022 10:49:25 +0200 Subject: [PATCH] watchtower: migrate channel summaries In this commit a migration of the tower client db is done. The migration creates a new top-level cChanDetailsBkt bucket and for each channel found in the old cChanSummaryBkt bucket, it creates a new sub-bucket. In the subbucket, the ClientChanSummary is then stored under the cChannelSummary key. The reason for this migration is that it will be useful in future when we want to store more easily accessible data under a specific client ID. --- watchtower/wtdb/client_db.go | 59 ++++++--- watchtower/wtdb/log.go | 2 + watchtower/wtdb/migration2/client_db.go | 82 ++++++++++++ watchtower/wtdb/migration2/client_db_test.go | 124 +++++++++++++++++++ watchtower/wtdb/migration2/codec.go | 17 +++ watchtower/wtdb/migration2/log.go | 14 +++ watchtower/wtdb/version.go | 4 + 7 files changed, 283 insertions(+), 19 deletions(-) create mode 100644 watchtower/wtdb/migration2/client_db.go create mode 100644 watchtower/wtdb/migration2/client_db_test.go create mode 100644 watchtower/wtdb/migration2/codec.go create mode 100644 watchtower/wtdb/migration2/log.go diff --git a/watchtower/wtdb/client_db.go b/watchtower/wtdb/client_db.go index 25b17a3ed..734aad933 100644 --- a/watchtower/wtdb/client_db.go +++ b/watchtower/wtdb/client_db.go @@ -18,9 +18,17 @@ var ( // tower-id -> reserved-session-key-index (uint32). cSessionKeyIndexBkt = []byte("client-session-key-index-bucket") - // cChanSummaryBkt is a top-level bucket storing: - // channel-id -> encoded ClientChanSummary. - cChanSummaryBkt = []byte("client-channel-summary-bucket") + // cChanDetailsBkt is a top-level bucket storing: + // channel-id => cChannelSummary -> encoded ClientChanSummary. + cChanDetailsBkt = []byte("client-channel-detail-bucket") + + // cChanDBID is a key used in the cChanDetailsBkt to store the + // db-assigned-id of a channel. + cChanDBID = []byte("client-channel-db-id") + + // cChannelSummary is a key used in cChanDetailsBkt to store the encoded + // body of ClientChanSummary. + cChannelSummary = []byte("client-channel-summary") // cSessionBkt is a top-level bucket storing: // session-id => cSessionBody -> encoded ClientSessionBody @@ -67,6 +75,10 @@ var ( // structure deviates from what is expected. ErrCorruptClientSession = errors.New("client session corrupted") + // ErrCorruptChanDetails signals that the clients channel detail's + // on-disk structure deviates from what is expected. + ErrCorruptChanDetails = errors.New("channel details corrupted") + // ErrClientSessionAlreadyExists signals an attempt to reinsert a client // session that has already been created. ErrClientSessionAlreadyExists = errors.New( @@ -198,7 +210,7 @@ func OpenClientDB(db kvdb.Backend) (*ClientDB, error) { func initClientDBBuckets(tx kvdb.RwTx) error { buckets := [][]byte{ cSessionKeyIndexBkt, - cChanSummaryBkt, + cChanDetailsBkt, cSessionBkt, cTowerBkt, cTowerIndexBkt, @@ -883,17 +895,23 @@ func (c *ClientDB) FetchSessionCommittedUpdates(id *SessionID) ( // channel summaries. func (c *ClientDB) FetchChanSummaries() (ChannelSummaries, error) { var summaries map[lnwire.ChannelID]ClientChanSummary + err := kvdb.View(c.db, func(tx kvdb.RTx) error { - chanSummaries := tx.ReadBucket(cChanSummaryBkt) - if chanSummaries == nil { + chanDetailsBkt := tx.ReadBucket(cChanDetailsBkt) + if chanDetailsBkt == nil { return ErrUninitializedDB } - return chanSummaries.ForEach(func(k, v []byte) error { + return chanDetailsBkt.ForEach(func(k, _ []byte) error { + chanDetails := chanDetailsBkt.NestedReadBucket(k) + if chanDetails == nil { + return ErrCorruptChanDetails + } + var chanID lnwire.ChannelID copy(chanID[:], k) - summary, err := getChanSummary(chanSummaries, chanID) + summary, err := getChanSummary(chanDetails) if err != nil { return err } @@ -921,22 +939,27 @@ func (c *ClientDB) RegisterChannel(chanID lnwire.ChannelID, sweepPkScript []byte) error { return kvdb.Update(c.db, func(tx kvdb.RwTx) error { - chanSummaries := tx.ReadWriteBucket(cChanSummaryBkt) - if chanSummaries == nil { + chanDetailsBkt := tx.ReadWriteBucket(cChanDetailsBkt) + if chanDetailsBkt == nil { return ErrUninitializedDB } - chanSummaryBytes := chanSummaries.Get(chanID[:]) - if chanSummaryBytes != nil { + chanDetails := chanDetailsBkt.NestedReadWriteBucket(chanID[:]) + if chanDetails != nil { // Channel is already registered. return ErrChannelAlreadyRegistered } + chanDetails, err := chanDetailsBkt.CreateBucket(chanID[:]) + if err != nil { + return err + } + summary := ClientChanSummary{ SweepPkScript: sweepPkScript, } - return putChanSummary(chanSummaries, chanID, &summary) + return putChanSummary(chanDetails, &summary) }, func() {}) } @@ -1405,10 +1428,8 @@ func markSessionStatus(sessions kvdb.RwBucket, session *ClientSession, } // getChanSummary loads a ClientChanSummary for the passed chanID. -func getChanSummary(chanSummaries kvdb.RBucket, - chanID lnwire.ChannelID) (*ClientChanSummary, error) { - - chanSummaryBytes := chanSummaries.Get(chanID[:]) +func getChanSummary(chanDetails kvdb.RBucket) (*ClientChanSummary, error) { + chanSummaryBytes := chanDetails.Get(cChannelSummary) if chanSummaryBytes == nil { return nil, ErrChannelNotRegistered } @@ -1423,7 +1444,7 @@ func getChanSummary(chanSummaries kvdb.RBucket, } // putChanSummary stores a ClientChanSummary for the passed chanID. -func putChanSummary(chanSummaries kvdb.RwBucket, chanID lnwire.ChannelID, +func putChanSummary(chanDetails kvdb.RwBucket, summary *ClientChanSummary) error { var b bytes.Buffer @@ -1432,7 +1453,7 @@ func putChanSummary(chanSummaries kvdb.RwBucket, chanID lnwire.ChannelID, return err } - return chanSummaries.Put(chanID[:], b.Bytes()) + return chanDetails.Put(cChannelSummary, b.Bytes()) } // getTower loads a Tower identified by its serialized tower id. diff --git a/watchtower/wtdb/log.go b/watchtower/wtdb/log.go index 6ddb6c35f..a25e94b39 100644 --- a/watchtower/wtdb/log.go +++ b/watchtower/wtdb/log.go @@ -4,6 +4,7 @@ import ( "github.com/btcsuite/btclog" "github.com/lightningnetwork/lnd/build" "github.com/lightningnetwork/lnd/watchtower/wtdb/migration1" + "github.com/lightningnetwork/lnd/watchtower/wtdb/migration2" ) // log is a logger that is initialized with no output filters. This @@ -28,6 +29,7 @@ func DisableLog() { func UseLogger(logger btclog.Logger) { log = logger migration1.UseLogger(logger) + migration2.UseLogger(logger) } // logClosure is used to provide a closure over expensive logging operations so diff --git a/watchtower/wtdb/migration2/client_db.go b/watchtower/wtdb/migration2/client_db.go new file mode 100644 index 000000000..ead2b9ea3 --- /dev/null +++ b/watchtower/wtdb/migration2/client_db.go @@ -0,0 +1,82 @@ +package migration2 + +import ( + "errors" + + "github.com/lightningnetwork/lnd/kvdb" +) + +var ( + // cChanSummaryBkt is a top-level bucket storing: + // channel-id -> encoded ClientChanSummary. + cChanSummaryBkt = []byte("client-channel-summary-bucket") + + // cChanDetailsBkt is a top-level bucket storing: + // channel-id => cChannelSummary -> encoded ClientChanSummary. + cChanDetailsBkt = []byte("client-channel-detail-bucket") + + // cChannelSummary is a key used in cChanDetailsBkt to store the encoded + // body of ClientChanSummary. + cChannelSummary = []byte("client-channel-summary") + + // ErrUninitializedDB signals that top-level buckets for the database + // have not been initialized. + ErrUninitializedDB = errors.New("db not initialized") + + // ErrCorruptChanSummary signals that the clients channel summary's + // on-disk structure deviates from what is expected. + ErrCorruptChanSummary = errors.New("channel summary corrupted") +) + +// MigrateClientChannelDetails creates a new channel-details bucket that uses +// channel IDs as sub-buckets where the channel summaries are moved to from the +// channel summary bucket. If the migration is successful then the channel +// summary bucket is deleted. +func MigrateClientChannelDetails(tx kvdb.RwTx) error { + log.Infof("Migrating the tower client db to move the channel " + + "summaries to the new channel-details bucket") + + // Create the new top level cChanDetailsBkt. + chanDetailsBkt, err := tx.CreateTopLevelBucket(cChanDetailsBkt) + if err != nil { + return err + } + + // Get the top-level channel summaries bucket. + chanSummaryBkt := tx.ReadWriteBucket(cChanSummaryBkt) + if chanSummaryBkt == nil { + return ErrUninitializedDB + } + + // Iterate over the cChanSummaryBkt's keys. Each key is a channel-id. + // For each of these, create a new sub-bucket with this key in + // cChanDetailsBkt. In this sub-bucket, add the cChannelSummary key with + // the encoded ClientChanSummary as the value. + err = chanSummaryBkt.ForEach(func(chanID, summary []byte) error { + // Force the migration to fail if the summary is empty. This + // should never be the case, but it is added so that we can + // force the migration to fail in a test so that we can test + // that the db remains unaffected if a migration failure takes + // place. + if len(summary) == 0 { + return ErrCorruptChanSummary + } + + // Create a new sub-bucket in the channel details bucket using + // this channel ID. + channelBkt, err := chanDetailsBkt.CreateBucket(chanID) + if err != nil { + return err + } + + // Add the encoded channel summary in the new bucket under the + // channel-summary key. + return channelBkt.Put(cChannelSummary, summary) + }) + if err != nil { + return err + } + + // Now delete the cChanSummaryBkt from the DB. + return tx.DeleteTopLevelBucket(cChanSummaryBkt) +} diff --git a/watchtower/wtdb/migration2/client_db_test.go b/watchtower/wtdb/migration2/client_db_test.go new file mode 100644 index 000000000..c1436184f --- /dev/null +++ b/watchtower/wtdb/migration2/client_db_test.go @@ -0,0 +1,124 @@ +package migration2 + +import ( + "encoding/binary" + "testing" + + "github.com/lightningnetwork/lnd/channeldb/migtest" + "github.com/lightningnetwork/lnd/kvdb" + "github.com/stretchr/testify/require" +) + +var ( + + // pre is the expected data in the cChanSummaryBkt bucket before the + // migration. + pre = map[string]interface{}{ + channelIDString(1): string([]byte{1, 2, 3}), + channelIDString(2): string([]byte{3, 4, 5}), + } + + // pre should fail the migration due to no channel summary being found + // for a given channel ID. + preFailCorruptDB = map[string]interface{}{ + channelIDString(1): string([]byte{1, 2, 3}), + channelIDString(2): "", + } + + // post is the expected data after migration. + post = map[string]interface{}{ + channelIDString(1): map[string]interface{}{ + string(cChannelSummary): string([]byte{1, 2, 3}), + }, + channelIDString(2): map[string]interface{}{ + string(cChannelSummary): string([]byte{3, 4, 5}), + }, + } +) + +// TestMigrateClientChannelDetails tests that the MigrateClientChannelDetails +// function correctly moves channel-summaries from the channel-summaries bucket +// to the new channel-details bucket. +func TestMigrateClientChannelDetails(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + shouldFail bool + pre map[string]interface{} + post map[string]interface{} + }{ + { + name: "migration ok", + shouldFail: false, + pre: pre, + post: post, + }, + { + name: "fail due to corrupt db", + shouldFail: true, + pre: preFailCorruptDB, + post: nil, + }, + { + name: "no channel summaries", + shouldFail: false, + pre: nil, + post: nil, + }, + } + + for _, test := range tests { + test := test + + t.Run(test.name, func(t *testing.T) { + t.Parallel() + + // Before the migration we have a channel summary + // bucket. + before := func(tx kvdb.RwTx) error { + return migtest.RestoreDB( + tx, cChanSummaryBkt, test.pre, + ) + } + + // After the migration, we should have a new channel + // details bucket and no longer have a channel summary + // bucket. + after := func(tx kvdb.RwTx) error { + // If we expect our migration to fail, we + // expect our channel summary bucket to remain + // intact. + if test.shouldFail { + return migtest.VerifyDB( + tx, cChanSummaryBkt, test.pre, + ) + } + + // Otherwise, we expect the channel summary + // bucket to be deleted. + err := migtest.VerifyDB( + tx, cChanSummaryBkt, test.pre, + ) + require.ErrorContains(t, err, "not found") + + // We also expect the new channel details bucket + // to be present. + return migtest.VerifyDB( + tx, cChanDetailsBkt, test.post, + ) + } + + migtest.ApplyMigration( + t, before, after, MigrateClientChannelDetails, + test.shouldFail, + ) + }) + } +} + +func channelIDString(id uint64) string { + var chanID ChannelID + binary.BigEndian.PutUint64(chanID[:], id) + return chanID.String() +} diff --git a/watchtower/wtdb/migration2/codec.go b/watchtower/wtdb/migration2/codec.go new file mode 100644 index 000000000..e3107ba8b --- /dev/null +++ b/watchtower/wtdb/migration2/codec.go @@ -0,0 +1,17 @@ +package migration2 + +import "encoding/hex" + +// ChannelID is a series of 32-bytes that uniquely identifies all channels +// within the network. The ChannelID is computed using the outpoint of the +// funding transaction (the txid, and output index). Given a funding output the +// ChannelID can be calculated by XOR'ing the big-endian serialization of the +// txid and the big-endian serialization of the output index, truncated to +// 2 bytes. +type ChannelID [32]byte + +// String returns the string representation of the ChannelID. This is just the +// hex string encoding of the ChannelID itself. +func (c ChannelID) String() string { + return hex.EncodeToString(c[:]) +} diff --git a/watchtower/wtdb/migration2/log.go b/watchtower/wtdb/migration2/log.go new file mode 100644 index 000000000..fd64d2fcb --- /dev/null +++ b/watchtower/wtdb/migration2/log.go @@ -0,0 +1,14 @@ +package migration2 + +import ( + "github.com/btcsuite/btclog" +) + +// log is a logger that is initialized as disabled. This means the package will +// not perform any logging by default until a logger is set. +var log = btclog.Disabled + +// UseLogger uses a specified Logger to output package logging info. +func UseLogger(logger btclog.Logger) { + log = logger +} diff --git a/watchtower/wtdb/version.go b/watchtower/wtdb/version.go index 4785b0ae2..0526817db 100644 --- a/watchtower/wtdb/version.go +++ b/watchtower/wtdb/version.go @@ -4,6 +4,7 @@ import ( "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/watchtower/wtdb/migration1" + "github.com/lightningnetwork/lnd/watchtower/wtdb/migration2" ) // migration is a function which takes a prior outdated version of the database @@ -29,6 +30,9 @@ var clientDBVersions = []version{ { migration: migration1.MigrateTowerToSessionIndex, }, + { + migration: migration2.MigrateClientChannelDetails, + }, } // getLatestDBVersion returns the last known database version.