From fdba28ab7da35e78aaf9a6442347db14969ed264 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Sat, 15 Oct 2022 17:04:29 +0200 Subject: [PATCH] watchtower: add channelID index In this commit, a new channel-ID index is added to the tower client db with the help of a migration. This index holds a mapping from a db-assigned-ID (a uint64 encoded using BigSize encoding) to real channel ID (32 bytes). This mapping will help us save space in future when persisting references to channels. --- watchtower/wtdb/client_db.go | 85 +++++++++++ watchtower/wtdb/codec_test.go | 6 +- watchtower/wtdb/log.go | 2 + watchtower/wtdb/migration3/client_db.go | 104 +++++++++++++ watchtower/wtdb/migration3/client_db_test.go | 149 +++++++++++++++++++ watchtower/wtdb/migration3/codec.go | 19 +++ watchtower/wtdb/migration3/log.go | 14 ++ watchtower/wtdb/version.go | 4 + 8 files changed, 380 insertions(+), 3 deletions(-) create mode 100644 watchtower/wtdb/migration3/client_db.go create mode 100644 watchtower/wtdb/migration3/client_db_test.go create mode 100644 watchtower/wtdb/migration3/codec.go create mode 100644 watchtower/wtdb/migration3/log.go diff --git a/watchtower/wtdb/client_db.go b/watchtower/wtdb/client_db.go index 734aad933..4b381bcc8 100644 --- a/watchtower/wtdb/client_db.go +++ b/watchtower/wtdb/client_db.go @@ -10,6 +10,7 @@ import ( "github.com/btcsuite/btcd/btcec/v2" "github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/tlv" "github.com/lightningnetwork/lnd/watchtower/blob" ) @@ -20,6 +21,7 @@ var ( // cChanDetailsBkt is a top-level bucket storing: // channel-id => cChannelSummary -> encoded ClientChanSummary. + // => cChanDBID -> db-assigned-id cChanDetailsBkt = []byte("client-channel-detail-bucket") // cChanDBID is a key used in the cChanDetailsBkt to store the @@ -48,6 +50,10 @@ var ( // seqnum -> encoded BackupID. cSessionAcks = []byte("client-session-acks") + // cChanIDIndexBkt is a top-level bucket storing: + // db-assigned-id -> channel-ID + cChanIDIndexBkt = []byte("client-channel-id-index") + // cTowerBkt is a top-level bucket storing: // tower-id -> encoded Tower. cTowerBkt = []byte("client-tower-bucket") @@ -215,6 +221,7 @@ func initClientDBBuckets(tx kvdb.RwTx) error { cTowerBkt, cTowerIndexBkt, cTowerToSessionIndexBkt, + cChanIDIndexBkt, } for _, bucket := range buckets { @@ -955,6 +962,37 @@ func (c *ClientDB) RegisterChannel(chanID lnwire.ChannelID, return err } + // Get the channel-id-index bucket. + indexBkt := tx.ReadWriteBucket(cChanIDIndexBkt) + if indexBkt == nil { + return ErrUninitializedDB + } + + // Request the next unique id from the bucket. + nextSeq, err := indexBkt.NextSequence() + if err != nil { + return err + } + + // Use BigSize encoding to encode the db-assigned index. + newIndex, err := writeBigSize(nextSeq) + if err != nil { + return err + } + + // Add the new db-assigned ID to channel-ID pair. + err = indexBkt.Put(newIndex, chanID[:]) + if err != nil { + return err + } + + // Add the db-assigned ID to the channel's channel details + // bucket under the cChanDBID key. + err = chanDetails.Put(cChanDBID, newIndex) + if err != nil { + return err + } + summary := ClientChanSummary{ SweepPkScript: sweepPkScript, } @@ -1484,3 +1522,50 @@ func putTower(towers kvdb.RwBucket, tower *Tower) error { return towers.Put(tower.ID.Bytes(), b.Bytes()) } + +// getDBChanID returns the db-assigned channel ID for the given real channel ID. +// It returns both the uint64 and byte representation. +func getDBChanID(chanDetailsBkt kvdb.RBucket, chanID lnwire.ChannelID) (uint64, + []byte, error) { + + chanDetails := chanDetailsBkt.NestedReadBucket(chanID[:]) + if chanDetails == nil { + return 0, nil, ErrChannelNotRegistered + } + + idBytes := chanDetails.Get(cChanDBID) + if len(idBytes) == 0 { + return 0, nil, fmt.Errorf("no db-assigned ID found for "+ + "channel ID %s", chanID) + } + + id, err := readBigSize(idBytes) + if err != nil { + return 0, nil, err + } + + return id, idBytes, nil +} + +// writeBigSize will encode the given uint64 as a BigSize byte slice. +func writeBigSize(i uint64) ([]byte, error) { + var b bytes.Buffer + err := tlv.WriteVarInt(&b, i, &[8]byte{}) + if err != nil { + return nil, err + } + + return b.Bytes(), nil +} + +// readBigSize converts the given byte slice into a uint64 and assumes that the +// bytes slice is using BigSize encoding. +func readBigSize(b []byte) (uint64, error) { + r := bytes.NewReader(b) + i, err := tlv.ReadVarInt(r, &[8]byte{}) + if err != nil { + return 0, err + } + + return i, nil +} diff --git a/watchtower/wtdb/codec_test.go b/watchtower/wtdb/codec_test.go index c2628b86a..228d2a0ec 100644 --- a/watchtower/wtdb/codec_test.go +++ b/watchtower/wtdb/codec_test.go @@ -125,9 +125,9 @@ type dbObject interface { Decode(io.Reader) error } -// TestCodec serializes and deserializes wtdb objects in order to test that that -// the codec understands all of the required field types. The test also asserts -// that decoding an object into another results in an equivalent object. +// TestCodec serializes and deserializes wtdb objects in order to test that the +// codec understands all of the required field types. The test also asserts that +// decoding an object into another results in an equivalent object. func TestCodec(tt *testing.T) { var t *testing.T diff --git a/watchtower/wtdb/log.go b/watchtower/wtdb/log.go index a25e94b39..63be43d28 100644 --- a/watchtower/wtdb/log.go +++ b/watchtower/wtdb/log.go @@ -5,6 +5,7 @@ import ( "github.com/lightningnetwork/lnd/build" "github.com/lightningnetwork/lnd/watchtower/wtdb/migration1" "github.com/lightningnetwork/lnd/watchtower/wtdb/migration2" + "github.com/lightningnetwork/lnd/watchtower/wtdb/migration3" ) // log is a logger that is initialized with no output filters. This @@ -30,6 +31,7 @@ func UseLogger(logger btclog.Logger) { log = logger migration1.UseLogger(logger) migration2.UseLogger(logger) + migration3.UseLogger(logger) } // logClosure is used to provide a closure over expensive logging operations so diff --git a/watchtower/wtdb/migration3/client_db.go b/watchtower/wtdb/migration3/client_db.go new file mode 100644 index 000000000..334a89abe --- /dev/null +++ b/watchtower/wtdb/migration3/client_db.go @@ -0,0 +1,104 @@ +package migration3 + +import ( + "bytes" + "encoding/binary" + "errors" + + "github.com/lightningnetwork/lnd/kvdb" + "github.com/lightningnetwork/lnd/tlv" +) + +var ( + // cChanDetailsBkt is a top-level bucket storing: + // channel-id => cChannelSummary -> encoded ClientChanSummary. + // => cChanDBID -> db-assigned-id + cChanDetailsBkt = []byte("client-channel-detail-bucket") + + // cChanDBID is a key used in the cChanDetailsBkt to store the + // db-assigned-id of a channel. + cChanDBID = []byte("client-channel-db-id") + + // cChanIDIndexBkt is a top-level bucket storing: + // db-assigned-id -> channel-ID + cChanIDIndexBkt = []byte("client-channel-id-index") + + // cChannelSummary is a key used in cChanDetailsBkt to store the encoded + // body of ClientChanSummary. + cChannelSummary = []byte("client-channel-summary") + + // ErrUninitializedDB signals that top-level buckets for the database + // have not been initialized. + ErrUninitializedDB = errors.New("db not initialized") + + // ErrCorruptChanDetails signals that the clients channel detail's + // on-disk structure deviates from what is expected. + ErrCorruptChanDetails = errors.New("channel details corrupted") + + byteOrder = binary.BigEndian +) + +// MigrateChannelIDIndex adds a new channel ID index to the tower client db. +// This index is a mapping from db-assigned ID (a uint64 encoded using BigSize +// encoding) to real channel ID (32 bytes). This mapping will allow us to +// persist channel pointers with fewer bytes in the future. +func MigrateChannelIDIndex(tx kvdb.RwTx) error { + log.Infof("Migrating the tower client db to add a new channel ID " + + "index which stores a mapping from db-assigned ID to real " + + "channel ID") + + // Create a new top-level bucket for the new index. + indexBkt, err := tx.CreateTopLevelBucket(cChanIDIndexBkt) + if err != nil { + return err + } + + // Get the top-level channel-details bucket. The keys of this bucket + // are the real channel IDs. + chanDetailsBkt := tx.ReadWriteBucket(cChanDetailsBkt) + if chanDetailsBkt == nil { + return ErrUninitializedDB + } + + // Iterate over the keys of the channel-details bucket. + return chanDetailsBkt.ForEach(func(chanID, _ []byte) error { + // Ask the db for a new, unique, ID for the index bucket. + nextSeq, err := indexBkt.NextSequence() + if err != nil { + return err + } + + // Encode the sequence number using BigSize encoding. + var newIndex bytes.Buffer + err = tlv.WriteVarInt(&newIndex, nextSeq, &[8]byte{}) + if err != nil { + return err + } + + // Add the mapping from the db-assigned ID to the channel ID + // to the new index. + newIndexBytes := newIndex.Bytes() + err = indexBkt.Put(newIndexBytes, chanID) + if err != nil { + return err + } + + chanDetails := chanDetailsBkt.NestedReadWriteBucket(chanID) + if chanDetails == nil { + return ErrCorruptChanDetails + } + + // Here we ensure that the channel-details bucket includes a + // channel summary. The only reason we do this is so that we can + // simulate a migration fail in a test to ensure that a + // migration fail results in an untouched db. + chanSummaryBytes := chanDetails.Get(cChannelSummary) + if chanSummaryBytes == nil { + return ErrCorruptChanDetails + } + + // In the channel-details sub-bucket for this channel, add the + // new DB-assigned ID for this channel under the cChanDBID key. + return chanDetails.Put(cChanDBID, newIndexBytes) + }) +} diff --git a/watchtower/wtdb/migration3/client_db_test.go b/watchtower/wtdb/migration3/client_db_test.go new file mode 100644 index 000000000..ffe0ee31f --- /dev/null +++ b/watchtower/wtdb/migration3/client_db_test.go @@ -0,0 +1,149 @@ +package migration3 + +import ( + "bytes" + "testing" + + "github.com/lightningnetwork/lnd/channeldb/migtest" + "github.com/lightningnetwork/lnd/kvdb" + "github.com/lightningnetwork/lnd/tlv" +) + +var ( + + // pre is the expected data in the cChanDetailsBkt bucket before the + // migration. + pre = map[string]interface{}{ + channelIDString(100): map[string]interface{}{ + string(cChannelSummary): string([]byte{1, 2, 3}), + }, + channelIDString(222): map[string]interface{}{ + string(cChannelSummary): string([]byte{4, 5, 6}), + }, + } + + // preFailCorruptDB should fail the migration due to no channel summary + // being found for a given channel ID. + preFailCorruptDB = map[string]interface{}{ + channelIDString(100): map[string]interface{}{}, + } + + // post is the expected data in the new index after migration. + postIndex = map[string]interface{}{ + indexToString(1): channelIDString(100), + indexToString(2): channelIDString(222), + } + + // postDetails is the expected data in the cChanDetailsBkt bucket after + // the migration. + postDetails = map[string]interface{}{ + channelIDString(100): map[string]interface{}{ + string(cChannelSummary): string([]byte{1, 2, 3}), + string(cChanDBID): indexToString(1), + }, + channelIDString(222): map[string]interface{}{ + string(cChannelSummary): string([]byte{4, 5, 6}), + string(cChanDBID): indexToString(2), + }, + } +) + +// TestMigrateChannelIDIndex tests that the MigrateChannelIDIndex function +// correctly adds a new channel-id index to the DB and also correctly updates +// the existing channel-details bucket. +func TestMigrateChannelIDIndex(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + shouldFail bool + pre map[string]interface{} + postDetails map[string]interface{} + postIndex map[string]interface{} + }{ + { + name: "migration ok", + shouldFail: false, + pre: pre, + postDetails: postDetails, + postIndex: postIndex, + }, + { + name: "fail due to corrupt db", + shouldFail: true, + pre: preFailCorruptDB, + }, + { + name: "no channel details", + shouldFail: false, + pre: nil, + postDetails: nil, + postIndex: nil, + }, + } + + for _, test := range tests { + test := test + + t.Run(test.name, func(t *testing.T) { + t.Parallel() + + // Before the migration we have a details bucket. + before := func(tx kvdb.RwTx) error { + return migtest.RestoreDB( + tx, cChanDetailsBkt, test.pre, + ) + } + + // After the migration, we should have an untouched + // summary bucket and a new index bucket. + after := func(tx kvdb.RwTx) error { + // If the migration fails, the details bucket + // should be untouched. + if test.shouldFail { + if err := migtest.VerifyDB( + tx, cChanDetailsBkt, test.pre, + ); err != nil { + return err + } + + return nil + } + + // Else, we expect an updated summary bucket + // and a new index bucket. + err := migtest.VerifyDB( + tx, cChanDetailsBkt, test.postDetails, + ) + if err != nil { + return err + } + + return migtest.VerifyDB( + tx, cChanIDIndexBkt, test.postIndex, + ) + } + + migtest.ApplyMigration( + t, before, after, MigrateChannelIDIndex, + test.shouldFail, + ) + }) + } +} + +func indexToString(id uint64) string { + var newIndex bytes.Buffer + err := tlv.WriteVarInt(&newIndex, id, &[8]byte{}) + if err != nil { + panic(err) + } + + return newIndex.String() +} + +func channelIDString(id uint64) string { + var chanID ChannelID + byteOrder.PutUint64(chanID[:], id) + return chanID.String() +} diff --git a/watchtower/wtdb/migration3/codec.go b/watchtower/wtdb/migration3/codec.go new file mode 100644 index 000000000..402c33ce9 --- /dev/null +++ b/watchtower/wtdb/migration3/codec.go @@ -0,0 +1,19 @@ +package migration3 + +import ( + "encoding/hex" +) + +// ChannelID is a series of 32-bytes that uniquely identifies all channels +// within the network. The ChannelID is computed using the outpoint of the +// funding transaction (the txid, and output index). Given a funding output the +// ChannelID can be calculated by XOR'ing the big-endian serialization of the +// txid and the big-endian serialization of the output index, truncated to +// 2 bytes. +type ChannelID [32]byte + +// String returns the string representation of the ChannelID. This is just the +// hex string encoding of the ChannelID itself. +func (c ChannelID) String() string { + return hex.EncodeToString(c[:]) +} diff --git a/watchtower/wtdb/migration3/log.go b/watchtower/wtdb/migration3/log.go new file mode 100644 index 000000000..2f14ff5e7 --- /dev/null +++ b/watchtower/wtdb/migration3/log.go @@ -0,0 +1,14 @@ +package migration3 + +import ( + "github.com/btcsuite/btclog" +) + +// log is a logger that is initialized as disabled. This means the package will +// not perform any logging by default until a logger is set. +var log = btclog.Disabled + +// UseLogger uses a specified Logger to output package logging info. +func UseLogger(logger btclog.Logger) { + log = logger +} diff --git a/watchtower/wtdb/version.go b/watchtower/wtdb/version.go index 0526817db..f95b6b54b 100644 --- a/watchtower/wtdb/version.go +++ b/watchtower/wtdb/version.go @@ -5,6 +5,7 @@ import ( "github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/watchtower/wtdb/migration1" "github.com/lightningnetwork/lnd/watchtower/wtdb/migration2" + "github.com/lightningnetwork/lnd/watchtower/wtdb/migration3" ) // migration is a function which takes a prior outdated version of the database @@ -33,6 +34,9 @@ var clientDBVersions = []version{ { migration: migration2.MigrateClientChannelDetails, }, + { + migration: migration3.MigrateChannelIDIndex, + }, } // getLatestDBVersion returns the last known database version.