invoices: add migration code that runs a full invoice DB SQL migration

This commit is contained in:
Andras Banki-Horvath 2024-06-12 13:32:22 +02:00
parent 708bed517d
commit b92f57e0ae
No known key found for this signature in database
GPG key ID: 80E5375C094198D8
5 changed files with 311 additions and 6 deletions

2
go.mod
View file

@ -138,7 +138,7 @@ require (
github.com/opencontainers/image-spec v1.0.2 // indirect
github.com/opencontainers/runc v1.1.12 // indirect
github.com/ory/dockertest/v3 v3.10.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/pmezard/go-difflib v1.0.0
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.26.0 // indirect
github.com/prometheus/procfs v0.6.0 // indirect

View file

@ -0,0 +1,146 @@
package invoices_test
import (
"context"
"database/sql"
"testing"
"time"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/clock"
invpkg "github.com/lightningnetwork/lnd/invoices"
"github.com/lightningnetwork/lnd/sqldb"
"github.com/lightningnetwork/lnd/sqldb/sqlc"
"github.com/stretchr/testify/require"
)
// TestMigrationWithChannelDB tests the migration of invoices from a bolt backed
// channel.db to a SQL database. Note that this test does not attempt to be a
// complete migration test for all invoice types but rather is added as a tool
// for developers and users to debug invoice migration issues with an actual
// channel.db file.
func TestMigrationWithChannelDB(t *testing.T) {
// First create a shared Postgres instance so we don't spawn a new
// docker container for each test.
pgFixture := sqldb.NewTestPgFixture(
t, sqldb.DefaultPostgresFixtureLifetime,
)
t.Cleanup(func() {
pgFixture.TearDown(t)
})
makeSQLDB := func(t *testing.T, sqlite bool) (*invpkg.SQLStore,
*sqldb.TransactionExecutor[*sqlc.Queries]) {
var db *sqldb.BaseDB
if sqlite {
db = sqldb.NewTestSqliteDB(t).BaseDB
} else {
db = sqldb.NewTestPostgresDB(t, pgFixture).BaseDB
}
invoiceExecutor := sqldb.NewTransactionExecutor(
db, func(tx *sql.Tx) invpkg.SQLInvoiceQueries {
return db.WithTx(tx)
},
)
genericExecutor := sqldb.NewTransactionExecutor(
db, func(tx *sql.Tx) *sqlc.Queries {
return db.WithTx(tx)
},
)
testClock := clock.NewTestClock(time.Unix(1, 0))
return invpkg.NewSQLStore(invoiceExecutor, testClock),
genericExecutor
}
migrationTest := func(t *testing.T, kvStore *channeldb.DB,
sqlite bool) {
sqlInvoiceStore, sqlStore := makeSQLDB(t, sqlite)
ctxb := context.Background()
const batchSize = 11
var opts sqldb.MigrationTxOptions
err := sqlStore.ExecTx(
ctxb, &opts, func(tx *sqlc.Queries) error {
return invpkg.MigrateInvoicesToSQL(
ctxb, kvStore.Backend, kvStore, tx,
batchSize,
)
}, func() {},
)
require.NoError(t, err)
// MigrateInvoices will check if the inserted invoice equals to
// the migrated one, but as a sanity check, we'll also fetch the
// invoices from the store and compare them to the original
// invoices.
query := invpkg.InvoiceQuery{
IndexOffset: 0,
// As a sanity check, fetch more invoices than we have
// to ensure that we did not add any extra invoices.
// Note that we don't really have a way to know the
// exact number of invoices in the bolt db without first
// iterating over all of them, but for test purposes
// constant should be enough.
NumMaxInvoices: 9999,
}
result1, err := kvStore.QueryInvoices(ctxb, query)
require.NoError(t, err)
numInvoices := len(result1.Invoices)
result2, err := sqlInvoiceStore.QueryInvoices(ctxb, query)
require.NoError(t, err)
require.Equal(t, numInvoices, len(result2.Invoices))
// Simply zero out the add index so we don't fail on that when
// comparing.
for i := 0; i < numInvoices; i++ {
result1.Invoices[i].AddIndex = 0
result2.Invoices[i].AddIndex = 0
// We need to override the timezone of the invoices as
// the provided DB vs the test runners local time zone
// might be different.
invpkg.OverrideInvoiceTimeZone(&result1.Invoices[i])
invpkg.OverrideInvoiceTimeZone(&result2.Invoices[i])
require.Equal(
t, result1.Invoices[i], result2.Invoices[i],
)
}
}
tests := []struct {
name string
dbPath string
}{
{
"empty",
t.TempDir(),
},
{
"testdata",
"testdata",
},
}
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
store := channeldb.OpenForTesting(t, test.dbPath)
t.Run("Postgres", func(t *testing.T) {
migrationTest(t, store, false)
})
t.Run("SQLite", func(t *testing.T) {
migrationTest(t, store, true)
})
})
}
}

View file

@ -4,15 +4,19 @@ import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"reflect"
"strconv"
"time"
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/graph/db/models"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/sqldb"
"github.com/lightningnetwork/lnd/sqldb/sqlc"
"github.com/pmezard/go-difflib/difflib"
)
var (
@ -48,6 +52,11 @@ var (
//
// addIndexNo => invoiceKey
addIndexBucket = []byte("invoice-add-index")
// ErrMigrationMismatch is returned when the migrated invoice does not
// match the original invoice.
ErrMigrationMismatch = fmt.Errorf("migrated invoice does not match " +
"original invoice")
)
// createInvoiceHashIndex generates a hash index that contains payment hashes
@ -60,7 +69,7 @@ var (
// a new index in the SQL database that maps each invoice key to its
// corresponding payment hash.
func createInvoiceHashIndex(ctx context.Context, db kvdb.Backend,
tx SQLInvoiceQueries) error {
tx *sqlc.Queries) error {
return db.View(func(kvTx kvdb.RTx) error {
invoices := kvTx.ReadBucket(invoiceBucket)
@ -399,3 +408,151 @@ func OverrideInvoiceTimeZone(invoice *Invoice) {
}
}
}
// MigrateInvoicesToSQL runs the migration of all invoices from the KV database
// to the SQL database. The migration is done in a single transaction to ensure
// that all invoices are migrated or none at all. This function can be run
// multiple times without causing any issues as it will check if the migration
// has already been performed.
func MigrateInvoicesToSQL(ctx context.Context, db kvdb.Backend,
kvStore InvoiceDB, tx *sqlc.Queries, batchSize int) error {
log.Infof("Starting migration of invoices from KV to SQL")
offset := uint64(0)
t0 := time.Now()
// Create the hash index which we will use to look up invoice
// payment hashes by their add index during migration.
err := createInvoiceHashIndex(ctx, db, tx)
if err != nil && !errors.Is(err, ErrNoInvoicesCreated) {
log.Errorf("Unable to create invoice hash index: %v",
err)
return err
}
log.Debugf("Created SQL invoice hash index in %v", time.Since(t0))
total := 0
// Now we can start migrating the invoices. We'll do this in
// batches to reduce memory usage.
for {
t0 = time.Now()
query := InvoiceQuery{
IndexOffset: offset,
NumMaxInvoices: uint64(batchSize),
}
queryResult, err := kvStore.QueryInvoices(ctx, query)
if err != nil && !errors.Is(err, ErrNoInvoicesCreated) {
return fmt.Errorf("unable to query invoices: "+
"%w", err)
}
if len(queryResult.Invoices) == 0 {
log.Infof("All invoices migrated")
break
}
err = migrateInvoices(ctx, tx, queryResult.Invoices)
if err != nil {
return err
}
offset = queryResult.LastIndexOffset
total += len(queryResult.Invoices)
log.Debugf("Migrated %d KV invoices to SQL in %v\n", total,
time.Since(t0))
}
// Clean up the hash index as it's no longer needed.
err = tx.ClearKVInvoiceHashIndex(ctx)
if err != nil {
return fmt.Errorf("unable to clear invoice hash "+
"index: %w", err)
}
log.Infof("Migration of %d invoices from KV to SQL completed", total)
return nil
}
func migrateInvoices(ctx context.Context, tx *sqlc.Queries,
invoices []Invoice) error {
for i, invoice := range invoices {
var paymentHash lntypes.Hash
if invoice.Terms.PaymentPreimage != nil {
paymentHash = invoice.Terms.PaymentPreimage.Hash()
} else {
paymentHashBytes, err :=
tx.GetKVInvoicePaymentHashByAddIndex(
ctx, int64(invoice.AddIndex),
)
if err != nil {
// This would be an unexpected inconsistency
// in the kv database. We can't do much here
// so we'll notify the user and continue.
log.Warnf("Cannot migrate invoice, unable to "+
"fetch payment hash (add_index=%v): %v",
invoice.AddIndex, err)
continue
}
copy(paymentHash[:], paymentHashBytes)
}
err := MigrateSingleInvoice(ctx, tx, &invoices[i], paymentHash)
if err != nil {
return fmt.Errorf("unable to migrate invoice(%v): %w",
paymentHash, err)
}
migratedInvoice, err := fetchInvoice(
ctx, tx, InvoiceRefByHash(paymentHash),
)
if err != nil {
return fmt.Errorf("unable to fetch migrated "+
"invoice(%v): %w", paymentHash, err)
}
// Override the time zone for comparison. Note that we need to
// override both invoices as the original invoice is coming from
// KV database, it was stored as a binary serialized Go
// time.Time value which has nanosecond precision but might have
// been created in a different time zone. The migrated invoice
// is stored in SQL in UTC and selected in the local time zone,
// however in PostgreSQL it has microsecond precision while in
// SQLite it has nanosecond precision if using TEXT storage
// class.
OverrideInvoiceTimeZone(&invoice)
OverrideInvoiceTimeZone(migratedInvoice)
// Override the add index before checking for equality.
migratedInvoice.AddIndex = invoice.AddIndex
if !reflect.DeepEqual(invoice, *migratedInvoice) {
diff := difflib.UnifiedDiff{
A: difflib.SplitLines(
spew.Sdump(invoice),
),
B: difflib.SplitLines(
spew.Sdump(migratedInvoice),
),
FromFile: "Expected",
FromDate: "",
ToFile: "Actual",
ToDate: "",
Context: 3,
}
diffText, _ := difflib.GetUnifiedDiffString(diff)
return fmt.Errorf("%w: %v.\n%v", ErrMigrationMismatch,
paymentHash, diffText)
}
}
return nil
}

View file

@ -138,6 +138,8 @@ type SQLInvoiceQueries interface { //nolint:interfacebloat
GetKVInvoicePaymentHashByAddIndex(ctx context.Context, addIndex int64) (
[]byte, error)
ClearKVInvoiceHashIndex(ctx context.Context) error
}
var _ InvoiceDB = (*SQLStore)(nil)
@ -354,8 +356,8 @@ func (i *SQLStore) AddInvoice(ctx context.Context,
// fetchInvoice fetches the common invoice data and the AMP state for the
// invoice with the given reference.
func (i *SQLStore) fetchInvoice(ctx context.Context,
db SQLInvoiceQueries, ref InvoiceRef) (*Invoice, error) {
func fetchInvoice(ctx context.Context, db SQLInvoiceQueries,
ref InvoiceRef) (*Invoice, error) {
if ref.PayHash() == nil && ref.PayAddr() == nil && ref.SetID() == nil {
return nil, ErrInvoiceNotFound
@ -686,7 +688,7 @@ func (i *SQLStore) LookupInvoice(ctx context.Context,
readTxOpt := NewSQLInvoiceQueryReadTx()
txErr := i.db.ExecTx(ctx, &readTxOpt, func(db SQLInvoiceQueries) error {
invoice, err = i.fetchInvoice(ctx, db, ref)
invoice, err = fetchInvoice(ctx, db, ref)
return err
}, func() {})
@ -1387,7 +1389,7 @@ func (i *SQLStore) UpdateInvoice(ctx context.Context, ref InvoiceRef,
ref.refModifier = HtlcSetOnlyModifier
}
invoice, err := i.fetchInvoice(ctx, db, ref)
invoice, err := fetchInvoice(ctx, db, ref)
if err != nil {
return err
}

BIN
invoices/testdata/channel.db vendored Normal file

Binary file not shown.