watchtower: migrate channel summaries

In this commit a migration of the tower client db is done. The migration
creates a new top-level cChanDetailsBkt bucket and for each channel
found in the old cChanSummaryBkt bucket, it creates a new sub-bucket. In
the subbucket, the ClientChanSummary is then stored under the
cChannelSummary key. The reason for this migration is that it will be
useful in future when we want to store more easily accessible data under
a specific client ID.
This commit is contained in:
Elle Mouton 2022-10-14 10:49:25 +02:00
parent 4ab8c57eae
commit d03047f313
No known key found for this signature in database
GPG key ID: D7D916376026F177
7 changed files with 283 additions and 19 deletions

View file

@ -18,9 +18,17 @@ var (
// tower-id -> reserved-session-key-index (uint32).
cSessionKeyIndexBkt = []byte("client-session-key-index-bucket")
// cChanSummaryBkt is a top-level bucket storing:
// channel-id -> encoded ClientChanSummary.
cChanSummaryBkt = []byte("client-channel-summary-bucket")
// cChanDetailsBkt is a top-level bucket storing:
// channel-id => cChannelSummary -> encoded ClientChanSummary.
cChanDetailsBkt = []byte("client-channel-detail-bucket")
// cChanDBID is a key used in the cChanDetailsBkt to store the
// db-assigned-id of a channel.
cChanDBID = []byte("client-channel-db-id")
// cChannelSummary is a key used in cChanDetailsBkt to store the encoded
// body of ClientChanSummary.
cChannelSummary = []byte("client-channel-summary")
// cSessionBkt is a top-level bucket storing:
// session-id => cSessionBody -> encoded ClientSessionBody
@ -67,6 +75,10 @@ var (
// structure deviates from what is expected.
ErrCorruptClientSession = errors.New("client session corrupted")
// ErrCorruptChanDetails signals that the clients channel detail's
// on-disk structure deviates from what is expected.
ErrCorruptChanDetails = errors.New("channel details corrupted")
// ErrClientSessionAlreadyExists signals an attempt to reinsert a client
// session that has already been created.
ErrClientSessionAlreadyExists = errors.New(
@ -198,7 +210,7 @@ func OpenClientDB(db kvdb.Backend) (*ClientDB, error) {
func initClientDBBuckets(tx kvdb.RwTx) error {
buckets := [][]byte{
cSessionKeyIndexBkt,
cChanSummaryBkt,
cChanDetailsBkt,
cSessionBkt,
cTowerBkt,
cTowerIndexBkt,
@ -883,17 +895,23 @@ func (c *ClientDB) FetchSessionCommittedUpdates(id *SessionID) (
// channel summaries.
func (c *ClientDB) FetchChanSummaries() (ChannelSummaries, error) {
var summaries map[lnwire.ChannelID]ClientChanSummary
err := kvdb.View(c.db, func(tx kvdb.RTx) error {
chanSummaries := tx.ReadBucket(cChanSummaryBkt)
if chanSummaries == nil {
chanDetailsBkt := tx.ReadBucket(cChanDetailsBkt)
if chanDetailsBkt == nil {
return ErrUninitializedDB
}
return chanSummaries.ForEach(func(k, v []byte) error {
return chanDetailsBkt.ForEach(func(k, _ []byte) error {
chanDetails := chanDetailsBkt.NestedReadBucket(k)
if chanDetails == nil {
return ErrCorruptChanDetails
}
var chanID lnwire.ChannelID
copy(chanID[:], k)
summary, err := getChanSummary(chanSummaries, chanID)
summary, err := getChanSummary(chanDetails)
if err != nil {
return err
}
@ -921,22 +939,27 @@ func (c *ClientDB) RegisterChannel(chanID lnwire.ChannelID,
sweepPkScript []byte) error {
return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
chanSummaries := tx.ReadWriteBucket(cChanSummaryBkt)
if chanSummaries == nil {
chanDetailsBkt := tx.ReadWriteBucket(cChanDetailsBkt)
if chanDetailsBkt == nil {
return ErrUninitializedDB
}
chanSummaryBytes := chanSummaries.Get(chanID[:])
if chanSummaryBytes != nil {
chanDetails := chanDetailsBkt.NestedReadWriteBucket(chanID[:])
if chanDetails != nil {
// Channel is already registered.
return ErrChannelAlreadyRegistered
}
chanDetails, err := chanDetailsBkt.CreateBucket(chanID[:])
if err != nil {
return err
}
summary := ClientChanSummary{
SweepPkScript: sweepPkScript,
}
return putChanSummary(chanSummaries, chanID, &summary)
return putChanSummary(chanDetails, &summary)
}, func() {})
}
@ -1405,10 +1428,8 @@ func markSessionStatus(sessions kvdb.RwBucket, session *ClientSession,
}
// getChanSummary loads a ClientChanSummary for the passed chanID.
func getChanSummary(chanSummaries kvdb.RBucket,
chanID lnwire.ChannelID) (*ClientChanSummary, error) {
chanSummaryBytes := chanSummaries.Get(chanID[:])
func getChanSummary(chanDetails kvdb.RBucket) (*ClientChanSummary, error) {
chanSummaryBytes := chanDetails.Get(cChannelSummary)
if chanSummaryBytes == nil {
return nil, ErrChannelNotRegistered
}
@ -1423,7 +1444,7 @@ func getChanSummary(chanSummaries kvdb.RBucket,
}
// putChanSummary stores a ClientChanSummary for the passed chanID.
func putChanSummary(chanSummaries kvdb.RwBucket, chanID lnwire.ChannelID,
func putChanSummary(chanDetails kvdb.RwBucket,
summary *ClientChanSummary) error {
var b bytes.Buffer
@ -1432,7 +1453,7 @@ func putChanSummary(chanSummaries kvdb.RwBucket, chanID lnwire.ChannelID,
return err
}
return chanSummaries.Put(chanID[:], b.Bytes())
return chanDetails.Put(cChannelSummary, b.Bytes())
}
// getTower loads a Tower identified by its serialized tower id.

View file

@ -4,6 +4,7 @@ import (
"github.com/btcsuite/btclog"
"github.com/lightningnetwork/lnd/build"
"github.com/lightningnetwork/lnd/watchtower/wtdb/migration1"
"github.com/lightningnetwork/lnd/watchtower/wtdb/migration2"
)
// log is a logger that is initialized with no output filters. This
@ -28,6 +29,7 @@ func DisableLog() {
func UseLogger(logger btclog.Logger) {
log = logger
migration1.UseLogger(logger)
migration2.UseLogger(logger)
}
// logClosure is used to provide a closure over expensive logging operations so

View file

@ -0,0 +1,82 @@
package migration2
import (
"errors"
"github.com/lightningnetwork/lnd/kvdb"
)
var (
// cChanSummaryBkt is a top-level bucket storing:
// channel-id -> encoded ClientChanSummary.
cChanSummaryBkt = []byte("client-channel-summary-bucket")
// cChanDetailsBkt is a top-level bucket storing:
// channel-id => cChannelSummary -> encoded ClientChanSummary.
cChanDetailsBkt = []byte("client-channel-detail-bucket")
// cChannelSummary is a key used in cChanDetailsBkt to store the encoded
// body of ClientChanSummary.
cChannelSummary = []byte("client-channel-summary")
// ErrUninitializedDB signals that top-level buckets for the database
// have not been initialized.
ErrUninitializedDB = errors.New("db not initialized")
// ErrCorruptChanSummary signals that the clients channel summary's
// on-disk structure deviates from what is expected.
ErrCorruptChanSummary = errors.New("channel summary corrupted")
)
// MigrateClientChannelDetails creates a new channel-details bucket that uses
// channel IDs as sub-buckets where the channel summaries are moved to from the
// channel summary bucket. If the migration is successful then the channel
// summary bucket is deleted.
func MigrateClientChannelDetails(tx kvdb.RwTx) error {
log.Infof("Migrating the tower client db to move the channel " +
"summaries to the new channel-details bucket")
// Create the new top level cChanDetailsBkt.
chanDetailsBkt, err := tx.CreateTopLevelBucket(cChanDetailsBkt)
if err != nil {
return err
}
// Get the top-level channel summaries bucket.
chanSummaryBkt := tx.ReadWriteBucket(cChanSummaryBkt)
if chanSummaryBkt == nil {
return ErrUninitializedDB
}
// Iterate over the cChanSummaryBkt's keys. Each key is a channel-id.
// For each of these, create a new sub-bucket with this key in
// cChanDetailsBkt. In this sub-bucket, add the cChannelSummary key with
// the encoded ClientChanSummary as the value.
err = chanSummaryBkt.ForEach(func(chanID, summary []byte) error {
// Force the migration to fail if the summary is empty. This
// should never be the case, but it is added so that we can
// force the migration to fail in a test so that we can test
// that the db remains unaffected if a migration failure takes
// place.
if len(summary) == 0 {
return ErrCorruptChanSummary
}
// Create a new sub-bucket in the channel details bucket using
// this channel ID.
channelBkt, err := chanDetailsBkt.CreateBucket(chanID)
if err != nil {
return err
}
// Add the encoded channel summary in the new bucket under the
// channel-summary key.
return channelBkt.Put(cChannelSummary, summary)
})
if err != nil {
return err
}
// Now delete the cChanSummaryBkt from the DB.
return tx.DeleteTopLevelBucket(cChanSummaryBkt)
}

View file

@ -0,0 +1,124 @@
package migration2
import (
"encoding/binary"
"testing"
"github.com/lightningnetwork/lnd/channeldb/migtest"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/stretchr/testify/require"
)
var (
// pre is the expected data in the cChanSummaryBkt bucket before the
// migration.
pre = map[string]interface{}{
channelIDString(1): string([]byte{1, 2, 3}),
channelIDString(2): string([]byte{3, 4, 5}),
}
// pre should fail the migration due to no channel summary being found
// for a given channel ID.
preFailCorruptDB = map[string]interface{}{
channelIDString(1): string([]byte{1, 2, 3}),
channelIDString(2): "",
}
// post is the expected data after migration.
post = map[string]interface{}{
channelIDString(1): map[string]interface{}{
string(cChannelSummary): string([]byte{1, 2, 3}),
},
channelIDString(2): map[string]interface{}{
string(cChannelSummary): string([]byte{3, 4, 5}),
},
}
)
// TestMigrateClientChannelDetails tests that the MigrateClientChannelDetails
// function correctly moves channel-summaries from the channel-summaries bucket
// to the new channel-details bucket.
func TestMigrateClientChannelDetails(t *testing.T) {
t.Parallel()
tests := []struct {
name string
shouldFail bool
pre map[string]interface{}
post map[string]interface{}
}{
{
name: "migration ok",
shouldFail: false,
pre: pre,
post: post,
},
{
name: "fail due to corrupt db",
shouldFail: true,
pre: preFailCorruptDB,
post: nil,
},
{
name: "no channel summaries",
shouldFail: false,
pre: nil,
post: nil,
},
}
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
t.Parallel()
// Before the migration we have a channel summary
// bucket.
before := func(tx kvdb.RwTx) error {
return migtest.RestoreDB(
tx, cChanSummaryBkt, test.pre,
)
}
// After the migration, we should have a new channel
// details bucket and no longer have a channel summary
// bucket.
after := func(tx kvdb.RwTx) error {
// If we expect our migration to fail, we
// expect our channel summary bucket to remain
// intact.
if test.shouldFail {
return migtest.VerifyDB(
tx, cChanSummaryBkt, test.pre,
)
}
// Otherwise, we expect the channel summary
// bucket to be deleted.
err := migtest.VerifyDB(
tx, cChanSummaryBkt, test.pre,
)
require.ErrorContains(t, err, "not found")
// We also expect the new channel details bucket
// to be present.
return migtest.VerifyDB(
tx, cChanDetailsBkt, test.post,
)
}
migtest.ApplyMigration(
t, before, after, MigrateClientChannelDetails,
test.shouldFail,
)
})
}
}
func channelIDString(id uint64) string {
var chanID ChannelID
binary.BigEndian.PutUint64(chanID[:], id)
return chanID.String()
}

View file

@ -0,0 +1,17 @@
package migration2
import "encoding/hex"
// ChannelID is a series of 32-bytes that uniquely identifies all channels
// within the network. The ChannelID is computed using the outpoint of the
// funding transaction (the txid, and output index). Given a funding output the
// ChannelID can be calculated by XOR'ing the big-endian serialization of the
// txid and the big-endian serialization of the output index, truncated to
// 2 bytes.
type ChannelID [32]byte
// String returns the string representation of the ChannelID. This is just the
// hex string encoding of the ChannelID itself.
func (c ChannelID) String() string {
return hex.EncodeToString(c[:])
}

View file

@ -0,0 +1,14 @@
package migration2
import (
"github.com/btcsuite/btclog"
)
// log is a logger that is initialized as disabled. This means the package will
// not perform any logging by default until a logger is set.
var log = btclog.Disabled
// UseLogger uses a specified Logger to output package logging info.
func UseLogger(logger btclog.Logger) {
log = logger
}

View file

@ -4,6 +4,7 @@ import (
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/watchtower/wtdb/migration1"
"github.com/lightningnetwork/lnd/watchtower/wtdb/migration2"
)
// migration is a function which takes a prior outdated version of the database
@ -29,6 +30,9 @@ var clientDBVersions = []version{
{
migration: migration1.MigrateTowerToSessionIndex,
},
{
migration: migration2.MigrateClientChannelDetails,
},
}
// getLatestDBVersion returns the last known database version.