diff --git a/watchtower/wtdb/client_db.go b/watchtower/wtdb/client_db.go index 25b17a3ed..734aad933 100644 --- a/watchtower/wtdb/client_db.go +++ b/watchtower/wtdb/client_db.go @@ -18,9 +18,17 @@ var ( // tower-id -> reserved-session-key-index (uint32). cSessionKeyIndexBkt = []byte("client-session-key-index-bucket") - // cChanSummaryBkt is a top-level bucket storing: - // channel-id -> encoded ClientChanSummary. - cChanSummaryBkt = []byte("client-channel-summary-bucket") + // cChanDetailsBkt is a top-level bucket storing: + // channel-id => cChannelSummary -> encoded ClientChanSummary. + cChanDetailsBkt = []byte("client-channel-detail-bucket") + + // cChanDBID is a key used in the cChanDetailsBkt to store the + // db-assigned-id of a channel. + cChanDBID = []byte("client-channel-db-id") + + // cChannelSummary is a key used in cChanDetailsBkt to store the encoded + // body of ClientChanSummary. + cChannelSummary = []byte("client-channel-summary") // cSessionBkt is a top-level bucket storing: // session-id => cSessionBody -> encoded ClientSessionBody @@ -67,6 +75,10 @@ var ( // structure deviates from what is expected. ErrCorruptClientSession = errors.New("client session corrupted") + // ErrCorruptChanDetails signals that the clients channel detail's + // on-disk structure deviates from what is expected. + ErrCorruptChanDetails = errors.New("channel details corrupted") + // ErrClientSessionAlreadyExists signals an attempt to reinsert a client // session that has already been created. ErrClientSessionAlreadyExists = errors.New( @@ -198,7 +210,7 @@ func OpenClientDB(db kvdb.Backend) (*ClientDB, error) { func initClientDBBuckets(tx kvdb.RwTx) error { buckets := [][]byte{ cSessionKeyIndexBkt, - cChanSummaryBkt, + cChanDetailsBkt, cSessionBkt, cTowerBkt, cTowerIndexBkt, @@ -883,17 +895,23 @@ func (c *ClientDB) FetchSessionCommittedUpdates(id *SessionID) ( // channel summaries. func (c *ClientDB) FetchChanSummaries() (ChannelSummaries, error) { var summaries map[lnwire.ChannelID]ClientChanSummary + err := kvdb.View(c.db, func(tx kvdb.RTx) error { - chanSummaries := tx.ReadBucket(cChanSummaryBkt) - if chanSummaries == nil { + chanDetailsBkt := tx.ReadBucket(cChanDetailsBkt) + if chanDetailsBkt == nil { return ErrUninitializedDB } - return chanSummaries.ForEach(func(k, v []byte) error { + return chanDetailsBkt.ForEach(func(k, _ []byte) error { + chanDetails := chanDetailsBkt.NestedReadBucket(k) + if chanDetails == nil { + return ErrCorruptChanDetails + } + var chanID lnwire.ChannelID copy(chanID[:], k) - summary, err := getChanSummary(chanSummaries, chanID) + summary, err := getChanSummary(chanDetails) if err != nil { return err } @@ -921,22 +939,27 @@ func (c *ClientDB) RegisterChannel(chanID lnwire.ChannelID, sweepPkScript []byte) error { return kvdb.Update(c.db, func(tx kvdb.RwTx) error { - chanSummaries := tx.ReadWriteBucket(cChanSummaryBkt) - if chanSummaries == nil { + chanDetailsBkt := tx.ReadWriteBucket(cChanDetailsBkt) + if chanDetailsBkt == nil { return ErrUninitializedDB } - chanSummaryBytes := chanSummaries.Get(chanID[:]) - if chanSummaryBytes != nil { + chanDetails := chanDetailsBkt.NestedReadWriteBucket(chanID[:]) + if chanDetails != nil { // Channel is already registered. return ErrChannelAlreadyRegistered } + chanDetails, err := chanDetailsBkt.CreateBucket(chanID[:]) + if err != nil { + return err + } + summary := ClientChanSummary{ SweepPkScript: sweepPkScript, } - return putChanSummary(chanSummaries, chanID, &summary) + return putChanSummary(chanDetails, &summary) }, func() {}) } @@ -1405,10 +1428,8 @@ func markSessionStatus(sessions kvdb.RwBucket, session *ClientSession, } // getChanSummary loads a ClientChanSummary for the passed chanID. -func getChanSummary(chanSummaries kvdb.RBucket, - chanID lnwire.ChannelID) (*ClientChanSummary, error) { - - chanSummaryBytes := chanSummaries.Get(chanID[:]) +func getChanSummary(chanDetails kvdb.RBucket) (*ClientChanSummary, error) { + chanSummaryBytes := chanDetails.Get(cChannelSummary) if chanSummaryBytes == nil { return nil, ErrChannelNotRegistered } @@ -1423,7 +1444,7 @@ func getChanSummary(chanSummaries kvdb.RBucket, } // putChanSummary stores a ClientChanSummary for the passed chanID. -func putChanSummary(chanSummaries kvdb.RwBucket, chanID lnwire.ChannelID, +func putChanSummary(chanDetails kvdb.RwBucket, summary *ClientChanSummary) error { var b bytes.Buffer @@ -1432,7 +1453,7 @@ func putChanSummary(chanSummaries kvdb.RwBucket, chanID lnwire.ChannelID, return err } - return chanSummaries.Put(chanID[:], b.Bytes()) + return chanDetails.Put(cChannelSummary, b.Bytes()) } // getTower loads a Tower identified by its serialized tower id. diff --git a/watchtower/wtdb/log.go b/watchtower/wtdb/log.go index 6ddb6c35f..a25e94b39 100644 --- a/watchtower/wtdb/log.go +++ b/watchtower/wtdb/log.go @@ -4,6 +4,7 @@ import ( "github.com/btcsuite/btclog" "github.com/lightningnetwork/lnd/build" "github.com/lightningnetwork/lnd/watchtower/wtdb/migration1" + "github.com/lightningnetwork/lnd/watchtower/wtdb/migration2" ) // log is a logger that is initialized with no output filters. This @@ -28,6 +29,7 @@ func DisableLog() { func UseLogger(logger btclog.Logger) { log = logger migration1.UseLogger(logger) + migration2.UseLogger(logger) } // logClosure is used to provide a closure over expensive logging operations so diff --git a/watchtower/wtdb/migration2/client_db.go b/watchtower/wtdb/migration2/client_db.go new file mode 100644 index 000000000..ead2b9ea3 --- /dev/null +++ b/watchtower/wtdb/migration2/client_db.go @@ -0,0 +1,82 @@ +package migration2 + +import ( + "errors" + + "github.com/lightningnetwork/lnd/kvdb" +) + +var ( + // cChanSummaryBkt is a top-level bucket storing: + // channel-id -> encoded ClientChanSummary. + cChanSummaryBkt = []byte("client-channel-summary-bucket") + + // cChanDetailsBkt is a top-level bucket storing: + // channel-id => cChannelSummary -> encoded ClientChanSummary. + cChanDetailsBkt = []byte("client-channel-detail-bucket") + + // cChannelSummary is a key used in cChanDetailsBkt to store the encoded + // body of ClientChanSummary. + cChannelSummary = []byte("client-channel-summary") + + // ErrUninitializedDB signals that top-level buckets for the database + // have not been initialized. + ErrUninitializedDB = errors.New("db not initialized") + + // ErrCorruptChanSummary signals that the clients channel summary's + // on-disk structure deviates from what is expected. + ErrCorruptChanSummary = errors.New("channel summary corrupted") +) + +// MigrateClientChannelDetails creates a new channel-details bucket that uses +// channel IDs as sub-buckets where the channel summaries are moved to from the +// channel summary bucket. If the migration is successful then the channel +// summary bucket is deleted. +func MigrateClientChannelDetails(tx kvdb.RwTx) error { + log.Infof("Migrating the tower client db to move the channel " + + "summaries to the new channel-details bucket") + + // Create the new top level cChanDetailsBkt. + chanDetailsBkt, err := tx.CreateTopLevelBucket(cChanDetailsBkt) + if err != nil { + return err + } + + // Get the top-level channel summaries bucket. + chanSummaryBkt := tx.ReadWriteBucket(cChanSummaryBkt) + if chanSummaryBkt == nil { + return ErrUninitializedDB + } + + // Iterate over the cChanSummaryBkt's keys. Each key is a channel-id. + // For each of these, create a new sub-bucket with this key in + // cChanDetailsBkt. In this sub-bucket, add the cChannelSummary key with + // the encoded ClientChanSummary as the value. + err = chanSummaryBkt.ForEach(func(chanID, summary []byte) error { + // Force the migration to fail if the summary is empty. This + // should never be the case, but it is added so that we can + // force the migration to fail in a test so that we can test + // that the db remains unaffected if a migration failure takes + // place. + if len(summary) == 0 { + return ErrCorruptChanSummary + } + + // Create a new sub-bucket in the channel details bucket using + // this channel ID. + channelBkt, err := chanDetailsBkt.CreateBucket(chanID) + if err != nil { + return err + } + + // Add the encoded channel summary in the new bucket under the + // channel-summary key. + return channelBkt.Put(cChannelSummary, summary) + }) + if err != nil { + return err + } + + // Now delete the cChanSummaryBkt from the DB. + return tx.DeleteTopLevelBucket(cChanSummaryBkt) +} diff --git a/watchtower/wtdb/migration2/client_db_test.go b/watchtower/wtdb/migration2/client_db_test.go new file mode 100644 index 000000000..c1436184f --- /dev/null +++ b/watchtower/wtdb/migration2/client_db_test.go @@ -0,0 +1,124 @@ +package migration2 + +import ( + "encoding/binary" + "testing" + + "github.com/lightningnetwork/lnd/channeldb/migtest" + "github.com/lightningnetwork/lnd/kvdb" + "github.com/stretchr/testify/require" +) + +var ( + + // pre is the expected data in the cChanSummaryBkt bucket before the + // migration. + pre = map[string]interface{}{ + channelIDString(1): string([]byte{1, 2, 3}), + channelIDString(2): string([]byte{3, 4, 5}), + } + + // pre should fail the migration due to no channel summary being found + // for a given channel ID. + preFailCorruptDB = map[string]interface{}{ + channelIDString(1): string([]byte{1, 2, 3}), + channelIDString(2): "", + } + + // post is the expected data after migration. + post = map[string]interface{}{ + channelIDString(1): map[string]interface{}{ + string(cChannelSummary): string([]byte{1, 2, 3}), + }, + channelIDString(2): map[string]interface{}{ + string(cChannelSummary): string([]byte{3, 4, 5}), + }, + } +) + +// TestMigrateClientChannelDetails tests that the MigrateClientChannelDetails +// function correctly moves channel-summaries from the channel-summaries bucket +// to the new channel-details bucket. +func TestMigrateClientChannelDetails(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + shouldFail bool + pre map[string]interface{} + post map[string]interface{} + }{ + { + name: "migration ok", + shouldFail: false, + pre: pre, + post: post, + }, + { + name: "fail due to corrupt db", + shouldFail: true, + pre: preFailCorruptDB, + post: nil, + }, + { + name: "no channel summaries", + shouldFail: false, + pre: nil, + post: nil, + }, + } + + for _, test := range tests { + test := test + + t.Run(test.name, func(t *testing.T) { + t.Parallel() + + // Before the migration we have a channel summary + // bucket. + before := func(tx kvdb.RwTx) error { + return migtest.RestoreDB( + tx, cChanSummaryBkt, test.pre, + ) + } + + // After the migration, we should have a new channel + // details bucket and no longer have a channel summary + // bucket. + after := func(tx kvdb.RwTx) error { + // If we expect our migration to fail, we + // expect our channel summary bucket to remain + // intact. + if test.shouldFail { + return migtest.VerifyDB( + tx, cChanSummaryBkt, test.pre, + ) + } + + // Otherwise, we expect the channel summary + // bucket to be deleted. + err := migtest.VerifyDB( + tx, cChanSummaryBkt, test.pre, + ) + require.ErrorContains(t, err, "not found") + + // We also expect the new channel details bucket + // to be present. + return migtest.VerifyDB( + tx, cChanDetailsBkt, test.post, + ) + } + + migtest.ApplyMigration( + t, before, after, MigrateClientChannelDetails, + test.shouldFail, + ) + }) + } +} + +func channelIDString(id uint64) string { + var chanID ChannelID + binary.BigEndian.PutUint64(chanID[:], id) + return chanID.String() +} diff --git a/watchtower/wtdb/migration2/codec.go b/watchtower/wtdb/migration2/codec.go new file mode 100644 index 000000000..e3107ba8b --- /dev/null +++ b/watchtower/wtdb/migration2/codec.go @@ -0,0 +1,17 @@ +package migration2 + +import "encoding/hex" + +// ChannelID is a series of 32-bytes that uniquely identifies all channels +// within the network. The ChannelID is computed using the outpoint of the +// funding transaction (the txid, and output index). Given a funding output the +// ChannelID can be calculated by XOR'ing the big-endian serialization of the +// txid and the big-endian serialization of the output index, truncated to +// 2 bytes. +type ChannelID [32]byte + +// String returns the string representation of the ChannelID. This is just the +// hex string encoding of the ChannelID itself. +func (c ChannelID) String() string { + return hex.EncodeToString(c[:]) +} diff --git a/watchtower/wtdb/migration2/log.go b/watchtower/wtdb/migration2/log.go new file mode 100644 index 000000000..fd64d2fcb --- /dev/null +++ b/watchtower/wtdb/migration2/log.go @@ -0,0 +1,14 @@ +package migration2 + +import ( + "github.com/btcsuite/btclog" +) + +// log is a logger that is initialized as disabled. This means the package will +// not perform any logging by default until a logger is set. +var log = btclog.Disabled + +// UseLogger uses a specified Logger to output package logging info. +func UseLogger(logger btclog.Logger) { + log = logger +} diff --git a/watchtower/wtdb/version.go b/watchtower/wtdb/version.go index 4785b0ae2..0526817db 100644 --- a/watchtower/wtdb/version.go +++ b/watchtower/wtdb/version.go @@ -4,6 +4,7 @@ import ( "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/watchtower/wtdb/migration1" + "github.com/lightningnetwork/lnd/watchtower/wtdb/migration2" ) // migration is a function which takes a prior outdated version of the database @@ -29,6 +30,9 @@ var clientDBVersions = []version{ { migration: migration1.MigrateTowerToSessionIndex, }, + { + migration: migration2.MigrateClientChannelDetails, + }, } // getLatestDBVersion returns the last known database version.