From 9aa55b164e366bf180d784a20e08b93ec14dd5e0 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Thu, 31 May 2018 12:41:51 +0200 Subject: [PATCH 1/4] breacharbiter test: extract common init logic --- breacharbiter_test.go | 79 ++++++++++++------------------------------- 1 file changed, 21 insertions(+), 58 deletions(-) diff --git a/breacharbiter_test.go b/breacharbiter_test.go index b9c00a094..edb45b0ae 100644 --- a/breacharbiter_test.go +++ b/breacharbiter_test.go @@ -933,11 +933,10 @@ restartCheck: } } -// TestBreachHandoffSuccess tests that a channel's close observer properly -// delivers retribution information to the breach arbiter in response to a -// breach close. This test verifies correctness in the event that the handoff -// experiences no interruptions. -func TestBreachHandoffSuccess(t *testing.T) { +func initBreachedState(t *testing.T) (*breachArbiter, + *lnwallet.LightningChannel, *lnwallet.LightningChannel, + *lnwallet.LocalForceCloseSummary, chan *ContractBreachEvent, + func(), func()) { // Create a pair of channels using a notifier that allows us to signal // a spend of the funding transaction. Alice's channel will be the on // observing a breach. @@ -945,7 +944,6 @@ func TestBreachHandoffSuccess(t *testing.T) { if err != nil { t.Fatalf("unable to create test channels: %v", err) } - defer cleanUpChans() // Instantiate a breach arbiter to handle the breach of alice's channel. contractBreaches := make(chan *ContractBreachEvent) @@ -956,7 +954,6 @@ func TestBreachHandoffSuccess(t *testing.T) { if err != nil { t.Fatalf("unable to initialize test breach arbiter: %v", err) } - defer cleanUpArb() // Send one HTLC to Bob and perform a state transition to lock it in. htlcAmount := lnwire.NewMSatFromSatoshis(20000) @@ -991,6 +988,20 @@ func TestBreachHandoffSuccess(t *testing.T) { t.Fatalf("Can't update the channel state: %v", err) } + return brar, alice, bob, bobClose, contractBreaches, cleanUpChans, + cleanUpArb +} + +// TestBreachHandoffSuccess tests that a channel's close observer properly +// delivers retribution information to the breach arbiter in response to a +// breach close. This test verifies correctness in the event that the handoff +// experiences no interruptions. +func TestBreachHandoffSuccess(t *testing.T) { + brar, alice, _, bobClose, contractBreaches, + cleanUpChans, cleanUpArb := initBreachedState(t) + defer cleanUpChans() + defer cleanUpArb() + chanPoint := alice.ChanPoint // Signal a spend of the funding transaction and wait for the close @@ -1052,59 +1063,11 @@ func TestBreachHandoffSuccess(t *testing.T) { // arbiter fails to write the information to disk, and that a subsequent attempt // at the handoff succeeds. func TestBreachHandoffFail(t *testing.T) { - // Create a pair of channels using a notifier that allows us to signal - // a spend of the funding transaction. Alice's channel will be the on - // observing a breach. - alice, bob, cleanUpChans, err := createInitChannels(1) - if err != nil { - t.Fatalf("unable to create test channels: %v", err) - } + brar, alice, _, bobClose, contractBreaches, + cleanUpChans, cleanUpArb := initBreachedState(t) defer cleanUpChans() - - // Instantiate a breach arbiter to handle the breach of alice's channel. - contractBreaches := make(chan *ContractBreachEvent) - - brar, cleanUpArb, err := createTestArbiter( - t, contractBreaches, alice.State().Db, - ) - if err != nil { - t.Fatalf("unable to initialize test breach arbiter: %v", err) - } defer cleanUpArb() - // Send one HTLC to Bob and perform a state transition to lock it in. - htlcAmount := lnwire.NewMSatFromSatoshis(20000) - htlc, _ := createHTLC(0, htlcAmount) - if _, err := alice.AddHTLC(htlc, nil); err != nil { - t.Fatalf("alice unable to add htlc: %v", err) - } - if _, err := bob.ReceiveHTLC(htlc); err != nil { - t.Fatalf("bob unable to recv add htlc: %v", err) - } - if err := forceStateTransition(alice, bob); err != nil { - t.Fatalf("Can't update the channel state: %v", err) - } - - // Generate the force close summary at this point in time, this will - // serve as the old state bob will broadcast. - bobClose, err := bob.ForceClose() - if err != nil { - t.Fatalf("unable to force close bob's channel: %v", err) - } - - // Now send another HTLC and perform a state transition, this ensures - // Alice is ahead of the state Bob will broadcast. - htlc2, _ := createHTLC(1, htlcAmount) - if _, err := alice.AddHTLC(htlc2, nil); err != nil { - t.Fatalf("alice unable to add htlc: %v", err) - } - if _, err := bob.ReceiveHTLC(htlc2); err != nil { - t.Fatalf("bob unable to recv add htlc: %v", err) - } - if err := forceStateTransition(alice, bob); err != nil { - t.Fatalf("Can't update the channel state: %v", err) - } - // Before alerting Alice of the breach, instruct our failing retribution // store to fail the next database operation, which we expect to write // the information handed off by the channel's close observer. @@ -1139,7 +1102,7 @@ func TestBreachHandoffFail(t *testing.T) { assertNoArbiterBreach(t, brar, chanPoint) assertNotPendingClosed(t, alice) - brar, cleanUpArb, err = createTestArbiter( + brar, cleanUpArb, err := createTestArbiter( t, contractBreaches, alice.State().Db, ) if err != nil { From 60d9ae02c7f4f8be09947ced25b964a46a74d7c8 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Sat, 2 Jun 2018 10:02:20 +0200 Subject: [PATCH 2/4] mock: protect mockSpendNotifier map by mutex --- mock.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/mock.go b/mock.go index 35154ab93..5caf51369 100644 --- a/mock.go +++ b/mock.go @@ -118,6 +118,7 @@ func (m *mockNotfier) RegisterSpendNtfn(outpoint *wire.OutPoint, type mockSpendNotifier struct { *mockNotfier spendMap map[wire.OutPoint][]chan *chainntnfs.SpendDetail + mtx sync.Mutex } func makeMockSpendNotifier() *mockSpendNotifier { @@ -131,6 +132,8 @@ func makeMockSpendNotifier() *mockSpendNotifier { func (m *mockSpendNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, heightHint uint32, _ bool) (*chainntnfs.SpendEvent, error) { + m.mtx.Lock() + defer m.mtx.Unlock() spendChan := make(chan *chainntnfs.SpendDetail) m.spendMap[*outpoint] = append(m.spendMap[*outpoint], spendChan) @@ -145,6 +148,8 @@ func (m *mockSpendNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, // will include the transaction and height provided by the caller. func (m *mockSpendNotifier) Spend(outpoint *wire.OutPoint, height int32, txn *wire.MsgTx) { + m.mtx.Lock() + defer m.mtx.Unlock() if spendChans, ok := m.spendMap[*outpoint]; ok { delete(m.spendMap, *outpoint) From e0560741b4507c9d551c3768eb7bb5ff995e3b1b Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Thu, 31 May 2018 12:52:02 +0200 Subject: [PATCH 3/4] breacharbiter test: add TestBreachSecondLevelTransfer --- breacharbiter_test.go | 112 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 112 insertions(+) diff --git a/breacharbiter_test.go b/breacharbiter_test.go index edb45b0ae..ae617dac0 100644 --- a/breacharbiter_test.go +++ b/breacharbiter_test.go @@ -20,6 +20,7 @@ import ( "github.com/btcsuite/btclog" "github.com/go-errors/errors" + "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/keychain" @@ -1149,6 +1150,117 @@ func TestBreachHandoffFail(t *testing.T) { assertArbiterBreach(t, brar, chanPoint) } +// TestBreachSecondLevelTransfer tests that sweep of a HTLC output on a +// breached commitment is transferred to a second level spend if the output is +// already spent. +func TestBreachSecondLevelTransfer(t *testing.T) { + brar, alice, _, bobClose, contractBreaches, + cleanUpChans, cleanUpArb := initBreachedState(t) + defer cleanUpChans() + defer cleanUpArb() + + var ( + height = bobClose.ChanSnapshot.CommitHeight + forceCloseTx = bobClose.CloseTx + chanPoint = alice.ChanPoint + publTx = make(chan *wire.MsgTx) + publErr error + ) + + // Make PublishTransaction always return ErrDoubleSpend to begin with. + publErr = lnwallet.ErrDoubleSpend + brar.cfg.PublishTransaction = func(tx *wire.MsgTx) error { + publTx <- tx + return publErr + } + + // Notify the breach arbiter about the breach. + retribution, err := lnwallet.NewBreachRetribution( + alice.State(), height, forceCloseTx, 1) + if err != nil { + t.Fatalf("unable to create breach retribution: %v", err) + } + + breach := &ContractBreachEvent{ + ChanPoint: *chanPoint, + ProcessACK: make(chan error, 1), + BreachRetribution: retribution, + } + contractBreaches <- breach + + // We'll also wait to consume the ACK back from the breach arbiter. + select { + case err := <-breach.ProcessACK: + if err != nil { + t.Fatalf("handoff failed: %v", err) + } + case <-time.After(time.Second * 15): + t.Fatalf("breach arbiter didn't send ack back") + } + + // After exiting, the breach arbiter should have persisted the + // retribution information and the channel should be shown as pending + // force closed. + assertArbiterBreach(t, brar, chanPoint) + + // Notify that the breaching transaction is confirmed, to trigger the + // retribution logic. + notifier := brar.cfg.Notifier.(*mockSpendNotifier) + notifier.confChannel <- &chainntnfs.TxConfirmation{} + + // The breach arbiter should attempt to sweep all outputs on the + // breached commitment. We'll pretend that the HTLC output has been + // spent by the channel counter party's second level tx already. + var tx *wire.MsgTx + select { + case tx = <-publTx: + case <-time.After(5 * time.Second): + t.Fatalf("tx was not published") + } + + if tx.TxIn[0].PreviousOutPoint.Hash != forceCloseTx.TxHash() { + t.Fatalf("tx not attempting to spend commitment") + } + + // Find the index of the TxIn spending the HTLC output. + htlcOutpoint := &retribution.HtlcRetributions[0].OutPoint + htlcIn := -1 + for i, txIn := range tx.TxIn { + if txIn.PreviousOutPoint == *htlcOutpoint { + htlcIn = i + } + } + if htlcIn == -1 { + t.Fatalf("htlc in not found") + } + + // Since publishing the transaction failed above, the breach arbiter + // will attempt another second level check. Now notify that the htlc + // output is spent by a second level tx. + secondLvlTx := &wire.MsgTx{ + TxOut: []*wire.TxOut{ + &wire.TxOut{Value: 1}, + }, + } + notifier.Spend(htlcOutpoint, 2, secondLvlTx) + + // Now a transaction attempting to spend from the second level tx + // should be published instead. Let this publish succeed by setting the + // publishing error to nil. + publErr = nil + select { + case tx = <-publTx: + case <-time.After(5 * time.Second): + t.Fatalf("tx was not published") + } + + // The TxIn previously attempting to spend the HTLC outpoint should now + // be spending from the second level tx. + if tx.TxIn[htlcIn].PreviousOutPoint.Hash != secondLvlTx.TxHash() { + t.Fatalf("tx not attempting to spend second level tx, %v", tx.TxIn[0]) + } +} + // assertArbiterBreach checks that the breach arbiter has persisted the breach // information for a particular channel. func assertArbiterBreach(t *testing.T, brar *breachArbiter, From 3bdc968f39de3fbe5714cb138e16dec1f0264b3e Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Fri, 1 Jun 2018 16:20:55 +0200 Subject: [PATCH 4/4] breacharbiter: wait on spend events instead of timeout This commit handles a racy condition within the breacharbiter's justice tx procedure. For backends that have no mempool we would check if an HTLC output was spent and then try broadcasting the justice tx, but this would fail since we wouldn't detect the spend before it was in a block. The result was that we would continuously attempt to broadcast the transaction, effectively ending up in an endless (until the second-level tx actually comfirmed) loop. Instead we now register for spend notifications in case broadcasting the transaction fails, and then wait for any of the notifications to be sent before trying again. This is a necessary step to be able to make lnd work well only with confimed transactions, and was a better solution than introducing timeouts within the broadcast loop (which complicates integration tests). --- breacharbiter.go | 254 ++++++++++++++++++++++++++++++++--------------- 1 file changed, 172 insertions(+), 82 deletions(-) diff --git a/breacharbiter.go b/breacharbiter.go index 25a190d37..5af664159 100644 --- a/breacharbiter.go +++ b/breacharbiter.go @@ -34,6 +34,10 @@ var ( // breached contracts. Entries are added to the justice txn bucket just // before broadcasting the sweep txn. justiceTxnBucket = []byte("justice-txn") + + // errBrarShuttingDown is an error returned if the breacharbiter has + // been signalled to exit. + errBrarShuttingDown = errors.New("breacharbiter shutting down") ) // ContractBreachEvent is an event the breachArbiter will receive in case a @@ -302,6 +306,147 @@ func convertToSecondLevelRevoke(bo *breachedOutput, breachInfo *retributionInfo, bo.outpoint) } +// waitForSpendEvent waits for any of the breached outputs to get spent, and +// mutates the breachInfo to be able to sweep it. This method should be used +// when we fail to publish the justice tx because of a double spend, indicating +// that the counter party has taken one of the breached outputs to the second +// level. The spendNtfns map is a cache used to store registered spend +// subscriptions, in case we must call this method multiple times. +func (b *breachArbiter) waitForSpendEvent(breachInfo *retributionInfo, + spendNtfns map[wire.OutPoint]*chainntnfs.SpendEvent) error { + + // spend is used to wrap the index of the output that gets spent + // together with the spend details. + type spend struct { + index int + detail *chainntnfs.SpendDetail + } + + // We create a channel the first goroutine that gets a spend event can + // signal. We make it buffered in case multiple spend events come in at + // the same time. + anySpend := make(chan struct{}, len(breachInfo.breachedOutputs)) + + // The allSpends channel will be used to pass spend events from all the + // goroutines that detects a spend before they are signalled to exit. + allSpends := make(chan spend, len(breachInfo.breachedOutputs)) + + // exit will be used to signal the goroutines that they can exit. + exit := make(chan struct{}) + var wg sync.WaitGroup + + // We'll now launch a goroutine for each of the HTLC outputs, that will + // signal the moment they detect a spend event. + for i := 0; i < len(breachInfo.breachedOutputs); i++ { + breachedOutput := &breachInfo.breachedOutputs[i] + + // If this isn't an HTLC output, then we can skip it. + if breachedOutput.witnessType != lnwallet.HtlcAcceptedRevoke && + breachedOutput.witnessType != lnwallet.HtlcOfferedRevoke { + continue + } + + brarLog.Debugf("Checking for second-level attempt on HTLC(%v) "+ + "for ChannelPoint(%v)", breachedOutput.outpoint, + breachInfo.chanPoint) + + // If we have already registered for a notification for this + // output, we'll reuse it. + spendNtfn, ok := spendNtfns[breachedOutput.outpoint] + if !ok { + var err error + spendNtfn, err = b.cfg.Notifier.RegisterSpendNtfn( + &breachedOutput.outpoint, + breachInfo.breachHeight, true, + ) + if err != nil { + brarLog.Errorf("unable to check for spentness "+ + "of out_point=%v: %v", + breachedOutput.outpoint, err) + + // Registration may have failed if we've been + // instructed to shutdown. If so, return here + // to avoid entering an infinite loop. + select { + case <-b.quit: + return errBrarShuttingDown + default: + continue + } + } + spendNtfns[breachedOutput.outpoint] = spendNtfn + } + + // Launch a goroutine waiting for a spend event. + b.wg.Add(1) + wg.Add(1) + go func(index int, spendEv *chainntnfs.SpendEvent) { + defer b.wg.Done() + defer wg.Done() + + select { + // The output has been taken to the second level! + case sp, ok := <-spendEv.Spend: + if !ok { + return + } + brarLog.Debugf("Detected spend of HTLC(%v) "+ + "for ChannelPoint(%v)", + breachedOutput.outpoint, + breachInfo.chanPoint) + + // First we send the spend event on the + // allSpends channel, such that it can be + // handled after all go routines have exited. + allSpends <- spend{index, sp} + + // Finally we'll signal the anySpend channel + // that a spend was detected, such that the + // other goroutines can be shut down. + anySpend <- struct{}{} + case <-exit: + return + case <-b.quit: + return + } + }(i, spendNtfn) + } + + // We'll wait for any of the outputs to be spent, or that we are + // signalled to exit. + select { + // A goroutine have signalled that a spend occured. + case <-anySpend: + // Signal for the remaining goroutines to exit. + close(exit) + wg.Wait() + + // At this point all goroutines that can send on the allSpends + // channel have exited. We can therefore safely close the + // channel before ranging over its content. + close(allSpends) + for s := range allSpends { + breachedOutput := &breachInfo.breachedOutputs[s.index] + brarLog.Debugf("Detected second-level spend on "+ + "HTLC(%v) for ChannelPoint(%v)", + breachedOutput.outpoint, breachInfo.chanPoint) + + delete(spendNtfns, breachedOutput.outpoint) + + // In this case we'll morph our initial revoke spend to + // instead point to the second level output, and update + // the sign descriptor in the process. + convertToSecondLevelRevoke( + breachedOutput, breachInfo, s.detail, + ) + } + case <-b.quit: + return errBrarShuttingDown + } + + return nil +} + // exactRetribution is a goroutine which is executed once a contract breach has // been detected by a breachObserver. This function is responsible for // punishing a counterparty for violating the channel contract by sweeping ALL @@ -334,6 +479,11 @@ func (b *breachArbiter) exactRetribution(confChan *chainntnfs.ConfirmationEvent, brarLog.Debugf("Breach transaction %v has been confirmed, sweeping "+ "revoked funds", breachInfo.commitHash) + // We may have to wait for some of the HTLC outputs to be spent to the + // second level before broadcasting the justice tx. We'll store the + // SpendEvents between each attempt to not re-register uneccessarily. + spendNtfns := make(map[wire.OutPoint]*chainntnfs.SpendEvent) + finalTx, err := b.cfg.Store.GetFinalizedTxn(&breachInfo.chanPoint) if err != nil { brarLog.Errorf("unable to get finalized txn for"+ @@ -345,77 +495,8 @@ func (b *breachArbiter) exactRetribution(confChan *chainntnfs.ConfirmationEvent, // construct a sweep transaction and write it to disk. This will allow // the breach arbiter to re-register for notifications for the justice // txid. - spendNtfns := make(map[wire.OutPoint]*chainntnfs.SpendEvent) - -secondLevelCheck: +justiceTxBroadcast: if finalTx == nil { - // Before we create the justice tx, we need to check to see if - // any of the active HTLC's on the commitment transactions has - // been spent. In this case, we'll need to go to the second - // level to sweep them before the remote party can. - for i := 0; i < len(breachInfo.breachedOutputs); i++ { - breachedOutput := &breachInfo.breachedOutputs[i] - - // If this isn't an HTLC output, then we can skip it. - if breachedOutput.witnessType != lnwallet.HtlcAcceptedRevoke && - breachedOutput.witnessType != lnwallet.HtlcOfferedRevoke { - continue - } - - brarLog.Debugf("Checking for second-level attempt on "+ - "HTLC(%v) for ChannelPoint(%v)", - breachedOutput.outpoint, breachInfo.chanPoint) - - // Now that we have an HTLC output, we'll quickly check - // to see if it has been spent or not. If we have - // already registered for a notification for this - // output, we'll reuse it. - spendNtfn, ok := spendNtfns[breachedOutput.outpoint] - if !ok { - spendNtfn, err = b.cfg.Notifier.RegisterSpendNtfn( - &breachedOutput.outpoint, - breachInfo.breachHeight, true, - ) - if err != nil { - brarLog.Errorf("unable to check for "+ - "spentness of out_point=%v: %v", - breachedOutput.outpoint, err) - - // Registration may have failed if - // we've been instructed to shutdown. - // If so, return here to avoid entering - // an infinite loop. - select { - case <-b.quit: - return - default: - continue - } - } - spendNtfns[breachedOutput.outpoint] = spendNtfn - } - - select { - // The output has been taken to the second level! - case spendDetails, ok := <-spendNtfn.Spend: - if !ok { - return - } - delete(spendNtfns, breachedOutput.outpoint) - - // In this case we'll morph our initial revoke - // spend to instead point to the second level - // output, and update the sign descriptor in - // the process. - convertToSecondLevelRevoke( - breachedOutput, breachInfo, spendDetails, - ) - - // It hasn't been spent so we'll continue. - default: - } - } - // With the breach transaction confirmed, we now create the // justice tx which will claim ALL the funds within the // channel. @@ -443,21 +524,30 @@ secondLevelCheck: // channel's retribution against the cheating counter party. err = b.cfg.PublishTransaction(finalTx) if err != nil { - brarLog.Errorf("unable to broadcast "+ - "justice tx: %v", err) + brarLog.Errorf("unable to broadcast justice tx: %v", err) + if err == lnwallet.ErrDoubleSpend { - brarLog.Infof("Attempting to transfer HTLC revocations " + - "to the second level") + // Broadcasting the transaction failed because of a + // conflict either in the mempool or in chain. We'll + // now create spend subscriptions for all HTLC outputs + // on the commitment transaction that could possibly + // have been spent, and wait for any of them to + // trigger. + brarLog.Infof("Waiting for a spend event before " + + "attempting to craft new justice tx.") finalTx = nil - // Txn publication may fail if we're shutting down. - // If so, return to avoid entering an infinite loop. - select { - case <-b.quit: + err := b.waitForSpendEvent(breachInfo, spendNtfns) + if err != nil { + if err != errBrarShuttingDown { + brarLog.Errorf("error waiting for "+ + "spend event: %v", err) + } return - default: - goto secondLevelCheck } + + brarLog.Infof("Attempting another justice tx broadcast") + goto justiceTxBroadcast } } @@ -469,8 +559,8 @@ secondLevelCheck: confChan, err = b.cfg.Notifier.RegisterConfirmationsNtfn( &justiceTXID, 1, breachConfHeight) if err != nil { - brarLog.Errorf("unable to register for conf for txid: %v", - justiceTXID) + brarLog.Errorf("unable to register for conf for txid(%v): %v", + justiceTXID, err) return }