Merge pull request #7879 from ziggie1984/remove-last-sweep-logic

Remove publishing last sweep tx logic during startup.
This commit is contained in:
Oliver Gugger 2023-08-15 16:21:43 +02:00 committed by GitHub
commit 4ab2454880
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 116 additions and 170 deletions

View File

@ -25,6 +25,7 @@ import (
"github.com/lightningnetwork/lnd/channeldb/migration27"
"github.com/lightningnetwork/lnd/channeldb/migration29"
"github.com/lightningnetwork/lnd/channeldb/migration30"
"github.com/lightningnetwork/lnd/channeldb/migration31"
"github.com/lightningnetwork/lnd/channeldb/migration_01_to_11"
"github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/kvdb"
@ -276,6 +277,14 @@ var (
number: 29,
migration: migration29.MigrateChanID,
},
{
// Removes the "sweeper-last-tx" bucket. Although we
// do not have a mandatory version 30 we skip this
// version because its naming is already used for the
// first optional migration.
number: 31,
migration: migration31.DeleteLastPublishedTxTLB,
},
}
// optionalVersions stores all optional migrations that are applied

View File

@ -9,6 +9,7 @@ import (
"github.com/lightningnetwork/lnd/channeldb/migration16"
"github.com/lightningnetwork/lnd/channeldb/migration24"
"github.com/lightningnetwork/lnd/channeldb/migration30"
"github.com/lightningnetwork/lnd/channeldb/migration31"
"github.com/lightningnetwork/lnd/channeldb/migration_01_to_11"
"github.com/lightningnetwork/lnd/kvdb"
)
@ -40,5 +41,6 @@ func UseLogger(logger btclog.Logger) {
migration16.UseLogger(logger)
migration24.UseLogger(logger)
migration30.UseLogger(logger)
migration31.UseLogger(logger)
kvdb.UseLogger(logger)
}

View File

@ -0,0 +1,14 @@
package migration31
import (
"github.com/btcsuite/btclog"
)
// log is a logger that is initialized as disabled. This means the package will
// not perform any logging by default until a logger is set.
var log = btclog.Disabled
// UseLogger uses a specified Logger to output package logging info.
func UseLogger(logger btclog.Logger) {
log = logger
}

View File

@ -0,0 +1,23 @@
package migration31
import (
"errors"
"github.com/btcsuite/btcwallet/walletdb"
"github.com/lightningnetwork/lnd/kvdb"
)
// DeleteLastPublishedTxTLB deletes the top level bucket with the key
// "sweeper-last-tx".
func DeleteLastPublishedTxTLB(tx kvdb.RwTx) error {
log.Infof("Deleting top-level bucket: %x ...", lastTxBucketKey)
err := tx.DeleteTopLevelBucket(lastTxBucketKey)
if err != nil && !errors.Is(err, walletdb.ErrBucketNotFound) {
return err
}
log.Infof("Deleted top-level bucket: %x", lastTxBucketKey)
return nil
}

View File

@ -0,0 +1,48 @@
package migration31
import (
"fmt"
"testing"
"github.com/lightningnetwork/lnd/channeldb/migtest"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/stretchr/testify/require"
)
var (
hexStr = migtest.Hex
// lastTxBefore is the "sweeper-last-tx" bucket before the migration.
// We fill the last-tx value with a dummy hex string because the actual
// value is not important when deleting the bucket.
lastTxBefore = map[string]interface{}{
"sweeper-last-tx": hexStr("0000"),
}
)
// TestDeleteLastPublishTxTLP asserts that the sweeper-last-tx bucket is
// properly deleted.
func TestDeleteLastPublishTxTLP(t *testing.T) {
t.Parallel()
// Prime the database with the populated sweeper-last-tx bucket.
before := func(tx kvdb.RwTx) error {
return migtest.RestoreDB(tx, lastTxBucketKey, lastTxBefore)
}
// After the migration, ensure that the sweeper-last-tx bucket was
// properly deleted.
after := func(tx kvdb.RwTx) error {
err := migtest.VerifyDB(tx, lastTxBucketKey, nil)
require.ErrorContains(
t, err,
fmt.Sprintf("bucket %s not found", lastTxBucketKey),
)
return nil
}
migtest.ApplyMigration(
t, before, after, DeleteLastPublishedTxTLB, false,
)
}

View File

@ -0,0 +1,9 @@
package migration31
var (
// lastTxBucketKey is the key that points to a bucket containing a
// single item storing the last published sweep tx.
//
// maps: lastTxKey -> serialized_tx
lastTxBucketKey = []byte("sweeper-last-tx")
)

View File

@ -230,9 +230,19 @@
creation](https://github.com/lightningnetwork/lnd/pull/7856) that can arise
under rare scenarios.
- A race condition found between `channel_ready` and link updates is [now
* A race condition found between `channel_ready` and link updates is [now
fixed](https://github.com/lightningnetwork/lnd/pull/7518).
* [Remove rebroadcasting of
the last sweep-tx](https://github.com/lightningnetwork/lnd/pull/7879). Now at
startup of the sweeper we do not rebroadcast the last sweep-tx anymore.
The "sweeper-last-tx" top level bucket in the channel.db is removed
(new migration version 31 of the db). The main reason is that neutrino
backends do not fail broadcasting invalid transactions because BIP157
supporting bitcoin core nodes do not reply with the reject msg anymore. So we
have to make sure to not broadcast outdated transactions which can lead to
locked up wallet funds indefinitely in the worst case.
### Tooling and documentation
* Add support for [custom `RPCHOST` and

View File

@ -4,7 +4,6 @@ import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
@ -12,15 +11,6 @@ import (
)
var (
// lastTxBucketKey is the key that points to a bucket containing a
// single item storing the last published tx.
//
// maps: lastTxKey -> serialized_tx
lastTxBucketKey = []byte("sweeper-last-tx")
// lastTxKey is the fixed key under which the serialized tx is stored.
lastTxKey = []byte("last-tx")
// txHashesBucketKey is the key that points to a bucket containing the
// hashes of all sweep txes that were published successfully.
//
@ -52,10 +42,6 @@ type SweeperStore interface {
// NotifyPublishTx signals that we are about to publish a tx.
NotifyPublishTx(*wire.MsgTx) error
// GetLastPublishedTx returns the last tx that we called NotifyPublishTx
// for.
GetLastPublishedTx() (*wire.MsgTx, error)
// ListSweeps lists all the sweeps we have successfully published.
ListSweeps() ([]chainhash.Hash, error)
}
@ -69,13 +55,6 @@ func NewSweeperStore(db kvdb.Backend, chainHash *chainhash.Hash) (
SweeperStore, error) {
err := kvdb.Update(db, func(tx kvdb.RwTx) error {
_, err := tx.CreateTopLevelBucket(
lastTxBucketKey,
)
if err != nil {
return err
}
if tx.ReadWriteBucket(txHashesBucketKey) != nil {
return nil
}
@ -171,64 +150,18 @@ func migrateTxHashes(tx kvdb.RwTx, txHashesBucket kvdb.RwBucket,
// NotifyPublishTx signals that we are about to publish a tx.
func (s *sweeperStore) NotifyPublishTx(sweepTx *wire.MsgTx) error {
return kvdb.Update(s.db, func(tx kvdb.RwTx) error {
lastTxBucket := tx.ReadWriteBucket(lastTxBucketKey)
if lastTxBucket == nil {
return errors.New("last tx bucket does not exist")
}
txHashesBucket := tx.ReadWriteBucket(txHashesBucketKey)
if txHashesBucket == nil {
return errNoTxHashesBucket
}
var b bytes.Buffer
if err := sweepTx.Serialize(&b); err != nil {
return err
}
if err := lastTxBucket.Put(lastTxKey, b.Bytes()); err != nil {
return err
}
hash := sweepTx.TxHash()
return txHashesBucket.Put(hash[:], []byte{})
}, func() {})
}
// GetLastPublishedTx returns the last tx that we called NotifyPublishTx
// for.
func (s *sweeperStore) GetLastPublishedTx() (*wire.MsgTx, error) {
var sweepTx *wire.MsgTx
err := kvdb.View(s.db, func(tx kvdb.RTx) error {
lastTxBucket := tx.ReadBucket(lastTxBucketKey)
if lastTxBucket == nil {
return errors.New("last tx bucket does not exist")
}
sweepTxRaw := lastTxBucket.Get(lastTxKey)
if sweepTxRaw == nil {
return nil
}
sweepTx = &wire.MsgTx{}
txReader := bytes.NewReader(sweepTxRaw)
if err := sweepTx.Deserialize(txReader); err != nil {
return fmt.Errorf("tx deserialize: %v", err)
}
return nil
}, func() {
sweepTx = nil
})
if err != nil {
return nil, err
}
return sweepTx, nil
}
// IsOurTx determines whether a tx is published by us, based on its
// hash.
func (s *sweeperStore) IsOurTx(hash chainhash.Hash) (bool, error) {

View File

@ -8,7 +8,6 @@ import (
// MockSweeperStore is a mock implementation of sweeper store. This type is
// exported, because it is currently used in nursery tests too.
type MockSweeperStore struct {
lastTx *wire.MsgTx
ourTxes map[chainhash.Hash]struct{}
}
@ -30,17 +29,10 @@ func (s *MockSweeperStore) IsOurTx(hash chainhash.Hash) (bool, error) {
func (s *MockSweeperStore) NotifyPublishTx(tx *wire.MsgTx) error {
txHash := tx.TxHash()
s.ourTxes[txHash] = struct{}{}
s.lastTx = tx
return nil
}
// GetLastPublishedTx returns the last tx that we called NotifyPublishTx
// for.
func (s *MockSweeperStore) GetLastPublishedTx() (*wire.MsgTx, error) {
return s.lastTx, nil
}
// ListSweeps lists all the sweeps we have successfully published.
func (s *MockSweeperStore) ListSweeps() ([]chainhash.Hash, error) {
var txns []chainhash.Hash

View File

@ -42,15 +42,6 @@ func testStore(t *testing.T, createStore func() (SweeperStore, error)) {
t.Fatal(err)
}
// Initially we expect the store not to have a last published tx.
retrievedTx, err := store.GetLastPublishedTx()
if err != nil {
t.Fatal(err)
}
if retrievedTx != nil {
t.Fatal("expected no last published tx")
}
// Notify publication of tx1
tx1 := wire.MsgTx{}
tx1.AddTxIn(&wire.TxIn{
@ -83,16 +74,6 @@ func testStore(t *testing.T, createStore func() (SweeperStore, error)) {
t.Fatal(err)
}
// Assert that last published tx2 is present.
retrievedTx, err = store.GetLastPublishedTx()
if err != nil {
t.Fatal(err)
}
if tx2.TxHash() != retrievedTx.TxHash() {
t.Fatal("txes do not match")
}
// Assert that both txes are recognized as our own.
ours, err := store.IsOurTx(tx1.TxHash())
if err != nil {

View File

@ -340,30 +340,6 @@ func (s *UtxoSweeper) Start() error {
log.Info("Sweeper starting")
// Retrieve last published tx from database.
lastTx, err := s.cfg.Store.GetLastPublishedTx()
if err != nil {
return fmt.Errorf("get last published tx: %v", err)
}
// Republish in case the previous call crashed lnd. We don't care about
// the return value, because inputs will be re-offered and retried
// anyway. The only reason we republish here is to prevent the corner
// case where lnd goes into a restart loop because of a crashing publish
// tx where we keep deriving new output script. By publishing and
// possibly crashing already now, we haven't derived a new output script
// yet.
if lastTx != nil {
log.Debugf("Publishing last tx %v", lastTx.TxHash())
// Error can be ignored. Because we are starting up, there are
// no pending inputs to update based on the publish result.
err := s.cfg.Wallet.PublishTransaction(lastTx, "")
if err != nil && err != lnwallet.ErrDoubleSpend {
log.Errorf("last tx publish: %v", err)
}
}
// Retrieve relay fee for dust limit calculation. Assume that this will
// not change from here on.
s.relayFeeRate = s.cfg.FeeEstimator.RelayFeePerKW()

View File

@ -404,16 +404,6 @@ func TestSuccess(t *testing.T) {
}
ctx.finish(1)
// Assert that last tx is stored in the database so we can republish
// on restart.
lastTx, err := ctx.store.GetLastPublishedTx()
if err != nil {
t.Fatal(err)
}
if lastTx == nil || sweepTx.TxHash() != lastTx.TxHash() {
t.Fatalf("last tx not stored")
}
}
// TestDust asserts that inputs that are not big enough to raise above the dust
@ -780,9 +770,6 @@ func TestRestart(t *testing.T) {
// Restart sweeper.
ctx.restartSweeper()
// Expect last tx to be republished.
ctx.receiveTx()
// Simulate other subsystem (e.g. contract resolver) re-offering inputs.
spendChan1, err := ctx.sweeper.SweepInput(input1, defaultFeePref)
if err != nil {
@ -830,9 +817,6 @@ func TestRestart(t *testing.T) {
// Restart sweeper again. No action is expected.
ctx.restartSweeper()
// Expect last tx to be republished.
ctx.receiveTx()
ctx.finish(1)
}
@ -861,9 +845,6 @@ func TestRestartRemoteSpend(t *testing.T) {
// Restart sweeper.
ctx.restartSweeper()
// Expect last tx to be republished.
ctx.receiveTx()
// Replace the sweep tx with a remote tx spending input 1.
ctx.backend.deleteUnconfirmed(sweepTx.TxHash())
@ -918,9 +899,6 @@ func TestRestartConfirmed(t *testing.T) {
// Restart sweeper.
ctx.restartSweeper()
// Expect last tx to be republished.
ctx.receiveTx()
// Mine the sweep tx.
ctx.backend.mine()
@ -939,35 +917,6 @@ func TestRestartConfirmed(t *testing.T) {
ctx.finish(1)
}
// TestRestartRepublish asserts that sweeper republishes the last published
// tx on restart.
func TestRestartRepublish(t *testing.T) {
ctx := createSweeperTestContext(t)
_, err := ctx.sweeper.SweepInput(spendableInputs[0], defaultFeePref)
if err != nil {
t.Fatal(err)
}
ctx.tick()
sweepTx := ctx.receiveTx()
// Restart sweeper again. No action is expected.
ctx.restartSweeper()
republishedTx := ctx.receiveTx()
if sweepTx.TxHash() != republishedTx.TxHash() {
t.Fatalf("last tx not republished")
}
// Mine the tx to conclude the test properly.
ctx.backend.mine()
ctx.finish(1)
}
// TestRetry tests the sweeper retry flow.
func TestRetry(t *testing.T) {
ctx := createSweeperTestContext(t)