mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-02-22 22:25:24 +01:00
Merge pull request #3016 from halseth/republish-close-tx-on-startup
[contractcourt] Republish close tx on startup
This commit is contained in:
commit
0c076bf82a
15 changed files with 553 additions and 277 deletions
|
@ -435,6 +435,14 @@ func (c *channelCloser) ProcessCloseMsg(msg lnwire.Message) ([]lnwire.Message, b
|
|||
"close: %v", c.chanPoint, err)
|
||||
}
|
||||
|
||||
// Before publishing the closing tx, we persist it to the
|
||||
// database, such that it can be republished if something goes
|
||||
// wrong.
|
||||
err = c.cfg.channel.MarkCommitmentBroadcasted(closeTx)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
// With the closing transaction crafted, we'll now broadcast it
|
||||
// to the network.
|
||||
peerLog.Infof("Broadcasting cooperative close tx: %v",
|
||||
|
@ -444,9 +452,6 @@ func (c *channelCloser) ProcessCloseMsg(msg lnwire.Message) ([]lnwire.Message, b
|
|||
if err := c.cfg.broadcastTx(closeTx); err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
if err := c.cfg.channel.MarkCommitmentBroadcasted(); err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
// Finally, we'll transition to the closeFinished state, and
|
||||
// also return the final close signed message we sent.
|
||||
|
|
|
@ -16,6 +16,7 @@ import (
|
|||
"github.com/btcsuite/btcd/wire"
|
||||
"github.com/btcsuite/btcutil"
|
||||
"github.com/coreos/bbolt"
|
||||
"github.com/lightningnetwork/lnd/input"
|
||||
"github.com/lightningnetwork/lnd/keychain"
|
||||
"github.com/lightningnetwork/lnd/lnwire"
|
||||
"github.com/lightningnetwork/lnd/shachain"
|
||||
|
@ -58,6 +59,10 @@ var (
|
|||
// remote peer during a channel sync in case we have lost channel state.
|
||||
dataLossCommitPointKey = []byte("data-loss-commit-point-key")
|
||||
|
||||
// closingTxKey points to a the closing tx that we broadcasted when
|
||||
// moving the channel to state CommitBroadcasted.
|
||||
closingTxKey = []byte("closing-tx-key")
|
||||
|
||||
// commitDiffKey stores the current pending commitment state we've
|
||||
// extended to the remote party (if any). Each time we propose a new
|
||||
// state, we store the information necessary to reconstruct this state
|
||||
|
@ -103,6 +108,10 @@ var (
|
|||
// in the database.
|
||||
ErrNoCommitPoint = fmt.Errorf("no commit point found")
|
||||
|
||||
// ErrNoCloseTx is returned when no closing tx is found for a channel
|
||||
// in the state CommitBroadcasted.
|
||||
ErrNoCloseTx = fmt.Errorf("no closing tx found")
|
||||
|
||||
// ErrNoRestoredChannelMutation is returned when a caller attempts to
|
||||
// mutate a channel that's been recovered.
|
||||
ErrNoRestoredChannelMutation = fmt.Errorf("cannot mutate restored " +
|
||||
|
@ -514,16 +523,6 @@ type OpenChannel struct {
|
|||
sync.RWMutex
|
||||
}
|
||||
|
||||
// FullSync serializes, and writes to disk the *full* channel state, using
|
||||
// both the active channel bucket to store the prefixed column fields, and the
|
||||
// remote node's ID to store the remainder of the channel state.
|
||||
func (c *OpenChannel) FullSync() error {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
return c.Db.Update(c.fullSync)
|
||||
}
|
||||
|
||||
// ShortChanID returns the current ShortChannelID of this channel.
|
||||
func (c *OpenChannel) ShortChanID() lnwire.ShortChannelID {
|
||||
c.RLock()
|
||||
|
@ -648,9 +647,8 @@ func fetchChanBucket(tx *bbolt.Tx, nodeKey *btcec.PublicKey,
|
|||
return chanBucket, nil
|
||||
}
|
||||
|
||||
// fullSync is an internal version of the FullSync method which allows callers
|
||||
// to sync the contents of an OpenChannel while re-using an existing database
|
||||
// transaction.
|
||||
// fullSync syncs the contents of an OpenChannel while re-using an existing
|
||||
// database transaction.
|
||||
func (c *OpenChannel) fullSync(tx *bbolt.Tx) error {
|
||||
// First fetch the top level bucket which stores all data related to
|
||||
// current, active channels.
|
||||
|
@ -736,44 +734,16 @@ func (c *OpenChannel) MarkDataLoss(commitPoint *btcec.PublicKey) error {
|
|||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
var status ChannelStatus
|
||||
if err := c.Db.Update(func(tx *bbolt.Tx) error {
|
||||
chanBucket, err := fetchChanBucket(
|
||||
tx, c.IdentityPub, &c.FundingOutpoint, c.ChainHash,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
channel, err := fetchOpenChannel(chanBucket, &c.FundingOutpoint)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Add status LocalDataLoss to the existing bitvector found in
|
||||
// the DB.
|
||||
status = channel.chanStatus | ChanStatusLocalDataLoss
|
||||
channel.chanStatus = status
|
||||
|
||||
var b bytes.Buffer
|
||||
if err := WriteElement(&b, commitPoint); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = chanBucket.Put(dataLossCommitPointKey, b.Bytes())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return putOpenChannel(chanBucket, channel)
|
||||
}); err != nil {
|
||||
var b bytes.Buffer
|
||||
if err := WriteElement(&b, commitPoint); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Update the in-memory representation to keep it in sync with the DB.
|
||||
c.chanStatus = status
|
||||
putCommitPoint := func(chanBucket *bbolt.Bucket) error {
|
||||
return chanBucket.Put(dataLossCommitPointKey, b.Bytes())
|
||||
}
|
||||
|
||||
return nil
|
||||
return c.putChanStatus(ChanStatusLocalDataLoss, putCommitPoint)
|
||||
}
|
||||
|
||||
// DataLossCommitPoint retrieves the stored commit point set during
|
||||
|
@ -821,6 +791,82 @@ func (c *OpenChannel) MarkBorked() error {
|
|||
return c.putChanStatus(ChanStatusBorked)
|
||||
}
|
||||
|
||||
// ChanSyncMsg returns the ChannelReestablish message that should be sent upon
|
||||
// reconnection with the remote peer that we're maintaining this channel with.
|
||||
// The information contained within this message is necessary to re-sync our
|
||||
// commitment chains in the case of a last or only partially processed message.
|
||||
// When the remote party receiver this message one of three things may happen:
|
||||
//
|
||||
// 1. We're fully synced and no messages need to be sent.
|
||||
// 2. We didn't get the last CommitSig message they sent, to they'll re-send
|
||||
// it.
|
||||
// 3. We didn't get the last RevokeAndAck message they sent, so they'll
|
||||
// re-send it.
|
||||
//
|
||||
// If this is a restored channel, having status ChanStatusRestored, then we'll
|
||||
// modify our typical chan sync message to ensure they force close even if
|
||||
// we're on the very first state.
|
||||
func (c *OpenChannel) ChanSyncMsg() (*lnwire.ChannelReestablish, error) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
// The remote commitment height that we'll send in the
|
||||
// ChannelReestablish message is our current commitment height plus
|
||||
// one. If the receiver thinks that our commitment height is actually
|
||||
// *equal* to this value, then they'll re-send the last commitment that
|
||||
// they sent but we never fully processed.
|
||||
localHeight := c.LocalCommitment.CommitHeight
|
||||
nextLocalCommitHeight := localHeight + 1
|
||||
|
||||
// The second value we'll send is the height of the remote commitment
|
||||
// from our PoV. If the receiver thinks that their height is actually
|
||||
// *one plus* this value, then they'll re-send their last revocation.
|
||||
remoteChainTipHeight := c.RemoteCommitment.CommitHeight
|
||||
|
||||
// If this channel has undergone a commitment update, then in order to
|
||||
// prove to the remote party our knowledge of their prior commitment
|
||||
// state, we'll also send over the last commitment secret that the
|
||||
// remote party sent.
|
||||
var lastCommitSecret [32]byte
|
||||
if remoteChainTipHeight != 0 {
|
||||
remoteSecret, err := c.RevocationStore.LookUp(
|
||||
remoteChainTipHeight - 1,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
lastCommitSecret = [32]byte(*remoteSecret)
|
||||
}
|
||||
|
||||
// Additionally, we'll send over the current unrevoked commitment on
|
||||
// our local commitment transaction.
|
||||
currentCommitSecret, err := c.RevocationProducer.AtIndex(
|
||||
localHeight,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// If we've restored this channel, then we'll purposefully give them an
|
||||
// invalid LocalUnrevokedCommitPoint so they'll force close the channel
|
||||
// allowing us to sweep our funds.
|
||||
if c.hasChanStatus(ChanStatusRestored) {
|
||||
currentCommitSecret[0] ^= 1
|
||||
}
|
||||
|
||||
return &lnwire.ChannelReestablish{
|
||||
ChanID: lnwire.NewChanIDFromOutPoint(
|
||||
&c.FundingOutpoint,
|
||||
),
|
||||
NextLocalCommitHeight: nextLocalCommitHeight,
|
||||
RemoteCommitTailHeight: remoteChainTipHeight,
|
||||
LastRemoteCommitSecret: lastCommitSecret,
|
||||
LocalUnrevokedCommitPoint: input.ComputeCommitmentPoint(
|
||||
currentCommitSecret[:],
|
||||
),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// isBorked returns true if the channel has been marked as borked in the
|
||||
// database. This requires an existing database transaction to already be
|
||||
// active.
|
||||
|
@ -837,15 +883,63 @@ func (c *OpenChannel) isBorked(chanBucket *bbolt.Bucket) (bool, error) {
|
|||
|
||||
// MarkCommitmentBroadcasted marks the channel as a commitment transaction has
|
||||
// been broadcast, either our own or the remote, and we should watch the chain
|
||||
// for it to confirm before taking any further action.
|
||||
func (c *OpenChannel) MarkCommitmentBroadcasted() error {
|
||||
// for it to confirm before taking any further action. It takes as argument the
|
||||
// closing tx _we believe_ will appear in the chain. This is only used to
|
||||
// republish this tx at startup to ensure propagation, and we should still
|
||||
// handle the case where a different tx actually hits the chain.
|
||||
func (c *OpenChannel) MarkCommitmentBroadcasted(closeTx *wire.MsgTx) error {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
return c.putChanStatus(ChanStatusCommitBroadcasted)
|
||||
var b bytes.Buffer
|
||||
if err := WriteElement(&b, closeTx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
putClosingTx := func(chanBucket *bbolt.Bucket) error {
|
||||
return chanBucket.Put(closingTxKey, b.Bytes())
|
||||
}
|
||||
|
||||
return c.putChanStatus(ChanStatusCommitBroadcasted, putClosingTx)
|
||||
}
|
||||
|
||||
func (c *OpenChannel) putChanStatus(status ChannelStatus) error {
|
||||
// BroadcastedCommitment retrieves the stored closing tx set during
|
||||
// MarkCommitmentBroadcasted. If not found ErrNoCloseTx is returned.
|
||||
func (c *OpenChannel) BroadcastedCommitment() (*wire.MsgTx, error) {
|
||||
var closeTx *wire.MsgTx
|
||||
|
||||
err := c.Db.View(func(tx *bbolt.Tx) error {
|
||||
chanBucket, err := fetchChanBucket(
|
||||
tx, c.IdentityPub, &c.FundingOutpoint, c.ChainHash,
|
||||
)
|
||||
switch err {
|
||||
case nil:
|
||||
case ErrNoChanDBExists, ErrNoActiveChannels, ErrChannelNotFound:
|
||||
return ErrNoCloseTx
|
||||
default:
|
||||
return err
|
||||
}
|
||||
|
||||
bs := chanBucket.Get(closingTxKey)
|
||||
if bs == nil {
|
||||
return ErrNoCloseTx
|
||||
}
|
||||
r := bytes.NewReader(bs)
|
||||
return ReadElement(r, &closeTx)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return closeTx, nil
|
||||
}
|
||||
|
||||
// putChanStatus appends the given status to the channel. fs is an optional
|
||||
// list of closures that are given the chanBucket in order to atomically add
|
||||
// extra information together with the new status.
|
||||
func (c *OpenChannel) putChanStatus(status ChannelStatus,
|
||||
fs ...func(*bbolt.Bucket) error) error {
|
||||
|
||||
if err := c.Db.Update(func(tx *bbolt.Tx) error {
|
||||
chanBucket, err := fetchChanBucket(
|
||||
tx, c.IdentityPub, &c.FundingOutpoint, c.ChainHash,
|
||||
|
@ -863,7 +957,17 @@ func (c *OpenChannel) putChanStatus(status ChannelStatus) error {
|
|||
status = channel.chanStatus | status
|
||||
channel.chanStatus = status
|
||||
|
||||
return putOpenChannel(chanBucket, channel)
|
||||
if err := putOpenChannel(chanBucket, channel); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, f := range fs {
|
||||
if err := f(chanBucket); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -263,7 +263,12 @@ func TestOpenChannelPutGetDelete(t *testing.T) {
|
|||
OnionBlob: []byte("onionblob"),
|
||||
},
|
||||
}
|
||||
if err := state.FullSync(); err != nil {
|
||||
|
||||
addr := &net.TCPAddr{
|
||||
IP: net.ParseIP("127.0.0.1"),
|
||||
Port: 18556,
|
||||
}
|
||||
if err := state.SyncPending(addr, 101); err != nil {
|
||||
t.Fatalf("unable to save and serialize channel state: %v", err)
|
||||
}
|
||||
|
||||
|
@ -363,7 +368,12 @@ func TestChannelStateTransition(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("unable to create channel state: %v", err)
|
||||
}
|
||||
if err := channel.FullSync(); err != nil {
|
||||
|
||||
addr := &net.TCPAddr{
|
||||
IP: net.ParseIP("127.0.0.1"),
|
||||
Port: 18556,
|
||||
}
|
||||
if err := channel.SyncPending(addr, 101); err != nil {
|
||||
t.Fatalf("unable to save and serialize channel state: %v", err)
|
||||
}
|
||||
|
||||
|
@ -881,7 +891,13 @@ func TestFetchWaitingCloseChannels(t *testing.T) {
|
|||
// This would happen in the event of a force close and should make the
|
||||
// channels enter a state of waiting close.
|
||||
for _, channel := range channels {
|
||||
if err := channel.MarkCommitmentBroadcasted(); err != nil {
|
||||
closeTx := wire.NewMsgTx(2)
|
||||
closeTx.AddTxIn(
|
||||
&wire.TxIn{
|
||||
PreviousOutPoint: channel.FundingOutpoint,
|
||||
},
|
||||
)
|
||||
if err := channel.MarkCommitmentBroadcasted(closeTx); err != nil {
|
||||
t.Fatalf("unable to mark commitment broadcast: %v", err)
|
||||
}
|
||||
}
|
||||
|
@ -906,6 +922,19 @@ func TestFetchWaitingCloseChannels(t *testing.T) {
|
|||
t.Fatalf("expected channel %v to be waiting close",
|
||||
channel.FundingOutpoint)
|
||||
}
|
||||
|
||||
// Finally, make sure we can retrieve the closing tx for the
|
||||
// channel.
|
||||
closeTx, err := channel.BroadcastedCommitment()
|
||||
if err != nil {
|
||||
t.Fatalf("Unable to retrieve commitment: %v", err)
|
||||
}
|
||||
|
||||
if closeTx.TxIn[0].PreviousOutPoint != channel.FundingOutpoint {
|
||||
t.Fatalf("expected outpoint %v, got %v",
|
||||
channel.FundingOutpoint,
|
||||
closeTx.TxIn[0].PreviousOutPoint)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -110,7 +110,12 @@ func TestFetchClosedChannelForID(t *testing.T) {
|
|||
for i := uint32(0); i < numChans; i++ {
|
||||
// Save the open channel to disk.
|
||||
state.FundingOutpoint.Index = i
|
||||
if err := state.FullSync(); err != nil {
|
||||
|
||||
addr := &net.TCPAddr{
|
||||
IP: net.ParseIP("127.0.0.1"),
|
||||
Port: 18556,
|
||||
}
|
||||
if err := state.SyncPending(addr, 101); err != nil {
|
||||
t.Fatalf("unable to save and serialize channel "+
|
||||
"state: %v", err)
|
||||
}
|
||||
|
|
|
@ -412,6 +412,36 @@ func (c *ChainArbitrator) Start() error {
|
|||
}
|
||||
|
||||
c.activeChannels[chanPoint] = channelArb
|
||||
|
||||
// If the channel has had its commitment broadcasted already,
|
||||
// republish it in case it didn't propagate.
|
||||
if !channel.HasChanStatus(
|
||||
channeldb.ChanStatusCommitBroadcasted,
|
||||
) {
|
||||
continue
|
||||
}
|
||||
|
||||
closeTx, err := channel.BroadcastedCommitment()
|
||||
switch {
|
||||
|
||||
// This can happen for channels that had their closing tx
|
||||
// published before we started storing it to disk.
|
||||
case err == channeldb.ErrNoCloseTx:
|
||||
log.Warnf("Channel %v is in state CommitBroadcasted, "+
|
||||
"but no closing tx to re-publish...", chanPoint)
|
||||
continue
|
||||
|
||||
case err != nil:
|
||||
return err
|
||||
}
|
||||
|
||||
log.Infof("Re-publishing closing tx(%v) for channel %v",
|
||||
closeTx.TxHash(), chanPoint)
|
||||
err = c.cfg.PublishTx(closeTx)
|
||||
if err != nil && err != lnwallet.ErrDoubleSpend {
|
||||
log.Warnf("Unable to broadcast close tx(%v): %v",
|
||||
closeTx.TxHash(), err)
|
||||
}
|
||||
}
|
||||
|
||||
// In addition to the channels that we know to be open, we'll also
|
||||
|
|
|
@ -1 +1,117 @@
|
|||
package contractcourt
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
||||
"github.com/btcsuite/btcd/wire"
|
||||
"github.com/lightningnetwork/lnd/channeldb"
|
||||
"github.com/lightningnetwork/lnd/lnwallet"
|
||||
)
|
||||
|
||||
// TestChainArbitratorRepulishCommitment testst that the chain arbitrator will
|
||||
// republish closing transactions for channels marked CommitementBroadcast in
|
||||
// the database at startup.
|
||||
func TestChainArbitratorRepublishCommitment(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
tempPath, err := ioutil.TempDir("", "testdb")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer os.RemoveAll(tempPath)
|
||||
|
||||
db, err := channeldb.Open(tempPath)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
// Create 10 test channels and sync them to the database.
|
||||
const numChans = 10
|
||||
var channels []*channeldb.OpenChannel
|
||||
for i := 0; i < numChans; i++ {
|
||||
lChannel, _, cleanup, err := lnwallet.CreateTestChannels()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer cleanup()
|
||||
|
||||
channel := lChannel.State()
|
||||
|
||||
// We manually set the db here to make sure all channels are
|
||||
// synced to the same db.
|
||||
channel.Db = db
|
||||
|
||||
addr := &net.TCPAddr{
|
||||
IP: net.ParseIP("127.0.0.1"),
|
||||
Port: 18556,
|
||||
}
|
||||
if err := channel.SyncPending(addr, 101); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
channels = append(channels, channel)
|
||||
}
|
||||
|
||||
// Mark half of the channels as commitment broadcasted.
|
||||
for i := 0; i < numChans/2; i++ {
|
||||
closeTx := channels[i].FundingTxn.Copy()
|
||||
closeTx.TxIn[0].PreviousOutPoint = channels[i].FundingOutpoint
|
||||
err := channels[i].MarkCommitmentBroadcasted(closeTx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// We keep track of the transactions published by the ChainArbitrator
|
||||
// at startup.
|
||||
published := make(map[chainhash.Hash]struct{})
|
||||
|
||||
chainArbCfg := ChainArbitratorConfig{
|
||||
ChainIO: &mockChainIO{},
|
||||
Notifier: &mockNotifier{},
|
||||
PublishTx: func(tx *wire.MsgTx) error {
|
||||
published[tx.TxHash()] = struct{}{}
|
||||
return nil
|
||||
},
|
||||
}
|
||||
chainArb := NewChainArbitrator(
|
||||
chainArbCfg, db,
|
||||
)
|
||||
|
||||
if err := chainArb.Start(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer func() {
|
||||
if err := chainArb.Stop(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Half of the channels should have had their closing tx re-published.
|
||||
if len(published) != numChans/2 {
|
||||
t.Fatalf("expected %d re-published transactions, got %d",
|
||||
numChans/2, len(published))
|
||||
}
|
||||
|
||||
// And make sure the published transactions are correct, and unique.
|
||||
for i := 0; i < numChans/2; i++ {
|
||||
closeTx := channels[i].FundingTxn.Copy()
|
||||
closeTx.TxIn[0].PreviousOutPoint = channels[i].FundingOutpoint
|
||||
|
||||
_, ok := published[closeTx.TxHash()]
|
||||
if !ok {
|
||||
t.Fatalf("closing tx not re-published")
|
||||
}
|
||||
|
||||
delete(published, closeTx.TxHash())
|
||||
}
|
||||
|
||||
if len(published) != 0 {
|
||||
t.Fatalf("unexpected tx published")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -731,10 +731,7 @@ func (c *chainWatcher) dispatchCooperativeClose(commitSpend *chainntnfs.SpendDet
|
|||
}
|
||||
|
||||
// Attempt to add a channel sync message to the close summary.
|
||||
chanSync, err := lnwallet.ChanSyncMsg(
|
||||
c.cfg.chanState,
|
||||
c.cfg.chanState.HasChanStatus(channeldb.ChanStatusRestored),
|
||||
)
|
||||
chanSync, err := c.cfg.chanState.ChanSyncMsg()
|
||||
if err != nil {
|
||||
log.Errorf("ChannelPoint(%v): unable to create channel sync "+
|
||||
"message: %v", c.cfg.chanState.FundingOutpoint, err)
|
||||
|
@ -811,10 +808,7 @@ func (c *chainWatcher) dispatchLocalForceClose(
|
|||
}
|
||||
|
||||
// Attempt to add a channel sync message to the close summary.
|
||||
chanSync, err := lnwallet.ChanSyncMsg(
|
||||
c.cfg.chanState,
|
||||
c.cfg.chanState.HasChanStatus(channeldb.ChanStatusRestored),
|
||||
)
|
||||
chanSync, err := c.cfg.chanState.ChanSyncMsg()
|
||||
if err != nil {
|
||||
log.Errorf("ChannelPoint(%v): unable to create channel sync "+
|
||||
"message: %v", c.cfg.chanState.FundingOutpoint, err)
|
||||
|
@ -998,10 +992,7 @@ func (c *chainWatcher) dispatchContractBreach(spendEvent *chainntnfs.SpendDetail
|
|||
}
|
||||
|
||||
// Attempt to add a channel sync message to the close summary.
|
||||
chanSync, err := lnwallet.ChanSyncMsg(
|
||||
c.cfg.chanState,
|
||||
c.cfg.chanState.HasChanStatus(channeldb.ChanStatusRestored),
|
||||
)
|
||||
chanSync, err := c.cfg.chanState.ChanSyncMsg()
|
||||
if err != nil {
|
||||
log.Errorf("ChannelPoint(%v): unable to create channel sync "+
|
||||
"message: %v", c.cfg.chanState.FundingOutpoint, err)
|
||||
|
|
|
@ -97,7 +97,7 @@ type ChannelArbitratorConfig struct {
|
|||
|
||||
// MarkCommitmentBroadcasted should mark the channel as the commitment
|
||||
// being broadcast, and we are waiting for the commitment to confirm.
|
||||
MarkCommitmentBroadcasted func() error
|
||||
MarkCommitmentBroadcasted func(*wire.MsgTx) error
|
||||
|
||||
// MarkChannelClosed marks the channel closed in the database, with the
|
||||
// passed close summary. After this method successfully returns we can
|
||||
|
@ -821,6 +821,16 @@ func (c *ChannelArbitrator) stateStep(
|
|||
}
|
||||
closeTx = closeSummary.CloseTx
|
||||
|
||||
// Before publishing the transaction, we store it to the
|
||||
// database, such that we can re-publish later in case it
|
||||
// didn't propagate.
|
||||
if err := c.cfg.MarkCommitmentBroadcasted(closeTx); err != nil {
|
||||
log.Errorf("ChannelArbitrator(%v): unable to "+
|
||||
"mark commitment broadcasted: %v",
|
||||
c.cfg.ChanPoint, err)
|
||||
return StateError, closeTx, err
|
||||
}
|
||||
|
||||
// With the close transaction in hand, broadcast the
|
||||
// transaction to the network, thereby entering the post
|
||||
// channel resolution state.
|
||||
|
@ -840,12 +850,6 @@ func (c *ChannelArbitrator) stateStep(
|
|||
}
|
||||
}
|
||||
|
||||
if err := c.cfg.MarkCommitmentBroadcasted(); err != nil {
|
||||
log.Errorf("ChannelArbitrator(%v): unable to "+
|
||||
"mark commitment broadcasted: %v",
|
||||
c.cfg.ChanPoint, err)
|
||||
}
|
||||
|
||||
// We go to the StateCommitmentBroadcasted state, where we'll
|
||||
// be waiting for the commitment to be confirmed.
|
||||
nextState = StateCommitmentBroadcasted
|
||||
|
|
|
@ -213,7 +213,7 @@ func createTestChannelArbitrator(log ArbitratorLog) (*ChannelArbitrator,
|
|||
}
|
||||
return summary, nil
|
||||
},
|
||||
MarkCommitmentBroadcasted: func() error {
|
||||
MarkCommitmentBroadcasted: func(_ *wire.MsgTx) error {
|
||||
return nil
|
||||
},
|
||||
MarkChannelClosed: func(*channeldb.ChannelCloseSummary) error {
|
||||
|
|
|
@ -609,30 +609,12 @@ func (l *channelLink) syncChanStates() error {
|
|||
// side. Based on this message, the remote party will decide if they
|
||||
// need to retransmit any data or not.
|
||||
chanState := l.channel.State()
|
||||
localChanSyncMsg, err := lnwallet.ChanSyncMsg(
|
||||
chanState,
|
||||
chanState.HasChanStatus(channeldb.ChanStatusRestored),
|
||||
)
|
||||
localChanSyncMsg, err := chanState.ChanSyncMsg()
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to generate chan sync message for "+
|
||||
"ChannelPoint(%v)", l.channel.ChannelPoint())
|
||||
}
|
||||
|
||||
// If we have a restored channel, we'll delay sending our channel
|
||||
// reestablish message briefly to ensure we first have a stable
|
||||
// connection. Sending the message will cause the remote peer to force
|
||||
// close the channel, which currently may not be resumed reliably if the
|
||||
// connection is being torn down simultaneously. This delay can be
|
||||
// removed after the force close is reliable, but in the meantime it
|
||||
// improves the reliability of successfully closing out the channel.
|
||||
if chanState.HasChanStatus(channeldb.ChanStatusRestored) {
|
||||
select {
|
||||
case <-time.After(5 * time.Second):
|
||||
case <-l.quit:
|
||||
return ErrLinkShuttingDown
|
||||
}
|
||||
}
|
||||
|
||||
if err := l.cfg.Peer.SendMessage(true, localChanSyncMsg); err != nil {
|
||||
return fmt.Errorf("Unable to send chan sync message for "+
|
||||
"ChannelPoint(%v)", l.channel.ChannelPoint())
|
||||
|
@ -896,6 +878,11 @@ func (l *channelLink) htlcManager() {
|
|||
if l.cfg.SyncStates {
|
||||
err := l.syncChanStates()
|
||||
if err != nil {
|
||||
log.Warnf("Error when syncing channel states: %v", err)
|
||||
|
||||
errDataLoss, localDataLoss :=
|
||||
err.(*lnwallet.ErrCommitSyncLocalDataLoss)
|
||||
|
||||
switch {
|
||||
case err == ErrLinkShuttingDown:
|
||||
log.Debugf("unable to sync channel states, " +
|
||||
|
@ -918,6 +905,12 @@ func (l *channelLink) htlcManager() {
|
|||
// what they sent us before.
|
||||
// TODO(halseth): ban peer?
|
||||
case err == lnwallet.ErrInvalidLocalUnrevokedCommitPoint:
|
||||
// We'll fail the link and tell the peer to
|
||||
// force close the channel. Note that the
|
||||
// database state is not updated here, but will
|
||||
// be updated when the close transaction is
|
||||
// ready to avoid that we go down before
|
||||
// storing the transaction in the db.
|
||||
l.fail(
|
||||
LinkFailureError{
|
||||
code: ErrSyncError,
|
||||
|
@ -931,13 +924,18 @@ func (l *channelLink) htlcManager() {
|
|||
// We have lost state and cannot safely force close the
|
||||
// channel. Fail the channel and wait for the remote to
|
||||
// hopefully force close it. The remote has sent us its
|
||||
// latest unrevoked commitment point, that we stored in
|
||||
// the database, that we can use to retrieve the funds
|
||||
// when the remote closes the channel.
|
||||
// TODO(halseth): mark this, such that we prevent
|
||||
// channel from being force closed by the user or
|
||||
// contractcourt etc.
|
||||
case err == lnwallet.ErrCommitSyncLocalDataLoss:
|
||||
// latest unrevoked commitment point, and we'll store
|
||||
// it in the database, such that we can attempt to
|
||||
// recover the funds if the remote force closes the
|
||||
// channel.
|
||||
case localDataLoss:
|
||||
err := l.channel.MarkDataLoss(
|
||||
errDataLoss.CommitPoint,
|
||||
)
|
||||
if err != nil {
|
||||
log.Errorf("Unable to mark channel "+
|
||||
"data loss: %v", err)
|
||||
}
|
||||
|
||||
// We determined the commit chains were not possible to
|
||||
// sync. We cautiously fail the channel, but don't
|
||||
|
@ -945,6 +943,10 @@ func (l *channelLink) htlcManager() {
|
|||
// TODO(halseth): can we safely force close in any
|
||||
// cases where this error is returned?
|
||||
case err == lnwallet.ErrCannotSyncCommitChains:
|
||||
if err := l.channel.MarkBorked(); err != nil {
|
||||
log.Errorf("Unable to mark channel "+
|
||||
"borked: %v", err)
|
||||
}
|
||||
|
||||
// Other, unspecified error.
|
||||
default:
|
||||
|
|
|
@ -8121,7 +8121,9 @@ func assertDLPExecuted(net *lntest.NetworkHarness, t *harnessTest,
|
|||
assertTxInBlock(t, block, forceClose)
|
||||
|
||||
// Dave should sweep his funds immediately, as they are not timelocked.
|
||||
daveSweep, err := waitForTxInMempool(net.Miner.Node, minerMempoolTimeout)
|
||||
daveSweep, err := waitForTxInMempool(
|
||||
net.Miner.Node, minerMempoolTimeout,
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to find Dave's sweep tx in mempool: %v", err)
|
||||
}
|
||||
|
@ -13559,6 +13561,27 @@ func testChanRestoreScenario(t *harnessTest, net *lntest.NetworkHarness,
|
|||
t.Fatalf("unable to restore node: %v", err)
|
||||
}
|
||||
|
||||
// First ensure that the on-chain balance is restored.
|
||||
err = wait.NoError(func() error {
|
||||
ctxt, _ := context.WithTimeout(ctxb, defaultTimeout)
|
||||
balReq := &lnrpc.WalletBalanceRequest{}
|
||||
daveBalResp, err := dave.WalletBalance(ctxt, balReq)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
daveBal := daveBalResp.ConfirmedBalance
|
||||
if daveBal <= 0 {
|
||||
return fmt.Errorf("expected positive balance, had %v",
|
||||
daveBal)
|
||||
}
|
||||
|
||||
return nil
|
||||
}, defaultTimeout)
|
||||
if err != nil {
|
||||
t.Fatalf("On-chain balance not restored: %v", err)
|
||||
}
|
||||
|
||||
// Now that we have our new node up, we expect that it'll re-connect to
|
||||
// Carol automatically based on the restored backup.
|
||||
ctxt, _ = context.WithTimeout(ctxb, defaultTimeout)
|
||||
|
|
|
@ -81,16 +81,6 @@ var (
|
|||
ErrInvalidLocalUnrevokedCommitPoint = fmt.Errorf("unrevoked commit " +
|
||||
"point is invalid")
|
||||
|
||||
// ErrCommitSyncLocalDataLoss is returned in the case that we receive a
|
||||
// valid commit secret within the ChannelReestablish message from the
|
||||
// remote node AND they advertise a RemoteCommitTailHeight higher than
|
||||
// our current known height. This means we have lost some critical
|
||||
// data, and must fail the channel and MUST NOT force close it. Instead
|
||||
// we should wait for the remote to force close it, such that we can
|
||||
// attempt to sweep our funds.
|
||||
ErrCommitSyncLocalDataLoss = fmt.Errorf("possible local commitment " +
|
||||
"state data loss")
|
||||
|
||||
// ErrCommitSyncRemoteDataLoss is returned in the case that we receive
|
||||
// a ChannelReestablish message from the remote that advertises a
|
||||
// NextLocalCommitHeight that is lower than what they have already
|
||||
|
@ -101,6 +91,30 @@ var (
|
|||
"state data loss")
|
||||
)
|
||||
|
||||
// ErrCommitSyncLocalDataLoss is returned in the case that we receive a valid
|
||||
// commit secret within the ChannelReestablish message from the remote node AND
|
||||
// they advertise a RemoteCommitTailHeight higher than our current known
|
||||
// height. This means we have lost some critical data, and must fail the
|
||||
// channel and MUST NOT force close it. Instead we should wait for the remote
|
||||
// to force close it, such that we can attempt to sweep our funds. The
|
||||
// commitment point needed to sweep the remote's force close is encapsuled.
|
||||
type ErrCommitSyncLocalDataLoss struct {
|
||||
// ChannelPoint is the identifier for the channel that experienced data
|
||||
// loss.
|
||||
ChannelPoint wire.OutPoint
|
||||
|
||||
// CommitPoint is the last unrevoked commit point, sent to us by the
|
||||
// remote when we determined we had lost state.
|
||||
CommitPoint *btcec.PublicKey
|
||||
}
|
||||
|
||||
// Error returns a string representation of the local data loss error.
|
||||
func (e *ErrCommitSyncLocalDataLoss) Error() string {
|
||||
return fmt.Sprintf("ChannelPoint(%v) with CommitPoint(%x) had "+
|
||||
"possible local commitment state data loss", e.ChannelPoint,
|
||||
e.CommitPoint.SerializeCompressed())
|
||||
}
|
||||
|
||||
// channelState is an enum like type which represents the current state of a
|
||||
// particular channel.
|
||||
// TODO(roasbeef): actually update state
|
||||
|
@ -3293,10 +3307,6 @@ func (lc *LightningChannel) ProcessChanSyncMsg(
|
|||
// doesn't support data loss protection. In either case
|
||||
// it is not safe for us to keep using the channel, so
|
||||
// we mark it borked and fail the channel.
|
||||
if err := lc.channelState.MarkBorked(); err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
walletLog.Errorf("ChannelPoint(%v), sync failed: "+
|
||||
"local data loss, but no recovery option.",
|
||||
lc.channelState.FundingOutpoint)
|
||||
|
@ -3304,16 +3314,11 @@ func (lc *LightningChannel) ProcessChanSyncMsg(
|
|||
}
|
||||
|
||||
// In this case, we've likely lost data and shouldn't proceed
|
||||
// with channel updates. So we'll store the commit point we
|
||||
// were given in the database, such that we can attempt to
|
||||
// recover the funds if the remote force closes the channel.
|
||||
err := lc.channelState.MarkDataLoss(
|
||||
msg.LocalUnrevokedCommitPoint,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
// with channel updates.
|
||||
return nil, nil, nil, &ErrCommitSyncLocalDataLoss{
|
||||
ChannelPoint: lc.channelState.FundingOutpoint,
|
||||
CommitPoint: msg.LocalUnrevokedCommitPoint,
|
||||
}
|
||||
return nil, nil, nil, ErrCommitSyncLocalDataLoss
|
||||
|
||||
// If the height of our commitment chain reported by the remote party
|
||||
// is behind our view of the chain, then they probably lost some state,
|
||||
|
@ -3323,10 +3328,6 @@ func (lc *LightningChannel) ProcessChanSyncMsg(
|
|||
"believes our tail height is %v, while we have %v!",
|
||||
lc.channelState.FundingOutpoint,
|
||||
msg.RemoteCommitTailHeight, localTailHeight)
|
||||
|
||||
if err := lc.channelState.MarkBorked(); err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
return nil, nil, nil, ErrCommitSyncRemoteDataLoss
|
||||
|
||||
// Their view of our commit chain is consistent with our view.
|
||||
|
@ -3390,10 +3391,6 @@ func (lc *LightningChannel) ProcessChanSyncMsg(
|
|||
"believes our tail height is %v, while we have %v!",
|
||||
lc.channelState.FundingOutpoint,
|
||||
msg.RemoteCommitTailHeight, localTailHeight)
|
||||
|
||||
if err := lc.channelState.MarkBorked(); err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
return nil, nil, nil, ErrCannotSyncCommitChains
|
||||
}
|
||||
|
||||
|
@ -3412,9 +3409,6 @@ func (lc *LightningChannel) ProcessChanSyncMsg(
|
|||
lc.channelState.FundingOutpoint,
|
||||
msg.NextLocalCommitHeight, remoteTipHeight)
|
||||
|
||||
if err := lc.channelState.MarkBorked(); err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
return nil, nil, nil, ErrCannotSyncCommitChains
|
||||
|
||||
// They are waiting for a state they have already ACKed.
|
||||
|
@ -3426,9 +3420,6 @@ func (lc *LightningChannel) ProcessChanSyncMsg(
|
|||
|
||||
// They previously ACKed our current tail, and now they are
|
||||
// waiting for it. They probably lost state.
|
||||
if err := lc.channelState.MarkBorked(); err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
return nil, nil, nil, ErrCommitSyncRemoteDataLoss
|
||||
|
||||
// They have received our latest commitment, life is good.
|
||||
|
@ -3474,10 +3465,6 @@ func (lc *LightningChannel) ProcessChanSyncMsg(
|
|||
"next commit height is %v, while we believe it is %v!",
|
||||
lc.channelState.FundingOutpoint,
|
||||
msg.NextLocalCommitHeight, remoteTipHeight)
|
||||
|
||||
if err := lc.channelState.MarkBorked(); err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
return nil, nil, nil, ErrCannotSyncCommitChains
|
||||
}
|
||||
|
||||
|
@ -3515,97 +3502,12 @@ func (lc *LightningChannel) ProcessChanSyncMsg(
|
|||
"sent invalid commit point for height %v!",
|
||||
lc.channelState.FundingOutpoint,
|
||||
msg.NextLocalCommitHeight)
|
||||
|
||||
if err := lc.channelState.MarkBorked(); err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
// TODO(halseth): force close?
|
||||
return nil, nil, nil, ErrInvalidLocalUnrevokedCommitPoint
|
||||
}
|
||||
|
||||
return updates, openedCircuits, closedCircuits, nil
|
||||
}
|
||||
|
||||
// ChanSyncMsg returns the ChannelReestablish message that should be sent upon
|
||||
// reconnection with the remote peer that we're maintaining this channel with.
|
||||
// The information contained within this message is necessary to re-sync our
|
||||
// commitment chains in the case of a last or only partially processed message.
|
||||
// When the remote party receiver this message one of three things may happen:
|
||||
//
|
||||
// 1. We're fully synced and no messages need to be sent.
|
||||
// 2. We didn't get the last CommitSig message they sent, to they'll re-send
|
||||
// it.
|
||||
// 3. We didn't get the last RevokeAndAck message they sent, so they'll
|
||||
// re-send it.
|
||||
//
|
||||
// The isRestoredChan bool indicates if we need to craft a chan sync message
|
||||
// for a channel that's been restored. If this is a restored channel, then
|
||||
// we'll modify our typical chan sync message to ensure they force close even
|
||||
// if we're on the very first state.
|
||||
func ChanSyncMsg(c *channeldb.OpenChannel,
|
||||
isRestoredChan bool) (*lnwire.ChannelReestablish, error) {
|
||||
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
// The remote commitment height that we'll send in the
|
||||
// ChannelReestablish message is our current commitment height plus
|
||||
// one. If the receiver thinks that our commitment height is actually
|
||||
// *equal* to this value, then they'll re-send the last commitment that
|
||||
// they sent but we never fully processed.
|
||||
localHeight := c.LocalCommitment.CommitHeight
|
||||
nextLocalCommitHeight := localHeight + 1
|
||||
|
||||
// The second value we'll send is the height of the remote commitment
|
||||
// from our PoV. If the receiver thinks that their height is actually
|
||||
// *one plus* this value, then they'll re-send their last revocation.
|
||||
remoteChainTipHeight := c.RemoteCommitment.CommitHeight
|
||||
|
||||
// If this channel has undergone a commitment update, then in order to
|
||||
// prove to the remote party our knowledge of their prior commitment
|
||||
// state, we'll also send over the last commitment secret that the
|
||||
// remote party sent.
|
||||
var lastCommitSecret [32]byte
|
||||
if remoteChainTipHeight != 0 {
|
||||
remoteSecret, err := c.RevocationStore.LookUp(
|
||||
remoteChainTipHeight - 1,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
lastCommitSecret = [32]byte(*remoteSecret)
|
||||
}
|
||||
|
||||
// Additionally, we'll send over the current unrevoked commitment on
|
||||
// our local commitment transaction.
|
||||
currentCommitSecret, err := c.RevocationProducer.AtIndex(
|
||||
localHeight,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// If we've restored this channel, then we'll purposefully give them an
|
||||
// invalid LocalUnrevokedCommitPoint so they'll force close the channel
|
||||
// allowing us to sweep our funds.
|
||||
if isRestoredChan {
|
||||
currentCommitSecret[0] ^= 1
|
||||
}
|
||||
|
||||
return &lnwire.ChannelReestablish{
|
||||
ChanID: lnwire.NewChanIDFromOutPoint(
|
||||
&c.FundingOutpoint,
|
||||
),
|
||||
NextLocalCommitHeight: nextLocalCommitHeight,
|
||||
RemoteCommitTailHeight: remoteChainTipHeight,
|
||||
LastRemoteCommitSecret: lastCommitSecret,
|
||||
LocalUnrevokedCommitPoint: input.ComputeCommitmentPoint(
|
||||
currentCommitSecret[:],
|
||||
),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// computeView takes the given htlcView, and calculates the balances, filtered
|
||||
// view (settling unsettled HTLCs), commitment weight and feePerKw, after
|
||||
// applying the HTLCs to the latest commitment. The returned balances are the
|
||||
|
@ -5187,10 +5089,7 @@ func NewUnilateralCloseSummary(chanState *channeldb.OpenChannel, signer input.Si
|
|||
}
|
||||
|
||||
// Attempt to add a channel sync message to the close summary.
|
||||
chanSync, err := ChanSyncMsg(
|
||||
chanState,
|
||||
chanState.HasChanStatus(channeldb.ChanStatusRestored),
|
||||
)
|
||||
chanSync, err := chanState.ChanSyncMsg()
|
||||
if err != nil {
|
||||
walletLog.Errorf("ChannelPoint(%v): unable to create channel sync "+
|
||||
"message: %v", chanState.FundingOutpoint, err)
|
||||
|
@ -6336,14 +6235,34 @@ func (lc *LightningChannel) State() *channeldb.OpenChannel {
|
|||
return lc.channelState
|
||||
}
|
||||
|
||||
// MarkCommitmentBroadcasted marks the channel as a commitment transaction has
|
||||
// been broadcast, either our own or the remote, and we should watch the chain
|
||||
// for it to confirm before taking any further action.
|
||||
func (lc *LightningChannel) MarkCommitmentBroadcasted() error {
|
||||
// MarkBorked marks the event when the channel as reached an irreconcilable
|
||||
// state, such as a channel breach or state desynchronization. Borked channels
|
||||
// should never be added to the switch.
|
||||
func (lc *LightningChannel) MarkBorked() error {
|
||||
lc.Lock()
|
||||
defer lc.Unlock()
|
||||
|
||||
return lc.channelState.MarkCommitmentBroadcasted()
|
||||
return lc.channelState.MarkBorked()
|
||||
}
|
||||
|
||||
// MarkCommitmentBroadcasted marks the channel as a commitment transaction has
|
||||
// been broadcast, either our own or the remote, and we should watch the chain
|
||||
// for it to confirm before taking any further action.
|
||||
func (lc *LightningChannel) MarkCommitmentBroadcasted(tx *wire.MsgTx) error {
|
||||
lc.Lock()
|
||||
defer lc.Unlock()
|
||||
|
||||
return lc.channelState.MarkCommitmentBroadcasted(tx)
|
||||
}
|
||||
|
||||
// MarkDataLoss marks sets the channel status to LocalDataLoss and stores the
|
||||
// passed commitPoint for use to retrieve funds in case the remote force closes
|
||||
// the channel.
|
||||
func (lc *LightningChannel) MarkDataLoss(commitPoint *btcec.PublicKey) error {
|
||||
lc.Lock()
|
||||
defer lc.Unlock()
|
||||
|
||||
return lc.channelState.MarkDataLoss(commitPoint)
|
||||
}
|
||||
|
||||
// ActiveHtlcs returns a slice of HTLC's which are currently active on *both*
|
||||
|
|
|
@ -2530,7 +2530,7 @@ func assertNoChanSyncNeeded(t *testing.T, aliceChannel *LightningChannel,
|
|||
|
||||
_, _, line, _ := runtime.Caller(1)
|
||||
|
||||
aliceChanSyncMsg, err := ChanSyncMsg(aliceChannel.channelState, false)
|
||||
aliceChanSyncMsg, err := aliceChannel.channelState.ChanSyncMsg()
|
||||
if err != nil {
|
||||
t.Fatalf("line #%v: unable to produce chan sync msg: %v",
|
||||
line, err)
|
||||
|
@ -2545,7 +2545,7 @@ func assertNoChanSyncNeeded(t *testing.T, aliceChannel *LightningChannel,
|
|||
"instead wants to send: %v", line, spew.Sdump(bobMsgsToSend))
|
||||
}
|
||||
|
||||
bobChanSyncMsg, err := ChanSyncMsg(bobChannel.channelState, false)
|
||||
bobChanSyncMsg, err := bobChannel.channelState.ChanSyncMsg()
|
||||
if err != nil {
|
||||
t.Fatalf("line #%v: unable to produce chan sync msg: %v",
|
||||
line, err)
|
||||
|
@ -2778,11 +2778,11 @@ func TestChanSyncOweCommitment(t *testing.T) {
|
|||
// Bob doesn't get this message so upon reconnection, they need to
|
||||
// synchronize. Alice should conclude that she owes Bob a commitment,
|
||||
// while Bob should think he's properly synchronized.
|
||||
aliceSyncMsg, err := ChanSyncMsg(aliceChannel.channelState, false)
|
||||
aliceSyncMsg, err := aliceChannel.channelState.ChanSyncMsg()
|
||||
if err != nil {
|
||||
t.Fatalf("unable to produce chan sync msg: %v", err)
|
||||
}
|
||||
bobSyncMsg, err := ChanSyncMsg(bobChannel.channelState, false)
|
||||
bobSyncMsg, err := bobChannel.channelState.ChanSyncMsg()
|
||||
if err != nil {
|
||||
t.Fatalf("unable to produce chan sync msg: %v", err)
|
||||
}
|
||||
|
@ -3092,11 +3092,11 @@ func TestChanSyncOweRevocation(t *testing.T) {
|
|||
// If we fetch the channel sync messages at this state, then Alice
|
||||
// should report that she owes Bob a revocation message, while Bob
|
||||
// thinks they're fully in sync.
|
||||
aliceSyncMsg, err := ChanSyncMsg(aliceChannel.channelState, false)
|
||||
aliceSyncMsg, err := aliceChannel.channelState.ChanSyncMsg()
|
||||
if err != nil {
|
||||
t.Fatalf("unable to produce chan sync msg: %v", err)
|
||||
}
|
||||
bobSyncMsg, err := ChanSyncMsg(bobChannel.channelState, false)
|
||||
bobSyncMsg, err := bobChannel.channelState.ChanSyncMsg()
|
||||
if err != nil {
|
||||
t.Fatalf("unable to produce chan sync msg: %v", err)
|
||||
}
|
||||
|
@ -3261,11 +3261,11 @@ func TestChanSyncOweRevocationAndCommit(t *testing.T) {
|
|||
// If we now attempt to resync, then Alice should conclude that she
|
||||
// doesn't need any further updates, while Bob concludes that he needs
|
||||
// to re-send both his revocation and commit sig message.
|
||||
aliceSyncMsg, err := ChanSyncMsg(aliceChannel.channelState, false)
|
||||
aliceSyncMsg, err := aliceChannel.channelState.ChanSyncMsg()
|
||||
if err != nil {
|
||||
t.Fatalf("unable to produce chan sync msg: %v", err)
|
||||
}
|
||||
bobSyncMsg, err := ChanSyncMsg(bobChannel.channelState, false)
|
||||
bobSyncMsg, err := bobChannel.channelState.ChanSyncMsg()
|
||||
if err != nil {
|
||||
t.Fatalf("unable to produce chan sync msg: %v", err)
|
||||
}
|
||||
|
@ -3472,11 +3472,11 @@ func TestChanSyncOweRevocationAndCommitForceTransition(t *testing.T) {
|
|||
// Now if we attempt to synchronize states at this point, Alice should
|
||||
// detect that she owes nothing, while Bob should re-send both his
|
||||
// RevokeAndAck as well as his commitment message.
|
||||
aliceSyncMsg, err := ChanSyncMsg(aliceChannel.channelState, false)
|
||||
aliceSyncMsg, err := aliceChannel.channelState.ChanSyncMsg()
|
||||
if err != nil {
|
||||
t.Fatalf("unable to produce chan sync msg: %v", err)
|
||||
}
|
||||
bobSyncMsg, err := ChanSyncMsg(bobChannel.channelState, false)
|
||||
bobSyncMsg, err := bobChannel.channelState.ChanSyncMsg()
|
||||
if err != nil {
|
||||
t.Fatalf("unable to produce chan sync msg: %v", err)
|
||||
}
|
||||
|
@ -3677,18 +3677,18 @@ func TestChanSyncFailure(t *testing.T) {
|
|||
assertLocalDataLoss := func(aliceOld *LightningChannel) {
|
||||
t.Helper()
|
||||
|
||||
aliceSyncMsg, err := ChanSyncMsg(aliceOld.channelState, false)
|
||||
aliceSyncMsg, err := aliceOld.channelState.ChanSyncMsg()
|
||||
if err != nil {
|
||||
t.Fatalf("unable to produce chan sync msg: %v", err)
|
||||
}
|
||||
bobSyncMsg, err := ChanSyncMsg(bobChannel.channelState, false)
|
||||
bobSyncMsg, err := bobChannel.channelState.ChanSyncMsg()
|
||||
if err != nil {
|
||||
t.Fatalf("unable to produce chan sync msg: %v", err)
|
||||
}
|
||||
|
||||
// Alice should detect from Bob's message that she lost state.
|
||||
_, _, _, err = aliceOld.ProcessChanSyncMsg(bobSyncMsg)
|
||||
if err != ErrCommitSyncLocalDataLoss {
|
||||
if _, ok := err.(*ErrCommitSyncLocalDataLoss); !ok {
|
||||
t.Fatalf("wrong error, expected "+
|
||||
"ErrCommitSyncLocalDataLoss instead got: %v",
|
||||
err)
|
||||
|
@ -3755,7 +3755,7 @@ func TestChanSyncFailure(t *testing.T) {
|
|||
// If we remove the recovery options from Bob's message, Alice cannot
|
||||
// tell if she lost state, since Bob might be lying. She still should
|
||||
// be able to detect that chains cannot be synced.
|
||||
bobSyncMsg, err := ChanSyncMsg(bobChannel.channelState, false)
|
||||
bobSyncMsg, err := bobChannel.channelState.ChanSyncMsg()
|
||||
if err != nil {
|
||||
t.Fatalf("unable to produce chan sync msg: %v", err)
|
||||
}
|
||||
|
@ -3769,7 +3769,7 @@ func TestChanSyncFailure(t *testing.T) {
|
|||
// If Bob lies about the NextLocalCommitHeight, making it greater than
|
||||
// what Alice expect, she cannot tell for sure whether she lost state,
|
||||
// but should detect the desync.
|
||||
bobSyncMsg, err = ChanSyncMsg(bobChannel.channelState, false)
|
||||
bobSyncMsg, err = bobChannel.channelState.ChanSyncMsg()
|
||||
if err != nil {
|
||||
t.Fatalf("unable to produce chan sync msg: %v", err)
|
||||
}
|
||||
|
@ -3782,7 +3782,7 @@ func TestChanSyncFailure(t *testing.T) {
|
|||
|
||||
// If Bob's NextLocalCommitHeight is lower than what Alice expects, Bob
|
||||
// probably lost state.
|
||||
bobSyncMsg, err = ChanSyncMsg(bobChannel.channelState, false)
|
||||
bobSyncMsg, err = bobChannel.channelState.ChanSyncMsg()
|
||||
if err != nil {
|
||||
t.Fatalf("unable to produce chan sync msg: %v", err)
|
||||
}
|
||||
|
@ -3795,7 +3795,7 @@ func TestChanSyncFailure(t *testing.T) {
|
|||
|
||||
// If Alice and Bob's states are in sync, but Bob is sending the wrong
|
||||
// LocalUnrevokedCommitPoint, Alice should detect this.
|
||||
bobSyncMsg, err = ChanSyncMsg(bobChannel.channelState, false)
|
||||
bobSyncMsg, err = bobChannel.channelState.ChanSyncMsg()
|
||||
if err != nil {
|
||||
t.Fatalf("unable to produce chan sync msg: %v", err)
|
||||
}
|
||||
|
@ -3824,7 +3824,7 @@ func TestChanSyncFailure(t *testing.T) {
|
|||
// when there's a pending remote commit.
|
||||
halfAdvance()
|
||||
|
||||
bobSyncMsg, err = ChanSyncMsg(bobChannel.channelState, false)
|
||||
bobSyncMsg, err = bobChannel.channelState.ChanSyncMsg()
|
||||
if err != nil {
|
||||
t.Fatalf("unable to produce chan sync msg: %v", err)
|
||||
}
|
||||
|
@ -3912,11 +3912,11 @@ func TestChannelRetransmissionFeeUpdate(t *testing.T) {
|
|||
// Bob doesn't get this message so upon reconnection, they need to
|
||||
// synchronize. Alice should conclude that she owes Bob a commitment,
|
||||
// while Bob should think he's properly synchronized.
|
||||
aliceSyncMsg, err := ChanSyncMsg(aliceChannel.channelState, false)
|
||||
aliceSyncMsg, err := aliceChannel.channelState.ChanSyncMsg()
|
||||
if err != nil {
|
||||
t.Fatalf("unable to produce chan sync msg: %v", err)
|
||||
}
|
||||
bobSyncMsg, err := ChanSyncMsg(bobChannel.channelState, false)
|
||||
bobSyncMsg, err := bobChannel.channelState.ChanSyncMsg()
|
||||
if err != nil {
|
||||
t.Fatalf("unable to produce chan sync msg: %v", err)
|
||||
}
|
||||
|
@ -4361,11 +4361,11 @@ func TestChanSyncInvalidLastSecret(t *testing.T) {
|
|||
}
|
||||
|
||||
// Next, we'll produce the ChanSync messages for both parties.
|
||||
aliceChanSync, err := ChanSyncMsg(aliceChannel.channelState, false)
|
||||
aliceChanSync, err := aliceChannel.channelState.ChanSyncMsg()
|
||||
if err != nil {
|
||||
t.Fatalf("unable to generate chan sync msg: %v", err)
|
||||
}
|
||||
bobChanSync, err := ChanSyncMsg(bobChannel.channelState, false)
|
||||
bobChanSync, err := bobChannel.channelState.ChanSyncMsg()
|
||||
if err != nil {
|
||||
t.Fatalf("unable to generate chan sync msg: %v", err)
|
||||
}
|
||||
|
@ -4377,7 +4377,7 @@ func TestChanSyncInvalidLastSecret(t *testing.T) {
|
|||
// Alice's former self should conclude that she possibly lost data as
|
||||
// Bob is sending a valid commit secret for the latest state.
|
||||
_, _, _, err = aliceOld.ProcessChanSyncMsg(bobChanSync)
|
||||
if err != ErrCommitSyncLocalDataLoss {
|
||||
if _, ok := err.(*ErrCommitSyncLocalDataLoss); !ok {
|
||||
t.Fatalf("wrong error, expected ErrCommitSyncLocalDataLoss "+
|
||||
"instead got: %v", err)
|
||||
}
|
||||
|
|
|
@ -7,6 +7,8 @@ import (
|
|||
"encoding/hex"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
prand "math/rand"
|
||||
"net"
|
||||
"os"
|
||||
|
||||
"github.com/btcsuite/btcd/btcec"
|
||||
|
@ -101,7 +103,7 @@ func CreateTestChannels() (*LightningChannel, *LightningChannel, func(), error)
|
|||
|
||||
prevOut := &wire.OutPoint{
|
||||
Hash: chainhash.Hash(testHdSeed),
|
||||
Index: 0,
|
||||
Index: prand.Uint32(),
|
||||
}
|
||||
fundingTxIn := wire.NewTxIn(prevOut, nil, nil)
|
||||
|
||||
|
@ -334,10 +336,20 @@ func CreateTestChannels() (*LightningChannel, *LightningChannel, func(), error)
|
|||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
if err := channelAlice.channelState.FullSync(); err != nil {
|
||||
addr := &net.TCPAddr{
|
||||
IP: net.ParseIP("127.0.0.1"),
|
||||
Port: 18556,
|
||||
}
|
||||
if err := channelAlice.channelState.SyncPending(addr, 101); err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
if err := channelBob.channelState.FullSync(); err != nil {
|
||||
|
||||
addr = &net.TCPAddr{
|
||||
IP: net.ParseIP("127.0.0.1"),
|
||||
Port: 18555,
|
||||
}
|
||||
|
||||
if err := channelBob.channelState.SyncPending(addr, 101); err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
|
|
56
peer.go
56
peer.go
|
@ -355,7 +355,8 @@ func (p *peer) Start() error {
|
|||
peerLog.Debugf("Loaded %v active channels from database with "+
|
||||
"NodeKey(%x)", len(activeChans), p.PubKey())
|
||||
|
||||
if err := p.loadActiveChannels(activeChans); err != nil {
|
||||
msgs, err := p.loadActiveChannels(activeChans)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to load channels: %v", err)
|
||||
}
|
||||
|
||||
|
@ -368,6 +369,17 @@ func (p *peer) Start() error {
|
|||
go p.channelManager()
|
||||
go p.pingHandler()
|
||||
|
||||
// Now that the peer has started up, we send any channel sync messages
|
||||
// that must be resent for borked channels.
|
||||
if len(msgs) > 0 {
|
||||
peerLog.Infof("Sending %d channel sync messages to peer after "+
|
||||
"loading active channels", len(msgs))
|
||||
if err := p.SendMessage(true, msgs...); err != nil {
|
||||
peerLog.Warnf("Failed sending channel sync "+
|
||||
"messages to peer %v: %v", p, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -406,14 +418,22 @@ func (p *peer) QuitSignal() <-chan struct{} {
|
|||
}
|
||||
|
||||
// loadActiveChannels creates indexes within the peer for tracking all active
|
||||
// channels returned by the database.
|
||||
func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
|
||||
// channels returned by the database. It returns a slice of channel reestablish
|
||||
// messages that should be sent to the peer immediately, in case we have borked
|
||||
// channels that haven't been closed yet.
|
||||
func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) (
|
||||
[]lnwire.Message, error) {
|
||||
|
||||
// Return a slice of messages to send to the peers in case the channel
|
||||
// cannot be loaded normally.
|
||||
var msgs []lnwire.Message
|
||||
|
||||
for _, dbChan := range chans {
|
||||
lnChan, err := lnwallet.NewLightningChannel(
|
||||
p.server.cc.signer, dbChan, p.server.sigPool,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
chanPoint := &dbChan.FundingOutpoint
|
||||
|
@ -433,6 +453,22 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
|
|||
case dbChan.HasChanStatus(channeldb.ChanStatusLocalDataLoss):
|
||||
peerLog.Warnf("ChannelPoint(%v) has status %v, won't "+
|
||||
"start.", chanPoint, dbChan.ChanStatus())
|
||||
|
||||
// To help our peer recover from a potential data loss,
|
||||
// we resend our channel reestablish message if the
|
||||
// channel is in a borked state. We won't process any
|
||||
// channel reestablish message sent from the peer, but
|
||||
// that's okay since the assumption is that we did when
|
||||
// marking the channel borked.
|
||||
chanSync, err := dbChan.ChanSyncMsg()
|
||||
if err != nil {
|
||||
peerLog.Errorf("Unable to create channel "+
|
||||
"reestablish message for channel %v: "+
|
||||
"%v", chanPoint, err)
|
||||
continue
|
||||
}
|
||||
|
||||
msgs = append(msgs, chanSync)
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -446,7 +482,7 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
|
|||
|
||||
_, currentHeight, err := p.server.cc.chainIO.GetBestBlock()
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Before we register this new link with the HTLC Switch, we'll
|
||||
|
@ -455,7 +491,7 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
|
|||
graph := p.server.chanDB.ChannelGraph()
|
||||
info, p1, p2, err := graph.FetchChannelEdgesByOutpoint(chanPoint)
|
||||
if err != nil && err != channeldb.ErrEdgeNotFound {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// We'll filter out our policy from the directional channel
|
||||
|
@ -503,7 +539,7 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
|
|||
*chanPoint,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Create the link and add it to the switch.
|
||||
|
@ -512,8 +548,8 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
|
|||
currentHeight, true,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to add link %v to switch: %v",
|
||||
chanPoint, err)
|
||||
return nil, fmt.Errorf("unable to add link %v to "+
|
||||
"switch: %v", chanPoint, err)
|
||||
}
|
||||
|
||||
p.activeChanMtx.Lock()
|
||||
|
@ -521,7 +557,7 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
|
|||
p.activeChanMtx.Unlock()
|
||||
}
|
||||
|
||||
return nil
|
||||
return msgs, nil
|
||||
}
|
||||
|
||||
// addLink creates and adds a new link from the specified channel.
|
||||
|
|
Loading…
Add table
Reference in a new issue