routing: start writing and reading from namespaced MC

and invoke the associated mission control migration.
This commit is contained in:
Elle Mouton 2024-08-13 18:24:46 +02:00
parent bfe4a08341
commit 6b449a6633
No known key found for this signature in database
GPG Key ID: D7D916376026F177
4 changed files with 53 additions and 13 deletions

View File

@ -27,6 +27,7 @@ import (
"github.com/lightningnetwork/lnd/channeldb/migration30"
"github.com/lightningnetwork/lnd/channeldb/migration31"
"github.com/lightningnetwork/lnd/channeldb/migration32"
"github.com/lightningnetwork/lnd/channeldb/migration33"
"github.com/lightningnetwork/lnd/channeldb/migration_01_to_11"
"github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/invoices"
@ -291,6 +292,10 @@ var (
number: 32,
migration: migration32.MigrateMCRouteSerialisation,
},
{
number: 33,
migration: migration33.MigrateMCStoreNameSpacedResults,
},
}
// optionalVersions stores all optional migrations that are applied

View File

@ -11,6 +11,7 @@ import (
"github.com/lightningnetwork/lnd/channeldb/migration30"
"github.com/lightningnetwork/lnd/channeldb/migration31"
"github.com/lightningnetwork/lnd/channeldb/migration32"
"github.com/lightningnetwork/lnd/channeldb/migration33"
"github.com/lightningnetwork/lnd/channeldb/migration_01_to_11"
"github.com/lightningnetwork/lnd/kvdb"
)
@ -44,5 +45,6 @@ func UseLogger(logger btclog.Logger) {
migration30.UseLogger(logger)
migration31.UseLogger(logger)
migration32.UseLogger(logger)
migration33.UseLogger(logger)
kvdb.UseLogger(logger)
}

View File

@ -69,6 +69,11 @@ const (
// FeeEstimationTimeout. It defines the maximum duration that the
// probing fee estimation is allowed to take.
DefaultFeeEstimationTimeout = time.Minute
// DefaultMissionControlNamespace is the name of the default mission
// control name space. This is used as the sub-bucket key within the
// top level DB bucket to store mission control results.
DefaultMissionControlNamespace = "default"
)
var (
@ -227,7 +232,7 @@ func NewMissionControl(db kvdb.Backend, self route.Vertex,
}
store, err := newMissionControlStore(
newNamespacedDB(db), cfg.MaxMcHistory,
newDefaultNamespacedStore(db), cfg.MaxMcHistory,
cfg.McFlushInterval,
)
if err != nil {
@ -549,22 +554,30 @@ func (m *MissionControl) applyPaymentResult(
}
// namespacedDB is an implementation of the missionControlDB that gives a user
// of the interface access to the top level mission control bucket. In a
// follow-up commit (accompanied by a migration), this will change to giving
// the user of the interface access to a namespaced sub-bucket instead.
// of the interface access to a namespaced bucket within the top level mission
// control bucket.
type namespacedDB struct {
topLevelBucketKey []byte
namespace []byte
db kvdb.Backend
}
// A compile-time check to ensure that namespacedDB implements missionControlDB.
var _ missionControlDB = (*namespacedDB)(nil)
// newDefaultNamespacedStore creates an instance of namespaceDB that uses the
// default namespace.
func newDefaultNamespacedStore(db kvdb.Backend) missionControlDB {
return newNamespacedDB(db, DefaultMissionControlNamespace)
}
// newNamespacedDB creates a new instance of missionControlDB where the DB will
// have access to the top level bucket.
func newNamespacedDB(db kvdb.Backend) missionControlDB {
// have access to a namespaced bucket within the top level mission control
// bucket.
func newNamespacedDB(db kvdb.Backend, namespace string) missionControlDB {
return &namespacedDB{
db: db,
namespace: []byte(namespace),
topLevelBucketKey: resultsKey,
}
}
@ -582,7 +595,16 @@ func (n *namespacedDB) update(f func(bkt walletdb.ReadWriteBucket) error,
"control bucket: %w", err)
}
return f(mcStoreBkt)
namespacedBkt, err := mcStoreBkt.CreateBucketIfNotExists(
n.namespace,
)
if err != nil {
return fmt.Errorf("cannot create namespaced bucket "+
"(%s) in mission control store: %w",
n.namespace, err)
}
return f(namespacedBkt)
}, reset)
}
@ -599,7 +621,13 @@ func (n *namespacedDB) view(f func(bkt walletdb.ReadBucket) error,
"not found")
}
return f(mcStoreBkt)
namespacedBkt := mcStoreBkt.NestedReadBucket(n.namespace)
if namespacedBkt == nil {
return fmt.Errorf("namespaced bucket (%s) not found "+
"in mission control store", n.namespace)
}
return f(namespacedBkt)
}, reset)
}
@ -608,12 +636,17 @@ func (n *namespacedDB) view(f func(bkt walletdb.ReadBucket) error,
// NOTE: this is part of the missionControlDB interface.
func (n *namespacedDB) purge() error {
return n.db.Update(func(tx kvdb.RwTx) error {
err := tx.DeleteTopLevelBucket(n.topLevelBucketKey)
mcStoreBkt := tx.ReadWriteBucket(n.topLevelBucketKey)
if mcStoreBkt == nil {
return nil
}
err := mcStoreBkt.DeleteNestedBucket(n.namespace)
if err != nil {
return err
}
_, err = tx.CreateTopLevelBucket(n.topLevelBucketKey)
_, err = mcStoreBkt.CreateBucket(n.namespace)
return err
}, func() {})

View File

@ -63,7 +63,7 @@ func newMCStoreTestHarness(t testing.TB, maxRecords int,
})
store, err := newMissionControlStore(
newNamespacedDB(db), maxRecords, flushInterval,
newDefaultNamespacedStore(db), maxRecords, flushInterval,
)
require.NoError(t, err)
@ -118,7 +118,7 @@ func TestMissionControlStore(t *testing.T) {
// Recreate store to test pruning.
store, err = newMissionControlStore(
newNamespacedDB(db), testMaxRecords, time.Second,
newDefaultNamespacedStore(db), testMaxRecords, time.Second,
)
require.NoError(t, err)
@ -218,7 +218,7 @@ func TestMissionControlStoreFlushing(t *testing.T) {
// Recreate store.
store, err := newMissionControlStore(
newNamespacedDB(db), testMaxRecords, flushInterval,
newDefaultNamespacedStore(db), testMaxRecords, flushInterval,
)
require.NoError(t, err)
store.run()