From f53a99e18efca3f859a41a850d50b7bfd7278a14 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Wed, 4 Apr 2018 17:36:38 -0700 Subject: [PATCH 1/5] htlcswitch: modify the SendMessage method on the Peer interface to optionally block In this commit, add a new argument to the SendMessage method to allow callers to request that the method block until the message has been sent on the socket to the remote peer. --- htlcswitch/interfaces.go | 7 +++++-- htlcswitch/link_test.go | 2 +- htlcswitch/mock.go | 2 +- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/htlcswitch/interfaces.go b/htlcswitch/interfaces.go index 51a5146e8..8086624a9 100644 --- a/htlcswitch/interfaces.go +++ b/htlcswitch/interfaces.go @@ -111,8 +111,11 @@ type ChannelLink interface { // Peer is an interface which represents the remote lightning node inside our // system. type Peer interface { - // SendMessage sends message to remote peer. - SendMessage(lnwire.Message) error + // SendMessage sends message to remote peer. The second arguments + // denote if the method should block until the message has been sent to + // the remote peer. If set, this allows the caller to more strongly + // synchronize. + SendMessage(msg lnwire.Message, sync bool) error // WipeChannel removes the channel uniquely identified by its channel // point from all indexes associated with the peer. diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index 84f209538..95f11d0f5 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -1392,7 +1392,7 @@ type mockPeer struct { quit chan struct{} } -func (m *mockPeer) SendMessage(msg lnwire.Message) error { +func (m *mockPeer) SendMessage(msg lnwire.Message, sync bool) error { select { case m.sentMsgs <- msg: case <-m.quit: diff --git a/htlcswitch/mock.go b/htlcswitch/mock.go index 658b636eb..42c1843c7 100644 --- a/htlcswitch/mock.go +++ b/htlcswitch/mock.go @@ -447,7 +447,7 @@ func (s *mockServer) intersect(f messageInterceptor) { s.interceptorFuncs = append(s.interceptorFuncs, f) } -func (s *mockServer) SendMessage(message lnwire.Message) error { +func (s *mockServer) SendMessage(message lnwire.Message, sync bool) error { select { case s.messages <- message: From 0dbd325fd03c314b521f3e2fa2de79a528471510 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Wed, 4 Apr 2018 17:38:23 -0700 Subject: [PATCH 2/5] htlcswitch: synchronously send the error to the peer on commitment verify fail In this commit, we fix a slight bug in lnd. Before this commit, we would send the error to the remote peer, but in an async manner. As a result, it was possible for the connections to be closed _before_ the error actually reached the remote party. The fix is simple: wait for the error to be returned when sending the message. This ensures that the error reaches the remote party before we kill the connection. --- htlcswitch/link.go | 34 +++++++++++++++++++--------------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index d58f22769..3b5138644 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -483,7 +483,7 @@ func (l *channelLink) syncChanStates() error { return fmt.Errorf("unable to generate chan sync message for "+ "ChannelPoint(%v)", l.channel.ChannelPoint()) } - if err := l.cfg.Peer.SendMessage(localChanSyncMsg); err != nil { + if err := l.cfg.Peer.SendMessage(localChanSyncMsg, false); err != nil { return fmt.Errorf("Unable to send chan sync message for "+ "ChannelPoint(%v)", l.channel.ChannelPoint()) } @@ -525,7 +525,7 @@ func (l *channelLink) syncChanStates() error { fundingLockedMsg := lnwire.NewFundingLocked( l.ChanID(), nextRevocation, ) - err = l.cfg.Peer.SendMessage(fundingLockedMsg) + err = l.cfg.Peer.SendMessage(fundingLockedMsg, false) if err != nil { return fmt.Errorf("unable to re-send "+ "FundingLocked: %v", err) @@ -575,7 +575,7 @@ func (l *channelLink) syncChanStates() error { // immediately so we return to a synchronized state as soon as // possible. for _, msg := range msgsToReSend { - l.cfg.Peer.SendMessage(msg) + l.cfg.Peer.SendMessage(msg, false) } case <-l.quit: @@ -1058,7 +1058,7 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) { l.openedCircuits = append(l.openedCircuits, pkt.inKey()) l.keystoneBatch = append(l.keystoneBatch, pkt.keystone()) - l.cfg.Peer.SendMessage(htlc) + l.cfg.Peer.SendMessage(htlc, false) case *lnwire.UpdateFulfillHTLC: // An HTLC we forward to the switch has just settled somewhere @@ -1090,7 +1090,7 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) { // Then we send the HTLC settle message to the connected peer // so we can continue the propagation of the settle message. - l.cfg.Peer.SendMessage(htlc) + l.cfg.Peer.SendMessage(htlc, false) isSettle = true case *lnwire.UpdateFailHTLC: @@ -1122,7 +1122,7 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) { // Finally, we send the HTLC message to the peer which // initially created the HTLC. - l.cfg.Peer.SendMessage(htlc) + l.cfg.Peer.SendMessage(htlc, false) isSettle = true } @@ -1241,10 +1241,14 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { // // TODO(roasbeef): force close chan if _, ok := err.(*lnwallet.InvalidCommitSigError); ok { - l.cfg.Peer.SendMessage(&lnwire.Error{ + err := l.cfg.Peer.SendMessage(&lnwire.Error{ ChanID: l.ChanID(), Data: []byte(err.Error()), - }) + }, true) + if err != nil { + l.errorf("unable to send msg to "+ + "remote peer: %v", err) + } } l.fail("ChannelPoint(%v): unable to accept new "+ @@ -1260,7 +1264,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { log.Errorf("unable to revoke commitment: %v", err) return } - l.cfg.Peer.SendMessage(nextRevocation) + l.cfg.Peer.SendMessage(nextRevocation, false) // Since we just revoked our commitment, we may have a new set // of HTLC's on our commitment, so we'll send them over our @@ -1288,7 +1292,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { // If both commitment chains are fully synced from our PoV, // then we don't need to reply with a signature as both sides - // already have a commitment with the latest accepted l. + // already have a commitment with the latest accepted. if l.channel.FullySynced() { return } @@ -1457,7 +1461,7 @@ func (l *channelLink) updateCommitTx() error { CommitSig: theirCommitSig, HtlcSigs: htlcSigs, } - l.cfg.Peer.SendMessage(commitSig) + l.cfg.Peer.SendMessage(commitSig, false) // We've just initiated a state transition, attempt to stop the // logCommitTimer. If the timer already ticked, then we'll consume the @@ -1665,7 +1669,7 @@ func (l *channelLink) updateChannelFee(feePerKw lnwallet.SatPerKWeight) error { // We'll then attempt to send a new UpdateFee message, and also lock it // in immediately by triggering a commitment update. msg := lnwire.NewUpdateFee(l.ChanID(), uint32(feePerKw)) - if err := l.cfg.Peer.SendMessage(msg); err != nil { + if err := l.cfg.Peer.SendMessage(msg, false); err != nil { return err } return l.updateCommitTx() @@ -2043,7 +2047,7 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg, ChanID: l.ChanID(), ID: pd.HtlcIndex, PaymentPreimage: preimage, - }) + }, false) needUpdate = true // There are additional channels left within this route. So @@ -2364,7 +2368,7 @@ func (l *channelLink) sendHTLCError(htlcIndex uint64, failure lnwire.FailureMess ChanID: l.ChanID(), ID: htlcIndex, Reason: reason, - }) + }, false) } // sendMalformedHTLCError helper function which sends the malformed HTLC update @@ -2384,7 +2388,7 @@ func (l *channelLink) sendMalformedHTLCError(htlcIndex uint64, ID: htlcIndex, ShaOnionBlob: shaOnionBlob, FailureCode: code, - }) + }, false) } // fail helper function which is used to encapsulate the action necessary for From 1c5d62a804b539639e8f0a7869afe44b585d7a77 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Wed, 4 Apr 2018 17:41:05 -0700 Subject: [PATCH 3/5] lnwallet: add new concrete error InvalidHtlcSigError for failed htlc sig validation In this commit we add a new error: InvalidHtlcSigError. This error will be returned when we're unable to validate an HTLC signature sent by the remote party. This will allow other nodes to more easily debug _why_ the signature was rejected. --- lnwallet/channel.go | 114 +++++++++++++++++++++++++++++++++----------- lnwallet/sigpool.go | 32 ++++++++++--- 2 files changed, 113 insertions(+), 33 deletions(-) diff --git a/lnwallet/channel.go b/lnwallet/channel.go index 22717eda8..c7804c0f1 100644 --- a/lnwallet/channel.go +++ b/lnwallet/channel.go @@ -1881,8 +1881,9 @@ func NewBreachRetribution(chanState *channeldb.OpenChannel, stateNum uint64, if err != nil { return nil, err } - commitmentSecret, commitmentPoint := btcec.PrivKeyFromBytes(btcec.S256(), - revocationPreimage[:]) + commitmentSecret, commitmentPoint := btcec.PrivKeyFromBytes( + btcec.S256(), revocationPreimage[:], + ) // With the commitment point generated, we can now generate the four // keys we'll need to reconstruct the commitment state, @@ -1893,8 +1894,9 @@ func NewBreachRetribution(chanState *channeldb.OpenChannel, stateNum uint64, // number so we can have the proper witness script to sign and include // within the final witness. remoteDelay := uint32(chanState.RemoteChanCfg.CsvDelay) - remotePkScript, err := commitScriptToSelf(remoteDelay, keyRing.DelayKey, - keyRing.RevocationKey) + remotePkScript, err := commitScriptToSelf( + remoteDelay, keyRing.DelayKey, keyRing.RevocationKey, + ) if err != nil { return nil, err } @@ -1999,10 +2001,10 @@ func NewBreachRetribution(chanState *channeldb.OpenChannel, stateNum uint64, return nil, err } - // Otherwise, is this was an outgoing HTLC that we sent, then - // from the PoV of the remote commitment state, they're the - // receiver of this HTLC. } else { + // Otherwise, is this was an outgoing HTLC that we + // sent, then from the PoV of the remote commitment + // state, they're the receiver of this HTLC. htlcScript, err = receiverHTLCScript( htlc.RefundTimeout, keyRing.LocalHtlcKey, keyRing.RemoteHtlcKey, keyRing.RevocationKey, @@ -2582,9 +2584,11 @@ func genRemoteHtlcSigJobs(keyRing *CommitmentKeyRing, Hash: txHash, Index: uint32(htlc.remoteOutputIndex), } - sigJob.tx, err = createHtlcTimeoutTx(op, outputAmt, - htlc.Timeout, uint32(remoteChanCfg.CsvDelay), - keyRing.RevocationKey, keyRing.DelayKey) + sigJob.tx, err = createHtlcTimeoutTx( + op, outputAmt, htlc.Timeout, + uint32(remoteChanCfg.CsvDelay), + keyRing.RevocationKey, keyRing.DelayKey, + ) if err != nil { return nil, nil, err } @@ -2632,9 +2636,10 @@ func genRemoteHtlcSigJobs(keyRing *CommitmentKeyRing, Hash: txHash, Index: uint32(htlc.remoteOutputIndex), } - sigJob.tx, err = createHtlcSuccessTx(op, outputAmt, - uint32(remoteChanCfg.CsvDelay), keyRing.RevocationKey, - keyRing.DelayKey) + sigJob.tx, err = createHtlcSuccessTx( + op, outputAmt, uint32(remoteChanCfg.CsvDelay), + keyRing.RevocationKey, keyRing.DelayKey, + ) if err != nil { return nil, nil, err } @@ -3483,20 +3488,23 @@ func genHtlcSigValidationJobs(localCommitmentView *commitment, i := 0 for index := range localCommitmentView.txn.TxOut { var ( - sigHash func() ([]byte, error) - sig *btcec.Signature - err error + htlcIndex uint64 + sigHash func() ([]byte, error) + sig *btcec.Signature + err error ) outputIndex := int32(index) switch { - // If this output index is found within the incoming HTLC index, - // then this means that we need to generate an HTLC success - // transaction in order to validate the signature. + // If this output index is found within the incoming HTLC + // index, then this means that we need to generate an HTLC + // success transaction in order to validate the signature. case localCommitmentView.incomingHTLCIndex[outputIndex] != nil: htlc := localCommitmentView.incomingHTLCIndex[outputIndex] + htlcIndex = htlc.HtlcIndex + sigHash = func() ([]byte, error) { op := wire.OutPoint{ Hash: txHash, @@ -3547,6 +3555,8 @@ func genHtlcSigValidationJobs(localCommitmentView *commitment, case localCommitmentView.outgoingHTLCIndex[outputIndex] != nil: htlc := localCommitmentView.outgoingHTLCIndex[outputIndex] + htlcIndex = htlc.HtlcIndex + sigHash = func() ([]byte, error) { op := wire.OutPoint{ Hash: txHash, @@ -3598,9 +3608,10 @@ func genHtlcSigValidationJobs(localCommitmentView *commitment, } verifyJobs = append(verifyJobs, verifyJob{ - pubKey: keyRing.RemoteHtlcKey, - sig: sig, - sigHash: sigHash, + htlcIndex: htlcIndex, + pubKey: keyRing.RemoteHtlcKey, + sig: sig, + sigHash: sigHash, }) i++ @@ -3617,7 +3628,7 @@ func genHtlcSigValidationJobs(localCommitmentView *commitment, } // InvalidCommitSigError is a struct that implements the error interface to -// report a failure to validation a commitment signature for a remote peer. +// report a failure to validate a commitment signature for a remote peer. // We'll use the items in this struct to generate a rich error message for the // remote peer when we receive an invalid signature from it. Doing so can // greatly aide in debugging cross implementation issues. @@ -3635,7 +3646,7 @@ type InvalidCommitSigError struct { // caused an invalid commitment signature. func (i *InvalidCommitSigError) Error() string { return fmt.Sprintf("rejected commitment: commit_height=%v, "+ - "invalid_sig=%x, commit_tx=%x, sig_hash=%x", i.commitHeight, + "invalid_commit_sig=%x, commit_tx=%x, sig_hash=%x", i.commitHeight, i.commitSig[:], i.commitTx, i.sigHash[:]) } @@ -3643,6 +3654,35 @@ func (i *InvalidCommitSigError) Error() string { // error interface. var _ error = (*InvalidCommitSigError)(nil) +// InvalidCommitSigError is a struc that implements the error interface to +// report a failure to validate an htlc signature from a remote peer. We'll use +// the items in this struct to generate a rich error message for the remote +// peer when we receive an invalid signature from it. Doing so can greatly aide +// in debugging across implementation issues. +type InvalidHtlcSigError struct { + commitHeight uint64 + + htlcSig []byte + + htlcIndex uint64 + + sigHash []byte + + commitTx []byte +} + +// Error returns a detailed error string including the exact transaction that +// caused an invalid htlc signature. +func (i *InvalidHtlcSigError) Error() string { + return fmt.Sprintf("rejected commitment: commit_height=%v, "+ + "invalid_htlc_sig=%x, commit_tx=%x, sig_hash=%x", i.commitHeight, + i.htlcSig, i.commitTx, i.sigHash[:]) +} + +// A compile time flag to ensure that InvalidCommitSigError implements the +// error interface. +var _ error = (*InvalidCommitSigError)(nil) + // ReceiveNewCommitment process a signature for a new commitment state sent by // the remote party. This method should be called in response to the // remote party initiating a new change, or when the remote party sends a @@ -3772,10 +3812,30 @@ func (lc *LightningChannel) ReceiveNewCommitment(commitSig lnwire.Sig, // In the case that a single signature is invalid, we'll exit // early and cancel all the outstanding verification jobs. select { - case err := <-verifyResps: - if err != nil { + case htlcErr := <-verifyResps: + if htlcErr != nil { close(cancelChan) - return fmt.Errorf("invalid htlc signature: %v", err) + + sig, err := lnwire.NewSigFromSignature( + htlcErr.sig, + ) + if err != nil { + return err + } + sigHash, err := htlcErr.sigHash() + if err != nil { + return err + } + + var txBytes bytes.Buffer + localCommitTx.Serialize(&txBytes) + return &InvalidHtlcSigError{ + commitHeight: nextHeight, + htlcSig: sig.ToSignatureBytes(), + htlcIndex: htlcErr.htlcIndex, + sigHash: sigHash, + commitTx: txBytes.Bytes(), + } } case <-lc.quit: return fmt.Errorf("channel shutting down") diff --git a/lnwallet/sigpool.go b/lnwallet/sigpool.go index a93c9630f..defbf8ba8 100644 --- a/lnwallet/sigpool.go +++ b/lnwallet/sigpool.go @@ -41,6 +41,10 @@ type verifyJob struct { // passed signature is known to have signed. sigHash func() ([]byte, error) + // htlcIndex is the index of the HTLC from the PoV of the remote + // party's update log. + htlcIndex uint64 + // cancel is a channel that should be closed if the caller wishes to // cancel all pending verification jobs part of a single batch. This // channel is to be closed in the case that a single signature in a @@ -52,7 +56,16 @@ type verifyJob struct { // is to be sent over. In the see that the signature is valid, a nil // error will be passed. Otherwise, a concrete error detailing the // issue will be passed. - errResp chan error + errResp chan *htlcIndexErr +} + +// verifyJobErr is a special type of error that also includes a pointer to the +// original validation job. Ths error message allows us to craft more detailed +// errors at upper layers. +type htlcIndexErr struct { + error + + *verifyJob } // signJob is a job sent to the sigPool to generate a valid signature according @@ -69,7 +82,8 @@ type signJob struct { // proper sighash for the input to be signed. tx *wire.MsgTx - // outputIndex... + // outputIndex is the output index of the HTLC on the commitment + // transaction being signed. outputIndex int32 // cancel is a channel that should be closed if the caller wishes to @@ -216,7 +230,10 @@ func (s *sigPool) poolWorker() { sigHash, err := verifyMsg.sigHash() if err != nil { select { - case verifyMsg.errResp <- err: + case verifyMsg.errResp <- &htlcIndexErr{ + error: err, + verifyJob: &verifyMsg, + }: continue case <-verifyMsg.cancel: continue @@ -229,7 +246,10 @@ func (s *sigPool) poolWorker() { err := fmt.Errorf("invalid signature "+ "sighash: %x, sig: %x", sigHash, rawSig.Serialize()) select { - case verifyMsg.errResp <- err: + case verifyMsg.errResp <- &htlcIndexErr{ + error: err, + verifyJob: &verifyMsg, + }: case <-verifyMsg.cancel: case <-s.quit: return @@ -272,9 +292,9 @@ func (s *sigPool) SubmitSignBatch(signJobs []signJob) { // allows the caller to cancel all pending jobs in the case that they wish to // bail early. func (s *sigPool) SubmitVerifyBatch(verifyJobs []verifyJob, - cancelChan chan struct{}) <-chan error { + cancelChan chan struct{}) <-chan *htlcIndexErr { - errChan := make(chan error, len(verifyJobs)) + errChan := make(chan *htlcIndexErr, len(verifyJobs)) for _, job := range verifyJobs { job.cancel = cancelChan From b3bc374ba14b329e686d8cb318661a8508c9962f Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Wed, 4 Apr 2018 17:41:40 -0700 Subject: [PATCH 4/5] htlcswitch: send a direct Error if we get a known channel error on validate commit --- htlcswitch/link.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 3b5138644..2a7feae23 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -1240,7 +1240,14 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { // direct error. // // TODO(roasbeef): force close chan - if _, ok := err.(*lnwallet.InvalidCommitSigError); ok { + var sendErr bool + switch err.(type) { + case *lnwallet.InvalidCommitSigError: + sendErr = true + case *lnwallet.InvalidHtlcSigError: + sendErr = true + } + if sendErr { err := l.cfg.Peer.SendMessage(&lnwire.Error{ ChanID: l.ChanID(), Data: []byte(err.Error()), From ca9174e166f863d7b75d0c2b50e8ef18ac8f1a1b Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Wed, 4 Apr 2018 17:43:51 -0700 Subject: [PATCH 5/5] peer: extend SendMessage to allow callers to block until msg is sent --- htlcswitch/interfaces.go | 7 +++---- peer.go | 21 +++++++++++++++++---- 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/htlcswitch/interfaces.go b/htlcswitch/interfaces.go index 8086624a9..ef00ff38b 100644 --- a/htlcswitch/interfaces.go +++ b/htlcswitch/interfaces.go @@ -111,10 +111,9 @@ type ChannelLink interface { // Peer is an interface which represents the remote lightning node inside our // system. type Peer interface { - // SendMessage sends message to remote peer. The second arguments - // denote if the method should block until the message has been sent to - // the remote peer. If set, this allows the caller to more strongly - // synchronize. + // SendMessage sends message to remote peer. The second argument + // denotes if the method should block until the message has been sent + // to the remote peer. SendMessage(msg lnwire.Message, sync bool) error // WipeChannel removes the channel uniquely identified by its channel diff --git a/peer.go b/peer.go index c45db9df2..b76c099d0 100644 --- a/peer.go +++ b/peer.go @@ -1874,10 +1874,23 @@ func (p *peer) sendInitMsg() error { return p.writeMessage(msg) } -// SendMessage queues a message for sending to the target peer. -func (p *peer) SendMessage(msg lnwire.Message) error { - p.queueMsg(msg, nil) - return nil +// SendMessage sends message to remote peer. The second argument denotes if the +// method should block until the message has been sent to the remote peer. +func (p *peer) SendMessage(msg lnwire.Message, sync bool) error { + if !sync { + p.queueMsg(msg, nil) + return nil + } + + errChan := make(chan error, 1) + p.queueMsg(msg, errChan) + + select { + case err := <-errChan: + return err + case <-p.quit: + return fmt.Errorf("peer shutting down") + } } // PubKey returns the pubkey of the peer in compressed serialized format.