migration30: cover the case where v0.15.0 is active

This commit changes how we locate the next migration height by including
the scenario where `lnd@v0.15.0` is active. In the new version, we will
see a mixed of new and old logs under the same open channel bucket.
Hence, we need to alter how we locate the next un-migrated height.
This commit is contained in:
yyforyongyu 2022-07-12 19:32:12 +08:00
parent 87f58a274b
commit 78a73f9761
No known key found for this signature in database
GPG key ID: 9BCD95C4FF296868
3 changed files with 182 additions and 73 deletions

View file

@ -2,6 +2,7 @@ package migration30
import ( import (
"bytes" "bytes"
"encoding/binary"
"errors" "errors"
"fmt" "fmt"
@ -71,7 +72,7 @@ func (ul *updateLocator) locateChanBucket(rootBucket kvdb.RwBucket) (
// findNextMigrateHeight finds the next commit height that's not migrated. It // findNextMigrateHeight finds the next commit height that's not migrated. It
// returns the commit height bytes found. A nil return value means the // returns the commit height bytes found. A nil return value means the
// migration has been completed for this particular channel bucket. // migration has been completed for this particular channel bucket.
func findNextMigrateHeight(chanBucket kvdb.RwBucket) ([]byte, error) { func findNextMigrateHeight(chanBucket kvdb.RwBucket) []byte {
// Read the old log bucket. The old bucket doesn't exist, indicating // Read the old log bucket. The old bucket doesn't exist, indicating
// either we don't have any old logs for this channel, or the migration // either we don't have any old logs for this channel, or the migration
// has been finished and the old bucket has been deleted. // has been finished and the old bucket has been deleted.
@ -79,7 +80,7 @@ func findNextMigrateHeight(chanBucket kvdb.RwBucket) ([]byte, error) {
revocationLogBucketDeprecated, revocationLogBucketDeprecated,
) )
if oldBucket == nil { if oldBucket == nil {
return nil, nil return nil
} }
// Acquire a read cursor for the old bucket. // Acquire a read cursor for the old bucket.
@ -92,7 +93,7 @@ func findNextMigrateHeight(chanBucket kvdb.RwBucket) ([]byte, error) {
logBucket := chanBucket.NestedReadBucket(revocationLogBucket) logBucket := chanBucket.NestedReadBucket(revocationLogBucket)
if logBucket == nil { if logBucket == nil {
nextHeight, _ := oldCursor.First() nextHeight, _ := oldCursor.First()
return nextHeight, nil return nextHeight
} }
// Acquire a read cursor for the new bucket. // Acquire a read cursor for the new bucket.
@ -100,37 +101,111 @@ func findNextMigrateHeight(chanBucket kvdb.RwBucket) ([]byte, error) {
// Read the last migrated record. If the key is nil, we haven't // Read the last migrated record. If the key is nil, we haven't
// migrated any logs yet. In this case we return the first commit // migrated any logs yet. In this case we return the first commit
// height found from the old revocation log bucket. // height found from the old revocation log bucket. For instance,
// - old log: [1, 2]
// - new log: []
// We will return the first key [1].
migratedHeight, _ := cursor.Last() migratedHeight, _ := cursor.Last()
if migratedHeight == nil { if migratedHeight == nil {
nextHeight, _ := oldCursor.First() nextHeight, _ := oldCursor.First()
return nextHeight, nil return nextHeight
} }
// Read the last height from the old log bucket. If the height of the // Read the last height from the old log bucket.
// last old revocation equals to the migrated height, we've done
// migrating for this channel.
endHeight, _ := oldCursor.Last() endHeight, _ := oldCursor.Last()
if bytes.Equal(migratedHeight, endHeight) {
return nil, nil switch bytes.Compare(migratedHeight, endHeight) {
// If the height of the last old revocation equals to the migrated
// height, we've done migrating for this channel. For instance,
// - old log: [1, 2]
// - new log: [1, 2]
case 0:
return nil
// If the migrated height is smaller, it means this is a resumed
// migration. In this case we will return the next height found in the
// old bucket. For instance,
// - old log: [1, 2]
// - new log: [1]
// We will return the key [2].
case -1:
// Now point the cursor to the migratedHeight. If we cannot
// find this key from the old log bucket, the database might be
// corrupted. In this case, we would return the first key so
// that we would redo the migration for this chan bucket.
matchedHeight, _ := oldCursor.Seek(migratedHeight)
// NOTE: because Seek will return the next key when the passed
// key cannot be found, we need to compare the `matchedHeight`
// to decide whether `migratedHeight` is found or not.
if !bytes.Equal(matchedHeight, migratedHeight) {
log.Warnf("Old revocation bucket doesn't have "+
"CommitHeight=%v yet it's found in the new "+
"bucket. It's likely the new revocation log "+
"bucket is corrupted. Migrations will be"+
"applied again.",
binary.BigEndian.Uint64(migratedHeight))
// Now return the first height found in the old bucket
// so we can redo the migration.
nextHeight, _ := oldCursor.First()
return nextHeight
}
// Otherwise, find the next height to be migrated.
nextHeight, _ := oldCursor.Next()
return nextHeight
// If the migrated height is greater, it means this node has new logs
// saved after v0.15.0. In this case, we need to further decide whether
// the old logs have been migrated or not.
case 1:
} }
// Now point the cursor to the migratedHeight. If we cannot find this // If we ever reached here, it means we have a mixed of new and old
// key from the old log bucket, the database might be corrupted. In // logs saved. Suppose we have old logs as,
// this case, we would return the first key so that we would redo the // - old log: [1, 2]
// migration for this chan bucket. // We'd have four possible scenarios,
matchedHeight, _ := oldCursor.Seek(migratedHeight) // - new log: [ 3, 4] <- no migration happened, return [1].
if matchedHeight == nil { // - new log: [1, 3, 4] <- resumed migration, return [2].
// Now return the first height found in the old bucket so we // - new log: [ 2, 3, 4] <- corrupted migration, return [1].
// can redo the migration. // - new log: [1, 2, 3, 4] <- finished migration, return nil.
nextHeight, _ := oldCursor.First() // To find the next migration height, we will iterate the old logs to
return nextHeight, nil // grab the heights and query them in the new bucket until an height
// cannot be found, which is our next migration height. Or, if the old
// heights can all be found, it indicates a finished migration.
// Move the cursor to the first record.
oldKey, _ := oldCursor.First()
// NOTE: this action can be time-consuming as we are iterating the
// records and compare them. However, we would only ever hit here if
// this is a resumed migration with new logs created after v.0.15.0.
for {
// Try to locate the old key in the new bucket. If it cannot be
// found, it will be the next migrate height.
newKey, _ := cursor.Seek(oldKey)
// If the old key is not found in the new bucket, return it as
// our next migration height.
//
// NOTE: because Seek will return the next key when the passed
// key cannot be found, we need to compare the keys to deicde
// whether the old key is found or not.
if !bytes.Equal(newKey, oldKey) {
return oldKey
}
// Otherwise, keep iterating the old bucket.
oldKey, _ = oldCursor.Next()
// If we've done iterating, yet all the old keys can be found
// in the new bucket, this means the migration has been
// finished.
if oldKey == nil {
return nil
}
} }
// Otherwise, find the next height to be migrated.
nextHeight, _ := oldCursor.Next()
return nextHeight, nil
} }
// locateNextUpdateNum returns a locator that's used to start our migration. A // locateNextUpdateNum returns a locator that's used to start our migration. A
@ -142,10 +217,7 @@ func locateNextUpdateNum(openChanBucket kvdb.RwBucket) (*updateLocator, error) {
cb := func(chanBucket kvdb.RwBucket, l *updateLocator) error { cb := func(chanBucket kvdb.RwBucket, l *updateLocator) error {
locator = l locator = l
updateNum, err := findNextMigrateHeight(chanBucket) updateNum := findNextMigrateHeight(chanBucket)
if err != nil {
return err
}
// We've found the next commit height and can now exit. // We've found the next commit height and can now exit.
if updateNum != nil { if updateNum != nil {

View file

@ -154,8 +154,8 @@ func TestFindNextMigrateHeight(t *testing.T) {
return err return err
} }
height, err = findNextMigrateHeight(chanBucket) height = findNextMigrateHeight(chanBucket)
return err return nil
}, func() {}) }, func() {})
require.NoError(t, err) require.NoError(t, err)
@ -175,10 +175,10 @@ func TestFindNextMigrateHeight(t *testing.T) {
expectedHeight: nil, expectedHeight: nil,
}, },
{ {
// When we don't have any new logs, our next migration // When we don't have any migrated logs, our next
// height would be the first height found in the old // migration height would be the first height found in
// logs. // the old logs.
name: "empty new logs", name: "empty migrated logs",
oldLogs: []mig.ChannelCommitment{ oldLogs: []mig.ChannelCommitment{
createDummyChannelCommit(1), createDummyChannelCommit(1),
createDummyChannelCommit(2), createDummyChannelCommit(2),
@ -186,10 +186,10 @@ func TestFindNextMigrateHeight(t *testing.T) {
expectedHeight: []byte{0, 0, 0, 0, 0, 0, 0, 1}, expectedHeight: []byte{0, 0, 0, 0, 0, 0, 0, 1},
}, },
{ {
// When we have new logs, the next migration height // When we have migrated logs, the next migration
// should be the first height found in the old logs but // height should be the first height found in the old
// not in the new logs. // logs but not in the migrated logs.
name: "have new logs", name: "have migrated logs",
oldLogs: []mig.ChannelCommitment{ oldLogs: []mig.ChannelCommitment{
createDummyChannelCommit(1), createDummyChannelCommit(1),
createDummyChannelCommit(2), createDummyChannelCommit(2),
@ -200,10 +200,10 @@ func TestFindNextMigrateHeight(t *testing.T) {
expectedHeight: []byte{0, 0, 0, 0, 0, 0, 0, 2}, expectedHeight: []byte{0, 0, 0, 0, 0, 0, 0, 2},
}, },
{ {
// When both the logs have equal length, the next // When both the logs have equal indexes, the next
// migration should be nil as we've finished migrating // migration should be nil as we've finished migrating
// for this bucket. // for this bucket.
name: "have equal logs", name: "have finished logs",
oldLogs: []mig.ChannelCommitment{ oldLogs: []mig.ChannelCommitment{
createDummyChannelCommit(1), createDummyChannelCommit(1),
createDummyChannelCommit(2), createDummyChannelCommit(2),
@ -215,10 +215,45 @@ func TestFindNextMigrateHeight(t *testing.T) {
expectedHeight: nil, expectedHeight: nil,
}, },
{ {
// When the lastest height found from the new logs is // When there are new logs saved in the new bucket,
// ahead of the old logs, we still return the old log's // which happens when the node is running with
// height. // v.0.15.0, and we don't have any migrated logs, the
name: "corrupted logs", // next migration height should be the first height
// found in the old bucket.
name: "have new logs but no migrated logs",
oldLogs: []mig.ChannelCommitment{
createDummyChannelCommit(1),
createDummyChannelCommit(2),
},
newLogs: []mig.ChannelCommitment{
createDummyChannelCommit(3),
createDummyChannelCommit(4),
},
expectedHeight: []byte{0, 0, 0, 0, 0, 0, 0, 1},
},
{
// When there are new logs saved in the new bucket,
// which happens when the node is running with
// v.0.15.0, and we have migrated logs, the returned
// value should be the next un-migrated height.
name: "have new logs and migrated logs",
oldLogs: []mig.ChannelCommitment{
createDummyChannelCommit(1),
createDummyChannelCommit(2),
},
newLogs: []mig.ChannelCommitment{
createDummyChannelCommit(1),
createDummyChannelCommit(3),
createDummyChannelCommit(4),
},
expectedHeight: []byte{0, 0, 0, 0, 0, 0, 0, 2},
},
{
// When there are new logs saved in the new bucket,
// which happens when the node is running with
// v.0.15.0, and we have corrupted logs, the returned
// value should be the first height in the old bucket.
name: "have new logs but missing logs",
oldLogs: []mig.ChannelCommitment{ oldLogs: []mig.ChannelCommitment{
createDummyChannelCommit(1), createDummyChannelCommit(1),
createDummyChannelCommit(2), createDummyChannelCommit(2),
@ -226,9 +261,28 @@ func TestFindNextMigrateHeight(t *testing.T) {
newLogs: []mig.ChannelCommitment{ newLogs: []mig.ChannelCommitment{
createDummyChannelCommit(2), createDummyChannelCommit(2),
createDummyChannelCommit(3), createDummyChannelCommit(3),
createDummyChannelCommit(4),
}, },
expectedHeight: []byte{0, 0, 0, 0, 0, 0, 0, 1}, expectedHeight: []byte{0, 0, 0, 0, 0, 0, 0, 1},
}, },
{
// When there are new logs saved in the new bucket,
// which happens when the node is running with
// v.0.15.0, and we have finished the migration, we
// expect a nil height to be returned.
name: "have new logs and finished logs",
oldLogs: []mig.ChannelCommitment{
createDummyChannelCommit(1),
createDummyChannelCommit(2),
},
newLogs: []mig.ChannelCommitment{
createDummyChannelCommit(1),
createDummyChannelCommit(2),
createDummyChannelCommit(3),
createDummyChannelCommit(4),
},
expectedHeight: nil,
},
} }
for _, tc := range testCases { for _, tc := range testCases {

View file

@ -243,38 +243,33 @@ func logMigrationStat(db kvdb.Backend) (uint64, uint64, error) {
// total is the number of total records. // total is the number of total records.
total uint64 total uint64
// migrated is the number of already migrated records.
migrated uint64
// unmigrated is the number of unmigrated records. // unmigrated is the number of unmigrated records.
unmigrated uint64 unmigrated uint64
) )
err = kvdb.Update(db, func(tx kvdb.RwTx) error { err = kvdb.Update(db, func(tx kvdb.RwTx) error {
total, unmigrated, migrated, err = fetchLogStats(tx) total, unmigrated, err = fetchLogStats(tx)
return err return err
}, func() {}) }, func() {})
log.Debugf("Total logs=%d, migrated=%d, unmigrated=%d", total, migrated, log.Debugf("Total logs=%d, unmigrated=%d", total, unmigrated)
unmigrated) return total, total - unmigrated, err
return total, migrated, err
} }
// fetchLogStats iterates all the chan buckets to provide stats about the logs. // fetchLogStats iterates all the chan buckets to provide stats about the logs.
// The returned values are num of total records, num of un-migrated records, // The returned values are num of total records, and num of un-migrated
// and num of migrated records. // records.
func fetchLogStats(tx kvdb.RwTx) (uint64, uint64, uint64, error) { func fetchLogStats(tx kvdb.RwTx) (uint64, uint64, error) {
var ( var (
total uint64 total uint64
totalUnmigrated uint64 totalUnmigrated uint64
totalMigrated uint64
) )
openChanBucket := tx.ReadWriteBucket(openChannelBucket) openChanBucket := tx.ReadWriteBucket(openChannelBucket)
// If no bucket is found, we can exit early. // If no bucket is found, we can exit early.
if openChanBucket == nil { if openChanBucket == nil {
return 0, 0, 0, fmt.Errorf("root bucket not found") return 0, 0, fmt.Errorf("root bucket not found")
} }
// counter is a helper closure used to count the number of records // counter is a helper closure used to count the number of records
@ -317,19 +312,10 @@ func fetchLogStats(tx kvdb.RwTx) (uint64, uint64, uint64, error) {
return nil return nil
} }
// countMigrated is a callback function used to count the total number
// of migrated records.
countMigrated := func(chanBucket kvdb.RwBucket,
l *updateLocator) error {
totalMigrated += counter(chanBucket, revocationLogBucket)
return nil
}
// Locate the next migration height. // Locate the next migration height.
locator, err := locateNextUpdateNum(openChanBucket) locator, err := locateNextUpdateNum(openChanBucket)
if err != nil { if err != nil {
return 0, 0, 0, fmt.Errorf("locator got error: %v", err) return 0, 0, fmt.Errorf("locator got error: %v", err)
} }
// If the returned locator is not nil, we still have un-migrated // If the returned locator is not nil, we still have un-migrated
@ -338,20 +324,17 @@ func fetchLogStats(tx kvdb.RwTx) (uint64, uint64, uint64, error) {
if locator != nil { if locator != nil {
err = iterateBuckets(openChanBucket, locator, countUnmigrated) err = iterateBuckets(openChanBucket, locator, countUnmigrated)
if err != nil { if err != nil {
return 0, 0, 0, err return 0, 0, err
} }
} }
// Count the total number of records by supplying a nil locator. // Count the total number of records by supplying a nil locator.
err = iterateBuckets(openChanBucket, nil, countTotal) err = iterateBuckets(openChanBucket, nil, countTotal)
if err != nil { if err != nil {
return 0, 0, 0, err return 0, 0, err
} }
// Count the total number of already migrated records by supplying a return total, totalUnmigrated, err
// nil locator.
err = iterateBuckets(openChanBucket, nil, countMigrated)
return total, totalUnmigrated, totalMigrated, err
} }
// logEntry houses the info needed to write a new revocation log. // logEntry houses the info needed to write a new revocation log.