watchtower/wtdb: add tower-to-session index entry for all towers

In this commit, a small migration is added to the watchtower client DB
to ensure that there is an entry in the towerID-to-sessionID index for
all towers in the db regardless of if they have sessions or not. This is
required as a follow up to migration 1 since that migration only created
entries in the index for towers that had associated sessions which would
lead to "tower not found" errors on start up.
This commit is contained in:
Elle Mouton 2023-03-08 10:24:32 +02:00
parent d26cfd241a
commit f6ef3db6ea
No known key found for this signature in database
GPG Key ID: D7D916376026F177
7 changed files with 313 additions and 1 deletions

View File

@ -406,7 +406,10 @@ in the lnwire package](https://github.com/lightningnetwork/lnd/pull/7303)
speed of listing sessions for a particular tower ID](
https://github.com/lightningnetwork/lnd/pull/6972). This PR also ensures a
closer coupling of Towers and Sessions and ensures that a session cannot be
added if the tower it is referring to does not exist.
added if the tower it is referring to does not exist. A [follow-up migration
was added](https://github.com/lightningnetwork/lnd/pull/7491) to ensure that
entries are added to the new index for _all_ towers in the db, including those
for which there are not yet associated sessions.
* [Remove `AckedUpdates` & `CommittedUpdates` from the `ClientSession`
struct](https://github.com/lightningnetwork/lnd/pull/6928) in order to

View File

@ -7,6 +7,7 @@ import (
"github.com/lightningnetwork/lnd/watchtower/wtdb/migration2"
"github.com/lightningnetwork/lnd/watchtower/wtdb/migration3"
"github.com/lightningnetwork/lnd/watchtower/wtdb/migration4"
"github.com/lightningnetwork/lnd/watchtower/wtdb/migration5"
)
// log is a logger that is initialized with no output filters. This
@ -34,6 +35,7 @@ func UseLogger(logger btclog.Logger) {
migration2.UseLogger(logger)
migration3.UseLogger(logger)
migration4.UseLogger(logger)
migration5.UseLogger(logger)
}
// logClosure is used to provide a closure over expensive logging operations so

View File

@ -0,0 +1,90 @@
package migration5
import (
"encoding/binary"
"errors"
"github.com/lightningnetwork/lnd/kvdb"
)
var (
// cTowerBkt is a top-level bucket storing:
// tower-id -> encoded Tower.
cTowerBkt = []byte("client-tower-bucket")
// cTowerIDToSessionIDIndexBkt is a top-level bucket storing:
// tower-id -> session-id -> 1
cTowerIDToSessionIDIndexBkt = []byte(
"client-tower-to-session-index-bucket",
)
// ErrUninitializedDB signals that top-level buckets for the database
// have not been initialized.
ErrUninitializedDB = errors.New("db not initialized")
// byteOrder is the default endianness used when serializing integers.
byteOrder = binary.BigEndian
)
// MigrateCompleteTowerToSessionIndex ensures that the tower-to-session index
// contains entries for all towers in the db. This is necessary because
// migration1 only created entries in the index for towers that the client had
// at least one session with. This migration thus makes sure that there is
// always a tower-to-sessions index entry for a tower even if there are no
// sessions with that tower.
func MigrateCompleteTowerToSessionIndex(tx kvdb.RwTx) error {
log.Infof("Migrating the tower client db to ensure that there is an " +
"entry in the towerID-to-sessionID index for every tower in " +
"the db")
// First, we collect all the towers that we should add an entry for in
// the index.
towerIDs, err := listTowerIDs(tx)
if err != nil {
return err
}
// Create a new top-level bucket for the index if it does not yet exist.
indexBkt, err := tx.CreateTopLevelBucket(cTowerIDToSessionIDIndexBkt)
if err != nil {
return err
}
// Finally, ensure that there is an entry in the tower-to-session index
// for each of our towers.
for _, id := range towerIDs {
// Create a sub-bucket using the tower ID.
_, err := indexBkt.CreateBucketIfNotExists(id.Bytes())
if err != nil {
return err
}
}
return nil
}
// listTowerIDs iterates through the cTowerBkt and collects a list of all the
// TowerIDs.
func listTowerIDs(tx kvdb.RTx) ([]*TowerID, error) {
var ids []*TowerID
towerBucket := tx.ReadBucket(cTowerBkt)
if towerBucket == nil {
return nil, ErrUninitializedDB
}
err := towerBucket.ForEach(func(towerIDBytes, _ []byte) error {
id, err := TowerIDFromBytes(towerIDBytes)
if err != nil {
return err
}
ids = append(ids, &id)
return nil
})
if err != nil {
return nil, err
}
return ids, nil
}

View File

@ -0,0 +1,162 @@
package migration5
import (
"testing"
"github.com/lightningnetwork/lnd/channeldb/migtest"
"github.com/lightningnetwork/lnd/kvdb"
)
var (
// preIndex is the data in the tower-to-session index before the
// migration.
preIndex = map[string]interface{}{
towerIDString(1): map[string]interface{}{
sessionIDString("1"): string([]byte{1}),
sessionIDString("3"): string([]byte{1}),
},
towerIDString(3): map[string]interface{}{
sessionIDString("4"): string([]byte{1}),
},
}
// preIndexBadTowerID has an invalid TowerID. This is used to test that
// the migration correctly rolls back on failure.
preIndexBadTowerID = map[string]interface{}{
"1": map[string]interface{}{},
}
// towerDBEmpty is the data in an empty tower bucket before the
// migration.
towerDBEmpty = map[string]interface{}{}
towerDBMatchIndex = map[string]interface{}{
towerIDString(1): map[string]interface{}{},
towerIDString(3): map[string]interface{}{},
}
towerDBWithExtraEntries = map[string]interface{}{
towerIDString(1): map[string]interface{}{},
towerIDString(3): map[string]interface{}{},
towerIDString(4): map[string]interface{}{},
}
// post is the expected data after migration.
postIndex = map[string]interface{}{
towerIDString(1): map[string]interface{}{
sessionIDString("1"): string([]byte{1}),
sessionIDString("3"): string([]byte{1}),
},
towerIDString(3): map[string]interface{}{
sessionIDString("4"): string([]byte{1}),
},
towerIDString(4): map[string]interface{}{},
}
)
// TestCompleteTowerToSessionIndex tests that the
// MigrateCompleteTowerToSessionIndex function correctly completes the
// towerID-to-sessionID index in the tower client db.
func TestCompleteTowerToSessionIndex(t *testing.T) {
t.Parallel()
tests := []struct {
name string
shouldFail bool
towerDB map[string]interface{}
pre map[string]interface{}
post map[string]interface{}
}{
{
name: "no changes - empty tower db",
towerDB: towerDBEmpty,
pre: preIndex,
post: preIndex,
},
{
name: "no changes - tower db matches index",
towerDB: towerDBMatchIndex,
pre: preIndex,
post: preIndex,
},
{
name: "fill in missing towers",
towerDB: towerDBWithExtraEntries,
pre: preIndex,
post: postIndex,
},
{
name: "fail due to corrupt db",
shouldFail: true,
towerDB: preIndexBadTowerID,
pre: preIndex,
post: preIndex,
},
}
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
t.Parallel()
// Before the migration we have a tower bucket and an
// initial tower-to-session index bucket.
before := func(tx kvdb.RwTx) error {
err := migtest.RestoreDB(
tx, cTowerBkt, test.towerDB,
)
if err != nil {
return err
}
return migtest.RestoreDB(
tx, cTowerIDToSessionIDIndexBkt,
test.pre,
)
}
// After the migration, we should have an untouched
// tower bucket and a possibly tweaked tower-to-session
// index bucket.
after := func(tx kvdb.RwTx) error {
if err := migtest.VerifyDB(
tx, cTowerBkt, test.towerDB,
); err != nil {
return err
}
// If we expect our migration to fail, we don't
// expect our index bucket to be unchanged.
if test.shouldFail {
return migtest.VerifyDB(
tx, cTowerIDToSessionIDIndexBkt,
test.pre,
)
}
return migtest.VerifyDB(
tx, cTowerIDToSessionIDIndexBkt,
test.post,
)
}
migtest.ApplyMigration(
t, before, after,
MigrateCompleteTowerToSessionIndex,
test.shouldFail,
)
})
}
}
func towerIDString(id int) string {
towerID := TowerID(id)
return string(towerID.Bytes())
}
func sessionIDString(id string) string {
var sessID SessionID
copy(sessID[:], id)
return string(sessID[:])
}

View File

@ -0,0 +1,37 @@
package migration5
import (
"encoding/binary"
"fmt"
)
// TowerID is a unique 64-bit identifier allocated to each unique watchtower.
// This allows the client to conserve on-disk space by not needing to always
// reference towers by their pubkey.
type TowerID uint64
// TowerIDFromBytes constructs a TowerID from the provided byte slice. The
// argument must have at least 8 bytes, and should contain the TowerID in
// big-endian byte order.
func TowerIDFromBytes(towerIDBytes []byte) (TowerID, error) {
if len(towerIDBytes) != 8 {
return 0, fmt.Errorf("not enough bytes in tower ID. "+
"Expected 8, got: %d", len(towerIDBytes))
}
return TowerID(byteOrder.Uint64(towerIDBytes)), nil
}
// Bytes encodes a TowerID into an 8-byte slice in big-endian byte order.
func (id TowerID) Bytes() []byte {
var buf [8]byte
binary.BigEndian.PutUint64(buf[:], uint64(id))
return buf[:]
}
// SessionIDSize is 33-bytes; it is a serialized, compressed public key.
const SessionIDSize = 33
// SessionID is created from the remote public key of a client, and serves as a
// unique identifier and authentication for sending state updates.
type SessionID [SessionIDSize]byte

View File

@ -0,0 +1,14 @@
package migration5
import (
"github.com/btcsuite/btclog"
)
// log is a logger that is initialized as disabled. This means the package will
// not perform any logging by default until a logger is set.
var log = btclog.Disabled
// UseLogger uses a specified Logger to output package logging info.
func UseLogger(logger btclog.Logger) {
log = logger
}

View File

@ -9,6 +9,7 @@ import (
"github.com/lightningnetwork/lnd/watchtower/wtdb/migration2"
"github.com/lightningnetwork/lnd/watchtower/wtdb/migration3"
"github.com/lightningnetwork/lnd/watchtower/wtdb/migration4"
"github.com/lightningnetwork/lnd/watchtower/wtdb/migration5"
)
// txMigration is a function which takes a prior outdated version of the
@ -55,6 +56,9 @@ var clientDBVersions = []version{
migration4.DefaultSessionsPerTx,
),
},
{
txMigration: migration5.MigrateCompleteTowerToSessionIndex,
},
}
// getLatestDBVersion returns the last known database version.