Merge pull request #6551 from yyforyongyu/fix-migration

channeldb: change balance fields to tlv records and migrate historical bucket
This commit is contained in:
Olaoluwa Osuntokun 2022-05-19 16:35:31 -07:00 committed by GitHub
commit 3e5b5d52f0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 1337 additions and 52 deletions

View file

@ -182,6 +182,14 @@ const (
// A tlv type definition used to serialize and deserialize a KeyLocator
// from the database.
keyLocType tlv.Type = 1
// A tlv type used to serialize and deserialize the
// `InitialLocalBalance` field.
initialLocalBalanceType tlv.Type = 2
// A tlv type used to serialize and deserialize the
// `InitialRemoteBalance` field.
initialRemoteBalanceType tlv.Type = 3
)
// indexStatus is an enum-like type that describes what state the
@ -3280,8 +3288,7 @@ func putChanInfo(chanBucket kvdb.RwBucket, channel *OpenChannel) error {
channel.chanStatus, channel.FundingBroadcastHeight,
channel.NumConfsRequired, channel.ChannelFlags,
channel.IdentityPub, channel.Capacity, channel.TotalMSatSent,
channel.TotalMSatReceived, channel.InitialLocalBalance,
channel.InitialRemoteBalance,
channel.TotalMSatReceived,
); err != nil {
return err
}
@ -3301,12 +3308,24 @@ func putChanInfo(chanBucket kvdb.RwBucket, channel *OpenChannel) error {
return err
}
// Write the RevocationKeyLocator as the first entry in a tlv stream.
keyLocRecord := MakeKeyLocRecord(
keyLocType, &channel.RevocationKeyLocator,
)
// Convert balance fields into uint64.
localBalance := uint64(channel.InitialLocalBalance)
remoteBalance := uint64(channel.InitialRemoteBalance)
tlvStream, err := tlv.NewStream(keyLocRecord)
// Create the tlv stream.
tlvStream, err := tlv.NewStream(
// Write the RevocationKeyLocator as the first entry in a tlv
// stream.
MakeKeyLocRecord(
keyLocType, &channel.RevocationKeyLocator,
),
tlv.MakePrimitiveRecord(
initialLocalBalanceType, &localBalance,
),
tlv.MakePrimitiveRecord(
initialRemoteBalanceType, &remoteBalance,
),
)
if err != nil {
return err
}
@ -3468,8 +3487,7 @@ func fetchChanInfo(chanBucket kvdb.RBucket, channel *OpenChannel) error {
&channel.chanStatus, &channel.FundingBroadcastHeight,
&channel.NumConfsRequired, &channel.ChannelFlags,
&channel.IdentityPub, &channel.Capacity, &channel.TotalMSatSent,
&channel.TotalMSatReceived, &channel.InitialLocalBalance,
&channel.InitialRemoteBalance,
&channel.TotalMSatReceived,
); err != nil {
return err
}
@ -3504,8 +3522,26 @@ func fetchChanInfo(chanBucket kvdb.RBucket, channel *OpenChannel) error {
}
}
keyLocRecord := MakeKeyLocRecord(keyLocType, &channel.RevocationKeyLocator)
tlvStream, err := tlv.NewStream(keyLocRecord)
// Create balance fields in uint64.
var (
localBalance uint64
remoteBalance uint64
)
// Create the tlv stream.
tlvStream, err := tlv.NewStream(
// Write the RevocationKeyLocator as the first entry in a tlv
// stream.
MakeKeyLocRecord(
keyLocType, &channel.RevocationKeyLocator,
),
tlv.MakePrimitiveRecord(
initialLocalBalanceType, &localBalance,
),
tlv.MakePrimitiveRecord(
initialRemoteBalanceType, &remoteBalance,
),
)
if err != nil {
return err
}
@ -3514,6 +3550,10 @@ func fetchChanInfo(chanBucket kvdb.RBucket, channel *OpenChannel) error {
return err
}
// Attach the balance fields.
channel.InitialLocalBalance = lnwire.MilliSatoshi(localBalance)
channel.InitialRemoteBalance = lnwire.MilliSatoshi(remoteBalance)
channel.Packager = NewChannelPackager(channel.ShortChannelID)
// Finally, read the optional shutdown scripts.

View file

@ -20,6 +20,8 @@ import (
"github.com/lightningnetwork/lnd/channeldb/migration23"
"github.com/lightningnetwork/lnd/channeldb/migration24"
"github.com/lightningnetwork/lnd/channeldb/migration25"
"github.com/lightningnetwork/lnd/channeldb/migration26"
"github.com/lightningnetwork/lnd/channeldb/migration27"
"github.com/lightningnetwork/lnd/channeldb/migration_01_to_11"
"github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/kvdb"
@ -212,6 +214,18 @@ var (
number: 25,
migration: migration25.MigrateInitialBalances,
},
{
// Migrate the initial local/remote balance fields into
// tlv records.
number: 26,
migration: migration26.MigrateBalancesToTlvRecords,
},
{
// Patch the initial local/remote balance fields with
// empty values for historical channels.
number: 27,
migration: migration27.MigrateHistoricalBalances,
},
}
// Big endian is the preferred byte order, due to cursor scans over

View file

@ -720,3 +720,39 @@ func fetchChannelLogEntry(log kvdb.RBucket,
commitReader := bytes.NewReader(commitBytes)
return mig.DeserializeChanCommit(commitReader)
}
func CreateChanBucket(tx kvdb.RwTx, c *OpenChannel) (kvdb.RwBucket, error) {
// First fetch the top level bucket which stores all data related to
// current, active channels.
openChanBucket, err := tx.CreateTopLevelBucket(openChannelBucket)
if err != nil {
return nil, err
}
// Within this top level bucket, fetch the bucket dedicated to storing
// open channel data specific to the remote node.
nodePub := c.IdentityPub.SerializeCompressed()
nodeChanBucket, err := openChanBucket.CreateBucketIfNotExists(nodePub)
if err != nil {
return nil, err
}
// We'll then recurse down an additional layer in order to fetch the
// bucket for this particular chain.
chainBucket, err := nodeChanBucket.CreateBucketIfNotExists(
c.ChainHash[:],
)
if err != nil {
return nil, err
}
var chanPointBuf bytes.Buffer
err = mig.WriteOutpoint(&chanPointBuf, &c.FundingOutpoint)
if err != nil {
return nil, err
}
// With the bucket for the node fetched, we can now go down another
// level, creating the bucket for this channel itself.
return chainBucket.CreateBucketIfNotExists(chanPointBuf.Bytes())
}

View file

@ -147,7 +147,7 @@ func findOpenChannels(openChanBucket kvdb.RBucket) ([]*OpenChannel, error) {
// balances and save them to the channel info.
func migrateBalances(tx kvdb.RwTx, c *OpenChannel) error {
// Get the bucket.
chanBucket, err := fetchChanBucket(tx, c)
chanBucket, err := FetchChanBucket(tx, c)
if err != nil {
return err
}
@ -168,10 +168,10 @@ func migrateBalances(tx kvdb.RwTx, c *OpenChannel) error {
return nil
}
// fetchChanBucket is a helper function that returns the bucket where a
// FetchChanBucket is a helper function that returns the bucket where a
// channel's data resides in given: the public key for the node, the outpoint,
// and the chainhash that the channel resides on.
func fetchChanBucket(tx kvdb.RwTx, c *OpenChannel) (kvdb.RwBucket, error) {
func FetchChanBucket(tx kvdb.RwTx, c *OpenChannel) (kvdb.RwBucket, error) {
// First fetch the top level bucket which stores all data related to
// current, active channels.
openChanBucket := tx.ReadWriteBucket(openChannelBucket)

View file

@ -256,7 +256,7 @@ func genBeforeMigration(c *OpenChannel,
}
// Create the channel bucket.
chanBucket, err := createChanBucket(tx, c)
chanBucket, err := CreateChanBucket(tx, c)
if err != nil {
return err
}
@ -295,7 +295,7 @@ func genAfterMigration(ourAmt, theirAmt lnwire.MilliSatoshi,
return nil
}
chanBucket, err := fetchChanBucket(tx, c)
chanBucket, err := FetchChanBucket(tx, c)
if err != nil {
return err
}
@ -334,42 +334,6 @@ func genAfterMigration(ourAmt, theirAmt lnwire.MilliSatoshi,
}
}
func createChanBucket(tx kvdb.RwTx, c *OpenChannel) (kvdb.RwBucket, error) {
// First fetch the top level bucket which stores all data related to
// current, active channels.
openChanBucket, err := tx.CreateTopLevelBucket(openChannelBucket)
if err != nil {
return nil, err
}
// Within this top level bucket, fetch the bucket dedicated to storing
// open channel data specific to the remote node.
nodePub := c.IdentityPub.SerializeCompressed()
nodeChanBucket, err := openChanBucket.CreateBucketIfNotExists(nodePub)
if err != nil {
return nil, err
}
// We'll then recurse down an additional layer in order to fetch the
// bucket for this particular chain.
chainBucket, err := nodeChanBucket.CreateBucketIfNotExists(
c.ChainHash[:],
)
if err != nil {
return nil, err
}
var chanPointBuf bytes.Buffer
err = mig.WriteOutpoint(&chanPointBuf, &c.FundingOutpoint)
if err != nil {
return nil, err
}
// With the bucket for the node fetched, we can now go down another
// level, creating the bucket for this channel itself.
return chainBucket.CreateBucketIfNotExists(chanPointBuf.Bytes())
}
// putChannelLogEntryLegacy saves an old format revocation log to the bucket.
func putChannelLogEntryLegacy(chanBucket kvdb.RwBucket,
commit *mig.ChannelCommitment) error {

View file

@ -0,0 +1,299 @@
package migration26
import (
"bytes"
"fmt"
lnwire "github.com/lightningnetwork/lnd/channeldb/migration/lnwire21"
mig25 "github.com/lightningnetwork/lnd/channeldb/migration25"
mig "github.com/lightningnetwork/lnd/channeldb/migration_01_to_11"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/tlv"
)
const (
// A tlv type definition used to serialize and deserialize a KeyLocator
// from the database.
keyLocType tlv.Type = 1
// A tlv type used to serialize and deserialize the
// `InitialLocalBalance` field.
initialLocalBalanceType tlv.Type = 2
// A tlv type used to serialize and deserialize the
// `InitialRemoteBalance` field.
initialRemoteBalanceType tlv.Type = 3
)
var (
// chanInfoKey can be accessed within the bucket for a channel
// (identified by its chanPoint). This key stores all the static
// information for a channel which is decided at the end of the
// funding flow.
chanInfoKey = []byte("chan-info-key")
// localUpfrontShutdownKey can be accessed within the bucket for a
// channel (identified by its chanPoint). This key stores an optional
// upfront shutdown script for the local peer.
localUpfrontShutdownKey = []byte("local-upfront-shutdown-key")
// remoteUpfrontShutdownKey can be accessed within the bucket for a
// channel (identified by its chanPoint). This key stores an optional
// upfront shutdown script for the remote peer.
remoteUpfrontShutdownKey = []byte("remote-upfront-shutdown-key")
// lastWasRevokeKey is a key that stores true when the last update we
// sent was a revocation and false when it was a commitment signature.
// This is nil in the case of new channels with no updates exchanged.
lastWasRevokeKey = []byte("last-was-revoke")
// ErrNoChanInfoFound is returned when a particular channel does not
// have any channels state.
ErrNoChanInfoFound = fmt.Errorf("no chan info found")
// ErrNoPastDeltas is returned when the channel delta bucket hasn't been
// created.
ErrNoPastDeltas = fmt.Errorf("channel has no recorded deltas")
// ErrLogEntryNotFound is returned when we cannot find a log entry at
// the height requested in the revocation log.
ErrLogEntryNotFound = fmt.Errorf("log entry not found")
// ErrNoCommitmentsFound is returned when a channel has not set
// commitment states.
ErrNoCommitmentsFound = fmt.Errorf("no commitments found")
)
// OpenChannel embeds a mig25.OpenChannel with the extra update-to-date
// serialization and deserialization methods.
//
// NOTE: doesn't have the Packager field as it's not used in current migration.
type OpenChannel struct {
mig25.OpenChannel
// chanStatus is the current status of this channel. If it is not in
// the state Default, it should not be used for forwarding payments.
chanStatus mig25.ChannelStatus
}
// FetchChanInfo deserializes the channel info based on the legacy boolean.
// After migration25, the legacy format would have the fields
// `InitialLocalBalance` and `InitialRemoteBalance` directly encoded as bytes.
// For the new format, they will be put inside a tlv stream.
func FetchChanInfo(chanBucket kvdb.RBucket, c *OpenChannel, legacy bool) error {
infoBytes := chanBucket.Get(chanInfoKey)
if infoBytes == nil {
return ErrNoChanInfoFound
}
r := bytes.NewReader(infoBytes)
var (
chanType mig.ChannelType
chanStatus mig.ChannelStatus
)
if err := mig.ReadElements(r,
&chanType, &c.ChainHash, &c.FundingOutpoint,
&c.ShortChannelID, &c.IsPending, &c.IsInitiator,
&chanStatus, &c.FundingBroadcastHeight,
&c.NumConfsRequired, &c.ChannelFlags,
&c.IdentityPub, &c.Capacity, &c.TotalMSatSent,
&c.TotalMSatReceived,
); err != nil {
return err
}
c.ChanType = mig25.ChannelType(chanType)
c.chanStatus = mig25.ChannelStatus(chanStatus)
// If this is the legacy format, we need to read the extra two new
// fields.
if legacy {
if err := mig.ReadElements(r,
&c.InitialLocalBalance, &c.InitialRemoteBalance,
); err != nil {
return err
}
}
// For single funder channels that we initiated and have the funding
// transaction to, read the funding txn.
if c.FundingTxPresent() {
if err := mig.ReadElement(r, &c.FundingTxn); err != nil {
return err
}
}
if err := mig.ReadChanConfig(r, &c.LocalChanCfg); err != nil {
return err
}
if err := mig.ReadChanConfig(r, &c.RemoteChanCfg); err != nil {
return err
}
// Retrieve the boolean stored under lastWasRevokeKey.
lastWasRevokeBytes := chanBucket.Get(lastWasRevokeKey)
if lastWasRevokeBytes == nil {
// If nothing has been stored under this key, we store false in
// the OpenChannel struct.
c.LastWasRevoke = false
} else {
// Otherwise, read the value into the LastWasRevoke field.
revokeReader := bytes.NewReader(lastWasRevokeBytes)
err := mig.ReadElements(revokeReader, &c.LastWasRevoke)
if err != nil {
return err
}
}
// Make the tlv stream based on the legacy param.
var (
ts *tlv.Stream
err error
localBalance uint64
remoteBalance uint64
)
keyLocRecord := mig25.MakeKeyLocRecord(
keyLocType, &c.RevocationKeyLocator,
)
// If it's legacy, create the stream with a single tlv record.
if legacy {
ts, err = tlv.NewStream(keyLocRecord)
} else {
// Otherwise, for the new format, we will encode the balance
// fields in the tlv stream too.
ts, err = tlv.NewStream(
keyLocRecord,
tlv.MakePrimitiveRecord(
initialLocalBalanceType, &localBalance,
),
tlv.MakePrimitiveRecord(
initialRemoteBalanceType, &remoteBalance,
),
)
}
if err != nil {
return err
}
if err := ts.Decode(r); err != nil {
return err
}
// For the new format, attach the balance fields.
if !legacy {
c.InitialLocalBalance = lnwire.MilliSatoshi(localBalance)
c.InitialRemoteBalance = lnwire.MilliSatoshi(remoteBalance)
}
// Finally, read the optional shutdown scripts.
if err := mig25.GetOptionalUpfrontShutdownScript(
chanBucket, localUpfrontShutdownKey, &c.LocalShutdownScript,
); err != nil {
return err
}
return mig25.GetOptionalUpfrontShutdownScript(
chanBucket, remoteUpfrontShutdownKey, &c.RemoteShutdownScript,
)
}
// MakeTlvStream creates a tlv stream based on whether we are deadling with
// legacy format or not. For the legacy format, we have a single record in the
// stream. For the new format, we have the extra balance records.
func MakeTlvStream(c *OpenChannel, legacy bool) (*tlv.Stream, error) {
keyLocRecord := mig25.MakeKeyLocRecord(
keyLocType, &c.RevocationKeyLocator,
)
// If it's legacy, return the stream with a single tlv record.
if legacy {
return tlv.NewStream(keyLocRecord)
}
// Otherwise, for the new format, we will encode the balance fields in
// the tlv stream too.
localBalance := uint64(c.InitialLocalBalance)
remoteBalance := uint64(c.InitialRemoteBalance)
// Create the tlv stream.
return tlv.NewStream(
keyLocRecord,
tlv.MakePrimitiveRecord(
initialLocalBalanceType, &localBalance,
),
tlv.MakePrimitiveRecord(
initialRemoteBalanceType, &remoteBalance,
),
)
}
// PutChanInfo serializes the channel info based on the legacy boolean. After
// migration25, the legacy format would have the fields `InitialLocalBalance`
// and `InitialRemoteBalance` directly encoded as bytes. For the new format,
// they will be put inside a tlv stream.
func PutChanInfo(chanBucket kvdb.RwBucket, c *OpenChannel, legacy bool) error {
var w bytes.Buffer
if err := mig.WriteElements(&w,
mig.ChannelType(c.ChanType), c.ChainHash, c.FundingOutpoint,
c.ShortChannelID, c.IsPending, c.IsInitiator,
mig.ChannelStatus(c.chanStatus), c.FundingBroadcastHeight,
c.NumConfsRequired, c.ChannelFlags,
c.IdentityPub, c.Capacity, c.TotalMSatSent,
c.TotalMSatReceived,
); err != nil {
return err
}
// If this is legacy format, we need to write the extra two fields.
if legacy {
if err := mig.WriteElements(&w,
c.InitialLocalBalance, c.InitialRemoteBalance,
); err != nil {
return err
}
}
// For single funder channels that we initiated, and we have the
// funding transaction, then write the funding txn.
if c.FundingTxPresent() {
if err := mig.WriteElement(&w, c.FundingTxn); err != nil {
return err
}
}
if err := mig.WriteChanConfig(&w, &c.LocalChanCfg); err != nil {
return err
}
if err := mig.WriteChanConfig(&w, &c.RemoteChanCfg); err != nil {
return err
}
// Make the tlv stream based on the legacy param.
tlvStream, err := MakeTlvStream(c, legacy)
if err != nil {
return err
}
if err := tlvStream.Encode(&w); err != nil {
return err
}
if err := chanBucket.Put(chanInfoKey, w.Bytes()); err != nil {
return err
}
// Finally, add optional shutdown scripts for the local and remote peer
// if they are present.
if err := mig25.PutOptionalUpfrontShutdownScript(
chanBucket, localUpfrontShutdownKey, c.LocalShutdownScript,
); err != nil {
return err
}
return mig25.PutOptionalUpfrontShutdownScript(
chanBucket, remoteUpfrontShutdownKey, c.RemoteShutdownScript,
)
}

View file

@ -0,0 +1,14 @@
package migration26
import (
"github.com/btcsuite/btclog"
)
// log is a logger that is initialized as disabled. This means the package will
// not perform any logging by default until a logger is set.
var log = btclog.Disabled
// UseLogger uses a specified Logger to output package logging info.
func UseLogger(logger btclog.Logger) {
log = logger
}

View file

@ -0,0 +1,147 @@
package migration26
import (
"fmt"
mig25 "github.com/lightningnetwork/lnd/channeldb/migration25"
"github.com/lightningnetwork/lnd/kvdb"
)
var (
// openChanBucket stores all the currently open channels. This bucket
// has a second, nested bucket which is keyed by a node's ID. Within
// that node ID bucket, all attributes required to track, update, and
// close a channel are stored.
openChannelBucket = []byte("open-chan-bucket")
// ErrNoChanDBExists is returned when a channel bucket hasn't been
// created.
ErrNoChanDBExists = fmt.Errorf("channel db has not yet been created")
// ErrNoActiveChannels is returned when there is no active (open)
// channels within the database.
ErrNoActiveChannels = fmt.Errorf("no active channels exist")
// ErrChannelNotFound is returned when we attempt to locate a channel
// for a specific chain, but it is not found.
ErrChannelNotFound = fmt.Errorf("channel not found")
)
// MigrateBalancesToTlvRecords migrates the balance fields into tlv records. It
// does so by first reading a list of open channels, then rewriting the channel
// info with the updated tlv stream.
func MigrateBalancesToTlvRecords(tx kvdb.RwTx) error {
log.Infof("Migrating local and remote balances into tlv records...")
openChanBucket := tx.ReadWriteBucket(openChannelBucket)
// If no bucket is found, we can exit early.
if openChanBucket == nil {
return nil
}
// Read a list of open channels.
channels, err := findOpenChannels(openChanBucket)
if err != nil {
return err
}
// Migrate the balances.
for _, c := range channels {
if err := migrateBalances(tx, c); err != nil {
return err
}
}
return err
}
// findOpenChannels finds all open channels.
func findOpenChannels(openChanBucket kvdb.RBucket) ([]*OpenChannel, error) {
channels := []*OpenChannel{}
// readChannel is a helper closure that reads the channel info from the
// channel bucket.
readChannel := func(chainBucket kvdb.RBucket, cp []byte) error {
c := &OpenChannel{}
// Read the sub-bucket level 3.
chanBucket := chainBucket.NestedReadBucket(
cp,
)
if chanBucket == nil {
log.Errorf("unable to read bucket for chanPoint=%x", cp)
return nil
}
// Get the old channel info.
if err := FetchChanInfo(chanBucket, c, true); err != nil {
return fmt.Errorf("unable to fetch chan info: %v", err)
}
channels = append(channels, c)
return nil
}
// Iterate the root bucket.
err := openChanBucket.ForEach(func(nodePub, v []byte) error {
// Ensure that this is a key the same size as a pubkey, and
// also that it leads directly to a bucket.
if len(nodePub) != 33 || v != nil {
return nil
}
// Read the sub-bucket level 1.
nodeChanBucket := openChanBucket.NestedReadBucket(nodePub)
if nodeChanBucket == nil {
log.Errorf("no bucket for node %x", nodePub)
return nil
}
// Iterate the bucket.
return nodeChanBucket.ForEach(func(chainHash, _ []byte) error {
// Read the sub-bucket level 2.
chainBucket := nodeChanBucket.NestedReadBucket(
chainHash,
)
if chainBucket == nil {
log.Errorf("unable to read bucket for chain=%x",
chainHash)
return nil
}
// Iterate the bucket.
return chainBucket.ForEach(func(cp, _ []byte) error {
return readChannel(chainBucket, cp)
})
})
})
if err != nil {
return nil, err
}
return channels, nil
}
// migrateBalances creates a new tlv stream which adds two more records to hold
// the balances info.
func migrateBalances(tx kvdb.RwTx, c *OpenChannel) error {
// Get the bucket.
chanBucket, err := mig25.FetchChanBucket(tx, &c.OpenChannel)
if err != nil {
return err
}
// Update the channel info. There isn't much to do here as the
// `PutChanInfo` will read the values from `c.InitialLocalBalance` and
// `c.InitialRemoteBalance` then create the new tlv stream as
// requested.
if err := PutChanInfo(chanBucket, c, false); err != nil {
return fmt.Errorf("unable to put chan info: %v", err)
}
return nil
}

View file

@ -0,0 +1,164 @@
package migration26
import (
"fmt"
"testing"
"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
lnwire "github.com/lightningnetwork/lnd/channeldb/migration/lnwire21"
mig25 "github.com/lightningnetwork/lnd/channeldb/migration25"
mig "github.com/lightningnetwork/lnd/channeldb/migration_01_to_11"
"github.com/lightningnetwork/lnd/channeldb/migtest"
"github.com/lightningnetwork/lnd/kvdb"
)
var (
// Create dummy values to be stored in db.
dummyPrivKey, _ = btcec.NewPrivateKey()
dummyPubKey = dummyPrivKey.PubKey()
dummyOp = wire.OutPoint{
Hash: chainhash.Hash{},
Index: 9,
}
// ourAmt and theirAmt are the initial balances found in the local
// channel commitment at height 0.
testOurAmt = lnwire.MilliSatoshi(500_000)
testTheirAmt = lnwire.MilliSatoshi(1000_000)
// testChannel is used to test the balance fields are correctly set.
testChannel = &OpenChannel{
OpenChannel: mig25.OpenChannel{
OpenChannel: mig.OpenChannel{
IdentityPub: dummyPubKey,
FundingOutpoint: dummyOp,
},
},
}
)
// TestMigrateBalancesToTlvRecords checks that the initial balances fields are
// saved using the tlv records.
func TestMigrateBalancesToTlvRecords(t *testing.T) {
testCases := []struct {
name string
ourAmt lnwire.MilliSatoshi
theirAmt lnwire.MilliSatoshi
beforeMigrationFunc func(kvdb.RwTx) error
afterMigrationFunc func(kvdb.RwTx) error
shouldFail bool
}{
{
// Test when both balance fields are non-zero.
name: "non-zero local and remote",
ourAmt: testOurAmt,
theirAmt: testTheirAmt,
beforeMigrationFunc: genBeforeMigration(testChannel),
afterMigrationFunc: genAfterMigration(testChannel),
},
{
// Test when local balance is non-zero.
name: "non-zero local balance",
ourAmt: testOurAmt,
theirAmt: 0,
beforeMigrationFunc: genBeforeMigration(testChannel),
afterMigrationFunc: genAfterMigration(testChannel),
},
{
// Test when remote balance is non-zero.
name: "non-zero remote balance",
ourAmt: 0,
theirAmt: testTheirAmt,
beforeMigrationFunc: genBeforeMigration(testChannel),
afterMigrationFunc: genAfterMigration(testChannel),
},
{
// Test when both balance fields are zero.
name: "zero local and remote",
ourAmt: 0,
theirAmt: 0,
beforeMigrationFunc: genBeforeMigration(testChannel),
afterMigrationFunc: genAfterMigration(testChannel),
},
}
for _, tc := range testCases {
tc := tc
// Before running the test, set the balance fields based on the
// test params.
testChannel.InitialLocalBalance = tc.ourAmt
testChannel.InitialRemoteBalance = tc.theirAmt
t.Run(tc.name, func(t *testing.T) {
migtest.ApplyMigration(
t,
tc.beforeMigrationFunc,
tc.afterMigrationFunc,
MigrateBalancesToTlvRecords,
tc.shouldFail,
)
})
}
}
func genBeforeMigration(c *OpenChannel) func(kvdb.RwTx) error {
return func(tx kvdb.RwTx) error {
// Create the channel bucket.
chanBucket, err := mig25.CreateChanBucket(tx, &c.OpenChannel)
if err != nil {
return err
}
// Save the channel info using legacy format.
if err := PutChanInfo(chanBucket, c, true); err != nil {
return err
}
return nil
}
}
func genAfterMigration(c *OpenChannel) func(kvdb.RwTx) error {
return func(tx kvdb.RwTx) error {
chanBucket, err := mig25.FetchChanBucket(tx, &c.OpenChannel)
if err != nil {
return err
}
newChan := &OpenChannel{}
// Fetch the channel info using the new format.
err = FetchChanInfo(chanBucket, newChan, false)
if err != nil {
return err
}
// Check our initial amount is correct.
if newChan.InitialLocalBalance != c.InitialLocalBalance {
return fmt.Errorf("wrong local balance, got %d, "+
"want %d", newChan.InitialLocalBalance,
c.InitialLocalBalance)
}
// Check their initial amount is correct.
if newChan.InitialRemoteBalance != c.InitialRemoteBalance {
return fmt.Errorf("wrong remote balance, got %d, "+
"want %d", newChan.InitialRemoteBalance,
c.InitialRemoteBalance)
}
// We also check the relevant channel info fields stay the
// same.
if !newChan.IdentityPub.IsEqual(dummyPubKey) {
return fmt.Errorf("wrong IdentityPub")
}
if newChan.FundingOutpoint != dummyOp {
return fmt.Errorf("wrong FundingOutpoint")
}
return nil
}
}

View file

@ -0,0 +1,233 @@
package migration27
import (
"bytes"
"fmt"
lnwire "github.com/lightningnetwork/lnd/channeldb/migration/lnwire21"
mig25 "github.com/lightningnetwork/lnd/channeldb/migration25"
mig26 "github.com/lightningnetwork/lnd/channeldb/migration26"
mig "github.com/lightningnetwork/lnd/channeldb/migration_01_to_11"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/tlv"
)
const (
// A tlv type definition used to serialize and deserialize a KeyLocator
// from the database.
keyLocType tlv.Type = 1
// A tlv type used to serialize and deserialize the
// `InitialLocalBalance` field.
initialLocalBalanceType tlv.Type = 2
// A tlv type used to serialize and deserialize the
// `InitialRemoteBalance` field.
initialRemoteBalanceType tlv.Type = 3
)
var (
// chanInfoKey can be accessed within the bucket for a channel
// (identified by its chanPoint). This key stores all the static
// information for a channel which is decided at the end of the
// funding flow.
chanInfoKey = []byte("chan-info-key")
// localUpfrontShutdownKey can be accessed within the bucket for a
// channel (identified by its chanPoint). This key stores an optional
// upfront shutdown script for the local peer.
localUpfrontShutdownKey = []byte("local-upfront-shutdown-key")
// remoteUpfrontShutdownKey can be accessed within the bucket for a
// channel (identified by its chanPoint). This key stores an optional
// upfront shutdown script for the remote peer.
remoteUpfrontShutdownKey = []byte("remote-upfront-shutdown-key")
// lastWasRevokeKey is a key that stores true when the last update we
// sent was a revocation and false when it was a commitment signature.
// This is nil in the case of new channels with no updates exchanged.
lastWasRevokeKey = []byte("last-was-revoke")
// ErrNoChanInfoFound is returned when a particular channel does not
// have any channels state.
ErrNoChanInfoFound = fmt.Errorf("no chan info found")
)
// OpenChannel embeds a mig26.OpenChannel with the extra update-to-date
// serialization and deserialization methods.
//
// NOTE: doesn't have the Packager field as it's not used in current migration.
type OpenChannel struct {
mig26.OpenChannel
// chanStatus is the current status of this channel. If it is not in
// the state Default, it should not be used for forwarding payments.
chanStatus mig25.ChannelStatus
}
// FetchChanInfo deserializes the channel info based on the legacy boolean.
func FetchChanInfo(chanBucket kvdb.RBucket, c *OpenChannel, legacy bool) error {
infoBytes := chanBucket.Get(chanInfoKey)
if infoBytes == nil {
return ErrNoChanInfoFound
}
r := bytes.NewReader(infoBytes)
var (
chanType mig.ChannelType
chanStatus mig.ChannelStatus
)
if err := mig.ReadElements(r,
&chanType, &c.ChainHash, &c.FundingOutpoint,
&c.ShortChannelID, &c.IsPending, &c.IsInitiator,
&chanStatus, &c.FundingBroadcastHeight,
&c.NumConfsRequired, &c.ChannelFlags,
&c.IdentityPub, &c.Capacity, &c.TotalMSatSent,
&c.TotalMSatReceived,
); err != nil {
return fmt.Errorf("ReadElements got: %v", err)
}
c.ChanType = mig25.ChannelType(chanType)
c.chanStatus = mig25.ChannelStatus(chanStatus)
// For single funder channels that we initiated and have the funding
// transaction to, read the funding txn.
if c.FundingTxPresent() {
if err := mig.ReadElement(r, &c.FundingTxn); err != nil {
return fmt.Errorf("read FundingTxn got: %v", err)
}
}
if err := mig.ReadChanConfig(r, &c.LocalChanCfg); err != nil {
return fmt.Errorf("read LocalChanCfg got: %v", err)
}
if err := mig.ReadChanConfig(r, &c.RemoteChanCfg); err != nil {
return fmt.Errorf("read RemoteChanCfg got: %v", err)
}
// Retrieve the boolean stored under lastWasRevokeKey.
lastWasRevokeBytes := chanBucket.Get(lastWasRevokeKey)
if lastWasRevokeBytes == nil {
// If nothing has been stored under this key, we store false in
// the OpenChannel struct.
c.LastWasRevoke = false
} else {
// Otherwise, read the value into the LastWasRevoke field.
revokeReader := bytes.NewReader(lastWasRevokeBytes)
err := mig.ReadElements(revokeReader, &c.LastWasRevoke)
if err != nil {
return fmt.Errorf("read LastWasRevoke got: %v", err)
}
}
// Make the tlv stream based on the legacy param.
var (
ts *tlv.Stream
err error
localBalance uint64
remoteBalance uint64
)
keyLocRecord := mig25.MakeKeyLocRecord(
keyLocType, &c.RevocationKeyLocator,
)
// If it's legacy, create the stream with a single tlv record.
if legacy {
ts, err = tlv.NewStream(keyLocRecord)
} else {
// Otherwise, for the new format, we will encode the balance
// fields in the tlv stream too.
ts, err = tlv.NewStream(
keyLocRecord,
tlv.MakePrimitiveRecord(
initialLocalBalanceType, &localBalance,
),
tlv.MakePrimitiveRecord(
initialRemoteBalanceType, &remoteBalance,
),
)
}
if err != nil {
return fmt.Errorf("create tlv stream got: %v", err)
}
if err := ts.Decode(r); err != nil {
return fmt.Errorf("decode tlv stream got: %v", err)
}
// For the new format, attach the balance fields.
if !legacy {
c.InitialLocalBalance = lnwire.MilliSatoshi(localBalance)
c.InitialRemoteBalance = lnwire.MilliSatoshi(remoteBalance)
}
// Finally, read the optional shutdown scripts.
if err := mig25.GetOptionalUpfrontShutdownScript(
chanBucket, localUpfrontShutdownKey, &c.LocalShutdownScript,
); err != nil {
return fmt.Errorf("local shutdown script got: %v", err)
}
return mig25.GetOptionalUpfrontShutdownScript(
chanBucket, remoteUpfrontShutdownKey, &c.RemoteShutdownScript,
)
}
// PutChanInfo serializes the channel info based on the legacy boolean.
func PutChanInfo(chanBucket kvdb.RwBucket, c *OpenChannel, legacy bool) error {
var w bytes.Buffer
if err := mig.WriteElements(&w,
mig.ChannelType(c.ChanType), c.ChainHash, c.FundingOutpoint,
c.ShortChannelID, c.IsPending, c.IsInitiator,
mig.ChannelStatus(c.chanStatus), c.FundingBroadcastHeight,
c.NumConfsRequired, c.ChannelFlags,
c.IdentityPub, c.Capacity, c.TotalMSatSent,
c.TotalMSatReceived,
); err != nil {
return err
}
// For single funder channels that we initiated, and we have the
// funding transaction, then write the funding txn.
if c.FundingTxPresent() {
if err := mig.WriteElement(&w, c.FundingTxn); err != nil {
return err
}
}
if err := mig.WriteChanConfig(&w, &c.LocalChanCfg); err != nil {
return err
}
if err := mig.WriteChanConfig(&w, &c.RemoteChanCfg); err != nil {
return err
}
// Make the tlv stream based on the legacy param.
tlvStream, err := mig26.MakeTlvStream(&c.OpenChannel, legacy)
if err != nil {
return err
}
if err := tlvStream.Encode(&w); err != nil {
return err
}
if err := chanBucket.Put(chanInfoKey, w.Bytes()); err != nil {
return err
}
// Finally, add optional shutdown scripts for the local and remote peer
// if they are present.
if err := mig25.PutOptionalUpfrontShutdownScript(
chanBucket, localUpfrontShutdownKey, c.LocalShutdownScript,
); err != nil {
return err
}
return mig25.PutOptionalUpfrontShutdownScript(
chanBucket, remoteUpfrontShutdownKey, c.RemoteShutdownScript,
)
}

View file

@ -0,0 +1,14 @@
package migration27
import (
"github.com/btcsuite/btclog"
)
// log is a logger that is initialized as disabled. This means the package will
// not perform any logging by default until a logger is set.
var log = btclog.Disabled
// UseLogger uses a specified Logger to output package logging info.
func UseLogger(logger btclog.Logger) {
log = logger
}

View file

@ -0,0 +1,149 @@
package migration27
import (
"bytes"
"fmt"
mig26 "github.com/lightningnetwork/lnd/channeldb/migration26"
mig "github.com/lightningnetwork/lnd/channeldb/migration_01_to_11"
"github.com/lightningnetwork/lnd/kvdb"
)
var (
// historicalChannelBucket stores all channels that have seen their
// commitment tx confirm. All information from their previous open state
// is retained.
historicalChannelBucket = []byte("historical-chan-bucket")
)
// MigrateHistoricalBalances patches the two new fields, `InitialLocalBalance`
// and `InitialRemoteBalance`, for all the open channels saved in historical
// channel bucket. Unlike migration 25, it will only read the old channel info
// first and then patch the new tlv records with empty values. For historical
// channels, we previously didn't save the initial balances anywhere and since
// it's corresponding open channel bucket is deleted after closure, we have
// lost that balance info.
func MigrateHistoricalBalances(tx kvdb.RwTx) error {
log.Infof("Migrating historical local and remote balances...")
// First fetch the top level bucket which stores all data related to
// historically stored channels.
rootBucket := tx.ReadWriteBucket(historicalChannelBucket)
// If no bucket is found, we can exit early.
if rootBucket == nil {
return nil
}
// Read a list of historical channels.
channels, err := findHistoricalChannels(rootBucket)
if err != nil {
return err
}
// Migrate the balances.
for _, c := range channels {
if err := migrateBalances(rootBucket, c); err != nil {
return err
}
}
return err
}
// findHistoricalChannels finds all historical channels.
func findHistoricalChannels(historicalBucket kvdb.RBucket) ([]*OpenChannel,
error) {
channels := []*OpenChannel{}
// readChannel is a helper closure that reads the channel info from the
// historical sub-bucket.
readChannel := func(rootBucket kvdb.RBucket, cp []byte) error {
c := &OpenChannel{}
chanPointBuf := bytes.NewBuffer(cp)
err := mig.ReadOutpoint(chanPointBuf, &c.FundingOutpoint)
if err != nil {
return fmt.Errorf("read funding outpoint got: %v", err)
}
// Read the sub-bucket.
chanBucket := rootBucket.NestedReadBucket(cp)
if chanBucket == nil {
log.Errorf("unable to read bucket for chanPoint=%s",
c.FundingOutpoint)
return nil
}
// Try to fetch channel info in old format.
err = fetchChanInfoCompatible(chanBucket, c, true)
if err != nil {
return fmt.Errorf("%s: fetch chan info got: %v",
c.FundingOutpoint, err)
}
channels = append(channels, c)
return nil
}
// Iterate the root bucket.
err := historicalBucket.ForEach(func(cp, _ []byte) error {
return readChannel(historicalBucket, cp)
})
if err != nil {
return nil, err
}
return channels, nil
}
// fetchChanInfoCompatible tries to fetch the channel info for a historical
// channel. It will first fetch the info assuming `InitialLocalBalance` and
// `InitialRemoteBalance` are not serialized. Upon receiving an error, it will
// then fetch it again assuming the two fields are present in db.
func fetchChanInfoCompatible(chanBucket kvdb.RBucket, c *OpenChannel,
legacy bool) error {
// Try to fetch the channel info assuming the historical channel in in
// the old format, where the two fields, `InitialLocalBalance` and
// `InitialRemoteBalance` are not saved to db.
err := FetchChanInfo(chanBucket, c, legacy)
if err == nil {
return err
}
// If we got an error above, the historical channel may already have
// the new fields saved. This could happen when a channel is closed
// after applying migration 25. In this case, we'll borrow the
// `FetchChanInfo` info method from migration 26 where we assume the
// two fields are saved.
return mig26.FetchChanInfo(chanBucket, &c.OpenChannel, legacy)
}
// migrateBalances serializes the channel info using the new tlv format where
// the two fields, `InitialLocalBalance` and `InitialRemoteBalance` are patched
// with empty values.
func migrateBalances(rootBucket kvdb.RwBucket, c *OpenChannel) error {
var chanPointBuf bytes.Buffer
err := mig.WriteOutpoint(&chanPointBuf, &c.FundingOutpoint)
if err != nil {
return err
}
// Get the channel bucket.
chanBucket := rootBucket.NestedReadWriteBucket(chanPointBuf.Bytes())
if chanBucket == nil {
return fmt.Errorf("empty historical chan bucket")
}
// Update the channel info.
if err := PutChanInfo(chanBucket, c, false); err != nil {
return fmt.Errorf("unable to put chan info: %v", err)
}
return nil
}

View file

@ -0,0 +1,203 @@
package migration27
import (
"bytes"
"fmt"
"testing"
"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
mig25 "github.com/lightningnetwork/lnd/channeldb/migration25"
mig26 "github.com/lightningnetwork/lnd/channeldb/migration26"
mig "github.com/lightningnetwork/lnd/channeldb/migration_01_to_11"
"github.com/lightningnetwork/lnd/channeldb/migtest"
"github.com/lightningnetwork/lnd/kvdb"
)
var (
// Create dummy values to be stored in db.
dummyPrivKey, _ = btcec.NewPrivateKey()
dummyPubKey = dummyPrivKey.PubKey()
dummyOp = wire.OutPoint{
Hash: chainhash.Hash{},
Index: 9,
}
// dummyInput is used in our commit tx.
dummyInput = &wire.TxIn{
PreviousOutPoint: wire.OutPoint{
Hash: chainhash.Hash{},
Index: 0xffffffff,
},
Sequence: 0xffffffff,
}
// toLocalScript is the PkScript used in to-local output.
toLocalScript = []byte{
0x0, 0x14, 0xc6, 0x9, 0x62, 0xab, 0x60, 0xbe,
0x40, 0xd, 0xab, 0x31, 0xc, 0x13, 0x14, 0x15,
0x93, 0xe6, 0xa2, 0x94, 0xe4, 0x2a,
}
// commitTx1 is the tx saved in the first old revocation.
commitTx1 = &wire.MsgTx{
Version: 2,
// Add a dummy input.
TxIn: []*wire.TxIn{dummyInput},
TxOut: []*wire.TxOut{
{
Value: 990_950,
PkScript: toLocalScript,
},
},
}
// testChannel is used to test the balance fields are correctly set.
testChannel = &OpenChannel{
OpenChannel: mig26.OpenChannel{
OpenChannel: mig25.OpenChannel{
OpenChannel: mig.OpenChannel{
IdentityPub: dummyPubKey,
FundingOutpoint: dummyOp,
FundingTxn: commitTx1,
IsInitiator: true,
},
},
},
}
)
// TestMigrateHistoricalBalances checks that the initial balances fields are
// patched to the historical channel info.
func TestMigrateHistoricalBalances(t *testing.T) {
// Test that when the historical channel doesn't have the two new
// fields.
migtest.ApplyMigration(
t,
genBeforeMigration(testChannel, false),
genAfterMigration(testChannel),
MigrateHistoricalBalances,
false,
)
// Test that when the historical channel have the two new fields.
migtest.ApplyMigration(
t,
genBeforeMigration(testChannel, true),
genAfterMigration(testChannel),
MigrateHistoricalBalances,
false,
)
}
func genBeforeMigration(c *OpenChannel, regression bool) func(kvdb.RwTx) error {
return func(tx kvdb.RwTx) error {
// Create the channel bucket.
chanBucket, err := createHistoricalBucket(tx, c)
if err != nil {
return err
}
// Save the channel info using legacy format.
if regression {
// If test regression, then the historical channel
// would have the two fields created. Thus we use the
// method from migration26 which will save the two
// fields for when legacy is true.
return mig26.PutChanInfo(
chanBucket, &c.OpenChannel, true,
)
}
// Otherwise we will save the channel without the new fields.
return PutChanInfo(chanBucket, c, true)
}
}
func genAfterMigration(c *OpenChannel) func(kvdb.RwTx) error {
return func(tx kvdb.RwTx) error {
chanBucket, err := fetchHistoricalChanBucket(tx, c)
if err != nil {
return err
}
newChan := &OpenChannel{}
// Fetch the channel info using the new format.
//
// NOTE: this is the main testing point where we check the
// deserialization of the historical channel bucket is correct.
err = FetchChanInfo(chanBucket, newChan, false)
if err != nil {
return err
}
// Check our initial amount is correct.
if newChan.InitialLocalBalance != 0 {
return fmt.Errorf("wrong local balance, got %d, "+
"want %d", newChan.InitialLocalBalance,
c.InitialLocalBalance)
}
// Check their initial amount is correct.
if newChan.InitialRemoteBalance != 0 {
return fmt.Errorf("wrong remote balance, got %d, "+
"want %d", newChan.InitialRemoteBalance,
c.InitialRemoteBalance)
}
// We also check the relevant channel info fields stay the
// same.
if !newChan.IdentityPub.IsEqual(c.IdentityPub) {
return fmt.Errorf("wrong IdentityPub")
}
if newChan.FundingOutpoint != c.FundingOutpoint {
return fmt.Errorf("wrong FundingOutpoint")
}
if !newChan.IsInitiator {
return fmt.Errorf("wrong IsInitiator")
}
if newChan.FundingTxn.TxHash() != commitTx1.TxHash() {
return fmt.Errorf("wrong FundingTxn")
}
return nil
}
}
func createHistoricalBucket(tx kvdb.RwTx, c *OpenChannel) (kvdb.RwBucket, error) {
// First fetch the top level bucket which stores all data related to
// historical channels.
rootBucket, err := tx.CreateTopLevelBucket(historicalChannelBucket)
if err != nil {
return nil, err
}
var chanPointBuf bytes.Buffer
err = mig.WriteOutpoint(&chanPointBuf, &c.FundingOutpoint)
if err != nil {
return nil, err
}
// Create the sub-bucket.
return rootBucket.CreateBucketIfNotExists(chanPointBuf.Bytes())
}
func fetchHistoricalChanBucket(tx kvdb.RTx,
c *OpenChannel) (kvdb.RBucket, error) {
rootBucket := tx.ReadBucket(historicalChannelBucket)
if rootBucket == nil {
return nil, fmt.Errorf("expected a rootBucket")
}
var chanPointBuf bytes.Buffer
err := mig.WriteOutpoint(&chanPointBuf, &c.FundingOutpoint)
if err != nil {
return nil, err
}
return rootBucket.NestedReadBucket(chanPointBuf.Bytes()), nil
}

View file

@ -332,6 +332,14 @@ to the htlc interceptor API.
grow very large on disk given a busy operating channel, [which is now changed
with a space deduction over (at least) 96 percents.](https://github.com/lightningnetwork/lnd/pull/6347)
* Aside from the above database optimization, two new fields,
[`InitialLocalBalance` and `InitialRemoteBalance` have been added to each
channel to keep track of the push
amount](https://github.com/lightningnetwork/lnd/pull/6551). For open
channels, these values are taken from reading its past states. For
historical(closed) channels, they are patched with empty values as the
channels' past states have been deleted during closing.
* [Mobile builds now expose main sub-servers by default](https://github.com/lightningnetwork/lnd/pull/6464).
All API methods have prefixed the generated methods with the subserver name.
This is required to support subservers with name conflicts.