mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-01-18 21:35:24 +01:00
sweep: Remove publishing last-tx logic.
We remove the publishing of the last published sweep tx during the startup of the sweeper. This republishing can lead to situations where funds of the default wallet might be locked for neutrino backend clients. Moreover all related tests are removed as well.
This commit is contained in:
parent
c6a68d193c
commit
07502a8fb0
@ -4,7 +4,6 @@ import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
||||
"github.com/btcsuite/btcd/wire"
|
||||
@ -12,15 +11,6 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
// lastTxBucketKey is the key that points to a bucket containing a
|
||||
// single item storing the last published tx.
|
||||
//
|
||||
// maps: lastTxKey -> serialized_tx
|
||||
lastTxBucketKey = []byte("sweeper-last-tx")
|
||||
|
||||
// lastTxKey is the fixed key under which the serialized tx is stored.
|
||||
lastTxKey = []byte("last-tx")
|
||||
|
||||
// txHashesBucketKey is the key that points to a bucket containing the
|
||||
// hashes of all sweep txes that were published successfully.
|
||||
//
|
||||
@ -52,10 +42,6 @@ type SweeperStore interface {
|
||||
// NotifyPublishTx signals that we are about to publish a tx.
|
||||
NotifyPublishTx(*wire.MsgTx) error
|
||||
|
||||
// GetLastPublishedTx returns the last tx that we called NotifyPublishTx
|
||||
// for.
|
||||
GetLastPublishedTx() (*wire.MsgTx, error)
|
||||
|
||||
// ListSweeps lists all the sweeps we have successfully published.
|
||||
ListSweeps() ([]chainhash.Hash, error)
|
||||
}
|
||||
@ -69,13 +55,6 @@ func NewSweeperStore(db kvdb.Backend, chainHash *chainhash.Hash) (
|
||||
SweeperStore, error) {
|
||||
|
||||
err := kvdb.Update(db, func(tx kvdb.RwTx) error {
|
||||
_, err := tx.CreateTopLevelBucket(
|
||||
lastTxBucketKey,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if tx.ReadWriteBucket(txHashesBucketKey) != nil {
|
||||
return nil
|
||||
}
|
||||
@ -171,64 +150,18 @@ func migrateTxHashes(tx kvdb.RwTx, txHashesBucket kvdb.RwBucket,
|
||||
// NotifyPublishTx signals that we are about to publish a tx.
|
||||
func (s *sweeperStore) NotifyPublishTx(sweepTx *wire.MsgTx) error {
|
||||
return kvdb.Update(s.db, func(tx kvdb.RwTx) error {
|
||||
lastTxBucket := tx.ReadWriteBucket(lastTxBucketKey)
|
||||
if lastTxBucket == nil {
|
||||
return errors.New("last tx bucket does not exist")
|
||||
}
|
||||
|
||||
txHashesBucket := tx.ReadWriteBucket(txHashesBucketKey)
|
||||
if txHashesBucket == nil {
|
||||
return errNoTxHashesBucket
|
||||
}
|
||||
|
||||
var b bytes.Buffer
|
||||
if err := sweepTx.Serialize(&b); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := lastTxBucket.Put(lastTxKey, b.Bytes()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
hash := sweepTx.TxHash()
|
||||
|
||||
return txHashesBucket.Put(hash[:], []byte{})
|
||||
}, func() {})
|
||||
}
|
||||
|
||||
// GetLastPublishedTx returns the last tx that we called NotifyPublishTx
|
||||
// for.
|
||||
func (s *sweeperStore) GetLastPublishedTx() (*wire.MsgTx, error) {
|
||||
var sweepTx *wire.MsgTx
|
||||
|
||||
err := kvdb.View(s.db, func(tx kvdb.RTx) error {
|
||||
lastTxBucket := tx.ReadBucket(lastTxBucketKey)
|
||||
if lastTxBucket == nil {
|
||||
return errors.New("last tx bucket does not exist")
|
||||
}
|
||||
|
||||
sweepTxRaw := lastTxBucket.Get(lastTxKey)
|
||||
if sweepTxRaw == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
sweepTx = &wire.MsgTx{}
|
||||
txReader := bytes.NewReader(sweepTxRaw)
|
||||
if err := sweepTx.Deserialize(txReader); err != nil {
|
||||
return fmt.Errorf("tx deserialize: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}, func() {
|
||||
sweepTx = nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return sweepTx, nil
|
||||
}
|
||||
|
||||
// IsOurTx determines whether a tx is published by us, based on its
|
||||
// hash.
|
||||
func (s *sweeperStore) IsOurTx(hash chainhash.Hash) (bool, error) {
|
||||
|
@ -8,7 +8,6 @@ import (
|
||||
// MockSweeperStore is a mock implementation of sweeper store. This type is
|
||||
// exported, because it is currently used in nursery tests too.
|
||||
type MockSweeperStore struct {
|
||||
lastTx *wire.MsgTx
|
||||
ourTxes map[chainhash.Hash]struct{}
|
||||
}
|
||||
|
||||
@ -30,17 +29,10 @@ func (s *MockSweeperStore) IsOurTx(hash chainhash.Hash) (bool, error) {
|
||||
func (s *MockSweeperStore) NotifyPublishTx(tx *wire.MsgTx) error {
|
||||
txHash := tx.TxHash()
|
||||
s.ourTxes[txHash] = struct{}{}
|
||||
s.lastTx = tx
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetLastPublishedTx returns the last tx that we called NotifyPublishTx
|
||||
// for.
|
||||
func (s *MockSweeperStore) GetLastPublishedTx() (*wire.MsgTx, error) {
|
||||
return s.lastTx, nil
|
||||
}
|
||||
|
||||
// ListSweeps lists all the sweeps we have successfully published.
|
||||
func (s *MockSweeperStore) ListSweeps() ([]chainhash.Hash, error) {
|
||||
var txns []chainhash.Hash
|
||||
|
@ -42,15 +42,6 @@ func testStore(t *testing.T, createStore func() (SweeperStore, error)) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Initially we expect the store not to have a last published tx.
|
||||
retrievedTx, err := store.GetLastPublishedTx()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if retrievedTx != nil {
|
||||
t.Fatal("expected no last published tx")
|
||||
}
|
||||
|
||||
// Notify publication of tx1
|
||||
tx1 := wire.MsgTx{}
|
||||
tx1.AddTxIn(&wire.TxIn{
|
||||
@ -83,16 +74,6 @@ func testStore(t *testing.T, createStore func() (SweeperStore, error)) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Assert that last published tx2 is present.
|
||||
retrievedTx, err = store.GetLastPublishedTx()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if tx2.TxHash() != retrievedTx.TxHash() {
|
||||
t.Fatal("txes do not match")
|
||||
}
|
||||
|
||||
// Assert that both txes are recognized as our own.
|
||||
ours, err := store.IsOurTx(tx1.TxHash())
|
||||
if err != nil {
|
||||
|
@ -340,30 +340,6 @@ func (s *UtxoSweeper) Start() error {
|
||||
|
||||
log.Info("Sweeper starting")
|
||||
|
||||
// Retrieve last published tx from database.
|
||||
lastTx, err := s.cfg.Store.GetLastPublishedTx()
|
||||
if err != nil {
|
||||
return fmt.Errorf("get last published tx: %v", err)
|
||||
}
|
||||
|
||||
// Republish in case the previous call crashed lnd. We don't care about
|
||||
// the return value, because inputs will be re-offered and retried
|
||||
// anyway. The only reason we republish here is to prevent the corner
|
||||
// case where lnd goes into a restart loop because of a crashing publish
|
||||
// tx where we keep deriving new output script. By publishing and
|
||||
// possibly crashing already now, we haven't derived a new output script
|
||||
// yet.
|
||||
if lastTx != nil {
|
||||
log.Debugf("Publishing last tx %v", lastTx.TxHash())
|
||||
|
||||
// Error can be ignored. Because we are starting up, there are
|
||||
// no pending inputs to update based on the publish result.
|
||||
err := s.cfg.Wallet.PublishTransaction(lastTx, "")
|
||||
if err != nil && err != lnwallet.ErrDoubleSpend {
|
||||
log.Errorf("last tx publish: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Retrieve relay fee for dust limit calculation. Assume that this will
|
||||
// not change from here on.
|
||||
s.relayFeeRate = s.cfg.FeeEstimator.RelayFeePerKW()
|
||||
|
@ -404,16 +404,6 @@ func TestSuccess(t *testing.T) {
|
||||
}
|
||||
|
||||
ctx.finish(1)
|
||||
|
||||
// Assert that last tx is stored in the database so we can republish
|
||||
// on restart.
|
||||
lastTx, err := ctx.store.GetLastPublishedTx()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if lastTx == nil || sweepTx.TxHash() != lastTx.TxHash() {
|
||||
t.Fatalf("last tx not stored")
|
||||
}
|
||||
}
|
||||
|
||||
// TestDust asserts that inputs that are not big enough to raise above the dust
|
||||
@ -780,9 +770,6 @@ func TestRestart(t *testing.T) {
|
||||
// Restart sweeper.
|
||||
ctx.restartSweeper()
|
||||
|
||||
// Expect last tx to be republished.
|
||||
ctx.receiveTx()
|
||||
|
||||
// Simulate other subsystem (e.g. contract resolver) re-offering inputs.
|
||||
spendChan1, err := ctx.sweeper.SweepInput(input1, defaultFeePref)
|
||||
if err != nil {
|
||||
@ -830,9 +817,6 @@ func TestRestart(t *testing.T) {
|
||||
// Restart sweeper again. No action is expected.
|
||||
ctx.restartSweeper()
|
||||
|
||||
// Expect last tx to be republished.
|
||||
ctx.receiveTx()
|
||||
|
||||
ctx.finish(1)
|
||||
}
|
||||
|
||||
@ -861,9 +845,6 @@ func TestRestartRemoteSpend(t *testing.T) {
|
||||
// Restart sweeper.
|
||||
ctx.restartSweeper()
|
||||
|
||||
// Expect last tx to be republished.
|
||||
ctx.receiveTx()
|
||||
|
||||
// Replace the sweep tx with a remote tx spending input 1.
|
||||
ctx.backend.deleteUnconfirmed(sweepTx.TxHash())
|
||||
|
||||
@ -918,9 +899,6 @@ func TestRestartConfirmed(t *testing.T) {
|
||||
// Restart sweeper.
|
||||
ctx.restartSweeper()
|
||||
|
||||
// Expect last tx to be republished.
|
||||
ctx.receiveTx()
|
||||
|
||||
// Mine the sweep tx.
|
||||
ctx.backend.mine()
|
||||
|
||||
@ -939,35 +917,6 @@ func TestRestartConfirmed(t *testing.T) {
|
||||
ctx.finish(1)
|
||||
}
|
||||
|
||||
// TestRestartRepublish asserts that sweeper republishes the last published
|
||||
// tx on restart.
|
||||
func TestRestartRepublish(t *testing.T) {
|
||||
ctx := createSweeperTestContext(t)
|
||||
|
||||
_, err := ctx.sweeper.SweepInput(spendableInputs[0], defaultFeePref)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
ctx.tick()
|
||||
|
||||
sweepTx := ctx.receiveTx()
|
||||
|
||||
// Restart sweeper again. No action is expected.
|
||||
ctx.restartSweeper()
|
||||
|
||||
republishedTx := ctx.receiveTx()
|
||||
|
||||
if sweepTx.TxHash() != republishedTx.TxHash() {
|
||||
t.Fatalf("last tx not republished")
|
||||
}
|
||||
|
||||
// Mine the tx to conclude the test properly.
|
||||
ctx.backend.mine()
|
||||
|
||||
ctx.finish(1)
|
||||
}
|
||||
|
||||
// TestRetry tests the sweeper retry flow.
|
||||
func TestRetry(t *testing.T) {
|
||||
ctx := createSweeperTestContext(t)
|
||||
|
Loading…
Reference in New Issue
Block a user