channeldb+migration27: patch balance fields for historical chan

This commit adds a new migration to patch the two balance fields,
`InitialLocalBalance` and `InitialRemoteBalance` for the historical
channels. Because they are not saved previously, for historical channels
prior to the revocation log PR, these fields will be empty.
This commit is contained in:
yyforyongyu 2022-05-18 23:55:11 +08:00
parent 55746e427e
commit 3458b2eb7d
No known key found for this signature in database
GPG Key ID: 9BCD95C4FF296868
5 changed files with 606 additions and 0 deletions

View File

@ -21,6 +21,7 @@ import (
"github.com/lightningnetwork/lnd/channeldb/migration24"
"github.com/lightningnetwork/lnd/channeldb/migration25"
"github.com/lightningnetwork/lnd/channeldb/migration26"
"github.com/lightningnetwork/lnd/channeldb/migration27"
"github.com/lightningnetwork/lnd/channeldb/migration_01_to_11"
"github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/kvdb"
@ -219,6 +220,12 @@ var (
number: 26,
migration: migration26.MigrateBalancesToTlvRecords,
},
{
// Patch the initial local/remote balance fields with
// empty values for historical channels.
number: 27,
migration: migration27.MigrateHistoricalBalances,
},
}
// Big endian is the preferred byte order, due to cursor scans over

View File

@ -0,0 +1,233 @@
package migration27
import (
"bytes"
"fmt"
lnwire "github.com/lightningnetwork/lnd/channeldb/migration/lnwire21"
mig25 "github.com/lightningnetwork/lnd/channeldb/migration25"
mig26 "github.com/lightningnetwork/lnd/channeldb/migration26"
mig "github.com/lightningnetwork/lnd/channeldb/migration_01_to_11"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/tlv"
)
const (
// A tlv type definition used to serialize and deserialize a KeyLocator
// from the database.
keyLocType tlv.Type = 1
// A tlv type used to serialize and deserialize the
// `InitialLocalBalance` field.
initialLocalBalanceType tlv.Type = 2
// A tlv type used to serialize and deserialize the
// `InitialRemoteBalance` field.
initialRemoteBalanceType tlv.Type = 3
)
var (
// chanInfoKey can be accessed within the bucket for a channel
// (identified by its chanPoint). This key stores all the static
// information for a channel which is decided at the end of the
// funding flow.
chanInfoKey = []byte("chan-info-key")
// localUpfrontShutdownKey can be accessed within the bucket for a
// channel (identified by its chanPoint). This key stores an optional
// upfront shutdown script for the local peer.
localUpfrontShutdownKey = []byte("local-upfront-shutdown-key")
// remoteUpfrontShutdownKey can be accessed within the bucket for a
// channel (identified by its chanPoint). This key stores an optional
// upfront shutdown script for the remote peer.
remoteUpfrontShutdownKey = []byte("remote-upfront-shutdown-key")
// lastWasRevokeKey is a key that stores true when the last update we
// sent was a revocation and false when it was a commitment signature.
// This is nil in the case of new channels with no updates exchanged.
lastWasRevokeKey = []byte("last-was-revoke")
// ErrNoChanInfoFound is returned when a particular channel does not
// have any channels state.
ErrNoChanInfoFound = fmt.Errorf("no chan info found")
)
// OpenChannel embeds a mig26.OpenChannel with the extra update-to-date
// serialization and deserialization methods.
//
// NOTE: doesn't have the Packager field as it's not used in current migration.
type OpenChannel struct {
mig26.OpenChannel
// chanStatus is the current status of this channel. If it is not in
// the state Default, it should not be used for forwarding payments.
chanStatus mig25.ChannelStatus
}
// FetchChanInfo deserializes the channel info based on the legacy boolean.
func FetchChanInfo(chanBucket kvdb.RBucket, c *OpenChannel, legacy bool) error {
infoBytes := chanBucket.Get(chanInfoKey)
if infoBytes == nil {
return ErrNoChanInfoFound
}
r := bytes.NewReader(infoBytes)
var (
chanType mig.ChannelType
chanStatus mig.ChannelStatus
)
if err := mig.ReadElements(r,
&chanType, &c.ChainHash, &c.FundingOutpoint,
&c.ShortChannelID, &c.IsPending, &c.IsInitiator,
&chanStatus, &c.FundingBroadcastHeight,
&c.NumConfsRequired, &c.ChannelFlags,
&c.IdentityPub, &c.Capacity, &c.TotalMSatSent,
&c.TotalMSatReceived,
); err != nil {
return fmt.Errorf("ReadElements got: %v", err)
}
c.ChanType = mig25.ChannelType(chanType)
c.chanStatus = mig25.ChannelStatus(chanStatus)
// For single funder channels that we initiated and have the funding
// transaction to, read the funding txn.
if c.FundingTxPresent() {
if err := mig.ReadElement(r, &c.FundingTxn); err != nil {
return fmt.Errorf("read FundingTxn got: %v", err)
}
}
if err := mig.ReadChanConfig(r, &c.LocalChanCfg); err != nil {
return fmt.Errorf("read LocalChanCfg got: %v", err)
}
if err := mig.ReadChanConfig(r, &c.RemoteChanCfg); err != nil {
return fmt.Errorf("read RemoteChanCfg got: %v", err)
}
// Retrieve the boolean stored under lastWasRevokeKey.
lastWasRevokeBytes := chanBucket.Get(lastWasRevokeKey)
if lastWasRevokeBytes == nil {
// If nothing has been stored under this key, we store false in
// the OpenChannel struct.
c.LastWasRevoke = false
} else {
// Otherwise, read the value into the LastWasRevoke field.
revokeReader := bytes.NewReader(lastWasRevokeBytes)
err := mig.ReadElements(revokeReader, &c.LastWasRevoke)
if err != nil {
return fmt.Errorf("read LastWasRevoke got: %v", err)
}
}
// Make the tlv stream based on the legacy param.
var (
ts *tlv.Stream
err error
localBalance uint64
remoteBalance uint64
)
keyLocRecord := mig25.MakeKeyLocRecord(
keyLocType, &c.RevocationKeyLocator,
)
// If it's legacy, create the stream with a single tlv record.
if legacy {
ts, err = tlv.NewStream(keyLocRecord)
} else {
// Otherwise, for the new format, we will encode the balance
// fields in the tlv stream too.
ts, err = tlv.NewStream(
keyLocRecord,
tlv.MakePrimitiveRecord(
initialLocalBalanceType, &localBalance,
),
tlv.MakePrimitiveRecord(
initialRemoteBalanceType, &remoteBalance,
),
)
}
if err != nil {
return fmt.Errorf("create tlv stream got: %v", err)
}
if err := ts.Decode(r); err != nil {
return fmt.Errorf("decode tlv stream got: %v", err)
}
// For the new format, attach the balance fields.
if !legacy {
c.InitialLocalBalance = lnwire.MilliSatoshi(localBalance)
c.InitialRemoteBalance = lnwire.MilliSatoshi(remoteBalance)
}
// Finally, read the optional shutdown scripts.
if err := mig25.GetOptionalUpfrontShutdownScript(
chanBucket, localUpfrontShutdownKey, &c.LocalShutdownScript,
); err != nil {
return fmt.Errorf("local shutdown script got: %v", err)
}
return mig25.GetOptionalUpfrontShutdownScript(
chanBucket, remoteUpfrontShutdownKey, &c.RemoteShutdownScript,
)
}
// PutChanInfo serializes the channel info based on the legacy boolean.
func PutChanInfo(chanBucket kvdb.RwBucket, c *OpenChannel, legacy bool) error {
var w bytes.Buffer
if err := mig.WriteElements(&w,
mig.ChannelType(c.ChanType), c.ChainHash, c.FundingOutpoint,
c.ShortChannelID, c.IsPending, c.IsInitiator,
mig.ChannelStatus(c.chanStatus), c.FundingBroadcastHeight,
c.NumConfsRequired, c.ChannelFlags,
c.IdentityPub, c.Capacity, c.TotalMSatSent,
c.TotalMSatReceived,
); err != nil {
return err
}
// For single funder channels that we initiated, and we have the
// funding transaction, then write the funding txn.
if c.FundingTxPresent() {
if err := mig.WriteElement(&w, c.FundingTxn); err != nil {
return err
}
}
if err := mig.WriteChanConfig(&w, &c.LocalChanCfg); err != nil {
return err
}
if err := mig.WriteChanConfig(&w, &c.RemoteChanCfg); err != nil {
return err
}
// Make the tlv stream based on the legacy param.
tlvStream, err := mig26.MakeTlvStream(&c.OpenChannel, legacy)
if err != nil {
return err
}
if err := tlvStream.Encode(&w); err != nil {
return err
}
if err := chanBucket.Put(chanInfoKey, w.Bytes()); err != nil {
return err
}
// Finally, add optional shutdown scripts for the local and remote peer
// if they are present.
if err := mig25.PutOptionalUpfrontShutdownScript(
chanBucket, localUpfrontShutdownKey, c.LocalShutdownScript,
); err != nil {
return err
}
return mig25.PutOptionalUpfrontShutdownScript(
chanBucket, remoteUpfrontShutdownKey, c.RemoteShutdownScript,
)
}

View File

@ -0,0 +1,14 @@
package migration27
import (
"github.com/btcsuite/btclog"
)
// log is a logger that is initialized as disabled. This means the package will
// not perform any logging by default until a logger is set.
var log = btclog.Disabled
// UseLogger uses a specified Logger to output package logging info.
func UseLogger(logger btclog.Logger) {
log = logger
}

View File

@ -0,0 +1,149 @@
package migration27
import (
"bytes"
"fmt"
mig26 "github.com/lightningnetwork/lnd/channeldb/migration26"
mig "github.com/lightningnetwork/lnd/channeldb/migration_01_to_11"
"github.com/lightningnetwork/lnd/kvdb"
)
var (
// historicalChannelBucket stores all channels that have seen their
// commitment tx confirm. All information from their previous open state
// is retained.
historicalChannelBucket = []byte("historical-chan-bucket")
)
// MigrateHistoricalBalances patches the two new fields, `InitialLocalBalance`
// and `InitialRemoteBalance`, for all the open channels saved in historical
// channel bucket. Unlike migration 25, it will only read the old channel info
// first and then patch the new tlv records with empty values. For historical
// channels, we previously didn't save the initial balances anywhere and since
// it's corresponding open channel bucket is deleted after closure, we have
// lost that balance info.
func MigrateHistoricalBalances(tx kvdb.RwTx) error {
log.Infof("Migrating historical local and remote balances...")
// First fetch the top level bucket which stores all data related to
// historically stored channels.
rootBucket := tx.ReadWriteBucket(historicalChannelBucket)
// If no bucket is found, we can exit early.
if rootBucket == nil {
return nil
}
// Read a list of historical channels.
channels, err := findHistoricalChannels(rootBucket)
if err != nil {
return err
}
// Migrate the balances.
for _, c := range channels {
if err := migrateBalances(rootBucket, c); err != nil {
return err
}
}
return err
}
// findHistoricalChannels finds all historical channels.
func findHistoricalChannels(historicalBucket kvdb.RBucket) ([]*OpenChannel,
error) {
channels := []*OpenChannel{}
// readChannel is a helper closure that reads the channel info from the
// historical sub-bucket.
readChannel := func(rootBucket kvdb.RBucket, cp []byte) error {
c := &OpenChannel{}
chanPointBuf := bytes.NewBuffer(cp)
err := mig.ReadOutpoint(chanPointBuf, &c.FundingOutpoint)
if err != nil {
return fmt.Errorf("read funding outpoint got: %v", err)
}
// Read the sub-bucket.
chanBucket := rootBucket.NestedReadBucket(cp)
if chanBucket == nil {
log.Errorf("unable to read bucket for chanPoint=%s",
c.FundingOutpoint)
return nil
}
// Try to fetch channel info in old format.
err = fetchChanInfoCompatible(chanBucket, c, true)
if err != nil {
return fmt.Errorf("%s: fetch chan info got: %v",
c.FundingOutpoint, err)
}
channels = append(channels, c)
return nil
}
// Iterate the root bucket.
err := historicalBucket.ForEach(func(cp, _ []byte) error {
return readChannel(historicalBucket, cp)
})
if err != nil {
return nil, err
}
return channels, nil
}
// fetchChanInfoCompatible tries to fetch the channel info for a historical
// channel. It will first fetch the info assuming `InitialLocalBalance` and
// `InitialRemoteBalance` are not serialized. Upon receiving an error, it will
// then fetch it again assuming the two fields are present in db.
func fetchChanInfoCompatible(chanBucket kvdb.RBucket, c *OpenChannel,
legacy bool) error {
// Try to fetch the channel info assuming the historical channel in in
// the old format, where the two fields, `InitialLocalBalance` and
// `InitialRemoteBalance` are not saved to db.
err := FetchChanInfo(chanBucket, c, legacy)
if err == nil {
return err
}
// If we got an error above, the historical channel may already have
// the new fields saved. This could happen when a channel is closed
// after applying migration 25. In this case, we'll borrow the
// `FetchChanInfo` info method from migration 26 where we assume the
// two fields are saved.
return mig26.FetchChanInfo(chanBucket, &c.OpenChannel, legacy)
}
// migrateBalances serializes the channel info using the new tlv format where
// the two fields, `InitialLocalBalance` and `InitialRemoteBalance` are patched
// with empty values.
func migrateBalances(rootBucket kvdb.RwBucket, c *OpenChannel) error {
var chanPointBuf bytes.Buffer
err := mig.WriteOutpoint(&chanPointBuf, &c.FundingOutpoint)
if err != nil {
return err
}
// Get the channel bucket.
chanBucket := rootBucket.NestedReadWriteBucket(chanPointBuf.Bytes())
if chanBucket == nil {
return fmt.Errorf("empty historical chan bucket")
}
// Update the channel info.
if err := PutChanInfo(chanBucket, c, false); err != nil {
return fmt.Errorf("unable to put chan info: %v", err)
}
return nil
}

View File

@ -0,0 +1,203 @@
package migration27
import (
"bytes"
"fmt"
"testing"
"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
mig25 "github.com/lightningnetwork/lnd/channeldb/migration25"
mig26 "github.com/lightningnetwork/lnd/channeldb/migration26"
mig "github.com/lightningnetwork/lnd/channeldb/migration_01_to_11"
"github.com/lightningnetwork/lnd/channeldb/migtest"
"github.com/lightningnetwork/lnd/kvdb"
)
var (
// Create dummy values to be stored in db.
dummyPrivKey, _ = btcec.NewPrivateKey()
dummyPubKey = dummyPrivKey.PubKey()
dummyOp = wire.OutPoint{
Hash: chainhash.Hash{},
Index: 9,
}
// dummyInput is used in our commit tx.
dummyInput = &wire.TxIn{
PreviousOutPoint: wire.OutPoint{
Hash: chainhash.Hash{},
Index: 0xffffffff,
},
Sequence: 0xffffffff,
}
// toLocalScript is the PkScript used in to-local output.
toLocalScript = []byte{
0x0, 0x14, 0xc6, 0x9, 0x62, 0xab, 0x60, 0xbe,
0x40, 0xd, 0xab, 0x31, 0xc, 0x13, 0x14, 0x15,
0x93, 0xe6, 0xa2, 0x94, 0xe4, 0x2a,
}
// commitTx1 is the tx saved in the first old revocation.
commitTx1 = &wire.MsgTx{
Version: 2,
// Add a dummy input.
TxIn: []*wire.TxIn{dummyInput},
TxOut: []*wire.TxOut{
{
Value: 990_950,
PkScript: toLocalScript,
},
},
}
// testChannel is used to test the balance fields are correctly set.
testChannel = &OpenChannel{
OpenChannel: mig26.OpenChannel{
OpenChannel: mig25.OpenChannel{
OpenChannel: mig.OpenChannel{
IdentityPub: dummyPubKey,
FundingOutpoint: dummyOp,
FundingTxn: commitTx1,
IsInitiator: true,
},
},
},
}
)
// TestMigrateHistoricalBalances checks that the initial balances fields are
// patched to the historical channel info.
func TestMigrateHistoricalBalances(t *testing.T) {
// Test that when the historical channel doesn't have the two new
// fields.
migtest.ApplyMigration(
t,
genBeforeMigration(testChannel, false),
genAfterMigration(testChannel),
MigrateHistoricalBalances,
false,
)
// Test that when the historical channel have the two new fields.
migtest.ApplyMigration(
t,
genBeforeMigration(testChannel, true),
genAfterMigration(testChannel),
MigrateHistoricalBalances,
false,
)
}
func genBeforeMigration(c *OpenChannel, regression bool) func(kvdb.RwTx) error {
return func(tx kvdb.RwTx) error {
// Create the channel bucket.
chanBucket, err := createHistoricalBucket(tx, c)
if err != nil {
return err
}
// Save the channel info using legacy format.
if regression {
// If test regression, then the historical channel
// would have the two fields created. Thus we use the
// method from migration26 which will save the two
// fields for when legacy is true.
return mig26.PutChanInfo(
chanBucket, &c.OpenChannel, true,
)
}
// Otherwise we will save the channel without the new fields.
return PutChanInfo(chanBucket, c, true)
}
}
func genAfterMigration(c *OpenChannel) func(kvdb.RwTx) error {
return func(tx kvdb.RwTx) error {
chanBucket, err := fetchHistoricalChanBucket(tx, c)
if err != nil {
return err
}
newChan := &OpenChannel{}
// Fetch the channel info using the new format.
//
// NOTE: this is the main testing point where we check the
// deserialization of the historical channel bucket is correct.
err = FetchChanInfo(chanBucket, newChan, false)
if err != nil {
return err
}
// Check our initial amount is correct.
if newChan.InitialLocalBalance != 0 {
return fmt.Errorf("wrong local balance, got %d, "+
"want %d", newChan.InitialLocalBalance,
c.InitialLocalBalance)
}
// Check their initial amount is correct.
if newChan.InitialRemoteBalance != 0 {
return fmt.Errorf("wrong remote balance, got %d, "+
"want %d", newChan.InitialRemoteBalance,
c.InitialRemoteBalance)
}
// We also check the relevant channel info fields stay the
// same.
if !newChan.IdentityPub.IsEqual(c.IdentityPub) {
return fmt.Errorf("wrong IdentityPub")
}
if newChan.FundingOutpoint != c.FundingOutpoint {
return fmt.Errorf("wrong FundingOutpoint")
}
if !newChan.IsInitiator {
return fmt.Errorf("wrong IsInitiator")
}
if newChan.FundingTxn.TxHash() != commitTx1.TxHash() {
return fmt.Errorf("wrong FundingTxn")
}
return nil
}
}
func createHistoricalBucket(tx kvdb.RwTx, c *OpenChannel) (kvdb.RwBucket, error) {
// First fetch the top level bucket which stores all data related to
// historical channels.
rootBucket, err := tx.CreateTopLevelBucket(historicalChannelBucket)
if err != nil {
return nil, err
}
var chanPointBuf bytes.Buffer
err = mig.WriteOutpoint(&chanPointBuf, &c.FundingOutpoint)
if err != nil {
return nil, err
}
// Create the sub-bucket.
return rootBucket.CreateBucketIfNotExists(chanPointBuf.Bytes())
}
func fetchHistoricalChanBucket(tx kvdb.RTx,
c *OpenChannel) (kvdb.RBucket, error) {
rootBucket := tx.ReadBucket(historicalChannelBucket)
if rootBucket == nil {
return nil, fmt.Errorf("expected a rootBucket")
}
var chanPointBuf bytes.Buffer
err := mig.WriteOutpoint(&chanPointBuf, &c.FundingOutpoint)
if err != nil {
return nil, err
}
return rootBucket.NestedReadBucket(chanPointBuf.Bytes()), nil
}