diff --git a/channeldb/db.go b/channeldb/db.go index 5a7b59b63..514b730f9 100644 --- a/channeldb/db.go +++ b/channeldb/db.go @@ -18,6 +18,7 @@ import ( "github.com/lightningnetwork/lnd/channeldb/migration20" "github.com/lightningnetwork/lnd/channeldb/migration21" "github.com/lightningnetwork/lnd/channeldb/migration23" + "github.com/lightningnetwork/lnd/channeldb/migration24" "github.com/lightningnetwork/lnd/channeldb/migration_01_to_11" "github.com/lightningnetwork/lnd/clock" "github.com/lightningnetwork/lnd/kvdb" @@ -198,6 +199,11 @@ var ( number: 23, migration: migration23.MigrateHtlcAttempts, }, + { + // Remove old forwarding packages of closed channels. + number: 24, + migration: migration24.MigrateFwdPkgCleanup, + }, } // Big endian is the preferred byte order, due to cursor scans over diff --git a/channeldb/log.go b/channeldb/log.go index 24c0c608b..42b55ccf1 100644 --- a/channeldb/log.go +++ b/channeldb/log.go @@ -7,6 +7,7 @@ import ( "github.com/lightningnetwork/lnd/channeldb/migration12" "github.com/lightningnetwork/lnd/channeldb/migration13" "github.com/lightningnetwork/lnd/channeldb/migration16" + "github.com/lightningnetwork/lnd/channeldb/migration24" "github.com/lightningnetwork/lnd/channeldb/migration_01_to_11" "github.com/lightningnetwork/lnd/kvdb" ) @@ -36,5 +37,6 @@ func UseLogger(logger btclog.Logger) { migration12.UseLogger(logger) migration13.UseLogger(logger) migration16.UseLogger(logger) + migration24.UseLogger(logger) kvdb.UseLogger(logger) } diff --git a/channeldb/migration24/log.go b/channeldb/migration24/log.go new file mode 100644 index 000000000..a74cb039d --- /dev/null +++ b/channeldb/migration24/log.go @@ -0,0 +1,14 @@ +package migration24 + +import ( + "github.com/btcsuite/btclog" +) + +// log is a logger that is initialized as disabled. This means the package will +// not perform any logging by default until a logger is set. +var log = btclog.Disabled + +// UseLogger uses a specified Logger to output package logging info. +func UseLogger(logger btclog.Logger) { + log = logger +} diff --git a/channeldb/migration24/migration.go b/channeldb/migration24/migration.go new file mode 100644 index 000000000..1110ea360 --- /dev/null +++ b/channeldb/migration24/migration.go @@ -0,0 +1,119 @@ +package migration24 + +import ( + "bytes" + "encoding/binary" + "io" + + mig "github.com/lightningnetwork/lnd/channeldb/migration_01_to_11" + "github.com/lightningnetwork/lnd/kvdb" +) + +var ( + // closedChannelBucket stores summarization information concerning + // previously open, but now closed channels. + closedChannelBucket = []byte("closed-chan-bucket") + + // fwdPackagesKey is the root-level bucket that all forwarding packages + // are written. This bucket is further subdivided based on the short + // channel ID of each channel. + fwdPackagesKey = []byte("fwd-packages") +) + +// MigrateFwdPkgCleanup deletes all the forwarding packages of closed channels. +// It determines the closed channels by iterating closedChannelBucket. The +// ShortChanID found in the ChannelCloseSummary is then used as a key to query +// the forwarding packages bucket. If a match is found, it will be deleted. +func MigrateFwdPkgCleanup(tx kvdb.RwTx) error { + log.Infof("Deleting forwarding packages for closed channels") + + // Find all closed channel summaries, which are stored in the + // closeBucket. + closeBucket := tx.ReadBucket(closedChannelBucket) + if closeBucket == nil { + return nil + } + + var chanSummaries []*mig.ChannelCloseSummary + + // appendSummary is a function closure to help put deserialized close + // summeries into chanSummaries. + appendSummary := func(_ []byte, summaryBytes []byte) error { + summaryReader := bytes.NewReader(summaryBytes) + chanSummary, err := deserializeCloseChannelSummary( + summaryReader, + ) + if err != nil { + return err + } + + // Skip pending channels + if chanSummary.IsPending { + return nil + } + + chanSummaries = append(chanSummaries, chanSummary) + return nil + } + + if err := closeBucket.ForEach(appendSummary); err != nil { + return err + } + + // Now we will load the forwarding packages bucket, delete all the + // nested buckets whose source matches the ShortChanID found in the + // closed channel summeraries. + fwdPkgBkt := tx.ReadWriteBucket(fwdPackagesKey) + if fwdPkgBkt == nil { + return nil + } + + // Iterate over all close channels and remove their forwarding packages. + for _, summery := range chanSummaries { + sourceBytes := makeLogKey(summery.ShortChanID.ToUint64()) + + // First, we will try to find the corresponding bucket. If there + // is not a nested bucket matching the ShortChanID, we will skip + // it. + if fwdPkgBkt.NestedReadBucket(sourceBytes[:]) == nil { + continue + } + + // Otherwise, wipe all the forwarding packages. + if err := fwdPkgBkt.DeleteNestedBucket( + sourceBytes[:]); err != nil { + + return err + } + } + + log.Infof("Deletion of forwarding packages of closed channels " + + "complete! DB compaction is recommended to free up the" + + "disk space.") + return nil +} + +// deserializeCloseChannelSummary will decode a CloseChannelSummary with no +// optional fields. +func deserializeCloseChannelSummary( + r io.Reader) (*mig.ChannelCloseSummary, error) { + + c := &mig.ChannelCloseSummary{} + err := mig.ReadElements( + r, + &c.ChanPoint, &c.ShortChanID, &c.ChainHash, &c.ClosingTXID, + &c.CloseHeight, &c.RemotePub, &c.Capacity, &c.SettledBalance, + &c.TimeLockedBalance, &c.CloseType, &c.IsPending, + ) + if err != nil { + return nil, err + } + return c, nil +} + +// makeLogKey converts a uint64 into an 8 byte array. +func makeLogKey(updateNum uint64) [8]byte { + var key [8]byte + binary.BigEndian.PutUint64(key[:], updateNum) + return key +} diff --git a/channeldb/migration24/migration_test.go b/channeldb/migration24/migration_test.go new file mode 100644 index 000000000..f02977146 --- /dev/null +++ b/channeldb/migration24/migration_test.go @@ -0,0 +1,296 @@ +package migration24 + +import ( + "bytes" + "encoding/binary" + "errors" + "fmt" + "io" + "math/rand" + "testing" + + "github.com/btcsuite/btcd/btcec" + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" + "github.com/btcsuite/btcutil" + lnwire "github.com/lightningnetwork/lnd/channeldb/migration/lnwire21" + mig "github.com/lightningnetwork/lnd/channeldb/migration_01_to_11" + "github.com/lightningnetwork/lnd/channeldb/migtest" + "github.com/lightningnetwork/lnd/kvdb" +) + +var ( + key = [chainhash.HashSize]byte{ + 0x81, 0xb6, 0x37, 0xd8, 0xfc, 0xd2, 0xc6, 0xda, + 0x68, 0x59, 0xe6, 0x96, 0x31, 0x13, 0xa1, 0x17, + 0xd, 0xe7, 0x93, 0xe4, 0xb7, 0x25, 0xb8, 0x4d, + 0x1e, 0xb, 0x4c, 0xf9, 0x9e, 0xc5, 0x8c, 0xe9, + } + testTx = &wire.MsgTx{ + Version: 1, + TxIn: []*wire.TxIn{ + { + PreviousOutPoint: wire.OutPoint{ + Hash: chainhash.Hash{}, + Index: 0xffffffff, + }, + SignatureScript: []byte{0x04, 0x31, 0xdc, 0x00, + 0x1b, 0x01, 0x62}, + Sequence: 0xffffffff, + }, + }, + TxOut: []*wire.TxOut{ + { + Value: 5000000000, + PkScript: []byte{ + 0x41, // OP_DATA_65 + 0x04, 0xd6, 0x4b, 0xdf, 0xd0, 0x9e, + 0xb1, 0xc5, 0xfe, 0x29, 0x5a, 0xbd, + 0xeb, 0x1d, 0xca, 0x42, 0x81, 0xbe, + 0x98, 0x8e, 0x2d, 0xa0, 0xb6, 0xc1, + 0xc6, 0xa5, 0x9d, 0xc2, 0x26, 0xc2, + 0x86, 0x24, 0xe1, 0x81, 0x75, 0xe8, + 0x51, 0xc9, 0x6b, 0x97, 0x3d, 0x81, + 0xb0, 0x1c, 0xc3, 0x1f, 0x04, 0x78, + 0x34, 0xbc, 0x06, 0xd6, 0xd6, 0xed, + 0xf6, 0x20, 0xd1, 0x84, 0x24, 0x1a, + 0x6a, 0xed, 0x8b, 0x63, + 0xa6, // 65-byte signature + 0xac, // OP_CHECKSIG + }, + }, + }, + LockTime: 5, + } + _, pubKey = btcec.PrivKeyFromBytes(btcec.S256(), key[:]) +) + +// TestMigrateFwdPkgCleanup asserts that the migration will delete all the +// forwarding packages for close channels, and leave the rest untouched. +func TestMigrateFwdPkgCleanup(t *testing.T) { + migrationTests := []struct { + name string + beforeMigrationFunc func(kvdb.RwTx) error + afterMigrationFunc func(kvdb.RwTx) error + }{ + { + // No closed channels summeries in the db. + // This leaves the fwdpkg untouched. + name: "no closed channel summeries", + beforeMigrationFunc: genBeforeMigration( + false, []int{}, []int{1}, + ), + afterMigrationFunc: genAfterMigration( + []int{}, []int{1}, + ), + }, + { + // One closed summery found, and the forwarding package + // shares the same channel ID. + // This makes the fwdpkg removed. + name: "remove one fwdpkg", + beforeMigrationFunc: genBeforeMigration( + false, []int{1}, []int{1}, + ), + afterMigrationFunc: genAfterMigration( + []int{1}, []int{}, + ), + }, + { + // One closed summery with pending status found, and the + // forwarding package shares the same channel ID. + // This leaves the fwdpkg untouched. + name: "no action if closed status is pending", + beforeMigrationFunc: genBeforeMigration( + true, []int{1}, []int{1}, + ), + afterMigrationFunc: genAfterMigration( + []int{}, []int{1}, + ), + }, + { + // One closed summery found, while the forwarding + // package has a different channel ID. + // This leaves the fwdpkg untouched. + name: "no fwdpkg removed", + beforeMigrationFunc: genBeforeMigration( + false, []int{1}, []int{2}, + ), + afterMigrationFunc: genAfterMigration( + []int{}, []int{2}, + ), + }, + { + // Multiple closed summeries and fwdpkg nested buckets + // found. Only the matched fwdPkg nested buckets are + // removed. + name: "only matching fwdpkg removed", + beforeMigrationFunc: genBeforeMigration(false, + []int{1, 2, 3}, + []int{1, 2, 4}, + ), + afterMigrationFunc: genAfterMigration( + []int{1, 2}, + []int{4}, + ), + }, + } + + for _, tt := range migrationTests { + test := tt + t.Run(test.name, func(t *testing.T) { + migtest.ApplyMigration( + t, + test.beforeMigrationFunc, + test.afterMigrationFunc, + MigrateFwdPkgCleanup, + false, + ) + }) + } +} + +func genBeforeMigration(isPending bool, + closeSummeryChanIDs, fwdPkgChanIDs []int) func(kvdb.RwTx) error { + + return func(tx kvdb.RwTx) error { + // Create closed channel summeries + for _, id := range closeSummeryChanIDs { + chanID := lnwire.NewShortChanIDFromInt(uint64(id)) + if err := createTestCloseChannelSummery( + tx, isPending, chanID, + ); err != nil { + return err + } + } + + // Create fwdPkg nested buckets + for _, id := range fwdPkgChanIDs { + chanID := lnwire.NewShortChanIDFromInt(uint64(id)) + err := createTestFwdPkgBucket(tx, chanID) + if err != nil { + return err + } + } + + return nil + } +} + +func genAfterMigration(deleted, untouched []int) func(kvdb.RwTx) error { + return func(tx kvdb.RwTx) error { + fwdPkgBkt := tx.ReadBucket(fwdPackagesKey) + if fwdPkgBkt == nil { + return errors.New("unable to find bucket") + } + + // Reading deleted buckets should return nil + for _, id := range deleted { + chanID := lnwire.NewShortChanIDFromInt(uint64(id)) + sourceKey := makeLogKey(chanID.ToUint64()) + sourceBkt := fwdPkgBkt.NestedReadBucket(sourceKey[:]) + if sourceBkt != nil { + return fmt.Errorf( + "expected bucket to be deleted: %v", + id, + ) + } + } + + // Reading untouched buckets should return not nil + for _, id := range untouched { + chanID := lnwire.NewShortChanIDFromInt(uint64(id)) + sourceKey := makeLogKey(chanID.ToUint64()) + sourceBkt := fwdPkgBkt.NestedReadBucket(sourceKey[:]) + if sourceBkt == nil { + return fmt.Errorf( + "expected bucket to not be deleted: %v", + id, + ) + } + } + + return nil + } +} + +// createTestCloseChannelSummery creates a CloseChannelSummery for testing. +func createTestCloseChannelSummery(tx kvdb.RwTx, isPending bool, + chanID lnwire.ShortChannelID) error { + + closedChanBucket, err := tx.CreateTopLevelBucket(closedChannelBucket) + if err != nil { + return err + } + + outputPoint := wire.OutPoint{Hash: key, Index: rand.Uint32()} + + ccs := &mig.ChannelCloseSummary{ + ChanPoint: outputPoint, + ShortChanID: chanID, + ChainHash: key, + ClosingTXID: testTx.TxHash(), + CloseHeight: 100, + RemotePub: pubKey, + Capacity: btcutil.Amount(10000), + SettledBalance: btcutil.Amount(50000), + CloseType: mig.RemoteForceClose, + IsPending: isPending, + // Optional fields + RemoteCurrentRevocation: pubKey, + RemoteNextRevocation: pubKey, + } + var b bytes.Buffer + if err := serializeChannelCloseSummary(&b, ccs); err != nil { + return err + } + + var chanPointBuf bytes.Buffer + if err := writeOutpoint(&chanPointBuf, &outputPoint); err != nil { + return err + } + + return closedChanBucket.Put(chanPointBuf.Bytes(), b.Bytes()) +} + +func createTestFwdPkgBucket(tx kvdb.RwTx, chanID lnwire.ShortChannelID) error { + fwdPkgBkt, err := tx.CreateTopLevelBucket(fwdPackagesKey) + if err != nil { + return err + } + + source := makeLogKey(chanID.ToUint64()) + if _, err := fwdPkgBkt.CreateBucketIfNotExists(source[:]); err != nil { + return err + } + + return nil +} + +func serializeChannelCloseSummary( + w io.Writer, cs *mig.ChannelCloseSummary) error { + + err := mig.WriteElements( + w, + cs.ChanPoint, cs.ShortChanID, cs.ChainHash, cs.ClosingTXID, + cs.CloseHeight, cs.RemotePub, cs.Capacity, cs.SettledBalance, + cs.TimeLockedBalance, cs.CloseType, cs.IsPending, + ) + if err != nil { + return err + } + return nil +} + +// writeOutpoint writes an outpoint to the passed writer using the minimal +// amount of bytes possible. +func writeOutpoint(w io.Writer, o *wire.OutPoint) error { + if _, err := w.Write(o.Hash[:]); err != nil { + return err + } + if err := binary.Write(w, binary.BigEndian, o.Index); err != nil { + return err + } + + return nil +}