mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-02-23 22:46:40 +01:00
Merge pull request #3961 from carlaKC/3709-cooperativecloseinitiator
channeldb: Indicate cooperative close initator
This commit is contained in:
commit
38b521d87d
19 changed files with 1455 additions and 736 deletions
|
@ -155,6 +155,9 @@ type channelCloser struct {
|
||||||
// remoteDeliveryScript is the script that we'll send the remote
|
// remoteDeliveryScript is the script that we'll send the remote
|
||||||
// party's settled channel funds to.
|
// party's settled channel funds to.
|
||||||
remoteDeliveryScript []byte
|
remoteDeliveryScript []byte
|
||||||
|
|
||||||
|
// locallyInitiated is true if we initiated the channel close.
|
||||||
|
locallyInitiated bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// newChannelCloser creates a new instance of the channel closure given the
|
// newChannelCloser creates a new instance of the channel closure given the
|
||||||
|
@ -162,7 +165,7 @@ type channelCloser struct {
|
||||||
// only be populated iff, we're the initiator of this closing request.
|
// only be populated iff, we're the initiator of this closing request.
|
||||||
func newChannelCloser(cfg chanCloseCfg, deliveryScript []byte,
|
func newChannelCloser(cfg chanCloseCfg, deliveryScript []byte,
|
||||||
idealFeePerKw chainfee.SatPerKWeight, negotiationHeight uint32,
|
idealFeePerKw chainfee.SatPerKWeight, negotiationHeight uint32,
|
||||||
closeReq *htlcswitch.ChanClose) *channelCloser {
|
closeReq *htlcswitch.ChanClose, locallyInitiated bool) *channelCloser {
|
||||||
|
|
||||||
// Given the target fee-per-kw, we'll compute what our ideal _total_
|
// Given the target fee-per-kw, we'll compute what our ideal _total_
|
||||||
// fee will be starting at for this fee negotiation.
|
// fee will be starting at for this fee negotiation.
|
||||||
|
@ -198,6 +201,7 @@ func newChannelCloser(cfg chanCloseCfg, deliveryScript []byte,
|
||||||
idealFeeSat: idealFeeSat,
|
idealFeeSat: idealFeeSat,
|
||||||
localDeliveryScript: deliveryScript,
|
localDeliveryScript: deliveryScript,
|
||||||
priorFeeOffers: make(map[btcutil.Amount]*lnwire.ClosingSigned),
|
priorFeeOffers: make(map[btcutil.Amount]*lnwire.ClosingSigned),
|
||||||
|
locallyInitiated: locallyInitiated,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -224,7 +228,7 @@ func (c *channelCloser) initChanShutdown() (*lnwire.Shutdown, error) {
|
||||||
// guarantees that our listchannels rpc will be externally consistent,
|
// guarantees that our listchannels rpc will be externally consistent,
|
||||||
// and reflect that the channel is being shutdown by the time the
|
// and reflect that the channel is being shutdown by the time the
|
||||||
// closing request returns.
|
// closing request returns.
|
||||||
err := c.cfg.channel.MarkCoopBroadcasted(nil)
|
err := c.cfg.channel.MarkCoopBroadcasted(nil, c.locallyInitiated)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -511,7 +515,9 @@ func (c *channelCloser) ProcessCloseMsg(msg lnwire.Message) ([]lnwire.Message, b
|
||||||
// Before publishing the closing tx, we persist it to the
|
// Before publishing the closing tx, we persist it to the
|
||||||
// database, such that it can be republished if something goes
|
// database, such that it can be republished if something goes
|
||||||
// wrong.
|
// wrong.
|
||||||
err = c.cfg.channel.MarkCoopBroadcasted(closeTx)
|
err = c.cfg.channel.MarkCoopBroadcasted(
|
||||||
|
closeTx, c.locallyInitiated,
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, false, err
|
return nil, false, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -401,20 +401,35 @@ var (
|
||||||
// will have.
|
// will have.
|
||||||
ChanStatusRestored ChannelStatus = 1 << 3
|
ChanStatusRestored ChannelStatus = 1 << 3
|
||||||
|
|
||||||
// ChanStatusCoopBroadcasted indicates that a cooperative close for this
|
// ChanStatusCoopBroadcasted indicates that a cooperative close for
|
||||||
// channel has been broadcasted.
|
// this channel has been broadcasted. Older cooperatively closed
|
||||||
|
// channels will only have this status set. Newer ones will also have
|
||||||
|
// close initiator information stored using the local/remote initiator
|
||||||
|
// status. This status is set in conjunction with the initiator status
|
||||||
|
// so that we do not need to check multiple channel statues for
|
||||||
|
// cooperative closes.
|
||||||
ChanStatusCoopBroadcasted ChannelStatus = 1 << 4
|
ChanStatusCoopBroadcasted ChannelStatus = 1 << 4
|
||||||
|
|
||||||
|
// ChanStatusLocalCloseInitiator indicates that we initiated closing
|
||||||
|
// the channel.
|
||||||
|
ChanStatusLocalCloseInitiator ChannelStatus = 1 << 5
|
||||||
|
|
||||||
|
// ChanStatusRemoteCloseInitiator indicates that the remote node
|
||||||
|
// initiated closing the channel.
|
||||||
|
ChanStatusRemoteCloseInitiator ChannelStatus = 1 << 6
|
||||||
)
|
)
|
||||||
|
|
||||||
// chanStatusStrings maps a ChannelStatus to a human friendly string that
|
// chanStatusStrings maps a ChannelStatus to a human friendly string that
|
||||||
// describes that status.
|
// describes that status.
|
||||||
var chanStatusStrings = map[ChannelStatus]string{
|
var chanStatusStrings = map[ChannelStatus]string{
|
||||||
ChanStatusDefault: "ChanStatusDefault",
|
ChanStatusDefault: "ChanStatusDefault",
|
||||||
ChanStatusBorked: "ChanStatusBorked",
|
ChanStatusBorked: "ChanStatusBorked",
|
||||||
ChanStatusCommitBroadcasted: "ChanStatusCommitBroadcasted",
|
ChanStatusCommitBroadcasted: "ChanStatusCommitBroadcasted",
|
||||||
ChanStatusLocalDataLoss: "ChanStatusLocalDataLoss",
|
ChanStatusLocalDataLoss: "ChanStatusLocalDataLoss",
|
||||||
ChanStatusRestored: "ChanStatusRestored",
|
ChanStatusRestored: "ChanStatusRestored",
|
||||||
ChanStatusCoopBroadcasted: "ChanStatusCoopBroadcasted",
|
ChanStatusCoopBroadcasted: "ChanStatusCoopBroadcasted",
|
||||||
|
ChanStatusLocalCloseInitiator: "ChanStatusLocalCloseInitiator",
|
||||||
|
ChanStatusRemoteCloseInitiator: "ChanStatusRemoteCloseInitiator",
|
||||||
}
|
}
|
||||||
|
|
||||||
// orderedChanStatusFlags is an in-order list of all that channel status flags.
|
// orderedChanStatusFlags is an in-order list of all that channel status flags.
|
||||||
|
@ -425,6 +440,8 @@ var orderedChanStatusFlags = []ChannelStatus{
|
||||||
ChanStatusLocalDataLoss,
|
ChanStatusLocalDataLoss,
|
||||||
ChanStatusRestored,
|
ChanStatusRestored,
|
||||||
ChanStatusCoopBroadcasted,
|
ChanStatusCoopBroadcasted,
|
||||||
|
ChanStatusLocalCloseInitiator,
|
||||||
|
ChanStatusRemoteCloseInitiator,
|
||||||
}
|
}
|
||||||
|
|
||||||
// String returns a human-readable representation of the ChannelStatus.
|
// String returns a human-readable representation of the ChannelStatus.
|
||||||
|
@ -974,30 +991,37 @@ func (c *OpenChannel) isBorked(chanBucket *bbolt.Bucket) (bool, error) {
|
||||||
// closing tx _we believe_ will appear in the chain. This is only used to
|
// closing tx _we believe_ will appear in the chain. This is only used to
|
||||||
// republish this tx at startup to ensure propagation, and we should still
|
// republish this tx at startup to ensure propagation, and we should still
|
||||||
// handle the case where a different tx actually hits the chain.
|
// handle the case where a different tx actually hits the chain.
|
||||||
func (c *OpenChannel) MarkCommitmentBroadcasted(closeTx *wire.MsgTx) error {
|
func (c *OpenChannel) MarkCommitmentBroadcasted(closeTx *wire.MsgTx,
|
||||||
|
locallyInitiated bool) error {
|
||||||
|
|
||||||
return c.markBroadcasted(
|
return c.markBroadcasted(
|
||||||
ChanStatusCommitBroadcasted, forceCloseTxKey, closeTx,
|
ChanStatusCommitBroadcasted, forceCloseTxKey, closeTx,
|
||||||
|
locallyInitiated,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// MarkCoopBroadcasted marks the channel to indicate that a cooperative close
|
// MarkCoopBroadcasted marks the channel to indicate that a cooperative close
|
||||||
// transaction has been broadcast, either our own or the remote, and that we
|
// transaction has been broadcast, either our own or the remote, and that we
|
||||||
// should wach the chain for it to confirm before taking further action. It
|
// should watch the chain for it to confirm before taking further action. It
|
||||||
// takes as argument a cooperative close tx that could appear on chain, and
|
// takes as argument a cooperative close tx that could appear on chain, and
|
||||||
// should be rebroadcast upon startup. This is only used to republish and ensure
|
// should be rebroadcast upon startup. This is only used to republish and
|
||||||
// propagation, and we should still handle the case where a different tx
|
// ensure propagation, and we should still handle the case where a different tx
|
||||||
// actually hits the chain.
|
// actually hits the chain.
|
||||||
func (c *OpenChannel) MarkCoopBroadcasted(closeTx *wire.MsgTx) error {
|
func (c *OpenChannel) MarkCoopBroadcasted(closeTx *wire.MsgTx,
|
||||||
|
locallyInitiated bool) error {
|
||||||
|
|
||||||
return c.markBroadcasted(
|
return c.markBroadcasted(
|
||||||
ChanStatusCoopBroadcasted, coopCloseTxKey, closeTx,
|
ChanStatusCoopBroadcasted, coopCloseTxKey, closeTx,
|
||||||
|
locallyInitiated,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// markBroadcasted is a helper function which modifies the channel status of the
|
// markBroadcasted is a helper function which modifies the channel status of the
|
||||||
// receiving channel and inserts a close transaction under the requested key,
|
// receiving channel and inserts a close transaction under the requested key,
|
||||||
// which should specify either a coop or force close.
|
// which should specify either a coop or force close. It adds a status which
|
||||||
|
// indicates the party that initiated the channel close.
|
||||||
func (c *OpenChannel) markBroadcasted(status ChannelStatus, key []byte,
|
func (c *OpenChannel) markBroadcasted(status ChannelStatus, key []byte,
|
||||||
closeTx *wire.MsgTx) error {
|
closeTx *wire.MsgTx, locallyInitiated bool) error {
|
||||||
|
|
||||||
c.Lock()
|
c.Lock()
|
||||||
defer c.Unlock()
|
defer c.Unlock()
|
||||||
|
@ -1016,6 +1040,15 @@ func (c *OpenChannel) markBroadcasted(status ChannelStatus, key []byte,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Add the initiator status to the status provided. These statuses are
|
||||||
|
// set in addition to the broadcast status so that we do not need to
|
||||||
|
// migrate the original logic which does not store initiator.
|
||||||
|
if locallyInitiated {
|
||||||
|
status |= ChanStatusLocalCloseInitiator
|
||||||
|
} else {
|
||||||
|
status |= ChanStatusRemoteCloseInitiator
|
||||||
|
}
|
||||||
|
|
||||||
return c.putChanStatus(status, putClosingTx)
|
return c.putChanStatus(status, putClosingTx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2349,8 +2382,12 @@ type ChannelCloseSummary struct {
|
||||||
// entails deleting all saved state within the database concerning this
|
// entails deleting all saved state within the database concerning this
|
||||||
// channel. This method also takes a struct that summarizes the state of the
|
// channel. This method also takes a struct that summarizes the state of the
|
||||||
// channel at closing, this compact representation will be the only component
|
// channel at closing, this compact representation will be the only component
|
||||||
// of a channel left over after a full closing.
|
// of a channel left over after a full closing. It takes an optional set of
|
||||||
func (c *OpenChannel) CloseChannel(summary *ChannelCloseSummary) error {
|
// channel statuses which will be written to the historical channel bucket.
|
||||||
|
// These statuses are used to record close initiators.
|
||||||
|
func (c *OpenChannel) CloseChannel(summary *ChannelCloseSummary,
|
||||||
|
statuses ...ChannelStatus) error {
|
||||||
|
|
||||||
c.Lock()
|
c.Lock()
|
||||||
defer c.Unlock()
|
defer c.Unlock()
|
||||||
|
|
||||||
|
@ -2428,6 +2465,11 @@ func (c *OpenChannel) CloseChannel(summary *ChannelCloseSummary) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Apply any additional statuses to the channel state.
|
||||||
|
for _, status := range statuses {
|
||||||
|
chanState.chanStatus |= status
|
||||||
|
}
|
||||||
|
|
||||||
err = putOpenChannel(historicalChanBucket, chanState)
|
err = putOpenChannel(historicalChanBucket, chanState)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -1089,13 +1089,13 @@ func TestFetchWaitingCloseChannels(t *testing.T) {
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
if err := channel.MarkCommitmentBroadcasted(closeTx); err != nil {
|
if err := channel.MarkCommitmentBroadcasted(closeTx, true); err != nil {
|
||||||
t.Fatalf("unable to mark commitment broadcast: %v", err)
|
t.Fatalf("unable to mark commitment broadcast: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Now try to marking a coop close with a nil tx. This should
|
// Now try to marking a coop close with a nil tx. This should
|
||||||
// succeed, but it shouldn't exit when queried.
|
// succeed, but it shouldn't exit when queried.
|
||||||
if err = channel.MarkCoopBroadcasted(nil); err != nil {
|
if err = channel.MarkCoopBroadcasted(nil, true); err != nil {
|
||||||
t.Fatalf("unable to mark nil coop broadcast: %v", err)
|
t.Fatalf("unable to mark nil coop broadcast: %v", err)
|
||||||
}
|
}
|
||||||
_, err := channel.BroadcastedCooperative()
|
_, err := channel.BroadcastedCooperative()
|
||||||
|
@ -1107,7 +1107,7 @@ func TestFetchWaitingCloseChannels(t *testing.T) {
|
||||||
// it as coop closed. Later we will test that distinct
|
// it as coop closed. Later we will test that distinct
|
||||||
// transactions are returned for both coop and force closes.
|
// transactions are returned for both coop and force closes.
|
||||||
closeTx.TxIn[0].PreviousOutPoint.Index ^= 1
|
closeTx.TxIn[0].PreviousOutPoint.Index ^= 1
|
||||||
if err := channel.MarkCoopBroadcasted(closeTx); err != nil {
|
if err := channel.MarkCoopBroadcasted(closeTx, true); err != nil {
|
||||||
t.Fatalf("unable to mark coop broadcast: %v", err)
|
t.Fatalf("unable to mark coop broadcast: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1255,3 +1255,141 @@ func TestRefreshShortChanID(t *testing.T) {
|
||||||
t.Fatalf("channel pending state wasn't updated: want false got true")
|
t.Fatalf("channel pending state wasn't updated: want false got true")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestCloseInitiator tests the setting of close initiator statuses for
|
||||||
|
// cooperative closes and local force closes.
|
||||||
|
func TestCloseInitiator(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
// updateChannel is called to update the channel as broadcast,
|
||||||
|
// cooperatively or not, based on the test's requirements.
|
||||||
|
updateChannel func(c *OpenChannel) error
|
||||||
|
expectedStatuses []ChannelStatus
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "local coop close",
|
||||||
|
// Mark the channel as cooperatively closed, initiated
|
||||||
|
// by the local party.
|
||||||
|
updateChannel: func(c *OpenChannel) error {
|
||||||
|
return c.MarkCoopBroadcasted(
|
||||||
|
&wire.MsgTx{}, true,
|
||||||
|
)
|
||||||
|
},
|
||||||
|
expectedStatuses: []ChannelStatus{
|
||||||
|
ChanStatusLocalCloseInitiator,
|
||||||
|
ChanStatusCoopBroadcasted,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "remote coop close",
|
||||||
|
// Mark the channel as cooperatively closed, initiated
|
||||||
|
// by the remote party.
|
||||||
|
updateChannel: func(c *OpenChannel) error {
|
||||||
|
return c.MarkCoopBroadcasted(
|
||||||
|
&wire.MsgTx{}, false,
|
||||||
|
)
|
||||||
|
},
|
||||||
|
expectedStatuses: []ChannelStatus{
|
||||||
|
ChanStatusRemoteCloseInitiator,
|
||||||
|
ChanStatusCoopBroadcasted,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "local force close",
|
||||||
|
// Mark the channel's commitment as broadcast with
|
||||||
|
// local initiator.
|
||||||
|
updateChannel: func(c *OpenChannel) error {
|
||||||
|
return c.MarkCommitmentBroadcasted(
|
||||||
|
&wire.MsgTx{}, true,
|
||||||
|
)
|
||||||
|
},
|
||||||
|
expectedStatuses: []ChannelStatus{
|
||||||
|
ChanStatusLocalCloseInitiator,
|
||||||
|
ChanStatusCommitBroadcasted,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
test := test
|
||||||
|
|
||||||
|
t.Run(test.name, func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
cdb, cleanUp, err := makeTestDB()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to make test database: %v",
|
||||||
|
err)
|
||||||
|
}
|
||||||
|
defer cleanUp()
|
||||||
|
|
||||||
|
// Create an open channel.
|
||||||
|
channel := createTestChannel(
|
||||||
|
t, cdb, openChannelOption(),
|
||||||
|
)
|
||||||
|
|
||||||
|
err = test.updateChannel(channel)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Lookup open channels in the database.
|
||||||
|
dbChans, err := fetchChannels(
|
||||||
|
cdb, pendingChannelFilter(false),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if len(dbChans) != 1 {
|
||||||
|
t.Fatalf("expected 1 channel, got: %v",
|
||||||
|
len(dbChans))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check that the statuses that we expect were written
|
||||||
|
// to disk.
|
||||||
|
for _, status := range test.expectedStatuses {
|
||||||
|
if !dbChans[0].HasChanStatus(status) {
|
||||||
|
t.Fatalf("expected channel to have "+
|
||||||
|
"status: %v, has status: %v",
|
||||||
|
status, dbChans[0].chanStatus)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestCloseChannelStatus tests setting of a channel status on the historical
|
||||||
|
// channel on channel close.
|
||||||
|
func TestCloseChannelStatus(t *testing.T) {
|
||||||
|
cdb, cleanUp, err := makeTestDB()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to make test database: %v",
|
||||||
|
err)
|
||||||
|
}
|
||||||
|
defer cleanUp()
|
||||||
|
|
||||||
|
// Create an open channel.
|
||||||
|
channel := createTestChannel(
|
||||||
|
t, cdb, openChannelOption(),
|
||||||
|
)
|
||||||
|
|
||||||
|
if err := channel.CloseChannel(
|
||||||
|
&ChannelCloseSummary{
|
||||||
|
ChanPoint: channel.FundingOutpoint,
|
||||||
|
RemotePub: channel.IdentityPub,
|
||||||
|
}, ChanStatusRemoteCloseInitiator,
|
||||||
|
); err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
histChan, err := channel.Db.FetchHistoricalChannel(
|
||||||
|
&channel.FundingOutpoint,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !histChan.HasChanStatus(ChanStatusRemoteCloseInitiator) {
|
||||||
|
t.Fatalf("channel should have status")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -1159,8 +1159,9 @@ func (d *DB) AbandonChannel(chanPoint *wire.OutPoint, bestHeight uint32) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Finally, we'll close the channel in the DB, and return back to the
|
// Finally, we'll close the channel in the DB, and return back to the
|
||||||
// caller.
|
// caller. We set ourselves as the close initiator because we abandoned
|
||||||
return dbChan.CloseChannel(summary)
|
// the channel.
|
||||||
|
return dbChan.CloseChannel(summary, ChanStatusLocalCloseInitiator)
|
||||||
}
|
}
|
||||||
|
|
||||||
// syncVersions function is used for safe db version synchronization. It
|
// syncVersions function is used for safe db version synchronization. It
|
||||||
|
@ -1249,3 +1250,50 @@ func getMigrationsToApply(versions []version, version uint32) ([]migration, []ui
|
||||||
|
|
||||||
return migrations, migrationVersions
|
return migrations, migrationVersions
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// fetchHistoricalChanBucket returns a the channel bucket for a given outpoint
|
||||||
|
// from the historical channel bucket. If the bucket does not exist,
|
||||||
|
// ErrNoHistoricalBucket is returned.
|
||||||
|
func fetchHistoricalChanBucket(tx *bbolt.Tx,
|
||||||
|
outPoint *wire.OutPoint) (*bbolt.Bucket, error) {
|
||||||
|
|
||||||
|
// First fetch the top level bucket which stores all data related to
|
||||||
|
// historically stored channels.
|
||||||
|
historicalChanBucket := tx.Bucket(historicalChannelBucket)
|
||||||
|
if historicalChanBucket == nil {
|
||||||
|
return nil, ErrNoHistoricalBucket
|
||||||
|
}
|
||||||
|
|
||||||
|
// With the bucket for the node and chain fetched, we can now go down
|
||||||
|
// another level, for the channel itself.
|
||||||
|
var chanPointBuf bytes.Buffer
|
||||||
|
if err := writeOutpoint(&chanPointBuf, outPoint); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
chanBucket := historicalChanBucket.Bucket(chanPointBuf.Bytes())
|
||||||
|
if chanBucket == nil {
|
||||||
|
return nil, ErrChannelNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
return chanBucket, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// FetchHistoricalChannel fetches open channel data from the historical channel
|
||||||
|
// bucket.
|
||||||
|
func (d *DB) FetchHistoricalChannel(outPoint *wire.OutPoint) (*OpenChannel, error) {
|
||||||
|
var channel *OpenChannel
|
||||||
|
err := d.View(func(tx *bbolt.Tx) error {
|
||||||
|
chanBucket, err := fetchHistoricalChanBucket(tx, outPoint)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
channel, err = fetchOpenChannel(chanBucket, outPoint)
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return channel, nil
|
||||||
|
}
|
||||||
|
|
|
@ -647,7 +647,7 @@ func TestFetchChannels(t *testing.T) {
|
||||||
channelIDOption(pendingWaitingChan),
|
channelIDOption(pendingWaitingChan),
|
||||||
)
|
)
|
||||||
|
|
||||||
err = pendingClosing.MarkCoopBroadcasted(nil)
|
err = pendingClosing.MarkCoopBroadcasted(nil, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error: %v", err)
|
t.Fatalf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -667,7 +667,7 @@ func TestFetchChannels(t *testing.T) {
|
||||||
channelIDOption(openWaitingChan),
|
channelIDOption(openWaitingChan),
|
||||||
openChannelOption(),
|
openChannelOption(),
|
||||||
)
|
)
|
||||||
err = openClosing.MarkCoopBroadcasted(nil)
|
err = openClosing.MarkCoopBroadcasted(nil, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error: %v", err)
|
t.Fatalf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -693,3 +693,57 @@ func TestFetchChannels(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestFetchHistoricalChannel tests lookup of historical channels.
|
||||||
|
func TestFetchHistoricalChannel(t *testing.T) {
|
||||||
|
cdb, cleanUp, err := makeTestDB()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to make test database: %v", err)
|
||||||
|
}
|
||||||
|
defer cleanUp()
|
||||||
|
|
||||||
|
// Create a an open channel in the database.
|
||||||
|
channel := createTestChannel(t, cdb, openChannelOption())
|
||||||
|
|
||||||
|
// First, try to lookup a channel when the bucket does not
|
||||||
|
// exist.
|
||||||
|
_, err = cdb.FetchHistoricalChannel(&channel.FundingOutpoint)
|
||||||
|
if err != ErrNoHistoricalBucket {
|
||||||
|
t.Fatalf("expected no bucket, got: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close the channel so that it will be written to the historical
|
||||||
|
// bucket. The values provided in the channel close summary are the
|
||||||
|
// minimum required for this call to run without panicking.
|
||||||
|
if err := channel.CloseChannel(&ChannelCloseSummary{
|
||||||
|
ChanPoint: channel.FundingOutpoint,
|
||||||
|
RemotePub: channel.IdentityPub,
|
||||||
|
SettledBalance: btcutil.Amount(500),
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("unexpected error closing channel: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
histChannel, err := cdb.FetchHistoricalChannel(&channel.FundingOutpoint)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexepected error getting channel: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set the db on our channel to nil so that we can check that all other
|
||||||
|
// fields on the channel equal those on the historical channel.
|
||||||
|
channel.Db = nil
|
||||||
|
|
||||||
|
if !reflect.DeepEqual(histChannel, channel) {
|
||||||
|
t.Fatalf("expected: %v, got: %v", channel, histChannel)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create an outpoint that will not be in the db and look it up.
|
||||||
|
badOutpoint := &wire.OutPoint{
|
||||||
|
Hash: channel.FundingOutpoint.Hash,
|
||||||
|
Index: channel.FundingOutpoint.Index + 1,
|
||||||
|
}
|
||||||
|
_, err = cdb.FetchHistoricalChannel(badOutpoint)
|
||||||
|
if err != ErrChannelNotFound {
|
||||||
|
t.Fatalf("expected chan not found, got: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
|
@ -10,6 +10,11 @@ var (
|
||||||
// created.
|
// created.
|
||||||
ErrNoChanDBExists = fmt.Errorf("channel db has not yet been created")
|
ErrNoChanDBExists = fmt.Errorf("channel db has not yet been created")
|
||||||
|
|
||||||
|
// ErrNoHistoricalBucket is returned when the historical channel bucket
|
||||||
|
// not been created yet.
|
||||||
|
ErrNoHistoricalBucket = fmt.Errorf("historical channel bucket has " +
|
||||||
|
"not yet been created")
|
||||||
|
|
||||||
// ErrDBReversion is returned when detecting an attempt to revert to a
|
// ErrDBReversion is returned when detecting an attempt to revert to a
|
||||||
// prior database version.
|
// prior database version.
|
||||||
ErrDBReversion = fmt.Errorf("channel db cannot revert to prior version")
|
ErrDBReversion = fmt.Errorf("channel db cannot revert to prior version")
|
||||||
|
|
|
@ -284,8 +284,11 @@ func newActiveChannelArbitrator(channel *channeldb.OpenChannel,
|
||||||
return chanMachine.ForceClose()
|
return chanMachine.ForceClose()
|
||||||
},
|
},
|
||||||
MarkCommitmentBroadcasted: channel.MarkCommitmentBroadcasted,
|
MarkCommitmentBroadcasted: channel.MarkCommitmentBroadcasted,
|
||||||
MarkChannelClosed: func(summary *channeldb.ChannelCloseSummary) error {
|
MarkChannelClosed: func(summary *channeldb.ChannelCloseSummary,
|
||||||
if err := channel.CloseChannel(summary); err != nil {
|
statuses ...channeldb.ChannelStatus) error {
|
||||||
|
|
||||||
|
err := channel.CloseChannel(summary, statuses...)
|
||||||
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
c.cfg.NotifyClosedChannel(summary.ChanPoint)
|
c.cfg.NotifyClosedChannel(summary.ChanPoint)
|
||||||
|
|
|
@ -62,12 +62,12 @@ func TestChainArbitratorRepublishCloses(t *testing.T) {
|
||||||
for i := 0; i < numChans/2; i++ {
|
for i := 0; i < numChans/2; i++ {
|
||||||
closeTx := channels[i].FundingTxn.Copy()
|
closeTx := channels[i].FundingTxn.Copy()
|
||||||
closeTx.TxIn[0].PreviousOutPoint = channels[i].FundingOutpoint
|
closeTx.TxIn[0].PreviousOutPoint = channels[i].FundingOutpoint
|
||||||
err := channels[i].MarkCommitmentBroadcasted(closeTx)
|
err := channels[i].MarkCommitmentBroadcasted(closeTx, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = channels[i].MarkCoopBroadcasted(closeTx)
|
err = channels[i].MarkCoopBroadcasted(closeTx, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -985,7 +985,9 @@ func (c *chainWatcher) dispatchContractBreach(spendEvent *chainntnfs.SpendDetail
|
||||||
closeSummary.LastChanSyncMsg = chanSync
|
closeSummary.LastChanSyncMsg = chanSync
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := c.cfg.chanState.CloseChannel(&closeSummary); err != nil {
|
if err := c.cfg.chanState.CloseChannel(
|
||||||
|
&closeSummary, channeldb.ChanStatusRemoteCloseInitiator,
|
||||||
|
); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -98,14 +98,16 @@ type ChannelArbitratorConfig struct {
|
||||||
|
|
||||||
// MarkCommitmentBroadcasted should mark the channel as the commitment
|
// MarkCommitmentBroadcasted should mark the channel as the commitment
|
||||||
// being broadcast, and we are waiting for the commitment to confirm.
|
// being broadcast, and we are waiting for the commitment to confirm.
|
||||||
MarkCommitmentBroadcasted func(*wire.MsgTx) error
|
MarkCommitmentBroadcasted func(*wire.MsgTx, bool) error
|
||||||
|
|
||||||
// MarkChannelClosed marks the channel closed in the database, with the
|
// MarkChannelClosed marks the channel closed in the database, with the
|
||||||
// passed close summary. After this method successfully returns we can
|
// passed close summary. After this method successfully returns we can
|
||||||
// no longer expect to receive chain events for this channel, and must
|
// no longer expect to receive chain events for this channel, and must
|
||||||
// be able to recover from a failure without getting the close event
|
// be able to recover from a failure without getting the close event
|
||||||
// again.
|
// again. It takes an optional channel status which will update the
|
||||||
MarkChannelClosed func(*channeldb.ChannelCloseSummary) error
|
// channel status in the record that we keep of historical channels.
|
||||||
|
MarkChannelClosed func(*channeldb.ChannelCloseSummary,
|
||||||
|
...channeldb.ChannelStatus) error
|
||||||
|
|
||||||
// IsPendingClose is a boolean indicating whether the channel is marked
|
// IsPendingClose is a boolean indicating whether the channel is marked
|
||||||
// as pending close in the database.
|
// as pending close in the database.
|
||||||
|
@ -797,8 +799,10 @@ func (c *ChannelArbitrator) stateStep(
|
||||||
|
|
||||||
// Before publishing the transaction, we store it to the
|
// Before publishing the transaction, we store it to the
|
||||||
// database, such that we can re-publish later in case it
|
// database, such that we can re-publish later in case it
|
||||||
// didn't propagate.
|
// didn't propagate. We initiated the force close, so we
|
||||||
if err := c.cfg.MarkCommitmentBroadcasted(closeTx); err != nil {
|
// mark broadcast with local initiator set to true.
|
||||||
|
err = c.cfg.MarkCommitmentBroadcasted(closeTx, true)
|
||||||
|
if err != nil {
|
||||||
log.Errorf("ChannelArbitrator(%v): unable to "+
|
log.Errorf("ChannelArbitrator(%v): unable to "+
|
||||||
"mark commitment broadcasted: %v",
|
"mark commitment broadcasted: %v",
|
||||||
c.cfg.ChanPoint, err)
|
c.cfg.ChanPoint, err)
|
||||||
|
@ -2176,7 +2180,10 @@ func (c *ChannelArbitrator) channelAttendant(bestHeight int32) {
|
||||||
// transition into StateContractClosed based on the
|
// transition into StateContractClosed based on the
|
||||||
// close status of the channel.
|
// close status of the channel.
|
||||||
closeSummary := &uniClosure.ChannelCloseSummary
|
closeSummary := &uniClosure.ChannelCloseSummary
|
||||||
err = c.cfg.MarkChannelClosed(closeSummary)
|
err = c.cfg.MarkChannelClosed(
|
||||||
|
closeSummary,
|
||||||
|
channeldb.ChanStatusRemoteCloseInitiator,
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("Unable to mark channel closed: %v",
|
log.Errorf("Unable to mark channel closed: %v",
|
||||||
err)
|
err)
|
||||||
|
|
|
@ -12,6 +12,7 @@ import (
|
||||||
|
|
||||||
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
||||||
"github.com/btcsuite/btcd/wire"
|
"github.com/btcsuite/btcd/wire"
|
||||||
|
"github.com/btcsuite/btcutil"
|
||||||
"github.com/coreos/bbolt"
|
"github.com/coreos/bbolt"
|
||||||
"github.com/lightningnetwork/lnd/chainntnfs"
|
"github.com/lightningnetwork/lnd/chainntnfs"
|
||||||
"github.com/lightningnetwork/lnd/channeldb"
|
"github.com/lightningnetwork/lnd/channeldb"
|
||||||
|
@ -21,6 +22,14 @@ import (
|
||||||
"github.com/lightningnetwork/lnd/lnwire"
|
"github.com/lightningnetwork/lnd/lnwire"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
defaultTimeout = time.Second * 5
|
||||||
|
|
||||||
|
// stateTimeout is the timeout we allow when waiting for state
|
||||||
|
// transitions.
|
||||||
|
stateTimeout = time.Second * 15
|
||||||
|
)
|
||||||
|
|
||||||
type mockArbitratorLog struct {
|
type mockArbitratorLog struct {
|
||||||
state ArbitratorState
|
state ArbitratorState
|
||||||
newStates chan ArbitratorState
|
newStates chan ArbitratorState
|
||||||
|
@ -221,7 +230,7 @@ func (c *chanArbTestCtx) AssertStateTransitions(expectedStates ...ArbitratorStat
|
||||||
var state ArbitratorState
|
var state ArbitratorState
|
||||||
select {
|
select {
|
||||||
case state = <-newStatesChan:
|
case state = <-newStatesChan:
|
||||||
case <-time.After(5 * time.Second):
|
case <-time.After(defaultTimeout):
|
||||||
c.t.Fatalf("new state not received")
|
c.t.Fatalf("new state not received")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -266,7 +275,26 @@ func (c *chanArbTestCtx) Restart(restartClosure func(*chanArbTestCtx)) (*chanArb
|
||||||
return newCtx, nil
|
return newCtx, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func createTestChannelArbitrator(t *testing.T, log ArbitratorLog) (*chanArbTestCtx, error) {
|
// testChanArbOption applies custom settings to a channel arbitrator config for
|
||||||
|
// testing purposes.
|
||||||
|
type testChanArbOption func(cfg *ChannelArbitratorConfig)
|
||||||
|
|
||||||
|
// remoteInitiatorOption sets the MarkChannelClosed function in the
|
||||||
|
// Channel Arbitrator's config.
|
||||||
|
func withMarkClosed(markClosed func(*channeldb.ChannelCloseSummary,
|
||||||
|
...channeldb.ChannelStatus) error) testChanArbOption {
|
||||||
|
|
||||||
|
return func(cfg *ChannelArbitratorConfig) {
|
||||||
|
cfg.MarkChannelClosed = markClosed
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// createTestChannelArbitrator returns a channel arbitrator test context which
|
||||||
|
// contains a channel arbitrator with default values. These values can be
|
||||||
|
// changed by providing options which overwrite the default config.
|
||||||
|
func createTestChannelArbitrator(t *testing.T, log ArbitratorLog,
|
||||||
|
opts ...testChanArbOption) (*chanArbTestCtx, error) {
|
||||||
|
|
||||||
blockEpochs := make(chan *chainntnfs.BlockEpoch)
|
blockEpochs := make(chan *chainntnfs.BlockEpoch)
|
||||||
blockEpoch := &chainntnfs.BlockEpochEvent{
|
blockEpoch := &chainntnfs.BlockEpochEvent{
|
||||||
Epochs: blockEpochs,
|
Epochs: blockEpochs,
|
||||||
|
@ -324,7 +352,7 @@ func createTestChannelArbitrator(t *testing.T, log ArbitratorLog) (*chanArbTestC
|
||||||
|
|
||||||
// Next we'll create the matching configuration struct that contains
|
// Next we'll create the matching configuration struct that contains
|
||||||
// all interfaces and methods the arbitrator needs to do its job.
|
// all interfaces and methods the arbitrator needs to do its job.
|
||||||
arbCfg := ChannelArbitratorConfig{
|
arbCfg := &ChannelArbitratorConfig{
|
||||||
ChanPoint: chanPoint,
|
ChanPoint: chanPoint,
|
||||||
ShortChanID: shortChanID,
|
ShortChanID: shortChanID,
|
||||||
BlockEpochs: blockEpoch,
|
BlockEpochs: blockEpoch,
|
||||||
|
@ -339,10 +367,11 @@ func createTestChannelArbitrator(t *testing.T, log ArbitratorLog) (*chanArbTestC
|
||||||
}
|
}
|
||||||
return summary, nil
|
return summary, nil
|
||||||
},
|
},
|
||||||
MarkCommitmentBroadcasted: func(_ *wire.MsgTx) error {
|
MarkCommitmentBroadcasted: func(_ *wire.MsgTx, _ bool) error {
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
MarkChannelClosed: func(*channeldb.ChannelCloseSummary) error {
|
MarkChannelClosed: func(*channeldb.ChannelCloseSummary,
|
||||||
|
...channeldb.ChannelStatus) error {
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
IsPendingClose: false,
|
IsPendingClose: false,
|
||||||
|
@ -350,6 +379,11 @@ func createTestChannelArbitrator(t *testing.T, log ArbitratorLog) (*chanArbTestC
|
||||||
ChainEvents: chanEvents,
|
ChainEvents: chanEvents,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Apply all custom options to the config struct.
|
||||||
|
for _, option := range opts {
|
||||||
|
option(arbCfg)
|
||||||
|
}
|
||||||
|
|
||||||
var cleanUp func()
|
var cleanUp func()
|
||||||
if log == nil {
|
if log == nil {
|
||||||
dbDir, err := ioutil.TempDir("", "chanArb")
|
dbDir, err := ioutil.TempDir("", "chanArb")
|
||||||
|
@ -363,7 +397,7 @@ func createTestChannelArbitrator(t *testing.T, log ArbitratorLog) (*chanArbTestC
|
||||||
}
|
}
|
||||||
|
|
||||||
backingLog, err := newBoltArbitratorLog(
|
backingLog, err := newBoltArbitratorLog(
|
||||||
db, arbCfg, chainhash.Hash{}, chanPoint,
|
db, *arbCfg, chainhash.Hash{}, chanPoint,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -381,7 +415,7 @@ func createTestChannelArbitrator(t *testing.T, log ArbitratorLog) (*chanArbTestC
|
||||||
|
|
||||||
htlcSets := make(map[HtlcSetKey]htlcSet)
|
htlcSets := make(map[HtlcSetKey]htlcSet)
|
||||||
|
|
||||||
chanArb := NewChannelArbitrator(arbCfg, htlcSets, log)
|
chanArb := NewChannelArbitrator(*arbCfg, htlcSets, log)
|
||||||
|
|
||||||
return &chanArbTestCtx{
|
return &chanArbTestCtx{
|
||||||
t: t,
|
t: t,
|
||||||
|
@ -424,7 +458,9 @@ func TestChannelArbitratorCooperativeClose(t *testing.T) {
|
||||||
// We set up a channel to detect when MarkChannelClosed is called.
|
// We set up a channel to detect when MarkChannelClosed is called.
|
||||||
closeInfos := make(chan *channeldb.ChannelCloseSummary)
|
closeInfos := make(chan *channeldb.ChannelCloseSummary)
|
||||||
chanArbCtx.chanArb.cfg.MarkChannelClosed = func(
|
chanArbCtx.chanArb.cfg.MarkChannelClosed = func(
|
||||||
closeInfo *channeldb.ChannelCloseSummary) error {
|
closeInfo *channeldb.ChannelCloseSummary,
|
||||||
|
statuses ...channeldb.ChannelStatus) error {
|
||||||
|
|
||||||
closeInfos <- closeInfo
|
closeInfos <- closeInfo
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -441,7 +477,7 @@ func TestChannelArbitratorCooperativeClose(t *testing.T) {
|
||||||
if c.CloseType != channeldb.CooperativeClose {
|
if c.CloseType != channeldb.CooperativeClose {
|
||||||
t.Fatalf("expected cooperative close, got %v", c.CloseType)
|
t.Fatalf("expected cooperative close, got %v", c.CloseType)
|
||||||
}
|
}
|
||||||
case <-time.After(5 * time.Second):
|
case <-time.After(defaultTimeout):
|
||||||
t.Fatalf("timeout waiting for channel close")
|
t.Fatalf("timeout waiting for channel close")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -449,7 +485,7 @@ func TestChannelArbitratorCooperativeClose(t *testing.T) {
|
||||||
select {
|
select {
|
||||||
case <-chanArbCtx.resolvedChan:
|
case <-chanArbCtx.resolvedChan:
|
||||||
// Expected.
|
// Expected.
|
||||||
case <-time.After(5 * time.Second):
|
case <-time.After(defaultTimeout):
|
||||||
t.Fatalf("contract was not resolved")
|
t.Fatalf("contract was not resolved")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -504,7 +540,7 @@ func TestChannelArbitratorRemoteForceClose(t *testing.T) {
|
||||||
select {
|
select {
|
||||||
case <-chanArbCtx.resolvedChan:
|
case <-chanArbCtx.resolvedChan:
|
||||||
// Expected.
|
// Expected.
|
||||||
case <-time.After(5 * time.Second):
|
case <-time.After(defaultTimeout):
|
||||||
t.Fatalf("contract was not resolved")
|
t.Fatalf("contract was not resolved")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -566,7 +602,7 @@ func TestChannelArbitratorLocalForceClose(t *testing.T) {
|
||||||
if state != StateBroadcastCommit {
|
if state != StateBroadcastCommit {
|
||||||
t.Fatalf("state during PublishTx was %v", state)
|
t.Fatalf("state during PublishTx was %v", state)
|
||||||
}
|
}
|
||||||
case <-time.After(15 * time.Second):
|
case <-time.After(stateTimeout):
|
||||||
t.Fatalf("did not get state update")
|
t.Fatalf("did not get state update")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -576,7 +612,7 @@ func TestChannelArbitratorLocalForceClose(t *testing.T) {
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-respChan:
|
case <-respChan:
|
||||||
case <-time.After(5 * time.Second):
|
case <-time.After(defaultTimeout):
|
||||||
t.Fatalf("no response received")
|
t.Fatalf("no response received")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -585,7 +621,7 @@ func TestChannelArbitratorLocalForceClose(t *testing.T) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error force closing channel: %v", err)
|
t.Fatalf("error force closing channel: %v", err)
|
||||||
}
|
}
|
||||||
case <-time.After(5 * time.Second):
|
case <-time.After(defaultTimeout):
|
||||||
t.Fatalf("no response received")
|
t.Fatalf("no response received")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -610,7 +646,7 @@ func TestChannelArbitratorLocalForceClose(t *testing.T) {
|
||||||
select {
|
select {
|
||||||
case <-chanArbCtx.resolvedChan:
|
case <-chanArbCtx.resolvedChan:
|
||||||
// Expected.
|
// Expected.
|
||||||
case <-time.After(5 * time.Second):
|
case <-time.After(defaultTimeout):
|
||||||
t.Fatalf("contract was not resolved")
|
t.Fatalf("contract was not resolved")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -654,7 +690,7 @@ func TestChannelArbitratorBreachClose(t *testing.T) {
|
||||||
select {
|
select {
|
||||||
case <-chanArbCtx.resolvedChan:
|
case <-chanArbCtx.resolvedChan:
|
||||||
// Expected.
|
// Expected.
|
||||||
case <-time.After(5 * time.Second):
|
case <-time.After(defaultTimeout):
|
||||||
t.Fatalf("contract was not resolved")
|
t.Fatalf("contract was not resolved")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -738,7 +774,7 @@ func TestChannelArbitratorLocalForceClosePendingHtlc(t *testing.T) {
|
||||||
)
|
)
|
||||||
select {
|
select {
|
||||||
case <-respChan:
|
case <-respChan:
|
||||||
case <-time.After(5 * time.Second):
|
case <-time.After(defaultTimeout):
|
||||||
t.Fatalf("no response received")
|
t.Fatalf("no response received")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -747,7 +783,7 @@ func TestChannelArbitratorLocalForceClosePendingHtlc(t *testing.T) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error force closing channel: %v", err)
|
t.Fatalf("error force closing channel: %v", err)
|
||||||
}
|
}
|
||||||
case <-time.After(5 * time.Second):
|
case <-time.After(defaultTimeout):
|
||||||
t.Fatalf("no response received")
|
t.Fatalf("no response received")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -825,7 +861,7 @@ func TestChannelArbitratorLocalForceClosePendingHtlc(t *testing.T) {
|
||||||
t.Fatalf("wrong htlc index: expected %v, got %v",
|
t.Fatalf("wrong htlc index: expected %v, got %v",
|
||||||
outgoingDustHtlc.HtlcIndex, msgs[0].HtlcIndex)
|
outgoingDustHtlc.HtlcIndex, msgs[0].HtlcIndex)
|
||||||
}
|
}
|
||||||
case <-time.After(5 * time.Second):
|
case <-time.After(defaultTimeout):
|
||||||
t.Fatalf("resolution msgs not sent")
|
t.Fatalf("resolution msgs not sent")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -887,7 +923,7 @@ func TestChannelArbitratorLocalForceClosePendingHtlc(t *testing.T) {
|
||||||
// htlcTimeoutResolver and should send the contract off for incubation.
|
// htlcTimeoutResolver and should send the contract off for incubation.
|
||||||
select {
|
select {
|
||||||
case <-chanArbCtx.incubationRequests:
|
case <-chanArbCtx.incubationRequests:
|
||||||
case <-time.After(5 * time.Second):
|
case <-time.After(defaultTimeout):
|
||||||
t.Fatalf("no response received")
|
t.Fatalf("no response received")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -907,7 +943,7 @@ func TestChannelArbitratorLocalForceClosePendingHtlc(t *testing.T) {
|
||||||
t.Fatalf("wrong htlc index: expected %v, got %v",
|
t.Fatalf("wrong htlc index: expected %v, got %v",
|
||||||
htlc.HtlcIndex, msgs[0].HtlcIndex)
|
htlc.HtlcIndex, msgs[0].HtlcIndex)
|
||||||
}
|
}
|
||||||
case <-time.After(5 * time.Second):
|
case <-time.After(defaultTimeout):
|
||||||
t.Fatalf("resolution msgs not sent")
|
t.Fatalf("resolution msgs not sent")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -927,7 +963,7 @@ func TestChannelArbitratorLocalForceClosePendingHtlc(t *testing.T) {
|
||||||
chanArbCtxNew.AssertStateTransitions(StateFullyResolved)
|
chanArbCtxNew.AssertStateTransitions(StateFullyResolved)
|
||||||
select {
|
select {
|
||||||
case <-chanArbCtxNew.resolvedChan:
|
case <-chanArbCtxNew.resolvedChan:
|
||||||
case <-time.After(5 * time.Second):
|
case <-time.After(defaultTimeout):
|
||||||
t.Fatalf("contract was not resolved")
|
t.Fatalf("contract was not resolved")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -989,7 +1025,7 @@ func TestChannelArbitratorLocalForceCloseRemoteConfirmed(t *testing.T) {
|
||||||
if state != StateBroadcastCommit {
|
if state != StateBroadcastCommit {
|
||||||
t.Fatalf("state during PublishTx was %v", state)
|
t.Fatalf("state during PublishTx was %v", state)
|
||||||
}
|
}
|
||||||
case <-time.After(15 * time.Second):
|
case <-time.After(stateTimeout):
|
||||||
t.Fatalf("no state update received")
|
t.Fatalf("no state update received")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1000,7 +1036,7 @@ func TestChannelArbitratorLocalForceCloseRemoteConfirmed(t *testing.T) {
|
||||||
// Wait for a response to the force close.
|
// Wait for a response to the force close.
|
||||||
select {
|
select {
|
||||||
case <-respChan:
|
case <-respChan:
|
||||||
case <-time.After(5 * time.Second):
|
case <-time.After(defaultTimeout):
|
||||||
t.Fatalf("no response received")
|
t.Fatalf("no response received")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1009,7 +1045,7 @@ func TestChannelArbitratorLocalForceCloseRemoteConfirmed(t *testing.T) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error force closing channel: %v", err)
|
t.Fatalf("error force closing channel: %v", err)
|
||||||
}
|
}
|
||||||
case <-time.After(5 * time.Second):
|
case <-time.After(defaultTimeout):
|
||||||
t.Fatalf("no response received")
|
t.Fatalf("no response received")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1035,7 +1071,7 @@ func TestChannelArbitratorLocalForceCloseRemoteConfirmed(t *testing.T) {
|
||||||
select {
|
select {
|
||||||
case <-chanArbCtx.resolvedChan:
|
case <-chanArbCtx.resolvedChan:
|
||||||
// Expected.
|
// Expected.
|
||||||
case <-time.After(15 * time.Second):
|
case <-time.After(stateTimeout):
|
||||||
t.Fatalf("contract was not resolved")
|
t.Fatalf("contract was not resolved")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1097,7 +1133,7 @@ func TestChannelArbitratorLocalForceDoubleSpend(t *testing.T) {
|
||||||
if state != StateBroadcastCommit {
|
if state != StateBroadcastCommit {
|
||||||
t.Fatalf("state during PublishTx was %v", state)
|
t.Fatalf("state during PublishTx was %v", state)
|
||||||
}
|
}
|
||||||
case <-time.After(15 * time.Second):
|
case <-time.After(stateTimeout):
|
||||||
t.Fatalf("no state update received")
|
t.Fatalf("no state update received")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1108,7 +1144,7 @@ func TestChannelArbitratorLocalForceDoubleSpend(t *testing.T) {
|
||||||
// Wait for a response to the force close.
|
// Wait for a response to the force close.
|
||||||
select {
|
select {
|
||||||
case <-respChan:
|
case <-respChan:
|
||||||
case <-time.After(5 * time.Second):
|
case <-time.After(defaultTimeout):
|
||||||
t.Fatalf("no response received")
|
t.Fatalf("no response received")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1117,7 +1153,7 @@ func TestChannelArbitratorLocalForceDoubleSpend(t *testing.T) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error force closing channel: %v", err)
|
t.Fatalf("error force closing channel: %v", err)
|
||||||
}
|
}
|
||||||
case <-time.After(5 * time.Second):
|
case <-time.After(defaultTimeout):
|
||||||
t.Fatalf("no response received")
|
t.Fatalf("no response received")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1143,7 +1179,7 @@ func TestChannelArbitratorLocalForceDoubleSpend(t *testing.T) {
|
||||||
select {
|
select {
|
||||||
case <-chanArbCtx.resolvedChan:
|
case <-chanArbCtx.resolvedChan:
|
||||||
// Expected.
|
// Expected.
|
||||||
case <-time.After(15 * time.Second):
|
case <-time.After(stateTimeout):
|
||||||
t.Fatalf("contract was not resolved")
|
t.Fatalf("contract was not resolved")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1205,7 +1241,9 @@ func TestChannelArbitratorPersistence(t *testing.T) {
|
||||||
// Now we make the log succeed writing the resolutions, but fail when
|
// Now we make the log succeed writing the resolutions, but fail when
|
||||||
// attempting to close the channel.
|
// attempting to close the channel.
|
||||||
log.failLog = false
|
log.failLog = false
|
||||||
chanArb.cfg.MarkChannelClosed = func(*channeldb.ChannelCloseSummary) error {
|
chanArb.cfg.MarkChannelClosed = func(*channeldb.ChannelCloseSummary,
|
||||||
|
...channeldb.ChannelStatus) error {
|
||||||
|
|
||||||
return fmt.Errorf("intentional close error")
|
return fmt.Errorf("intentional close error")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1264,7 +1302,7 @@ func TestChannelArbitratorPersistence(t *testing.T) {
|
||||||
select {
|
select {
|
||||||
case <-chanArbCtx.resolvedChan:
|
case <-chanArbCtx.resolvedChan:
|
||||||
// Expected.
|
// Expected.
|
||||||
case <-time.After(5 * time.Second):
|
case <-time.After(defaultTimeout):
|
||||||
t.Fatalf("contract was not resolved")
|
t.Fatalf("contract was not resolved")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1328,7 +1366,7 @@ func TestChannelArbitratorForceCloseBreachedChannel(t *testing.T) {
|
||||||
if state != StateBroadcastCommit {
|
if state != StateBroadcastCommit {
|
||||||
t.Fatalf("state during PublishTx was %v", state)
|
t.Fatalf("state during PublishTx was %v", state)
|
||||||
}
|
}
|
||||||
case <-time.After(15 * time.Second):
|
case <-time.After(stateTimeout):
|
||||||
t.Fatalf("no state update received")
|
t.Fatalf("no state update received")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1339,7 +1377,7 @@ func TestChannelArbitratorForceCloseBreachedChannel(t *testing.T) {
|
||||||
t.Fatalf("unexpected error force closing channel: %v",
|
t.Fatalf("unexpected error force closing channel: %v",
|
||||||
err)
|
err)
|
||||||
}
|
}
|
||||||
case <-time.After(5 * time.Second):
|
case <-time.After(defaultTimeout):
|
||||||
t.Fatalf("no response received")
|
t.Fatalf("no response received")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1363,7 +1401,7 @@ func TestChannelArbitratorForceCloseBreachedChannel(t *testing.T) {
|
||||||
select {
|
select {
|
||||||
case <-chanArbCtx.resolvedChan:
|
case <-chanArbCtx.resolvedChan:
|
||||||
// Expected.
|
// Expected.
|
||||||
case <-time.After(5 * time.Second):
|
case <-time.After(defaultTimeout):
|
||||||
t.Fatalf("contract was not resolved")
|
t.Fatalf("contract was not resolved")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1457,7 +1495,8 @@ func TestChannelArbitratorCommitFailure(t *testing.T) {
|
||||||
|
|
||||||
closed := make(chan struct{})
|
closed := make(chan struct{})
|
||||||
chanArb.cfg.MarkChannelClosed = func(
|
chanArb.cfg.MarkChannelClosed = func(
|
||||||
*channeldb.ChannelCloseSummary) error {
|
*channeldb.ChannelCloseSummary,
|
||||||
|
...channeldb.ChannelStatus) error {
|
||||||
close(closed)
|
close(closed)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -1467,7 +1506,7 @@ func TestChannelArbitratorCommitFailure(t *testing.T) {
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-closed:
|
case <-closed:
|
||||||
case <-time.After(5 * time.Second):
|
case <-time.After(defaultTimeout):
|
||||||
t.Fatalf("channel was not marked closed")
|
t.Fatalf("channel was not marked closed")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1501,7 +1540,7 @@ func TestChannelArbitratorCommitFailure(t *testing.T) {
|
||||||
select {
|
select {
|
||||||
case <-chanArbCtx.resolvedChan:
|
case <-chanArbCtx.resolvedChan:
|
||||||
// Expected.
|
// Expected.
|
||||||
case <-time.After(5 * time.Second):
|
case <-time.After(defaultTimeout):
|
||||||
t.Fatalf("contract was not resolved")
|
t.Fatalf("contract was not resolved")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1805,7 +1844,7 @@ func TestChannelArbitratorDanglingCommitForceClose(t *testing.T) {
|
||||||
t.Fatalf("wrong htlc index: expected %v, got %v",
|
t.Fatalf("wrong htlc index: expected %v, got %v",
|
||||||
htlcIndex, msgs[0].HtlcIndex)
|
htlcIndex, msgs[0].HtlcIndex)
|
||||||
}
|
}
|
||||||
case <-time.After(5 * time.Second):
|
case <-time.After(defaultTimeout):
|
||||||
t.Fatalf("resolution msgs not sent")
|
t.Fatalf("resolution msgs not sent")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1902,3 +1941,150 @@ func TestChannelArbitratorPendingExpiredHTLC(t *testing.T) {
|
||||||
StateCommitmentBroadcasted,
|
StateCommitmentBroadcasted,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestRemoteCloseInitiator tests the setting of close initiator statuses
|
||||||
|
// for remote force closes and breaches.
|
||||||
|
func TestRemoteCloseInitiator(t *testing.T) {
|
||||||
|
// getCloseSummary returns a unilateral close summary for the channel
|
||||||
|
// provided.
|
||||||
|
getCloseSummary := func(channel *channeldb.OpenChannel) *RemoteUnilateralCloseInfo {
|
||||||
|
return &RemoteUnilateralCloseInfo{
|
||||||
|
UnilateralCloseSummary: &lnwallet.UnilateralCloseSummary{
|
||||||
|
SpendDetail: &chainntnfs.SpendDetail{
|
||||||
|
SpenderTxHash: &chainhash.Hash{},
|
||||||
|
SpendingTx: &wire.MsgTx{
|
||||||
|
TxIn: []*wire.TxIn{},
|
||||||
|
TxOut: []*wire.TxOut{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
ChannelCloseSummary: channeldb.ChannelCloseSummary{
|
||||||
|
ChanPoint: channel.FundingOutpoint,
|
||||||
|
RemotePub: channel.IdentityPub,
|
||||||
|
SettledBalance: btcutil.Amount(500),
|
||||||
|
TimeLockedBalance: btcutil.Amount(10000),
|
||||||
|
IsPending: false,
|
||||||
|
},
|
||||||
|
HtlcResolutions: &lnwallet.HtlcResolutions{},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
|
||||||
|
// notifyClose sends the appropriate chain event to indicate
|
||||||
|
// that the channel has closed. The event subscription channel
|
||||||
|
// is expected to be buffered, as is the default for test
|
||||||
|
// channel arbitrators.
|
||||||
|
notifyClose func(sub *ChainEventSubscription,
|
||||||
|
channel *channeldb.OpenChannel)
|
||||||
|
|
||||||
|
// expectedStates is the set of states we expect the arbitrator
|
||||||
|
// to progress through.
|
||||||
|
expectedStates []ArbitratorState
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "force close",
|
||||||
|
notifyClose: func(sub *ChainEventSubscription,
|
||||||
|
channel *channeldb.OpenChannel) {
|
||||||
|
|
||||||
|
s := getCloseSummary(channel)
|
||||||
|
sub.RemoteUnilateralClosure <- s
|
||||||
|
},
|
||||||
|
expectedStates: []ArbitratorState{
|
||||||
|
StateContractClosed, StateFullyResolved,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
test := test
|
||||||
|
|
||||||
|
t.Run(test.name, func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
// First, create alice's channel.
|
||||||
|
alice, _, cleanUp, err := lnwallet.CreateTestChannels(
|
||||||
|
true,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to create test channels: %v",
|
||||||
|
err)
|
||||||
|
}
|
||||||
|
defer cleanUp()
|
||||||
|
|
||||||
|
// Create a mock log which will not block the test's
|
||||||
|
// expected number of transitions transitions, and has
|
||||||
|
// no commit resolutions so that the channel will
|
||||||
|
// resolve immediately.
|
||||||
|
log := &mockArbitratorLog{
|
||||||
|
state: StateDefault,
|
||||||
|
newStates: make(chan ArbitratorState,
|
||||||
|
len(test.expectedStates)),
|
||||||
|
resolutions: &ContractResolutions{
|
||||||
|
CommitHash: chainhash.Hash{},
|
||||||
|
CommitResolution: nil,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mock marking the channel as closed, we only care
|
||||||
|
// about setting of channel status.
|
||||||
|
mockMarkClosed := func(_ *channeldb.ChannelCloseSummary,
|
||||||
|
statuses ...channeldb.ChannelStatus) error {
|
||||||
|
for _, status := range statuses {
|
||||||
|
err := alice.State().ApplyChanStatus(status)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
chanArbCtx, err := createTestChannelArbitrator(
|
||||||
|
t, log, withMarkClosed(mockMarkClosed),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to create "+
|
||||||
|
"ChannelArbitrator: %v", err)
|
||||||
|
}
|
||||||
|
chanArb := chanArbCtx.chanArb
|
||||||
|
|
||||||
|
if err := chanArb.Start(); err != nil {
|
||||||
|
t.Fatalf("unable to start "+
|
||||||
|
"ChannelArbitrator: %v", err)
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
if err := chanArb.Stop(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// It should start out in the default state.
|
||||||
|
chanArbCtx.AssertState(StateDefault)
|
||||||
|
|
||||||
|
// Notify the close event.
|
||||||
|
test.notifyClose(chanArb.cfg.ChainEvents, alice.State())
|
||||||
|
|
||||||
|
// Check that the channel transitions as expected.
|
||||||
|
chanArbCtx.AssertStateTransitions(
|
||||||
|
test.expectedStates...,
|
||||||
|
)
|
||||||
|
|
||||||
|
// It should also mark the channel as resolved.
|
||||||
|
select {
|
||||||
|
case <-chanArbCtx.resolvedChan:
|
||||||
|
// Expected.
|
||||||
|
case <-time.After(defaultTimeout):
|
||||||
|
t.Fatalf("contract was not resolved")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check that alice has the status we expect.
|
||||||
|
if !alice.State().HasChanStatus(
|
||||||
|
channeldb.ChanStatusRemoteCloseInitiator,
|
||||||
|
) {
|
||||||
|
t.Fatalf("expected remote close initiator, "+
|
||||||
|
"got: %v", alice.State().ChanStatus())
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -1009,7 +1009,11 @@ func (f *fundingManager) advancePendingChannelState(
|
||||||
LocalChanConfig: ch.LocalChanCfg,
|
LocalChanConfig: ch.LocalChanCfg,
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := ch.CloseChannel(closeInfo); err != nil {
|
// Close the channel with us as the initiator because we are
|
||||||
|
// timing the channel out.
|
||||||
|
if err := ch.CloseChannel(
|
||||||
|
closeInfo, channeldb.ChanStatusLocalCloseInitiator,
|
||||||
|
); err != nil {
|
||||||
return fmt.Errorf("failed closing channel "+
|
return fmt.Errorf("failed closing channel "+
|
||||||
"%v: %v", ch.FundingOutpoint, err)
|
"%v: %v", ch.FundingOutpoint, err)
|
||||||
}
|
}
|
||||||
|
@ -1639,7 +1643,11 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) {
|
||||||
LocalChanConfig: completeChan.LocalChanCfg,
|
LocalChanConfig: completeChan.LocalChanCfg,
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := completeChan.CloseChannel(closeInfo); err != nil {
|
// Close the channel with us as the initiator because we are
|
||||||
|
// deciding to exit the funding flow due to an internal error.
|
||||||
|
if err := completeChan.CloseChannel(
|
||||||
|
closeInfo, channeldb.ChanStatusLocalCloseInitiator,
|
||||||
|
); err != nil {
|
||||||
fndgLog.Errorf("Failed closing channel %v: %v",
|
fndgLog.Errorf("Failed closing channel %v: %v",
|
||||||
completeChan.FundingOutpoint, err)
|
completeChan.FundingOutpoint, err)
|
||||||
}
|
}
|
||||||
|
|
1304
lnrpc/rpc.pb.go
1304
lnrpc/rpc.pb.go
File diff suppressed because it is too large
Load diff
|
@ -1430,6 +1430,29 @@ message ChannelCloseSummary {
|
||||||
|
|
||||||
/// Details on how the channel was closed.
|
/// Details on how the channel was closed.
|
||||||
ClosureType close_type = 10 [json_name = "close_type"];
|
ClosureType close_type = 10 [json_name = "close_type"];
|
||||||
|
|
||||||
|
enum Initiator {
|
||||||
|
UNKNOWN = 0;
|
||||||
|
LOCAL = 1;
|
||||||
|
REMOTE = 2;
|
||||||
|
BOTH = 3;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
Open initiator is the party that initiated opening the channel. Note that
|
||||||
|
this value may be unknown if the channel was closed before we migrated to
|
||||||
|
store open channel information after close.
|
||||||
|
*/
|
||||||
|
Initiator open_initiator = 11 [json_name = "open_initiator"];
|
||||||
|
|
||||||
|
/**
|
||||||
|
Close initiator indicates which party initiated the close. This value will
|
||||||
|
be unknown for channels that were cooperatively closed before we started
|
||||||
|
tracking cooperative close initiators. Note that this indicates which party
|
||||||
|
initiated a close, and it is possible for both to initiate cooperative or
|
||||||
|
force closes, although only one party's close will be confirmed on chain.
|
||||||
|
*/
|
||||||
|
Initiator close_initiator = 12 [json_name = "close_initiator"];
|
||||||
}
|
}
|
||||||
|
|
||||||
message ClosedChannelsRequest {
|
message ClosedChannelsRequest {
|
||||||
|
|
|
@ -1461,6 +1461,16 @@
|
||||||
],
|
],
|
||||||
"default": "COOPERATIVE_CLOSE"
|
"default": "COOPERATIVE_CLOSE"
|
||||||
},
|
},
|
||||||
|
"ChannelCloseSummaryInitiator": {
|
||||||
|
"type": "string",
|
||||||
|
"enum": [
|
||||||
|
"UNKNOWN",
|
||||||
|
"LOCAL",
|
||||||
|
"REMOTE",
|
||||||
|
"BOTH"
|
||||||
|
],
|
||||||
|
"default": "UNKNOWN"
|
||||||
|
},
|
||||||
"ChannelEventUpdateUpdateType": {
|
"ChannelEventUpdateUpdateType": {
|
||||||
"type": "string",
|
"type": "string",
|
||||||
"enum": [
|
"enum": [
|
||||||
|
@ -2059,6 +2069,14 @@
|
||||||
"close_type": {
|
"close_type": {
|
||||||
"$ref": "#/definitions/ChannelCloseSummaryClosureType",
|
"$ref": "#/definitions/ChannelCloseSummaryClosureType",
|
||||||
"description": "/ Details on how the channel was closed."
|
"description": "/ Details on how the channel was closed."
|
||||||
|
},
|
||||||
|
"open_initiator": {
|
||||||
|
"$ref": "#/definitions/ChannelCloseSummaryInitiator",
|
||||||
|
"description": "*\nOpen initiator is the party that initiated opening the channel. Note that\nthis value may be unknown if the channel was closed before we migrated to\nstore open channel information after close."
|
||||||
|
},
|
||||||
|
"close_initiator": {
|
||||||
|
"$ref": "#/definitions/ChannelCloseSummaryInitiator",
|
||||||
|
"description": "*\nClose initiator indicates which party initiated the close. This value will\nbe unknown for channels that were cooperatively closed before we started\ntracking cooperative close initiators. Note that this indicates which party\ninitiated a close, and it is possible for both to initiate cooperative or\nforce closes, although only one party's close will be confirmed on chain."
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
|
@ -6382,7 +6382,8 @@ func subscribeChannelNotifications(ctxb context.Context, t *harnessTest,
|
||||||
// verifyCloseUpdate is used to verify that a closed channel update is of the
|
// verifyCloseUpdate is used to verify that a closed channel update is of the
|
||||||
// expected type.
|
// expected type.
|
||||||
func verifyCloseUpdate(chanUpdate *lnrpc.ChannelEventUpdate,
|
func verifyCloseUpdate(chanUpdate *lnrpc.ChannelEventUpdate,
|
||||||
force bool, forceType lnrpc.ChannelCloseSummary_ClosureType) error {
|
closeType lnrpc.ChannelCloseSummary_ClosureType,
|
||||||
|
closeInitiator lnrpc.ChannelCloseSummary_Initiator) error {
|
||||||
|
|
||||||
// We should receive one inactive and one closed notification
|
// We should receive one inactive and one closed notification
|
||||||
// for each channel.
|
// for each channel.
|
||||||
|
@ -6401,23 +6402,19 @@ func verifyCloseUpdate(chanUpdate *lnrpc.ChannelEventUpdate,
|
||||||
chanUpdate.Type)
|
chanUpdate.Type)
|
||||||
}
|
}
|
||||||
|
|
||||||
switch force {
|
if update.ClosedChannel.CloseType != closeType {
|
||||||
case true:
|
return fmt.Errorf("channel closure type "+
|
||||||
if update.ClosedChannel.CloseType != forceType {
|
"mismatch: expected %v, got %v",
|
||||||
return fmt.Errorf("channel closure type mismatch: "+
|
closeType,
|
||||||
"expected %v, got %v",
|
update.ClosedChannel.CloseType)
|
||||||
forceType,
|
|
||||||
update.ClosedChannel.CloseType)
|
|
||||||
}
|
|
||||||
case false:
|
|
||||||
if update.ClosedChannel.CloseType !=
|
|
||||||
lnrpc.ChannelCloseSummary_COOPERATIVE_CLOSE {
|
|
||||||
return fmt.Errorf("channel closure type "+
|
|
||||||
"mismatch: expected %v, got %v",
|
|
||||||
lnrpc.ChannelCloseSummary_COOPERATIVE_CLOSE,
|
|
||||||
update.ClosedChannel.CloseType)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if update.ClosedChannel.CloseInitiator != closeInitiator {
|
||||||
|
return fmt.Errorf("expected close intiator: %v, got: %v",
|
||||||
|
closeInitiator,
|
||||||
|
update.ClosedChannel.CloseInitiator)
|
||||||
|
}
|
||||||
|
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("channel update channel of wrong type, "+
|
return fmt.Errorf("channel update channel of wrong type, "+
|
||||||
"expected closed channel, got %T",
|
"expected closed channel, got %T",
|
||||||
|
@ -6529,18 +6526,29 @@ func testBasicChannelCreationAndUpdates(net *lntest.NetworkHarness, t *harnessTe
|
||||||
// verifyCloseUpdatesReceived is used to verify that Alice and Bob
|
// verifyCloseUpdatesReceived is used to verify that Alice and Bob
|
||||||
// receive the correct channel updates in order.
|
// receive the correct channel updates in order.
|
||||||
verifyCloseUpdatesReceived := func(sub channelSubscription,
|
verifyCloseUpdatesReceived := func(sub channelSubscription,
|
||||||
forceType lnrpc.ChannelCloseSummary_ClosureType) error {
|
forceType lnrpc.ChannelCloseSummary_ClosureType,
|
||||||
|
closeInitiator lnrpc.ChannelCloseSummary_Initiator) error {
|
||||||
|
|
||||||
// Ensure one inactive and one closed notification is received for each
|
// Ensure one inactive and one closed notification is received for each
|
||||||
// closed channel.
|
// closed channel.
|
||||||
numChannelUpds := 0
|
numChannelUpds := 0
|
||||||
for numChannelUpds < 2*numChannels {
|
for numChannelUpds < 2*numChannels {
|
||||||
// Every other channel should be force closed.
|
expectedCloseType := lnrpc.ChannelCloseSummary_COOPERATIVE_CLOSE
|
||||||
|
|
||||||
|
// Every other channel should be force closed. If this
|
||||||
|
// channel was force closed, set the expected close type
|
||||||
|
// the the type passed in.
|
||||||
force := (numChannelUpds/2)%2 == 0
|
force := (numChannelUpds/2)%2 == 0
|
||||||
|
if force {
|
||||||
|
expectedCloseType = forceType
|
||||||
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case chanUpdate := <-sub.updateChan:
|
case chanUpdate := <-sub.updateChan:
|
||||||
err := verifyCloseUpdate(chanUpdate, force, forceType)
|
err := verifyCloseUpdate(
|
||||||
|
chanUpdate, expectedCloseType,
|
||||||
|
closeInitiator,
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -6549,9 +6557,10 @@ func testBasicChannelCreationAndUpdates(net *lntest.NetworkHarness, t *harnessTe
|
||||||
case err := <-sub.errChan:
|
case err := <-sub.errChan:
|
||||||
return err
|
return err
|
||||||
case <-time.After(time.Second * 10):
|
case <-time.After(time.Second * 10):
|
||||||
return fmt.Errorf("timeout waiting for channel "+
|
return fmt.Errorf("timeout waiting "+
|
||||||
"notifications, only received %d/%d "+
|
"for channel notifications, only "+
|
||||||
"chanupds", numChannelUpds, 2*numChannels)
|
"received %d/%d chanupds",
|
||||||
|
numChannelUpds, 2*numChannels)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -6560,15 +6569,21 @@ func testBasicChannelCreationAndUpdates(net *lntest.NetworkHarness, t *harnessTe
|
||||||
|
|
||||||
// Verify Bob receives all closed channel notifications. He should
|
// Verify Bob receives all closed channel notifications. He should
|
||||||
// receive a remote force close notification for force closed channels.
|
// receive a remote force close notification for force closed channels.
|
||||||
|
// All channels (cooperatively and force closed) should have a remote
|
||||||
|
// close initiator because Alice closed the channels.
|
||||||
if err := verifyCloseUpdatesReceived(bobChanSub,
|
if err := verifyCloseUpdatesReceived(bobChanSub,
|
||||||
lnrpc.ChannelCloseSummary_REMOTE_FORCE_CLOSE); err != nil {
|
lnrpc.ChannelCloseSummary_REMOTE_FORCE_CLOSE,
|
||||||
|
lnrpc.ChannelCloseSummary_REMOTE); err != nil {
|
||||||
t.Fatalf("errored verifying close updates: %v", err)
|
t.Fatalf("errored verifying close updates: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify Alice receives all closed channel notifications. She should
|
// Verify Alice receives all closed channel notifications. She should
|
||||||
// receive a remote force close notification for force closed channels.
|
// receive a remote force close notification for force closed channels.
|
||||||
|
// All channels (cooperatively and force closed) should have a local
|
||||||
|
// close initiator because Alice closed the channels.
|
||||||
if err := verifyCloseUpdatesReceived(aliceChanSub,
|
if err := verifyCloseUpdatesReceived(aliceChanSub,
|
||||||
lnrpc.ChannelCloseSummary_LOCAL_FORCE_CLOSE); err != nil {
|
lnrpc.ChannelCloseSummary_LOCAL_FORCE_CLOSE,
|
||||||
|
lnrpc.ChannelCloseSummary_LOCAL); err != nil {
|
||||||
t.Fatalf("errored verifying close updates: %v", err)
|
t.Fatalf("errored verifying close updates: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -6433,22 +6433,28 @@ func (lc *LightningChannel) MarkBorked() error {
|
||||||
|
|
||||||
// MarkCommitmentBroadcasted marks the channel as a commitment transaction has
|
// MarkCommitmentBroadcasted marks the channel as a commitment transaction has
|
||||||
// been broadcast, either our own or the remote, and we should watch the chain
|
// been broadcast, either our own or the remote, and we should watch the chain
|
||||||
// for it to confirm before taking any further action.
|
// for it to confirm before taking any further action. It takes a boolean which
|
||||||
func (lc *LightningChannel) MarkCommitmentBroadcasted(tx *wire.MsgTx) error {
|
// indicates whether we initiated the close.
|
||||||
|
func (lc *LightningChannel) MarkCommitmentBroadcasted(tx *wire.MsgTx,
|
||||||
|
locallyInitiated bool) error {
|
||||||
|
|
||||||
lc.Lock()
|
lc.Lock()
|
||||||
defer lc.Unlock()
|
defer lc.Unlock()
|
||||||
|
|
||||||
return lc.channelState.MarkCommitmentBroadcasted(tx)
|
return lc.channelState.MarkCommitmentBroadcasted(tx, locallyInitiated)
|
||||||
}
|
}
|
||||||
|
|
||||||
// MarkCoopBroadcasted marks the channel as a cooperative close transaction has
|
// MarkCoopBroadcasted marks the channel as a cooperative close transaction has
|
||||||
// been broadcast, and that we should watch the chain for it to confirm before
|
// been broadcast, and that we should watch the chain for it to confirm before
|
||||||
// taking any further action.
|
// taking any further action. It takes a locally initiated bool which is true
|
||||||
func (lc *LightningChannel) MarkCoopBroadcasted(tx *wire.MsgTx) error {
|
// if we initiated the cooperative close.
|
||||||
|
func (lc *LightningChannel) MarkCoopBroadcasted(tx *wire.MsgTx,
|
||||||
|
localInitiated bool) error {
|
||||||
|
|
||||||
lc.Lock()
|
lc.Lock()
|
||||||
defer lc.Unlock()
|
defer lc.Unlock()
|
||||||
|
|
||||||
return lc.channelState.MarkCoopBroadcasted(tx)
|
return lc.channelState.MarkCoopBroadcasted(tx, localInitiated)
|
||||||
}
|
}
|
||||||
|
|
||||||
// MarkDataLoss marks sets the channel status to LocalDataLoss and stores the
|
// MarkDataLoss marks sets the channel status to LocalDataLoss and stores the
|
||||||
|
|
2
peer.go
2
peer.go
|
@ -2119,6 +2119,7 @@ func (p *peer) fetchActiveChanCloser(chanID lnwire.ChannelID) (*channelCloser, e
|
||||||
feePerKw,
|
feePerKw,
|
||||||
uint32(startingHeight),
|
uint32(startingHeight),
|
||||||
nil,
|
nil,
|
||||||
|
false,
|
||||||
)
|
)
|
||||||
p.activeChanCloses[chanID] = chanCloser
|
p.activeChanCloses[chanID] = chanCloser
|
||||||
}
|
}
|
||||||
|
@ -2231,6 +2232,7 @@ func (p *peer) handleLocalCloseReq(req *htlcswitch.ChanClose) {
|
||||||
req.TargetFeePerKw,
|
req.TargetFeePerKw,
|
||||||
uint32(startingHeight),
|
uint32(startingHeight),
|
||||||
req,
|
req,
|
||||||
|
true,
|
||||||
)
|
)
|
||||||
p.activeChanCloses[chanID] = chanCloser
|
p.activeChanCloses[chanID] = chanCloser
|
||||||
|
|
||||||
|
|
104
rpcserver.go
104
rpcserver.go
|
@ -2922,7 +2922,11 @@ func (r *rpcServer) ClosedChannels(ctx context.Context,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
channel := createRPCClosedChannel(dbChannel)
|
channel, err := r.createRPCClosedChannel(dbChannel)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
resp.Channels = append(resp.Channels, channel)
|
resp.Channels = append(resp.Channels, channel)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3137,13 +3141,29 @@ func createRPCOpenChannel(r *rpcServer, graph *channeldb.ChannelGraph,
|
||||||
|
|
||||||
// createRPCClosedChannel creates an *lnrpc.ClosedChannelSummary from a
|
// createRPCClosedChannel creates an *lnrpc.ClosedChannelSummary from a
|
||||||
// *channeldb.ChannelCloseSummary.
|
// *channeldb.ChannelCloseSummary.
|
||||||
func createRPCClosedChannel(
|
func (r *rpcServer) createRPCClosedChannel(
|
||||||
dbChannel *channeldb.ChannelCloseSummary) *lnrpc.ChannelCloseSummary {
|
dbChannel *channeldb.ChannelCloseSummary) (*lnrpc.ChannelCloseSummary, error) {
|
||||||
|
|
||||||
nodePub := dbChannel.RemotePub
|
nodePub := dbChannel.RemotePub
|
||||||
nodeID := hex.EncodeToString(nodePub.SerializeCompressed())
|
nodeID := hex.EncodeToString(nodePub.SerializeCompressed())
|
||||||
|
|
||||||
var closeType lnrpc.ChannelCloseSummary_ClosureType
|
var (
|
||||||
|
closeType lnrpc.ChannelCloseSummary_ClosureType
|
||||||
|
openInit lnrpc.ChannelCloseSummary_Initiator
|
||||||
|
closeInitiator lnrpc.ChannelCloseSummary_Initiator
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
|
||||||
|
// Lookup local and remote cooperative initiators. If these values
|
||||||
|
// are not known they will just return unknown.
|
||||||
|
openInit, closeInitiator, err = r.getInitiators(
|
||||||
|
&dbChannel.ChanPoint,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convert the close type to rpc type.
|
||||||
switch dbChannel.CloseType {
|
switch dbChannel.CloseType {
|
||||||
case channeldb.CooperativeClose:
|
case channeldb.CooperativeClose:
|
||||||
closeType = lnrpc.ChannelCloseSummary_COOPERATIVE_CLOSE
|
closeType = lnrpc.ChannelCloseSummary_COOPERATIVE_CLOSE
|
||||||
|
@ -3170,7 +3190,75 @@ func createRPCClosedChannel(
|
||||||
TimeLockedBalance: int64(dbChannel.TimeLockedBalance),
|
TimeLockedBalance: int64(dbChannel.TimeLockedBalance),
|
||||||
ChainHash: dbChannel.ChainHash.String(),
|
ChainHash: dbChannel.ChainHash.String(),
|
||||||
ClosingTxHash: dbChannel.ClosingTXID.String(),
|
ClosingTxHash: dbChannel.ClosingTXID.String(),
|
||||||
|
OpenInitiator: openInit,
|
||||||
|
CloseInitiator: closeInitiator,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// getInitiators returns an initiator enum that provides information about the
|
||||||
|
// party that initiated channel's open and close. This information is obtained
|
||||||
|
// from the historical channel bucket, so unknown values are returned when the
|
||||||
|
// channel is not present (which indicates that it was closed before we started
|
||||||
|
// writing channels to the historical close bucket).
|
||||||
|
func (r *rpcServer) getInitiators(chanPoint *wire.OutPoint) (
|
||||||
|
lnrpc.ChannelCloseSummary_Initiator,
|
||||||
|
lnrpc.ChannelCloseSummary_Initiator, error) {
|
||||||
|
|
||||||
|
var (
|
||||||
|
openInitiator = lnrpc.ChannelCloseSummary_UNKNOWN
|
||||||
|
closeInitiator = lnrpc.ChannelCloseSummary_UNKNOWN
|
||||||
|
)
|
||||||
|
|
||||||
|
// To get the close initiator for cooperative closes, we need
|
||||||
|
// to get the channel status from the historical channel bucket.
|
||||||
|
histChan, err := r.server.chanDB.FetchHistoricalChannel(chanPoint)
|
||||||
|
switch {
|
||||||
|
// The node has upgraded from a version where we did not store
|
||||||
|
// historical channels, and has not closed a channel since. Do
|
||||||
|
// not return an error, initiator values are unknown.
|
||||||
|
case err == channeldb.ErrNoHistoricalBucket:
|
||||||
|
return openInitiator, closeInitiator, nil
|
||||||
|
|
||||||
|
// The channel was closed before we started storing historical
|
||||||
|
// channels. Do not return an error, initiator values are unknown.
|
||||||
|
case err == channeldb.ErrChannelNotFound:
|
||||||
|
return openInitiator, closeInitiator, nil
|
||||||
|
|
||||||
|
case err != nil:
|
||||||
|
return 0, 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If we successfully looked up the channel, determine initiator based
|
||||||
|
// on channels status.
|
||||||
|
if histChan.IsInitiator {
|
||||||
|
openInitiator = lnrpc.ChannelCloseSummary_LOCAL
|
||||||
|
} else {
|
||||||
|
openInitiator = lnrpc.ChannelCloseSummary_REMOTE
|
||||||
|
}
|
||||||
|
|
||||||
|
localInit := histChan.HasChanStatus(
|
||||||
|
channeldb.ChanStatusLocalCloseInitiator,
|
||||||
|
)
|
||||||
|
|
||||||
|
remoteInit := histChan.HasChanStatus(
|
||||||
|
channeldb.ChanStatusRemoteCloseInitiator,
|
||||||
|
)
|
||||||
|
|
||||||
|
switch {
|
||||||
|
// There is a possible case where closes were attempted by both parties.
|
||||||
|
// We return the initiator as both in this case to provide full
|
||||||
|
// information about the close.
|
||||||
|
case localInit && remoteInit:
|
||||||
|
closeInitiator = lnrpc.ChannelCloseSummary_BOTH
|
||||||
|
|
||||||
|
case localInit:
|
||||||
|
closeInitiator = lnrpc.ChannelCloseSummary_LOCAL
|
||||||
|
|
||||||
|
case remoteInit:
|
||||||
|
closeInitiator = lnrpc.ChannelCloseSummary_REMOTE
|
||||||
|
}
|
||||||
|
|
||||||
|
return openInitiator, closeInitiator, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// SubscribeChannelEvents returns a uni-directional stream (server -> client)
|
// SubscribeChannelEvents returns a uni-directional stream (server -> client)
|
||||||
|
@ -3222,7 +3310,13 @@ func (r *rpcServer) SubscribeChannelEvents(req *lnrpc.ChannelEventSubscription,
|
||||||
}
|
}
|
||||||
|
|
||||||
case channelnotifier.ClosedChannelEvent:
|
case channelnotifier.ClosedChannelEvent:
|
||||||
closedChannel := createRPCClosedChannel(event.CloseSummary)
|
closedChannel, err := r.createRPCClosedChannel(
|
||||||
|
event.CloseSummary,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
update = &lnrpc.ChannelEventUpdate{
|
update = &lnrpc.ChannelEventUpdate{
|
||||||
Type: lnrpc.ChannelEventUpdate_CLOSED_CHANNEL,
|
Type: lnrpc.ChannelEventUpdate_CLOSED_CHANNEL,
|
||||||
Channel: &lnrpc.ChannelEventUpdate_ClosedChannel{
|
Channel: &lnrpc.ChannelEventUpdate_ClosedChannel{
|
||||||
|
|
Loading…
Add table
Reference in a new issue