Merge pull request #4364 from yyforyongyu/clean-forward-states

multi: clean up forwarding state from closed channels
This commit is contained in:
Olaoluwa Osuntokun 2021-09-24 15:52:46 -07:00 committed by GitHub
commit fbb1d159e0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
25 changed files with 3188 additions and 1675 deletions

View file

@ -1787,13 +1787,12 @@ func (h *HTLC) Copy() HTLC {
// LogUpdate represents a pending update to the remote commitment chain. The
// log update may be an add, fail, or settle entry. We maintain this data in
// order to be able to properly retransmit our proposed
// state if necessary.
// order to be able to properly retransmit our proposed state if necessary.
type LogUpdate struct {
// LogIndex is the log index of this proposed commitment update entry.
LogIndex uint64
// UpdateMsg is the update message that was included within the our
// UpdateMsg is the update message that was included within our
// local update log. The LogIndex value denotes the log index of this
// update which will be used when restoring our local update log if
// we're left with a dangling update on restart.
@ -2862,6 +2861,12 @@ func (c *OpenChannel) CloseChannel(summary *ChannelCloseSummary,
return err
}
// Delete all the forwarding packages stored for this particular
// channel.
if err = chanState.Packager.Wipe(tx); err != nil {
return err
}
// Now that the index to this channel has been deleted, purge
// the remaining channel metadata from the database.
err = deleteOpenChannel(chanBucket)

View file

@ -130,6 +130,25 @@ func remoteHtlcsOption(htlcs []HTLC) testChannelOption {
}
}
// loadFwdPkgs is a helper method that reads all forwarding packages for a
// particular packager.
func loadFwdPkgs(t *testing.T, db kvdb.Backend,
packager FwdPackager) []*FwdPkg {
var (
fwdPkgs []*FwdPkg
err error
)
err = kvdb.View(db, func(tx kvdb.RTx) error {
fwdPkgs, err = packager.LoadFwdPkgs(tx)
return err
}, func() {})
require.NoError(t, err, "unable to load fwd pkgs")
return fwdPkgs
}
// localShutdownOption is an option which sets the local upfront shutdown
// script for the channel.
func localShutdownOption(addr lnwire.DeliveryAddress) testChannelOption {
@ -822,6 +841,10 @@ func TestChannelStateTransition(t *testing.T) {
t.Fatalf("revocation state was not synced")
}
// At this point, we should have 2 forwarding packages added.
fwdPkgs := loadFwdPkgs(t, cdb, channel.Packager)
require.Len(t, fwdPkgs, 2, "wrong number of forwarding packages")
// Now attempt to delete the channel from the database.
closeSummary := &ChannelCloseSummary{
ChanPoint: channel.FundingOutpoint,
@ -852,6 +875,10 @@ func TestChannelStateTransition(t *testing.T) {
if err == nil {
t.Fatal("revocation log search should have failed")
}
// All forwarding packages of this channel has been deleted too.
fwdPkgs = loadFwdPkgs(t, cdb, channel.Packager)
require.Empty(t, fwdPkgs, "no forwarding packages should exist")
}
func TestFetchPendingChannels(t *testing.T) {

View file

@ -18,6 +18,7 @@ import (
"github.com/lightningnetwork/lnd/channeldb/migration20"
"github.com/lightningnetwork/lnd/channeldb/migration21"
"github.com/lightningnetwork/lnd/channeldb/migration23"
"github.com/lightningnetwork/lnd/channeldb/migration24"
"github.com/lightningnetwork/lnd/channeldb/migration_01_to_11"
"github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/kvdb"
@ -198,6 +199,11 @@ var (
number: 23,
migration: migration23.MigrateHtlcAttempts,
},
{
// Remove old forwarding packages of closed channels.
number: 24,
migration: migration24.MigrateFwdPkgCleanup,
},
}
// Big endian is the preferred byte order, due to cursor scans over
@ -1293,6 +1299,8 @@ func (d *DB) FetchHistoricalChannel(outPoint *wire.OutPoint) (*OpenChannel, erro
}
channel, err = fetchOpenChannel(chanBucket, outPoint)
channel.Db = d
return err
}, func() {
channel = nil

View file

@ -717,9 +717,10 @@ func TestFetchHistoricalChannel(t *testing.T) {
t.Fatalf("unexepected error getting channel: %v", err)
}
// Set the db on our channel to nil so that we can check that all other
// fields on the channel equal those on the historical channel.
channel.Db = nil
// FetchHistoricalChannel will attach the cdb to channel.Db, we set it
// here so that we can check that all other fields on the channel equal
// those on the historical channel.
channel.Db = cdb
if !reflect.DeepEqual(histChannel, channel) {
t.Fatalf("expected: %v, got: %v", channel, histChannel)

View file

@ -39,6 +39,38 @@ var (
// fwdPackagesKey is the root-level bucket that all forwarding packages
// are written. This bucket is further subdivided based on the short
// channel ID of each channel.
//
// Bucket hierarchy:
//
// fwdPackagesKey(root-bucket)
// |
// |-- <shortChannelID>
// | |
// | |-- <height>
// | | |-- ackFilterKey: <encoded bytes of PkgFilter>
// | | |-- settleFailFilterKey: <encoded bytes of PkgFilter>
// | | |-- fwdFilterKey: <encoded bytes of PkgFilter>
// | | |
// | | |-- addBucketKey
// | | | |-- <index of LogUpdate>: <encoded bytes of LogUpdate>
// | | | |-- <index of LogUpdate>: <encoded bytes of LogUpdate>
// | | | ...
// | | |
// | | |-- failSettleBucketKey
// | | |-- <index of LogUpdate>: <encoded bytes of LogUpdate>
// | | |-- <index of LogUpdate>: <encoded bytes of LogUpdate>
// | | ...
// | |
// | |-- <height>
// | | |
// | ... ...
// |
// |
// |-- <shortChannelID>
// | |
// | ...
// ...
//
fwdPackagesKey = []byte("fwd-packages")
// addBucketKey is the bucket to which all Add log updates are written.
@ -401,6 +433,9 @@ type FwdPackager interface {
// RemovePkg deletes a forwarding package owned by this channel at
// the provided remote `height`.
RemovePkg(tx kvdb.RwTx, height uint64) error
// Wipe deletes all the forwarding packages owned by this channel.
Wipe(tx kvdb.RwTx) error
}
// ChannelPackager is used by a channel to manage the lifecycle of its forwarding
@ -912,6 +947,24 @@ func (p *ChannelPackager) RemovePkg(tx kvdb.RwTx, height uint64) error {
return sourceBkt.DeleteNestedBucket(heightKey[:])
}
// Wipe deletes all the channel's forwarding packages, if any.
func (p *ChannelPackager) Wipe(tx kvdb.RwTx) error {
// If the root bucket doesn't exist, there's no need to delete.
fwdPkgBkt := tx.ReadWriteBucket(fwdPackagesKey)
if fwdPkgBkt == nil {
return nil
}
sourceBytes := makeLogKey(p.source.ToUint64())
// If the nested bucket doesn't exist, there's no need to delete.
if fwdPkgBkt.NestedReadWriteBucket(sourceBytes[:]) == nil {
return nil
}
return fwdPkgBkt.DeleteNestedBucket(sourceBytes[:])
}
// uint16Key writes the provided 16-bit unsigned integer to a 2-byte slice.
func uint16Key(i uint16) []byte {
key := make([]byte, 2)

View file

@ -11,6 +11,7 @@ import (
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/stretchr/testify/require"
)
// TestPkgFilterBruteForce tests the behavior of a pkg filter up to size 1000,
@ -729,6 +730,49 @@ func TestPackagerSettleFailsThenAdds(t *testing.T) {
}
}
// TestPackagerWipeAll checks that when the method is called, all the related
// forwarding packages will be removed.
func TestPackagerWipeAll(t *testing.T) {
t.Parallel()
db := makeFwdPkgDB(t, "")
shortChanID := lnwire.NewShortChanIDFromInt(1)
packager := channeldb.NewChannelPackager(shortChanID)
// To begin, there should be no forwarding packages on disk.
fwdPkgs := loadFwdPkgs(t, db, packager)
require.Empty(t, fwdPkgs, "no forwarding packages should exist")
// Now, check we can wipe without error since it's a noop.
err := kvdb.Update(db, packager.Wipe, func() {})
require.NoError(t, err, "unable to wipe fwdpkg")
// Next, create and write two forwarding packages with no htlcs.
fwdPkg1 := channeldb.NewFwdPkg(shortChanID, 0, nil, nil)
fwdPkg2 := channeldb.NewFwdPkg(shortChanID, 1, nil, nil)
err = kvdb.Update(db, func(tx kvdb.RwTx) error {
if err := packager.AddFwdPkg(tx, fwdPkg2); err != nil {
return err
}
return packager.AddFwdPkg(tx, fwdPkg1)
}, func() {})
require.NoError(t, err, "unable to add fwd pkg")
// There should now be two fwdpkgs on disk.
fwdPkgs = loadFwdPkgs(t, db, packager)
require.Equal(t, 2, len(fwdPkgs), "expected 2 fwdpkg")
// Now, wipe all forwarding packages from disk.
err = kvdb.Update(db, packager.Wipe, func() {})
require.NoError(t, err, "unable to wipe fwdpkg")
// Check that the packages were actually removed.
fwdPkgs = loadFwdPkgs(t, db, packager)
require.Empty(t, fwdPkgs, "no forwarding packages should exist")
}
// assertFwdPkgState checks the current state of a fwdpkg meets our
// expectations.
func assertFwdPkgState(t *testing.T, fwdPkg *channeldb.FwdPkg,

View file

@ -7,6 +7,7 @@ import (
"github.com/lightningnetwork/lnd/channeldb/migration12"
"github.com/lightningnetwork/lnd/channeldb/migration13"
"github.com/lightningnetwork/lnd/channeldb/migration16"
"github.com/lightningnetwork/lnd/channeldb/migration24"
"github.com/lightningnetwork/lnd/channeldb/migration_01_to_11"
"github.com/lightningnetwork/lnd/kvdb"
)
@ -36,5 +37,6 @@ func UseLogger(logger btclog.Logger) {
migration12.UseLogger(logger)
migration13.UseLogger(logger)
migration16.UseLogger(logger)
migration24.UseLogger(logger)
kvdb.UseLogger(logger)
}

View file

@ -0,0 +1,14 @@
package migration24
import (
"github.com/btcsuite/btclog"
)
// log is a logger that is initialized as disabled. This means the package will
// not perform any logging by default until a logger is set.
var log = btclog.Disabled
// UseLogger uses a specified Logger to output package logging info.
func UseLogger(logger btclog.Logger) {
log = logger
}

View file

@ -0,0 +1,119 @@
package migration24
import (
"bytes"
"encoding/binary"
"io"
mig "github.com/lightningnetwork/lnd/channeldb/migration_01_to_11"
"github.com/lightningnetwork/lnd/kvdb"
)
var (
// closedChannelBucket stores summarization information concerning
// previously open, but now closed channels.
closedChannelBucket = []byte("closed-chan-bucket")
// fwdPackagesKey is the root-level bucket that all forwarding packages
// are written. This bucket is further subdivided based on the short
// channel ID of each channel.
fwdPackagesKey = []byte("fwd-packages")
)
// MigrateFwdPkgCleanup deletes all the forwarding packages of closed channels.
// It determines the closed channels by iterating closedChannelBucket. The
// ShortChanID found in the ChannelCloseSummary is then used as a key to query
// the forwarding packages bucket. If a match is found, it will be deleted.
func MigrateFwdPkgCleanup(tx kvdb.RwTx) error {
log.Infof("Deleting forwarding packages for closed channels")
// Find all closed channel summaries, which are stored in the
// closeBucket.
closeBucket := tx.ReadBucket(closedChannelBucket)
if closeBucket == nil {
return nil
}
var chanSummaries []*mig.ChannelCloseSummary
// appendSummary is a function closure to help put deserialized close
// summeries into chanSummaries.
appendSummary := func(_ []byte, summaryBytes []byte) error {
summaryReader := bytes.NewReader(summaryBytes)
chanSummary, err := deserializeCloseChannelSummary(
summaryReader,
)
if err != nil {
return err
}
// Skip pending channels
if chanSummary.IsPending {
return nil
}
chanSummaries = append(chanSummaries, chanSummary)
return nil
}
if err := closeBucket.ForEach(appendSummary); err != nil {
return err
}
// Now we will load the forwarding packages bucket, delete all the
// nested buckets whose source matches the ShortChanID found in the
// closed channel summeraries.
fwdPkgBkt := tx.ReadWriteBucket(fwdPackagesKey)
if fwdPkgBkt == nil {
return nil
}
// Iterate over all close channels and remove their forwarding packages.
for _, summery := range chanSummaries {
sourceBytes := makeLogKey(summery.ShortChanID.ToUint64())
// First, we will try to find the corresponding bucket. If there
// is not a nested bucket matching the ShortChanID, we will skip
// it.
if fwdPkgBkt.NestedReadBucket(sourceBytes[:]) == nil {
continue
}
// Otherwise, wipe all the forwarding packages.
if err := fwdPkgBkt.DeleteNestedBucket(
sourceBytes[:]); err != nil {
return err
}
}
log.Infof("Deletion of forwarding packages of closed channels " +
"complete! DB compaction is recommended to free up the" +
"disk space.")
return nil
}
// deserializeCloseChannelSummary will decode a CloseChannelSummary with no
// optional fields.
func deserializeCloseChannelSummary(
r io.Reader) (*mig.ChannelCloseSummary, error) {
c := &mig.ChannelCloseSummary{}
err := mig.ReadElements(
r,
&c.ChanPoint, &c.ShortChanID, &c.ChainHash, &c.ClosingTXID,
&c.CloseHeight, &c.RemotePub, &c.Capacity, &c.SettledBalance,
&c.TimeLockedBalance, &c.CloseType, &c.IsPending,
)
if err != nil {
return nil, err
}
return c, nil
}
// makeLogKey converts a uint64 into an 8 byte array.
func makeLogKey(updateNum uint64) [8]byte {
var key [8]byte
binary.BigEndian.PutUint64(key[:], updateNum)
return key
}

View file

@ -0,0 +1,296 @@
package migration24
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"io"
"math/rand"
"testing"
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
lnwire "github.com/lightningnetwork/lnd/channeldb/migration/lnwire21"
mig "github.com/lightningnetwork/lnd/channeldb/migration_01_to_11"
"github.com/lightningnetwork/lnd/channeldb/migtest"
"github.com/lightningnetwork/lnd/kvdb"
)
var (
key = [chainhash.HashSize]byte{
0x81, 0xb6, 0x37, 0xd8, 0xfc, 0xd2, 0xc6, 0xda,
0x68, 0x59, 0xe6, 0x96, 0x31, 0x13, 0xa1, 0x17,
0xd, 0xe7, 0x93, 0xe4, 0xb7, 0x25, 0xb8, 0x4d,
0x1e, 0xb, 0x4c, 0xf9, 0x9e, 0xc5, 0x8c, 0xe9,
}
testTx = &wire.MsgTx{
Version: 1,
TxIn: []*wire.TxIn{
{
PreviousOutPoint: wire.OutPoint{
Hash: chainhash.Hash{},
Index: 0xffffffff,
},
SignatureScript: []byte{0x04, 0x31, 0xdc, 0x00,
0x1b, 0x01, 0x62},
Sequence: 0xffffffff,
},
},
TxOut: []*wire.TxOut{
{
Value: 5000000000,
PkScript: []byte{
0x41, // OP_DATA_65
0x04, 0xd6, 0x4b, 0xdf, 0xd0, 0x9e,
0xb1, 0xc5, 0xfe, 0x29, 0x5a, 0xbd,
0xeb, 0x1d, 0xca, 0x42, 0x81, 0xbe,
0x98, 0x8e, 0x2d, 0xa0, 0xb6, 0xc1,
0xc6, 0xa5, 0x9d, 0xc2, 0x26, 0xc2,
0x86, 0x24, 0xe1, 0x81, 0x75, 0xe8,
0x51, 0xc9, 0x6b, 0x97, 0x3d, 0x81,
0xb0, 0x1c, 0xc3, 0x1f, 0x04, 0x78,
0x34, 0xbc, 0x06, 0xd6, 0xd6, 0xed,
0xf6, 0x20, 0xd1, 0x84, 0x24, 0x1a,
0x6a, 0xed, 0x8b, 0x63,
0xa6, // 65-byte signature
0xac, // OP_CHECKSIG
},
},
},
LockTime: 5,
}
_, pubKey = btcec.PrivKeyFromBytes(btcec.S256(), key[:])
)
// TestMigrateFwdPkgCleanup asserts that the migration will delete all the
// forwarding packages for close channels, and leave the rest untouched.
func TestMigrateFwdPkgCleanup(t *testing.T) {
migrationTests := []struct {
name string
beforeMigrationFunc func(kvdb.RwTx) error
afterMigrationFunc func(kvdb.RwTx) error
}{
{
// No closed channels summeries in the db.
// This leaves the fwdpkg untouched.
name: "no closed channel summeries",
beforeMigrationFunc: genBeforeMigration(
false, []int{}, []int{1},
),
afterMigrationFunc: genAfterMigration(
[]int{}, []int{1},
),
},
{
// One closed summery found, and the forwarding package
// shares the same channel ID.
// This makes the fwdpkg removed.
name: "remove one fwdpkg",
beforeMigrationFunc: genBeforeMigration(
false, []int{1}, []int{1},
),
afterMigrationFunc: genAfterMigration(
[]int{1}, []int{},
),
},
{
// One closed summery with pending status found, and the
// forwarding package shares the same channel ID.
// This leaves the fwdpkg untouched.
name: "no action if closed status is pending",
beforeMigrationFunc: genBeforeMigration(
true, []int{1}, []int{1},
),
afterMigrationFunc: genAfterMigration(
[]int{}, []int{1},
),
},
{
// One closed summery found, while the forwarding
// package has a different channel ID.
// This leaves the fwdpkg untouched.
name: "no fwdpkg removed",
beforeMigrationFunc: genBeforeMigration(
false, []int{1}, []int{2},
),
afterMigrationFunc: genAfterMigration(
[]int{}, []int{2},
),
},
{
// Multiple closed summeries and fwdpkg nested buckets
// found. Only the matched fwdPkg nested buckets are
// removed.
name: "only matching fwdpkg removed",
beforeMigrationFunc: genBeforeMigration(false,
[]int{1, 2, 3},
[]int{1, 2, 4},
),
afterMigrationFunc: genAfterMigration(
[]int{1, 2},
[]int{4},
),
},
}
for _, tt := range migrationTests {
test := tt
t.Run(test.name, func(t *testing.T) {
migtest.ApplyMigration(
t,
test.beforeMigrationFunc,
test.afterMigrationFunc,
MigrateFwdPkgCleanup,
false,
)
})
}
}
func genBeforeMigration(isPending bool,
closeSummeryChanIDs, fwdPkgChanIDs []int) func(kvdb.RwTx) error {
return func(tx kvdb.RwTx) error {
// Create closed channel summeries
for _, id := range closeSummeryChanIDs {
chanID := lnwire.NewShortChanIDFromInt(uint64(id))
if err := createTestCloseChannelSummery(
tx, isPending, chanID,
); err != nil {
return err
}
}
// Create fwdPkg nested buckets
for _, id := range fwdPkgChanIDs {
chanID := lnwire.NewShortChanIDFromInt(uint64(id))
err := createTestFwdPkgBucket(tx, chanID)
if err != nil {
return err
}
}
return nil
}
}
func genAfterMigration(deleted, untouched []int) func(kvdb.RwTx) error {
return func(tx kvdb.RwTx) error {
fwdPkgBkt := tx.ReadBucket(fwdPackagesKey)
if fwdPkgBkt == nil {
return errors.New("unable to find bucket")
}
// Reading deleted buckets should return nil
for _, id := range deleted {
chanID := lnwire.NewShortChanIDFromInt(uint64(id))
sourceKey := makeLogKey(chanID.ToUint64())
sourceBkt := fwdPkgBkt.NestedReadBucket(sourceKey[:])
if sourceBkt != nil {
return fmt.Errorf(
"expected bucket to be deleted: %v",
id,
)
}
}
// Reading untouched buckets should return not nil
for _, id := range untouched {
chanID := lnwire.NewShortChanIDFromInt(uint64(id))
sourceKey := makeLogKey(chanID.ToUint64())
sourceBkt := fwdPkgBkt.NestedReadBucket(sourceKey[:])
if sourceBkt == nil {
return fmt.Errorf(
"expected bucket to not be deleted: %v",
id,
)
}
}
return nil
}
}
// createTestCloseChannelSummery creates a CloseChannelSummery for testing.
func createTestCloseChannelSummery(tx kvdb.RwTx, isPending bool,
chanID lnwire.ShortChannelID) error {
closedChanBucket, err := tx.CreateTopLevelBucket(closedChannelBucket)
if err != nil {
return err
}
outputPoint := wire.OutPoint{Hash: key, Index: rand.Uint32()}
ccs := &mig.ChannelCloseSummary{
ChanPoint: outputPoint,
ShortChanID: chanID,
ChainHash: key,
ClosingTXID: testTx.TxHash(),
CloseHeight: 100,
RemotePub: pubKey,
Capacity: btcutil.Amount(10000),
SettledBalance: btcutil.Amount(50000),
CloseType: mig.RemoteForceClose,
IsPending: isPending,
// Optional fields
RemoteCurrentRevocation: pubKey,
RemoteNextRevocation: pubKey,
}
var b bytes.Buffer
if err := serializeChannelCloseSummary(&b, ccs); err != nil {
return err
}
var chanPointBuf bytes.Buffer
if err := writeOutpoint(&chanPointBuf, &outputPoint); err != nil {
return err
}
return closedChanBucket.Put(chanPointBuf.Bytes(), b.Bytes())
}
func createTestFwdPkgBucket(tx kvdb.RwTx, chanID lnwire.ShortChannelID) error {
fwdPkgBkt, err := tx.CreateTopLevelBucket(fwdPackagesKey)
if err != nil {
return err
}
source := makeLogKey(chanID.ToUint64())
if _, err := fwdPkgBkt.CreateBucketIfNotExists(source[:]); err != nil {
return err
}
return nil
}
func serializeChannelCloseSummary(
w io.Writer, cs *mig.ChannelCloseSummary) error {
err := mig.WriteElements(
w,
cs.ChanPoint, cs.ShortChanID, cs.ChainHash, cs.ClosingTXID,
cs.CloseHeight, cs.RemotePub, cs.Capacity, cs.SettledBalance,
cs.TimeLockedBalance, cs.CloseType, cs.IsPending,
)
if err != nil {
return err
}
return nil
}
// writeOutpoint writes an outpoint to the passed writer using the minimal
// amount of bytes possible.
func writeOutpoint(w io.Writer, o *wire.OutPoint) error {
if _, err := w.Write(o.Hash[:]); err != nil {
return err
}
if err := binary.Write(w, binary.BigEndian, o.Index); err != nil {
return err
}
return nil
}

View file

@ -125,6 +125,59 @@ type ArbitratorLog interface {
type ArbitratorState uint8
const (
// While some state transition is allowed, certain transitions are not
// possible. Listed below is the full state transition map which
// contains all possible paths. We start at StateDefault and end at
// StateFullyResolved, or StateError(not listed as its a possible state
// in every path). The format is,
// -> state: conditions we switch to this state.
//
// StateDefault
// |
// |-> StateDefault: no actions and chain trigger
// |
// |-> StateBroadcastCommit: chain/user trigger
// | |
// | |-> StateCommitmentBroadcasted: chain/user trigger
// | | |
// | | |-> StateCommitmentBroadcasted: chain/user trigger
// | | |
// | | |-> StateContractClosed: local/remote close trigger
// | | | |
// | | | |-> StateWaitingFullResolution: contract resolutions not empty
// | | | | |
// | | | | |-> StateWaitingFullResolution: contract resolutions not empty
// | | | | |
// | | | | |-> StateFullyResolved: contract resolutions empty
// | | | |
// | | | |-> StateFullyResolved: contract resolutions empty
// | | |
// | | |-> StateFullyResolved: coop/breach close trigger
// | |
// | |-> StateContractClosed: local/remote close trigger
// | | |
// | | |-> StateWaitingFullResolution: contract resolutions not empty
// | | | |
// | | | |-> StateWaitingFullResolution: contract resolutions not empty
// | | | |
// | | | |-> StateFullyResolved: contract resolutions empty
// | | |
// | | |-> StateFullyResolved: contract resolutions empty
// | |
// | |-> StateFullyResolved: coop/breach close trigger
// |
// |-> StateContractClosed: local/remote close trigger
// | |
// | |-> StateWaitingFullResolution: contract resolutions empty
// | | |
// | | |-> StateWaitingFullResolution: contract resolutions not empty
// | | |
// | | |-> StateFullyResolved: contract resolutions empty
// | |
// | |-> StateFullyResolved: contract resolutions empty
// |
// |-> StateFullyResolved: coop/breach close trigger
// StateDefault is the default state. In this state, no major actions
// need to be executed.
StateDefault ArbitratorState = 0

View file

@ -990,7 +990,7 @@ func (c *ChannelArbitrator) stateStep(
}
// If the resolution is empty, and we have no HTLCs at all to
// tend to, then we're done here. We don't need to launch any
// send to, then we're done here. We don't need to launch any
// resolvers, and can go straight to our final state.
if contractResolutions.IsEmpty() && confCommitSet.IsEmpty() {
log.Infof("ChannelArbitrator(%v): contract "+
@ -1001,7 +1001,7 @@ func (c *ChannelArbitrator) stateStep(
}
// Now that we know we'll need to act, we'll process the htlc
// actions, wen create the structures we need to resolve all
// actions, then create the structures we need to resolve all
// outstanding contracts.
htlcResolvers, pktsToSend, err := c.prepContractResolutions(
contractResolutions, triggerHeight, trigger,

View file

@ -281,6 +281,10 @@ you.
* [Ensure single writer for legacy
code](https://github.com/lightningnetwork/lnd/pull/5547) when using etcd
backend.
* When starting/restarting, `lnd` will [clean forwarding packages, payment
circuits and keystones](https://github.com/lightningnetwork/lnd/pull/4364)
for closed channels, which will potentially free up disk space for long
running nodes that have lots of closed channels.
* [Optimized payment sequence generation](https://github.com/lightningnetwork/lnd/pull/5514/)
to make LNDs payment throughput (and latency) with better when using etcd.

View file

@ -137,11 +137,31 @@ var (
// a packet to the source link, potentially including an error
// encrypter for applying this hop's encryption to the payload in the
// reverse direction.
//
// Bucket hierarchy:
//
// circuitAddKey(root-bucket)
// |
// |-- <incoming-circuit-key>: <encoded bytes of PaymentCircuit>
// |-- <incoming-circuit-key>: <encoded bytes of PaymentCircuit>
// |
// ...
//
circuitAddKey = []byte("circuit-adds")
// circuitKeystoneKey is used to retrieve the bucket containing circuit
// keystones, which are set in place once a forwarded packet is
// assigned an index on an outgoing commitment txn.
//
// Bucket hierarchy:
//
// circuitKeystoneKey(root-bucket)
// |
// |-- <outgoing-circuit-key>: <incoming-circuit-key>
// |-- <outgoing-circuit-key>: <incoming-circuit-key>
// |
// ...
//
circuitKeystoneKey = []byte("circuit-keystones")
)
@ -199,6 +219,11 @@ func NewCircuitMap(cfg *CircuitMapConfig) (CircuitMap, error) {
return nil, err
}
// Delete old circuits and keystones of closed channels.
if err := cm.cleanClosedChannels(); err != nil {
return nil, err
}
// Load any previously persisted circuit into back into memory.
if err := cm.restoreMemState(); err != nil {
return nil, err
@ -230,6 +255,216 @@ func (cm *circuitMap) initBuckets() error {
}, func() {})
}
// cleanClosedChannels deletes all circuits and keystones related to closed
// channels. It first reads all the closed channels and caches the ShortChanIDs
// into a map for fast lookup. Then it iterates the circuit bucket and keystone
// bucket and deletes items whose ChanID matches the ShortChanID.
//
// NOTE: this operation can also be built into restoreMemState since the latter
// already opens and iterates the two root buckets, circuitAddKey and
// circuitKeystoneKey. Depending on the size of the buckets, this marginal gain
// may be worth investigating. Atm, for clarity, this operation is wrapped into
// its own function.
func (cm *circuitMap) cleanClosedChannels() error {
log.Infof("Cleaning circuits from disk for closed channels")
// closedChanIDSet stores the short channel IDs for closed channels.
closedChanIDSet := make(map[lnwire.ShortChannelID]struct{})
// circuitKeySet stores the incoming circuit keys of the payment
// circuits that need to be deleted.
circuitKeySet := make(map[CircuitKey]struct{})
// keystoneKeySet stores the outgoing keys of the keystones that need
// to be deleted.
keystoneKeySet := make(map[CircuitKey]struct{})
// isClosedChannel is a helper closure that returns a bool indicating
// the chanID belongs to a closed channel.
isClosedChannel := func(chanID lnwire.ShortChannelID) bool {
// Skip if the channel ID is zero value. This has the effect
// that a zero value incoming or outgoing key will never be
// matched and its corresponding circuits or keystones are not
// deleted.
if chanID.ToUint64() == 0 {
return false
}
_, ok := closedChanIDSet[chanID]
return ok
}
// Find closed channels and cache their ShortChannelIDs into a map.
// This map will be used for looking up relative circuits and keystones.
closedChannels, err := cm.cfg.DB.FetchClosedChannels(false)
if err != nil {
return err
}
for _, closedChannel := range closedChannels {
// Skip if the channel close is pending.
if closedChannel.IsPending {
continue
}
closedChanIDSet[closedChannel.ShortChanID] = struct{}{}
}
log.Debugf("Found %v closed channels", len(closedChanIDSet))
// Exit early if there are no closed channels.
if len(closedChanIDSet) == 0 {
log.Infof("Finished cleaning: no closed channels found, " +
"no actions taken.",
)
return nil
}
// Find the payment circuits and keystones that need to be deleted.
if err := kvdb.View(cm.cfg.DB, func(tx kvdb.RTx) error {
circuitBkt := tx.ReadBucket(circuitAddKey)
if circuitBkt == nil {
return ErrCorruptedCircuitMap
}
keystoneBkt := tx.ReadBucket(circuitKeystoneKey)
if keystoneBkt == nil {
return ErrCorruptedCircuitMap
}
// If a circuit's incoming/outgoing key prefix matches the
// ShortChanID, it will be deleted. However, if the ShortChanID
// of the incoming key is zero, the circuit will be kept as it
// indicates a locally initiated payment.
if err := circuitBkt.ForEach(func(_, v []byte) error {
circuit, err := cm.decodeCircuit(v)
if err != nil {
return err
}
// Check if the incoming channel ID can be found in the
// closed channel ID map.
if !isClosedChannel(circuit.Incoming.ChanID) {
return nil
}
circuitKeySet[circuit.Incoming] = struct{}{}
return nil
}); err != nil {
return err
}
// If a keystone's InKey or OutKey matches the short channel id
// in the closed channel ID map, it will be deleted.
err := keystoneBkt.ForEach(func(k, v []byte) error {
var (
inKey CircuitKey
outKey CircuitKey
)
// Decode the incoming and outgoing circuit keys.
if err := inKey.SetBytes(v); err != nil {
return err
}
if err := outKey.SetBytes(k); err != nil {
return err
}
// Check if the incoming channel ID can be found in the
// closed channel ID map.
if isClosedChannel(inKey.ChanID) {
// If the incoming channel is closed, we can
// skip checking on outgoing channel ID because
// this keystone will be deleted.
keystoneKeySet[outKey] = struct{}{}
// Technically the incoming keys found in
// keystone bucket should be a subset of
// circuit bucket. So a previous loop should
// have this inKey put inside circuitAddKey map
// already. We do this again to be sure the
// circuits are properly cleaned. Even this
// inKey doesn't exist in circuit bucket, we
// are fine as db deletion is a noop.
circuitKeySet[inKey] = struct{}{}
return nil
}
// Check if the outgoing channel ID can be found in the
// closed channel ID map. Notice that we need to store
// the outgoing key because it's used for db query.
if isClosedChannel(outKey.ChanID) {
keystoneKeySet[outKey] = struct{}{}
// Also update circuitKeySet to mark the
// payment circuit needs to be deleted.
circuitKeySet[inKey] = struct{}{}
}
return nil
})
return err
}, func() {
// Reset the sets.
circuitKeySet = make(map[CircuitKey]struct{})
keystoneKeySet = make(map[CircuitKey]struct{})
}); err != nil {
return err
}
log.Debugf("To be deleted: num_circuits=%v, num_keystones=%v",
len(circuitKeySet), len(keystoneKeySet),
)
numCircuitsDeleted := 0
numKeystonesDeleted := 0
// Delete all the circuits and keystones for closed channels.
if err := kvdb.Update(cm.cfg.DB, func(tx kvdb.RwTx) error {
circuitBkt := tx.ReadWriteBucket(circuitAddKey)
if circuitBkt == nil {
return ErrCorruptedCircuitMap
}
keystoneBkt := tx.ReadWriteBucket(circuitKeystoneKey)
if keystoneBkt == nil {
return ErrCorruptedCircuitMap
}
// Delete the ciruit.
for inKey := range circuitKeySet {
if err := circuitBkt.Delete(inKey.Bytes()); err != nil {
return err
}
numCircuitsDeleted++
}
// Delete the keystone using the outgoing key.
for outKey := range keystoneKeySet {
err := keystoneBkt.Delete(outKey.Bytes())
if err != nil {
return err
}
numKeystonesDeleted++
}
return nil
}, func() {}); err != nil {
numCircuitsDeleted = 0
numKeystonesDeleted = 0
return err
}
log.Infof("Finished cleaning: num_closed_channel=%v, "+
"num_circuits=%v, num_keystone=%v",
len(closedChannels), numCircuitsDeleted, numKeystonesDeleted,
)
return nil
}
// restoreMemState loads the contents of the half circuit and full circuit
// buckets from disk and reconstructs the in-memory representation of the
// circuit map. Afterwards, the state of the hash index is reconstructed using
@ -489,8 +724,8 @@ func (cm *circuitMap) TrimOpenCircuits(chanID lnwire.ShortChannelID,
}, func() {})
}
// LookupByHTLC looks up the payment circuit by the outgoing channel and HTLC
// IDs. Returns nil if there is no such circuit.
// LookupCircuit queries the circuit map for the circuit identified by its
// incoming circuit key. Returns nil if there is no such circuit.
func (cm *circuitMap) LookupCircuit(inKey CircuitKey) *PaymentCircuit {
cm.mtx.RLock()
defer cm.mtx.RUnlock()

View file

@ -0,0 +1,388 @@
package htlcswitch_test
import (
"bytes"
"fmt"
"io"
"testing"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/stretchr/testify/require"
)
var (
// closedChannelBucket stores summarization information concerning
// previously open, but now closed channels.
closedChannelBucket = []byte("closed-chan-bucket")
)
// TestCircuitMapCleanClosedChannels checks that the circuits and keystones are
// deleted for closed channels upon restart.
func TestCircuitMapCleanClosedChannels(t *testing.T) {
t.Parallel()
var (
// chanID0 is a zero value channel ID indicating a locally
// initiated payment.
chanID0 = lnwire.NewShortChanIDFromInt(uint64(0))
chanID1 = lnwire.NewShortChanIDFromInt(uint64(1))
chanID2 = lnwire.NewShortChanIDFromInt(uint64(2))
inKey00 = htlcswitch.CircuitKey{ChanID: chanID0, HtlcID: 0}
inKey10 = htlcswitch.CircuitKey{ChanID: chanID1, HtlcID: 0}
inKey11 = htlcswitch.CircuitKey{ChanID: chanID1, HtlcID: 1}
inKey20 = htlcswitch.CircuitKey{ChanID: chanID2, HtlcID: 0}
inKey21 = htlcswitch.CircuitKey{ChanID: chanID2, HtlcID: 1}
inKey22 = htlcswitch.CircuitKey{ChanID: chanID2, HtlcID: 2}
outKey00 = htlcswitch.CircuitKey{ChanID: chanID0, HtlcID: 0}
outKey10 = htlcswitch.CircuitKey{ChanID: chanID1, HtlcID: 0}
outKey11 = htlcswitch.CircuitKey{ChanID: chanID1, HtlcID: 1}
outKey20 = htlcswitch.CircuitKey{ChanID: chanID2, HtlcID: 0}
outKey21 = htlcswitch.CircuitKey{ChanID: chanID2, HtlcID: 1}
outKey22 = htlcswitch.CircuitKey{ChanID: chanID2, HtlcID: 2}
)
type closeChannelParams struct {
chanID lnwire.ShortChannelID
isPending bool
}
testParams := []struct {
name string
// keystones is used to create and open circuits. A keystone is
// a pair of circuit keys, inKey and outKey, with the outKey
// optionally being empty. If a keystone with an outKey is used,
// a circuit will be created and opened, thus creating a circuit
// and a keystone in the DB. Otherwise, only the circuit is
// created.
keystones []htlcswitch.Keystone
chanParams []closeChannelParams
deleted []htlcswitch.Keystone
untouched []htlcswitch.Keystone
}{
{
name: "no deletion if there are no closed channels",
keystones: []htlcswitch.Keystone{
// Creates a circuit and a keystone
{InKey: inKey10, OutKey: outKey10},
},
untouched: []htlcswitch.Keystone{
{InKey: inKey10, OutKey: outKey10},
},
},
{
name: "no deletion if channel is pending close",
chanParams: []closeChannelParams{
// Creates a pending close channel.
{chanID: chanID1, isPending: true},
},
keystones: []htlcswitch.Keystone{
// Creates a circuit and a keystone
{InKey: inKey10, OutKey: outKey10},
},
untouched: []htlcswitch.Keystone{
{InKey: inKey10, OutKey: outKey10},
},
},
{
name: "no deletion if the chanID is zero value",
chanParams: []closeChannelParams{
// Creates a close channel with chanID0.
{chanID: chanID0, isPending: false},
},
keystones: []htlcswitch.Keystone{
// Creates a circuit and a keystone
{InKey: inKey00, OutKey: outKey00},
},
untouched: []htlcswitch.Keystone{
{InKey: inKey00, OutKey: outKey00},
},
},
{
name: "delete half circuits on inKey match",
chanParams: []closeChannelParams{
// Creates a close channel with chanID1.
{chanID: chanID1, isPending: false},
},
keystones: []htlcswitch.Keystone{
// Creates a circuit, no keystone created
{InKey: inKey10},
// Creates a circuit, no keystone created
{InKey: inKey11},
// Creates a circuit and a keystone
{InKey: inKey20, OutKey: outKey20},
},
deleted: []htlcswitch.Keystone{
{InKey: inKey00}, {InKey: inKey11},
},
untouched: []htlcswitch.Keystone{
{InKey: inKey20, OutKey: outKey20},
},
},
{
name: "delete half circuits on outKey match",
chanParams: []closeChannelParams{
// Creates a close channel with chanID1.
{chanID: chanID1, isPending: false},
},
keystones: []htlcswitch.Keystone{
// Creates a circuit and a keystone
{InKey: inKey20, OutKey: outKey10},
// Creates a circuit and a keystone
{InKey: inKey21, OutKey: outKey11},
// Creates a circuit and a keystone
{InKey: inKey22, OutKey: outKey21},
},
deleted: []htlcswitch.Keystone{
{InKey: inKey20, OutKey: outKey10},
{InKey: inKey21, OutKey: outKey11},
},
untouched: []htlcswitch.Keystone{
{InKey: inKey22, OutKey: outKey21},
},
},
{
name: "delete full circuits on inKey match",
chanParams: []closeChannelParams{
// Creates a close channel with chanID1.
{chanID: chanID1, isPending: false},
},
keystones: []htlcswitch.Keystone{
// Creates a circuit and a keystone
{InKey: inKey10, OutKey: outKey20},
// Creates a circuit and a keystone
{InKey: inKey11, OutKey: outKey21},
// Creates a circuit and a keystone
{InKey: inKey20, OutKey: outKey22},
},
deleted: []htlcswitch.Keystone{
{InKey: inKey10, OutKey: outKey20},
{InKey: inKey11, OutKey: outKey21},
},
untouched: []htlcswitch.Keystone{
{InKey: inKey20, OutKey: outKey22},
},
},
{
name: "delete full circuits on outKey match",
chanParams: []closeChannelParams{
// Creates a close channel with chanID1.
{chanID: chanID1, isPending: false},
},
keystones: []htlcswitch.Keystone{
// Creates a circuit and a keystone
{InKey: inKey20, OutKey: outKey10},
// Creates a circuit and a keystone
{InKey: inKey21, OutKey: outKey11},
// Creates a circuit and a keystone
{InKey: inKey22, OutKey: outKey20},
},
deleted: []htlcswitch.Keystone{
{InKey: inKey20, OutKey: outKey10},
{InKey: inKey21, OutKey: outKey11},
},
untouched: []htlcswitch.Keystone{
{InKey: inKey22, OutKey: outKey20},
},
},
{
name: "delete all circuits",
chanParams: []closeChannelParams{
// Creates a close channel with chanID1.
{chanID: chanID1, isPending: false},
// Creates a close channel with chanID2.
{chanID: chanID2, isPending: false},
},
keystones: []htlcswitch.Keystone{
// Creates a circuit and a keystone
{InKey: inKey20, OutKey: outKey10},
// Creates a circuit and a keystone
{InKey: inKey21, OutKey: outKey11},
// Creates a circuit and a keystone
{InKey: inKey22, OutKey: outKey20},
},
deleted: []htlcswitch.Keystone{
{InKey: inKey20, OutKey: outKey10},
{InKey: inKey21, OutKey: outKey11},
{InKey: inKey22, OutKey: outKey20},
},
},
}
for _, tt := range testParams {
test := tt
t.Run(test.name, func(t *testing.T) {
cfg, circuitMap := newCircuitMap(t)
// create test circuits
for _, ks := range test.keystones {
err := createTestCircuit(ks, circuitMap)
require.NoError(
t, err,
"failed to create test circuit",
)
}
// create close channels
err := kvdb.Update(cfg.DB, func(tx kvdb.RwTx) error {
for _, channel := range test.chanParams {
if err := createTestCloseChannelSummery(
tx, channel.isPending,
channel.chanID,
); err != nil {
return err
}
}
return nil
}, func() {})
require.NoError(
t, err,
"failed to create close channel summery",
)
// Now, restart the circuit map, and check that the
// circuits and keystones of closed channels are
// deleted in DB.
_, circuitMap = restartCircuitMap(t, cfg)
// Check that items are deleted. LookupCircuit and
// LookupOpenCircuit will check the cached circuits,
// which are loaded on restart from the DB.
for _, ks := range test.deleted {
assertKeystoneDeleted(t, circuitMap, ks)
}
// We also check we are not deleting wanted circuits.
for _, ks := range test.untouched {
assertKeystoneNotDeleted(t, circuitMap, ks)
}
})
}
}
// createTestCircuit creates a circuit for testing with its incoming key being
// the keystone's InKey. If the keystone has an OutKey, the circuit will be
// opened, which causes a Keystone to be created in DB.
func createTestCircuit(ks htlcswitch.Keystone, cm htlcswitch.CircuitMap) error {
circuit := &htlcswitch.PaymentCircuit{
Incoming: ks.InKey,
ErrorEncrypter: testExtracter,
}
// First we will try to add an new circuit to the circuit map, this
// should succeed.
_, err := cm.CommitCircuits(circuit)
if err != nil {
return fmt.Errorf("failed to commit circuits: %v", err)
}
// If the keystone has no outgoing key, we won't open it.
if ks.OutKey == htlcswitch.EmptyCircuitKey {
return nil
}
// Open the circuit, implicitly creates a keystone on disk.
err = cm.OpenCircuits(ks)
if err != nil {
return fmt.Errorf("failed to open circuits: %v", err)
}
return nil
}
// assertKeystoneDeleted checks that a given keystone is deleted from the
// circuit map.
func assertKeystoneDeleted(t *testing.T,
cm htlcswitch.CircuitLookup, ks htlcswitch.Keystone) {
c := cm.LookupCircuit(ks.InKey)
require.Nil(t, c, "no circuit should be found using InKey")
if ks.OutKey != htlcswitch.EmptyCircuitKey {
c = cm.LookupOpenCircuit(ks.OutKey)
require.Nil(t, c, "no circuit should be found using OutKey")
}
}
// assertKeystoneDeleted checks that a given keystone is not deleted from the
// circuit map.
func assertKeystoneNotDeleted(t *testing.T,
cm htlcswitch.CircuitLookup, ks htlcswitch.Keystone) {
c := cm.LookupCircuit(ks.InKey)
require.NotNil(t, c, "expecting circuit found using InKey")
if ks.OutKey != htlcswitch.EmptyCircuitKey {
c = cm.LookupOpenCircuit(ks.OutKey)
require.NotNil(t, c, "expecting circuit found using OutKey")
}
}
// createTestCloseChannelSummery creates a CloseChannelSummery for testing.
func createTestCloseChannelSummery(tx kvdb.RwTx, isPending bool,
chanID lnwire.ShortChannelID) error {
closedChanBucket, err := tx.CreateTopLevelBucket(closedChannelBucket)
if err != nil {
return err
}
outputPoint := wire.OutPoint{Hash: hash1, Index: 1}
ccs := &channeldb.ChannelCloseSummary{
ChanPoint: outputPoint,
ShortChanID: chanID,
ChainHash: hash1,
ClosingTXID: hash2,
CloseHeight: 100,
RemotePub: testEphemeralKey,
Capacity: btcutil.Amount(10000),
SettledBalance: btcutil.Amount(50000),
CloseType: channeldb.RemoteForceClose,
IsPending: isPending,
}
var b bytes.Buffer
if err := serializeChannelCloseSummary(&b, ccs); err != nil {
return err
}
var chanPointBuf bytes.Buffer
if err := lnwire.WriteOutPoint(&chanPointBuf, outputPoint); err != nil {
return err
}
return closedChanBucket.Put(chanPointBuf.Bytes(), b.Bytes())
}
func serializeChannelCloseSummary(
w io.Writer,
cs *channeldb.ChannelCloseSummary) error {
err := channeldb.WriteElements(
w,
cs.ChanPoint, cs.ShortChanID, cs.ChainHash, cs.ClosingTXID,
cs.CloseHeight, cs.RemotePub, cs.Capacity, cs.SettledBalance,
cs.TimeLockedBalance, cs.CloseType, cs.IsPending,
)
if err != nil {
return err
}
// If this is a close channel summary created before the addition of
// the new fields, then we can exit here.
if cs.RemoteCurrentRevocation == nil {
return channeldb.WriteElements(w, false)
}
return nil
}

View file

@ -1312,8 +1312,8 @@ func TestCircuitMapDeleteUnopenedCircuit(t *testing.T) {
}
}
// TestCircuitMapDeleteUnopenedCircuit checks that an open circuit can be
// removed persistently from the circuit map.
// TestCircuitMapDeleteOpenCircuit checks that an open circuit can be removed
// persistently from the circuit map.
func TestCircuitMapDeleteOpenCircuit(t *testing.T) {
t.Parallel()

View file

@ -2446,7 +2446,7 @@ func TestChannelLinkBandwidthConsistency(t *testing.T) {
addPkt.outgoingChanID = carolChanID
addPkt.outgoingHTLCID = 0
err = coreLink.cfg.Switch.openCircuits(addPkt.keystone())
err = coreLink.cfg.Circuits.OpenCircuits(addPkt.keystone())
if err != nil {
t.Fatalf("unable to set keystone: %v", err)
}
@ -2554,7 +2554,7 @@ func TestChannelLinkBandwidthConsistency(t *testing.T) {
addPkt.outgoingChanID = carolChanID
addPkt.outgoingHTLCID = 1
err = coreLink.cfg.Switch.openCircuits(addPkt.keystone())
err = coreLink.cfg.Circuits.OpenCircuits(addPkt.keystone())
if err != nil {
t.Fatalf("unable to set keystone: %v", err)
}
@ -5373,6 +5373,10 @@ func (*mockPackager) RemovePkg(tx kvdb.RwTx, height uint64) error {
return nil
}
func (*mockPackager) Wipe(tx kvdb.RwTx) error {
return nil
}
func (*mockPackager) AckSettleFails(tx kvdb.RwTx,
settleFailRefs ...channeldb.SettleFailRef) error {
return nil

View file

@ -671,7 +671,8 @@ func (f *mockChannelLink) completeCircuit(pkt *htlcPacket) error {
htlc.ID = f.htlcID
keystone := Keystone{pkt.inKey(), pkt.outKey()}
if err := f.htlcSwitch.openCircuits(keystone); err != nil {
err := f.htlcSwitch.circuits.OpenCircuits(keystone)
if err != nil {
return err
}
@ -690,7 +691,7 @@ func (f *mockChannelLink) completeCircuit(pkt *htlcPacket) error {
}
func (f *mockChannelLink) deleteCircuit(pkt *htlcPacket) error {
return f.htlcSwitch.deleteCircuits(pkt.inKey())
return f.htlcSwitch.circuits.DeleteCircuits(pkt.inKey())
}
func newMockChannelLink(htlcSwitch *Switch, chanID lnwire.ChannelID,

View file

@ -219,8 +219,8 @@ type Switch struct {
cfg *Config
// networkResults stores the results of payments initiated by the user.
// results. The store is used to later look up the payments and notify
// the user of the result when they are complete. Each payment attempt
// The store is used to later look up the payments and notify the
// user of the result when they are complete. Each payment attempt
// should be given a unique integer ID when it is created, otherwise
// results might be overwritten.
networkResults *networkResultStore
@ -375,9 +375,9 @@ func (s *Switch) GetPaymentResult(attemptID uint64, paymentHash lntypes.Hash,
deobfuscator ErrorDecrypter) (<-chan *PaymentResult, error) {
var (
nChan <-chan *networkResult
err error
outKey = CircuitKey{
nChan <-chan *networkResult
err error
inKey = CircuitKey{
ChanID: hop.Source,
HtlcID: attemptID,
}
@ -386,7 +386,7 @@ func (s *Switch) GetPaymentResult(attemptID uint64, paymentHash lntypes.Hash,
// If the payment is not found in the circuit map, check whether a
// result is already available.
// Assumption: no one will add this payment ID other than the caller.
if s.circuits.LookupCircuit(outKey) == nil {
if s.circuits.LookupCircuit(inKey) == nil {
res, err := s.networkResults.getResult(attemptID)
if err != nil {
return nil, err
@ -2234,18 +2234,6 @@ func (s *Switch) commitCircuits(circuits ...*PaymentCircuit) (
return s.circuits.CommitCircuits(circuits...)
}
// openCircuits preemptively writes the keystones for Adds that are about to be
// added to a commitment txn.
func (s *Switch) openCircuits(keystones ...Keystone) error {
return s.circuits.OpenCircuits(keystones...)
}
// deleteCircuits persistently removes the circuit, and keystone if present,
// from the circuit map.
func (s *Switch) deleteCircuits(inKeys ...CircuitKey) error {
return s.circuits.DeleteCircuits(inKeys...)
}
// FlushForwardingEvents flushes out the set of pending forwarding events to
// the persistent log. This will be used by the switch to periodically flush
// out the set of forwarding events to disk. External callers can also use this

File diff suppressed because it is too large Load diff

View file

@ -2236,6 +2236,9 @@ message PendingChannelsResponse {
// The commitment type used by this channel.
CommitmentType commitment_type = 9;
// Total number of forwarding packages created in this channel.
int64 num_forwarding_packages = 10;
}
message PendingOpenChannel {

View file

@ -2698,6 +2698,11 @@
"commitment_type": {
"$ref": "#/definitions/lnrpcCommitmentType",
"description": "The commitment type used by this channel."
},
"num_forwarding_packages": {
"type": "string",
"format": "int64",
"description": "Total number of forwarding packages created in this channel."
}
}
},

View file

@ -342,4 +342,8 @@ var allTestCases = []*testCase{
name: "rpc middleware interceptor",
test: testRPCMiddlewareInterceptor,
},
{
name: "wipe forwarding packages",
test: testWipeForwardingPackages,
},
}

View file

@ -0,0 +1,227 @@
package itest
import (
"context"
"testing"
"time"
"github.com/lightningnetwork/lnd/chainreg"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnrpc/routerrpc"
"github.com/lightningnetwork/lnd/lntest"
"github.com/stretchr/testify/require"
)
type pendingChan *lnrpc.PendingChannelsResponse_PendingChannel
// testWipeForwardingPackagesLocal tests that when a channel is closed, either
// through local force close, remote close, or coop close, all the forwarding
// packages of that particular channel are deleted. The test creates a
// topology as Alice -> Bob -> Carol, and sends payments from Alice to Carol
// through Bob. It then performs the test in two steps,
// - Bob force closes the channel Alice->Bob, and checks from both Bob's
// PoV(local force close) and Alice's Pov(remote close) that the forwarding
// packages are wiped.
// - Bob coop closes the channel Bob->Carol, and checks from both Bob PoVs that
// the forwarding packages are wiped.
func testWipeForwardingPackages(net *lntest.NetworkHarness,
t *harnessTest) {
// Setup the test and get the channel points.
pointAB, pointBC, carol, cleanUp := setupFwdPkgTest(net, t)
defer cleanUp()
// Firstly, Bob force closes the channel.
_, _, err := net.CloseChannel(net.Bob, pointAB, true)
require.NoError(t.t, err, "unable to force close channel")
// Now that the channel has been force closed, it should show up in
// bob's PendingChannels RPC under the waiting close section.
pendingChan := assertWaitingCloseChannel(t.t, net.Bob)
// Check that Bob has created forwarding packages. We don't care the
// exact number here as long as these packages are deleted when the
// channel is closed.
require.NotZero(t.t, pendingChan.NumForwardingPackages)
// Mine 1 block to get the closing transaction confirmed.
_, err = net.Miner.Client.Generate(1)
require.NoError(t.t, err, "unable to mine blocks")
// Now that the closing transaction is confirmed, the above waiting
// close channel should now become pending force closed channel.
pendingChan = assertPendingForceClosedChannel(t.t, net.Bob)
// Check the forwarding pacakges are deleted.
require.Zero(t.t, pendingChan.NumForwardingPackages)
// For Alice, the forwarding packages should have been wiped too.
pendingChanAlice := assertPendingForceClosedChannel(t.t, net.Alice)
require.Zero(t.t, pendingChanAlice.NumForwardingPackages)
// Secondly, Bob coop closes the channel.
_, _, err = net.CloseChannel(net.Bob, pointBC, false)
require.NoError(t.t, err, "unable to coop close channel")
// Now that the channel has been coop closed, it should show up in
// bob's PendingChannels RPC under the waiting close section.
pendingChan = assertWaitingCloseChannel(t.t, net.Bob)
// Check that Bob has created forwarding packages. We don't care the
// exact number here as long as these packages are deleted when the
// channel is closed.
require.NotZero(t.t, pendingChan.NumForwardingPackages)
// Since it's a coop close, Carol should see the waiting close channel
// too.
pendingChanCarol := assertWaitingCloseChannel(t.t, carol)
require.NotZero(t.t, pendingChanCarol.NumForwardingPackages)
// Mine 1 block to get the closing transaction confirmed.
_, err = net.Miner.Client.Generate(1)
require.NoError(t.t, err, "unable to mine blocks")
// Now that the closing transaction is confirmed, the above waiting
// close channel should now become pending closed channel. Note that
// the name PendingForceClosingChannels is a bit confusing, what it
// really contains is channels whose closing tx has been broadcast.
pendingChan = assertPendingForceClosedChannel(t.t, net.Bob)
// Check the forwarding pacakges are deleted.
require.Zero(t.t, pendingChan.NumForwardingPackages)
// Mine a block to confirm sweep transactions such that they
// don't remain in the mempool for any subsequent tests.
_, err = net.Miner.Client.Generate(1)
require.NoError(t.t, err, "unable to mine blocks")
}
// setupFwdPkgTest prepares the wipe forwarding packages tests. It creates a
// network topology that has a channel direction: Alice -> Bob -> Carol, sends
// several payments from Alice to Carol, and returns the two channel points(one
// for Alice and Bob, the other for Bob and Carol), the node Carol, and a
// cleanup function to be used when the test finishes.
func setupFwdPkgTest(net *lntest.NetworkHarness,
t *harnessTest) (*lnrpc.ChannelPoint, *lnrpc.ChannelPoint,
*lntest.HarnessNode, func()) {
ctxb := context.Background()
const (
chanAmt = 10e6
paymentAmt = 10e4
finalCTLVDelta = chainreg.DefaultBitcoinTimeLockDelta
numInvoices = 3
)
// Grab Alice and Bob from harness net.
alice, bob := net.Alice, net.Bob
// Create a new node Carol, which will create invoices that require
// Alice to pay.
carol := net.NewNode(t.t, "Carol", nil)
// Connect Bob to Carol.
net.ConnectNodes(t.t, bob, carol)
// Open a channel between Alice and Bob.
chanPointAB := openChannelAndAssert(
t, net, alice, bob, lntest.OpenChannelParams{
Amt: chanAmt,
},
)
// Open a channel between Bob and Carol.
chanPointBC := openChannelAndAssert(
t, net, bob, carol, lntest.OpenChannelParams{
Amt: chanAmt,
},
)
ctxt, cancel := context.WithTimeout(ctxb, defaultTimeout)
defer cancel()
// Alice sends several payments to Carol through Bob, which triggers
// Bob to create forwarding packages.
for i := 0; i < numInvoices; i++ {
// Add an invoice for Carol.
invoice := &lnrpc.Invoice{Memo: "testing", Value: paymentAmt}
invoiceResp, err := carol.AddInvoice(ctxt, invoice)
require.NoError(t.t, err, "unable to add invoice")
// Alice sends a payment to Carol through Bob.
sendAndAssertSuccess(
t, net.Alice, &routerrpc.SendPaymentRequest{
PaymentRequest: invoiceResp.PaymentRequest,
TimeoutSeconds: 60,
FeeLimitSat: noFeeLimitMsat,
},
)
}
return chanPointAB, chanPointBC, carol, func() {
shutdownAndAssert(net, t, alice)
shutdownAndAssert(net, t, bob)
shutdownAndAssert(net, t, carol)
}
}
// assertWaitingCloseChannel checks there is a single channel that is waiting
// for close and returns the channel found.
func assertWaitingCloseChannel(t *testing.T,
node *lntest.HarnessNode) pendingChan {
ctxb := context.Background()
var channel pendingChan
require.Eventually(t, func() bool {
ctxt, cancel := context.WithTimeout(ctxb, defaultTimeout)
defer cancel()
req := &lnrpc.PendingChannelsRequest{}
resp, err := node.PendingChannels(ctxt, req)
// We require the RPC call to be succeeded and won't retry upon
// an error.
require.NoError(t, err, "unable to query for pending channels")
if err := checkNumWaitingCloseChannels(resp, 1); err != nil {
return false
}
channel = resp.WaitingCloseChannels[0].Channel
return true
}, defaultTimeout, 200*time.Millisecond)
return channel
}
// assertForceClosedChannel checks there is a single channel that is pending
// force closed and returns the channel found.
func assertPendingForceClosedChannel(t *testing.T,
node *lntest.HarnessNode) pendingChan {
ctxb := context.Background()
var channel pendingChan
require.Eventually(t, func() bool {
ctxt, cancel := context.WithTimeout(ctxb, defaultTimeout)
defer cancel()
req := &lnrpc.PendingChannelsRequest{}
resp, err := node.PendingChannels(ctxt, req)
// We require the RPC call to be succeeded and won't retry upon
// an error.
require.NoError(t, err, "unable to query for pending channels")
if err := checkNumForceClosedChannels(resp, 1); err != nil {
return false
}
channel = resp.PendingForceClosingChannels[0].Channel
return true
}, defaultTimeout, 200*time.Millisecond)
return channel
}

View file

@ -3239,6 +3239,17 @@ func (r *rpcServer) PendingChannels(ctx context.Context,
return nil, err
}
// Get the number of forwarding packages from the historical
// channel.
fwdPkgs, err := historical.LoadFwdPkgs()
if err != nil {
rpcsLog.Errorf("unable to load forwarding packages "+
"for channel:%s, %v",
historical.ShortChannelID, err)
return nil, err
}
channel.NumForwardingPackages = int64(len(fwdPkgs))
closeTXID := pendingClose.ClosingTXID.String()
switch pendingClose.CloseType {
@ -3350,16 +3361,25 @@ func (r *rpcServer) PendingChannels(ctx context.Context,
)
}
fwdPkgs, err := waitingClose.LoadFwdPkgs()
if err != nil {
rpcsLog.Errorf("unable to load forwarding packages "+
"for channel:%s, %v",
waitingClose.ShortChannelID, err)
return nil, err
}
channel := &lnrpc.PendingChannelsResponse_PendingChannel{
RemoteNodePub: hex.EncodeToString(pub),
ChannelPoint: chanPoint.String(),
Capacity: int64(waitingClose.Capacity),
LocalBalance: int64(waitingClose.LocalCommitment.LocalBalance.ToSatoshis()),
RemoteBalance: int64(waitingClose.LocalCommitment.RemoteBalance.ToSatoshis()),
LocalChanReserveSat: int64(waitingClose.LocalChanCfg.ChanReserve),
RemoteChanReserveSat: int64(waitingClose.RemoteChanCfg.ChanReserve),
Initiator: rpcInitiator(waitingClose.IsInitiator),
CommitmentType: rpcCommitmentType(waitingClose.ChanType),
RemoteNodePub: hex.EncodeToString(pub),
ChannelPoint: chanPoint.String(),
Capacity: int64(waitingClose.Capacity),
LocalBalance: int64(waitingClose.LocalCommitment.LocalBalance.ToSatoshis()),
RemoteBalance: int64(waitingClose.LocalCommitment.RemoteBalance.ToSatoshis()),
LocalChanReserveSat: int64(waitingClose.LocalChanCfg.ChanReserve),
RemoteChanReserveSat: int64(waitingClose.RemoteChanCfg.ChanReserve),
Initiator: rpcInitiator(waitingClose.IsInitiator),
CommitmentType: rpcCommitmentType(waitingClose.ChanType),
NumForwardingPackages: int64(len(fwdPkgs)),
}
waitingCloseResp := &lnrpc.PendingChannelsResponse_WaitingCloseChannel{