diff --git a/channeldb/db.go b/channeldb/db.go index c4262ffa8..51f3fe9aa 100644 --- a/channeldb/db.go +++ b/channeldb/db.go @@ -90,6 +90,13 @@ var ( number: 7, migration: migrateOptionalChannelCloseSummaryFields, }, + { + // The DB version that changes the gossiper's message + // store keys to account for the message's type and + // ShortChannelID. + number: 8, + migration: migrateGossipMessageStoreKeys, + }, } // Big endian is the preferred byte order, due to cursor scans over diff --git a/channeldb/meta_test.go b/channeldb/meta_test.go index 921e55c54..76d0cb257 100644 --- a/channeldb/meta_test.go +++ b/channeldb/meta_test.go @@ -50,7 +50,7 @@ func applyMigration(t *testing.T, beforeMigration, afterMigration func(d *DB), if err == nil && shouldFail { t.Fatal("error wasn't received on migration stage") } else if err != nil && !shouldFail { - t.Fatal("error was received on migration stage") + t.Fatalf("error was received on migration stage: %v", err) } // afterMigration usually used for checking the database state and diff --git a/channeldb/migrations.go b/channeldb/migrations.go index 5d4919db9..f86e416b9 100644 --- a/channeldb/migrations.go +++ b/channeldb/migrations.go @@ -7,6 +7,7 @@ import ( "fmt" "github.com/coreos/bbolt" + "github.com/lightningnetwork/lnd/lnwire" ) // migrateNodeAndEdgeUpdateIndex is a migration function that will update the @@ -610,3 +611,75 @@ func migrateOptionalChannelCloseSummaryFields(tx *bbolt.Tx) error { return nil } + +var messageStoreBucket = []byte("message-store") + +// migrateGossipMessageStoreKeys migrates the key format for gossip messages +// found in the message store to a new one that takes into consideration the of +// the message being stored. +func migrateGossipMessageStoreKeys(tx *bbolt.Tx) error { + // We'll start by retrieving the bucket in which these messages are + // stored within. If there isn't one, there's nothing left for us to do + // so we can avoid the migration. + messageStore := tx.Bucket(messageStoreBucket) + if messageStore == nil { + return nil + } + + log.Info("Migrating to the gossip message store new key format") + + // Otherwise we'll proceed with the migration. We'll start by coalescing + // all the current messages within the store, which are indexed by the + // public key of the peer which they should be sent to, followed by the + // short channel ID of the channel for which the message belongs to. We + // should only expect to find channel announcement signatures as that + // was the only support message type previously. + msgs := make(map[[33 + 8]byte]*lnwire.AnnounceSignatures) + err := messageStore.ForEach(func(k, v []byte) error { + var msgKey [33 + 8]byte + copy(msgKey[:], k) + + msg := &lnwire.AnnounceSignatures{} + if err := msg.Decode(bytes.NewReader(v), 0); err != nil { + return err + } + + msgs[msgKey] = msg + + return nil + + }) + if err != nil { + return err + } + + // Then, we'll go over all of our messages, remove their previous entry, + // and add another with the new key format. Once we've done this for + // every message, we can consider the migration complete. + for oldMsgKey, msg := range msgs { + if err := messageStore.Delete(oldMsgKey[:]); err != nil { + return err + } + + // Construct the new key for which we'll find this message with + // in the store. It'll be the same as the old, but we'll also + // include the message type. + var msgType [2]byte + binary.BigEndian.PutUint16(msgType[:], uint16(msg.MsgType())) + newMsgKey := append(oldMsgKey[:], msgType[:]...) + + // Serialize the message with its wire encoding. + var b bytes.Buffer + if _, err := lnwire.WriteMessage(&b, msg, 0); err != nil { + return err + } + + if err := messageStore.Put(newMsgKey, b.Bytes()); err != nil { + return err + } + } + + log.Info("Migration to the gossip message store new key format complete!") + + return nil +} diff --git a/channeldb/migrations_test.go b/channeldb/migrations_test.go index ed2829c0e..9223108d3 100644 --- a/channeldb/migrations_test.go +++ b/channeldb/migrations_test.go @@ -12,6 +12,7 @@ import ( "github.com/coreos/bbolt" "github.com/davecgh/go-spew/spew" "github.com/go-errors/errors" + "github.com/lightningnetwork/lnd/lnwire" ) // TestPaymentStatusesMigration checks that already completed payments will have @@ -468,3 +469,98 @@ func TestMigrateOptionalChannelCloseSummaryFields(t *testing.T) { false) } } + +// TestMigrateGossipMessageStoreKeys ensures that the migration to the new +// gossip message store key format is successful/unsuccessful under various +// scenarios. +func TestMigrateGossipMessageStoreKeys(t *testing.T) { + t.Parallel() + + // Construct the message which we'll use to test the migration, along + // with its old and new key formats. + shortChanID := lnwire.ShortChannelID{BlockHeight: 10} + msg := &lnwire.AnnounceSignatures{ShortChannelID: shortChanID} + + var oldMsgKey [33 + 8]byte + copy(oldMsgKey[:33], pubKey.SerializeCompressed()) + binary.BigEndian.PutUint64(oldMsgKey[33:41], shortChanID.ToUint64()) + + var newMsgKey [33 + 8 + 2]byte + copy(newMsgKey[:41], oldMsgKey[:]) + binary.BigEndian.PutUint16(newMsgKey[41:43], uint16(msg.MsgType())) + + // Before the migration, we'll create the bucket where the messages + // should live and insert them. + beforeMigration := func(db *DB) { + var b bytes.Buffer + if err := msg.Encode(&b, 0); err != nil { + t.Fatalf("unable to serialize message: %v", err) + } + + err := db.Update(func(tx *bbolt.Tx) error { + messageStore, err := tx.CreateBucketIfNotExists( + messageStoreBucket, + ) + if err != nil { + return err + } + + return messageStore.Put(oldMsgKey[:], b.Bytes()) + }) + if err != nil { + t.Fatal(err) + } + } + + // After the migration, we'll make sure that: + // 1. We cannot find the message under its old key. + // 2. We can find the message under its new key. + // 3. The message matches the original. + afterMigration := func(db *DB) { + meta, err := db.FetchMeta(nil) + if err != nil { + t.Fatalf("unable to fetch db version: %v", err) + } + if meta.DbVersionNumber != 1 { + t.Fatalf("migration should have succeeded but didn't") + } + + var rawMsg []byte + err = db.View(func(tx *bbolt.Tx) error { + messageStore := tx.Bucket(messageStoreBucket) + if messageStore == nil { + return errors.New("message store bucket not " + + "found") + } + rawMsg = messageStore.Get(oldMsgKey[:]) + if rawMsg != nil { + t.Fatal("expected to not find message under " + + "old key, but did") + } + rawMsg = messageStore.Get(newMsgKey[:]) + if rawMsg == nil { + return fmt.Errorf("expected to find message " + + "under new key, but didn't") + } + + return nil + }) + if err != nil { + t.Fatal(err) + } + + gotMsg, err := lnwire.ReadMessage(bytes.NewReader(rawMsg), 0) + if err != nil { + t.Fatalf("unable to deserialize raw message: %v", err) + } + if !reflect.DeepEqual(msg, gotMsg) { + t.Fatalf("expected message: %v\ngot message: %v", + spew.Sdump(msg), spew.Sdump(gotMsg)) + } + } + + applyMigration( + t, beforeMigration, afterMigration, + migrateGossipMessageStoreKeys, false, + ) +}