diff --git a/htlcswitch/link.go b/htlcswitch/link.go index b8a50eeab..c7a86c1e0 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -223,6 +223,11 @@ type ChannelLinkConfig struct { // syncing. FwdPkgGCTicker ticker.Ticker + // PendingCommitTicker is a ticker that allows the link to determine if + // a locally initiated commitment dance gets stuck waiting for the + // remote party to revoke. + PendingCommitTicker ticker.Ticker + // BatchSize is the max size of a batch of updates done to the link // before we do a state update. BatchSize uint32 @@ -509,6 +514,13 @@ func (l *channelLink) Stop() { close(l.quit) l.wg.Wait() + // Now that the htlcManager has completely exited, reset the packet + // courier. This allows the mailbox to revaluate any lingering Adds that + // were delivered but didn't make it on a commitment to be failed back + // if the link is offline for an extended period of time. The error is + // ignored since it can only fail when the daemon is exiting. + _ = l.mailBox.ResetPackets() + // As a final precaution, we will attempt to flush any uncommitted // preimages to the preimage cache. The preimages should be re-delivered // after channel reestablishment, however this adds an extra layer of @@ -1003,13 +1015,12 @@ func (l *channelLink) htlcManager() { go l.fwdPkgGarbager() } -out: for { // We must always check if we failed at some point processing // the last update before processing the next. if l.failed { l.log.Errorf("link failed, exiting htlcManager") - break out + return } // If the previous event resulted in a non-empty batch, resume @@ -1079,7 +1090,7 @@ out: l.cfg.Peer.WipeChannel(chanPoint) }() - break out + return case <-l.cfg.BatchTicker.Ticks(): // Attempt to extend the remote commitment chain @@ -1089,9 +1100,14 @@ out: if err := l.updateCommitTx(); err != nil { l.fail(LinkFailureError{code: ErrInternalError}, "unable to update commitment: %v", err) - break out + return } + case <-l.cfg.PendingCommitTicker.Ticks(): + l.fail(LinkFailureError{code: ErrRemoteUnresponsive}, + "unable to complete dance") + return + // A message from the switch was just received. This indicates // that the link is an intermediate hop in a multi-hop HTLC // circuit. @@ -1114,11 +1130,11 @@ out: fmt.Sprintf("process hodl queue: %v", err.Error()), ) - break out + return } case <-l.quit: - break out + return } } } @@ -1179,7 +1195,7 @@ func (l *channelLink) processHtlcResolution(resolution invoices.HtlcResolution, // Settle htlcs that returned a settle resolution using the preimage // in the resolution. case *invoices.HtlcSettleResolution: - l.log.Debugf("received settle resolution for %v"+ + l.log.Debugf("received settle resolution for %v "+ "with outcome: %v", circuitKey, res.Outcome) return l.settleHTLC(res.Preimage, htlc.pd) @@ -1272,72 +1288,6 @@ func (l *channelLink) handleDownstreamPkt(pkt *htlcPacket) { l.log.Warnf("Unable to handle downstream add HTLC: %v", err) - var ( - localFailure = false - reason lnwire.OpaqueReason - ) - - // Create a temporary channel failure which we will send - // back to our peer if this is a forward, or report to - // the user if the failed payment was locally initiated. - failure := l.createFailureWithUpdate( - func(upd *lnwire.ChannelUpdate) lnwire.FailureMessage { - return lnwire.NewTemporaryChannelFailure( - upd, - ) - }, - ) - - // If the payment was locally initiated (which is - // indicated by a nil obfuscator), we do not need to - // encrypt it back to the sender. - if pkt.obfuscator == nil { - var b bytes.Buffer - err := lnwire.EncodeFailure(&b, failure, 0) - if err != nil { - l.log.Errorf("unable to encode "+ - "failure: %v", err) - l.mailBox.AckPacket(pkt.inKey()) - return - } - reason = lnwire.OpaqueReason(b.Bytes()) - localFailure = true - } else { - // If the packet is part of a forward, - // (identified by a non-nil obfuscator) we need - // to encrypt the error back to the source. - var err error - reason, err = pkt.obfuscator.EncryptFirstHop(failure) - if err != nil { - l.log.Errorf("unable to "+ - "obfuscate error: %v", err) - l.mailBox.AckPacket(pkt.inKey()) - return - } - } - - // Create a link error containing the temporary channel - // failure and a detail which indicates the we failed to - // add the htlc. - linkError := NewDetailedLinkError( - failure, OutgoingFailureDownstreamHtlcAdd, - ) - - failPkt := &htlcPacket{ - incomingChanID: pkt.incomingChanID, - incomingHTLCID: pkt.incomingHTLCID, - circuit: pkt.circuit, - sourceRef: pkt.sourceRef, - hasSource: true, - localFailure: localFailure, - linkFailure: linkError, - htlc: &lnwire.UpdateFailHTLC{ - Reason: reason, - }, - } - - go l.forwardBatch(failPkt) - // Remove this packet from the link's mailbox, this // prevents it from being reprocessed if the link // restarts and resets it mailbox. If this response @@ -1346,7 +1296,7 @@ func (l *channelLink) handleDownstreamPkt(pkt *htlcPacket) { // the switch, since the circuit was never fully opened, // and the forwarding package shows it as // unacknowledged. - l.mailBox.AckPacket(pkt.inKey()) + l.mailBox.FailAdd(pkt) return } @@ -1994,6 +1944,8 @@ func (l *channelLink) updateCommitTx() error { theirCommitSig, htlcSigs, pendingHTLCs, err := l.channel.SignNextCommitment() if err == lnwallet.ErrNoWindow { + l.cfg.PendingCommitTicker.Resume() + l.log.Tracef("revocation window exhausted, unable to send: "+ "%v, pend_updates=%v, dangling_closes%v", l.channel.PendingLocalUpdateCount(), @@ -2013,6 +1965,8 @@ func (l *channelLink) updateCommitTx() error { return err } + l.cfg.PendingCommitTicker.Pause() + // The remote party now has a new pending commitment, so we'll update // the contract court to be aware of this new set (the prior old remote // pending). @@ -2948,27 +2902,7 @@ func (l *channelLink) forwardBatch(packets ...*htlcPacket) { } errChan := l.cfg.ForwardPackets(l.quit, filteredPkts...) - go l.handleBatchFwdErrs(errChan) -} - -// handleBatchFwdErrs waits on the given errChan until it is closed, logging -// the errors returned from any unsuccessful forwarding attempts. -func (l *channelLink) handleBatchFwdErrs(errChan chan error) { - for { - err, ok := <-errChan - if !ok { - // Err chan has been drained or switch is shutting - // down. Either way, return. - return - } - - if err == nil { - continue - } - - l.log.Errorf("unhandled error while forwarding htlc packet over "+ - "htlcswitch: %v", err) - } + go handleBatchFwdErrs(errChan, l.log) } // sendHTLCError functions cancels HTLC and send cancel message back to the diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index ffd53a475..935ccde3d 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -1699,10 +1699,11 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) ( UpdateContractSignals: func(*contractcourt.ContractSignals) error { return nil }, - Registry: invoiceRegistry, - ChainEvents: &contractcourt.ChainEventSubscription{}, - BatchTicker: bticker, - FwdPkgGCTicker: ticker.NewForce(15 * time.Second), + Registry: invoiceRegistry, + ChainEvents: &contractcourt.ChainEventSubscription{}, + BatchTicker: bticker, + FwdPkgGCTicker: ticker.NewForce(15 * time.Second), + PendingCommitTicker: ticker.New(time.Minute), // Make the BatchSize and Min/MaxFeeUpdateTimeout large enough // to not trigger commit updates automatically during tests. BatchSize: 10000, @@ -4203,10 +4204,11 @@ func (h *persistentLinkHarness) restartLink( UpdateContractSignals: func(*contractcourt.ContractSignals) error { return nil }, - Registry: h.coreLink.cfg.Registry, - ChainEvents: &contractcourt.ChainEventSubscription{}, - BatchTicker: bticker, - FwdPkgGCTicker: ticker.New(5 * time.Second), + Registry: h.coreLink.cfg.Registry, + ChainEvents: &contractcourt.ChainEventSubscription{}, + BatchTicker: bticker, + FwdPkgGCTicker: ticker.New(5 * time.Second), + PendingCommitTicker: ticker.New(time.Minute), // Make the BatchSize and Min/MaxFeeUpdateTimeout large enough // to not trigger commit updates automatically during tests. BatchSize: 10000, @@ -6134,6 +6136,91 @@ func TestChannelLinkReceiveEmptySig(t *testing.T) { aliceLink.Stop() } +// TestPendingCommitTicker tests that a link will fail itself after a timeout if +// the commitment dance stalls out. +func TestPendingCommitTicker(t *testing.T) { + t.Parallel() + + const chanAmt = btcutil.SatoshiPerBitcoin * 5 + const chanReserve = btcutil.SatoshiPerBitcoin * 1 + aliceLink, bobChannel, batchTicker, start, cleanUp, _, err := + newSingleLinkTestHarness(chanAmt, chanReserve) + if err != nil { + t.Fatalf("unable to create link: %v", err) + } + + var ( + coreLink = aliceLink.(*channelLink) + aliceMsgs = coreLink.cfg.Peer.(*mockPeer).sentMsgs + ) + + coreLink.cfg.PendingCommitTicker = ticker.NewForce(time.Millisecond) + + linkErrs := make(chan LinkFailureError) + coreLink.cfg.OnChannelFailure = func(_ lnwire.ChannelID, + _ lnwire.ShortChannelID, linkErr LinkFailureError) { + + linkErrs <- linkErr + } + + if err := start(); err != nil { + t.Fatalf("unable to start test harness: %v", err) + } + defer cleanUp() + + ctx := linkTestContext{ + t: t, + aliceLink: aliceLink, + bobChannel: bobChannel, + aliceMsgs: aliceMsgs, + } + + // Send an HTLC from Alice to Bob, and signal the batch ticker to signa + // a commitment. + htlc, _ := generateHtlcAndInvoice(t, 0) + ctx.sendHtlcAliceToBob(0, htlc) + ctx.receiveHtlcAliceToBob() + batchTicker <- time.Now() + + select { + case msg := <-aliceMsgs: + if _, ok := msg.(*lnwire.CommitSig); !ok { + t.Fatalf("expected CommitSig, got: %T", msg) + } + case <-time.After(time.Second): + t.Fatalf("alice did not send commit sig") + } + + // Check that Alice hasn't failed. + select { + case linkErr := <-linkErrs: + t.Fatalf("link failed unexpectedly: %v", linkErr) + case <-time.After(50 * time.Millisecond): + } + + // Without completing the dance, send another HTLC from Alice to Bob. + // Since the revocation window has been exhausted, we should see the + // link fail itself immediately due to the low pending commit timeout. + // In production this would be much longer, e.g. a minute. + htlc, _ = generateHtlcAndInvoice(t, 1) + ctx.sendHtlcAliceToBob(1, htlc) + ctx.receiveHtlcAliceToBob() + batchTicker <- time.Now() + + // Assert that we get the expected link failure from Alice. + select { + case linkErr := <-linkErrs: + if linkErr.code != ErrRemoteUnresponsive { + t.Fatalf("error code mismatch, "+ + "want: ErrRemoteUnresponsive, got: %v", + linkErr.code) + } + + case <-time.After(time.Second): + t.Fatalf("did not receive failure") + } +} + // assertFailureCode asserts that an error is of type ClearTextError and that // the failure code is as expected. func assertFailureCode(t *testing.T, err error, code lnwire.FailCode) { diff --git a/htlcswitch/linkfailure.go b/htlcswitch/linkfailure.go index c806c4b26..840a4d8da 100644 --- a/htlcswitch/linkfailure.go +++ b/htlcswitch/linkfailure.go @@ -20,6 +20,10 @@ const ( // to fail the link. ErrRemoteError + // ErrRemoteUnresponsive indicates that our peer took too long to + // complete a commitment dance. + ErrRemoteUnresponsive + // ErrSyncError indicates that we failed synchronizing the state of the // channel with our peer. ErrSyncError @@ -71,6 +75,8 @@ func (e LinkFailureError) Error() string { return "internal error" case ErrRemoteError: return "remote error" + case ErrRemoteUnresponsive: + return "remote unresponsive" case ErrSyncError: return "sync error" case ErrInvalidUpdate: @@ -90,13 +96,23 @@ func (e LinkFailureError) Error() string { // the link fails with this LinkFailureError. func (e LinkFailureError) ShouldSendToPeer() bool { switch e.code { - // If the failure is a result of the peer sending us an error, we don't - // have to respond with one. - case ErrRemoteError: - return false - // In all other cases we will attempt to send our peer an error message. - default: + // Since sending an error can lead some nodes to force close the + // channel, create a whitelist of the failures we want to send so that + // newly added error codes aren't automatically sent to the remote peer. + case + ErrInternalError, + ErrRemoteError, + ErrSyncError, + ErrInvalidUpdate, + ErrInvalidCommitment, + ErrInvalidRevocation, + ErrRecoveryError: + return true + + // In all other cases we will not attempt to send our peer an error. + default: + return false } } diff --git a/htlcswitch/mailbox.go b/htlcswitch/mailbox.go index 04c5cd6d8..95d88763e 100644 --- a/htlcswitch/mailbox.go +++ b/htlcswitch/mailbox.go @@ -1,17 +1,26 @@ package htlcswitch import ( + "bytes" "container/list" "errors" + "fmt" "sync" "time" + "github.com/lightningnetwork/lnd/clock" "github.com/lightningnetwork/lnd/lnwire" ) -// ErrMailBoxShuttingDown is returned when the mailbox is interrupted by a -// shutdown request. -var ErrMailBoxShuttingDown = errors.New("mailbox is shutting down") +var ( + // ErrMailBoxShuttingDown is returned when the mailbox is interrupted by + // a shutdown request. + ErrMailBoxShuttingDown = errors.New("mailbox is shutting down") + + // ErrPacketAlreadyExists signals that an attempt to add a packet failed + // because it already exists in the mailbox. + ErrPacketAlreadyExists = errors.New("mailbox already has packet") +) // MailBox is an interface which represents a concurrent-safe, in-order // delivery queue for messages from the network and also from the main switch. @@ -31,8 +40,17 @@ type MailBox interface { // AckPacket removes a packet from the mailboxes in-memory replay // buffer. This will prevent a packet from being delivered after a link - // restarts if the switch has remained online. - AckPacket(CircuitKey) + // restarts if the switch has remained online. The returned boolean + // indicates whether or not a packet with the passed incoming circuit + // key was removed. + AckPacket(CircuitKey) bool + + // FailAdd fails an UpdateAddHTLC that exists within the mailbox, + // removing it from the in-memory replay buffer. This will prevent the + // packet from being delivered after the link restarts if the switch has + // remained online. The generated LinkError will show an + // OutgoingFailureDownstreamHtlcAdd FailureDetail. + FailAdd(pkt *htlcPacket) // MessageOutBox returns a channel that any new messages ready for // delivery will be sent on. @@ -56,12 +74,37 @@ type MailBox interface { Stop() } +type mailBoxConfig struct { + // shortChanID is the short channel id of the channel this mailbox + // belongs to. + shortChanID lnwire.ShortChannelID + + // fetchUpdate retreives the most recent channel update for the channel + // this mailbox belongs to. + fetchUpdate func(lnwire.ShortChannelID) (*lnwire.ChannelUpdate, error) + + // forwardPackets send a varidic number of htlcPackets to the switch to + // be routed. A quit channel should be provided so that the call can + // properly exit during shutdown. + forwardPackets func(chan struct{}, ...*htlcPacket) chan error + + // clock is a time source for the mailbox. + clock clock.Clock + + // expiry is the interval after which Adds will be cancelled if they + // have not been yet been delivered. The computed deadline will expiry + // this long after the Adds are added via AddPacket. + expiry time.Duration +} + // memoryMailBox is an implementation of the MailBox struct backed by purely // in-memory queues. type memoryMailBox struct { started sync.Once stopped sync.Once + cfg *mailBoxConfig + wireMessages *list.List wireMtx sync.Mutex wireCond *sync.Cond @@ -69,29 +112,42 @@ type memoryMailBox struct { messageOutbox chan lnwire.Message msgReset chan chan struct{} - htlcPkts *list.List - pktIndex map[CircuitKey]*list.Element - pktHead *list.Element - pktMtx sync.Mutex - pktCond *sync.Cond + // repPkts is a queue for reply packets, e.g. Settles and Fails. + repPkts *list.List + repIndex map[CircuitKey]*list.Element + repHead *list.Element + + // addPkts is a dedicated queue for Adds. + addPkts *list.List + addIndex map[CircuitKey]*list.Element + addHead *list.Element + + pktMtx sync.Mutex + pktCond *sync.Cond pktOutbox chan *htlcPacket pktReset chan chan struct{} - wg sync.WaitGroup - quit chan struct{} + wireShutdown chan struct{} + pktShutdown chan struct{} + quit chan struct{} } // newMemoryMailBox creates a new instance of the memoryMailBox. -func newMemoryMailBox() *memoryMailBox { +func newMemoryMailBox(cfg *mailBoxConfig) *memoryMailBox { box := &memoryMailBox{ + cfg: cfg, wireMessages: list.New(), - htlcPkts: list.New(), + repPkts: list.New(), + addPkts: list.New(), messageOutbox: make(chan lnwire.Message), pktOutbox: make(chan *htlcPacket), msgReset: make(chan chan struct{}, 1), pktReset: make(chan chan struct{}, 1), - pktIndex: make(map[CircuitKey]*list.Element), + repIndex: make(map[CircuitKey]*list.Element), + addIndex: make(map[CircuitKey]*list.Element), + wireShutdown: make(chan struct{}), + pktShutdown: make(chan struct{}), quit: make(chan struct{}), } box.wireCond = sync.NewCond(&box.wireMtx) @@ -122,7 +178,6 @@ const ( // NOTE: This method is part of the MailBox interface. func (m *memoryMailBox) Start() { m.started.Do(func() { - m.wg.Add(2) go m.mailCourier(wireCourier) go m.mailCourier(pktCourier) }) @@ -157,6 +212,7 @@ func (m *memoryMailBox) signalUntilReset(cType courierType, done chan struct{}) error { for { + switch cType { case wireCourier: m.wireCond.Signal() @@ -176,27 +232,59 @@ func (m *memoryMailBox) signalUntilReset(cType courierType, } // AckPacket removes the packet identified by it's incoming circuit key from the -// queue of packets to be delivered. +// queue of packets to be delivered. The returned boolean indicates whether or +// not a packet with the passed incoming circuit key was removed. // // NOTE: It is safe to call this method multiple times for the same circuit key. -func (m *memoryMailBox) AckPacket(inKey CircuitKey) { +func (m *memoryMailBox) AckPacket(inKey CircuitKey) bool { m.pktCond.L.Lock() - entry, ok := m.pktIndex[inKey] - if !ok { - m.pktCond.L.Unlock() - return + defer m.pktCond.L.Unlock() + + if entry, ok := m.repIndex[inKey]; ok { + // Check whether we are removing the head of the queue. If so, + // we must advance the head to the next packet before removing. + // It's possible that the courier has already advanced the + // repHead, so this check prevents the repHead from getting + // desynchronized. + if entry == m.repHead { + m.repHead = entry.Next() + } + m.repPkts.Remove(entry) + delete(m.repIndex, inKey) + + return true } - m.htlcPkts.Remove(entry) - delete(m.pktIndex, inKey) - m.pktCond.L.Unlock() + if entry, ok := m.addIndex[inKey]; ok { + // Check whether we are removing the head of the queue. If so, + // we must advance the head to the next add before removing. + // It's possible that the courier has already advanced the + // addHead, so this check prevents the addHead from getting + // desynchronized. + // + // NOTE: While this event is rare for Settles or Fails, it could + // be very common for Adds since the mailbox has the ability to + // cancel Adds before they are delivered. When that occurs, the + // head of addPkts has only been peeked and we expect to be + // removing the head of the queue. + if entry == m.addHead { + m.addHead = entry.Next() + } + + m.addPkts.Remove(entry) + delete(m.addIndex, inKey) + + return true + } + + return false } // HasPacket queries the packets for a circuit key, this is used to drop packets // bound for the switch that already have a queued response. func (m *memoryMailBox) HasPacket(inKey CircuitKey) bool { m.pktCond.L.Lock() - _, ok := m.pktIndex[inKey] + _, ok := m.repIndex[inKey] m.pktCond.L.Unlock() return ok @@ -209,17 +297,61 @@ func (m *memoryMailBox) Stop() { m.stopped.Do(func() { close(m.quit) - m.wireCond.Signal() - m.pktCond.Signal() + m.signalUntilShutdown(wireCourier) + m.signalUntilShutdown(pktCourier) }) } +// signalUntilShutdown strobes the condition variable of the passed courier +// type, blocking until the worker has exited. +func (m *memoryMailBox) signalUntilShutdown(cType courierType) { + var ( + cond *sync.Cond + shutdown chan struct{} + ) + + switch cType { + case wireCourier: + cond = m.wireCond + shutdown = m.wireShutdown + case pktCourier: + cond = m.pktCond + shutdown = m.pktShutdown + } + + for { + select { + case <-time.After(time.Millisecond): + cond.Signal() + case <-shutdown: + return + } + } +} + +// pktWithExpiry wraps an incoming packet and records the time at which it it +// should be canceled from the mailbox. This will be used to detect if it gets +// stuck in the mailbox and inform when to cancel back. +type pktWithExpiry struct { + pkt *htlcPacket + expiry time.Time +} + +func (p *pktWithExpiry) deadline(clock clock.Clock) <-chan time.Time { + return clock.TickAfter(p.expiry.Sub(clock.Now())) +} + // mailCourier is a dedicated goroutine whose job is to reliably deliver // messages of a particular type. There are two types of couriers: wire // couriers, and mail couriers. Depending on the passed courierType, this // goroutine will assume one of two roles. func (m *memoryMailBox) mailCourier(cType courierType) { - defer m.wg.Done() + switch cType { + case wireCourier: + defer close(m.wireShutdown) + case pktCourier: + defer close(m.pktShutdown) + } // TODO(roasbeef): refactor... @@ -246,7 +378,7 @@ func (m *memoryMailBox) mailCourier(cType courierType) { case pktCourier: m.pktCond.L.Lock() - for m.pktHead == nil { + for m.repHead == nil && m.addHead == nil { m.pktCond.Wait() select { @@ -255,9 +387,11 @@ func (m *memoryMailBox) mailCourier(cType courierType) { // any un-ACK'd messages are re-delivered upon // reconnect. case pktDone := <-m.pktReset: - m.pktHead = m.htlcPkts.Front() + m.repHead = m.repPkts.Front() + m.addHead = m.addPkts.Front() close(pktDone) + case <-m.quit: m.pktCond.L.Unlock() return @@ -267,8 +401,11 @@ func (m *memoryMailBox) mailCourier(cType courierType) { } var ( - nextPkt *htlcPacket - nextMsg lnwire.Message + nextRep *htlcPacket + nextRepEl *list.Element + nextAdd *pktWithExpiry + nextAddEl *list.Element + nextMsg lnwire.Message ) switch cType { // Grab the datum off the front of the queue, shifting the @@ -283,8 +420,20 @@ func (m *memoryMailBox) mailCourier(cType courierType) { // doesn't make it into a commitment, then it'll be // re-delivered once the link comes back online. case pktCourier: - nextPkt = m.pktHead.Value.(*htlcPacket) - m.pktHead = m.pktHead.Next() + // Peek at the head of the Settle/Fails and Add queues. + // We peak both even if there is a Settle/Fail present + // because we need to set a deadline for the next + // pending Add if it's present. Due to clock + // monotonicity, we know that the head of the Adds is + // the next to expire. + if m.repHead != nil { + nextRep = m.repHead.Value.(*htlcPacket) + nextRepEl = m.repHead + } + if m.addHead != nil { + nextAdd = m.addHead.Value.(*pktWithExpiry) + nextAddEl = m.addHead + } } // Now that we're done with the condition, we can unlock it to @@ -314,14 +463,77 @@ func (m *memoryMailBox) mailCourier(cType courierType) { } case pktCourier: + var ( + pktOutbox chan *htlcPacket + addOutbox chan *htlcPacket + add *htlcPacket + deadline <-chan time.Time + ) + + // Prioritize delivery of Settle/Fail packets over Adds. + // This ensures that we actively clear the commitment of + // existing HTLCs before trying to add new ones. This + // can help to improve forwarding performance since the + // time to sign a commitment is linear in the number of + // HTLCs manifested on the commitments. + // + // NOTE: Both types are eventually delivered over the + // same channel, but we can control which is delivered + // by exclusively making one nil and the other non-nil. + // We know from our loop condition that at least one + // nextRep and nextAdd are non-nil. + if nextRep != nil { + pktOutbox = m.pktOutbox + } else { + addOutbox = m.pktOutbox + } + + // If we have a pending Add, we'll also construct the + // deadline so we can fail it back if we are unable to + // deliver any message in time. We also dereference the + // nextAdd's packet, since we will need access to it in + // the case we are delivering it and/or if the deadline + // expires. + // + // NOTE: It's possible after this point for add to be + // nil, but this can only occur when addOutbox is also + // nil, hence we won't accidentally deliver a nil + // packet. + if nextAdd != nil { + add = nextAdd.pkt + deadline = nextAdd.deadline(m.cfg.clock) + } + select { - case m.pktOutbox <- nextPkt: + case pktOutbox <- nextRep: + m.pktCond.L.Lock() + // Only advance the repHead if this Settle or + // Fail is still at the head of the queue. + if m.repHead != nil && m.repHead == nextRepEl { + m.repHead = m.repHead.Next() + } + m.pktCond.L.Unlock() + + case addOutbox <- add: + m.pktCond.L.Lock() + // Only advance the addHead if this Add is still + // at the head of the queue. + if m.addHead != nil && m.addHead == nextAddEl { + m.addHead = m.addHead.Next() + } + m.pktCond.L.Unlock() + + case <-deadline: + m.FailAdd(add) + case pktDone := <-m.pktReset: m.pktCond.L.Lock() - m.pktHead = m.htlcPkts.Front() + m.repHead = m.repPkts.Front() + m.addHead = m.addPkts.Front() m.pktCond.L.Unlock() close(pktDone) + case <-m.quit: return } @@ -353,18 +565,41 @@ func (m *memoryMailBox) AddMessage(msg lnwire.Message) error { // NOTE: This method is safe for concrete use and part of the MailBox // interface. func (m *memoryMailBox) AddPacket(pkt *htlcPacket) error { - // First, we'll lock the condition, and add the packet to the end of - // the htlc packet inbox. m.pktCond.L.Lock() - if _, ok := m.pktIndex[pkt.inKey()]; ok { - m.pktCond.L.Unlock() - return nil - } + switch htlc := pkt.htlc.(type) { - entry := m.htlcPkts.PushBack(pkt) - m.pktIndex[pkt.inKey()] = entry - if m.pktHead == nil { - m.pktHead = entry + // Split off Settle/Fail packets into the repPkts queue. + case *lnwire.UpdateFulfillHTLC, *lnwire.UpdateFailHTLC: + if _, ok := m.repIndex[pkt.inKey()]; ok { + m.pktCond.L.Unlock() + return ErrPacketAlreadyExists + } + + entry := m.repPkts.PushBack(pkt) + m.repIndex[pkt.inKey()] = entry + if m.repHead == nil { + m.repHead = entry + } + + // Split off Add packets into the addPkts queue. + case *lnwire.UpdateAddHTLC: + if _, ok := m.addIndex[pkt.inKey()]; ok { + m.pktCond.L.Unlock() + return ErrPacketAlreadyExists + } + + entry := m.addPkts.PushBack(&pktWithExpiry{ + pkt: pkt, + expiry: m.cfg.clock.Now().Add(m.cfg.expiry), + }) + m.addIndex[pkt.inKey()] = entry + if m.addHead == nil { + m.addHead = entry + } + + default: + m.pktCond.L.Unlock() + return fmt.Errorf("unknown htlc type: %T", htlc) } m.pktCond.L.Unlock() @@ -375,6 +610,80 @@ func (m *memoryMailBox) AddPacket(pkt *htlcPacket) error { return nil } +// FailAdd fails an UpdateAddHTLC that exists within the mailbox, removing it +// from the in-memory replay buffer. This will prevent the packet from being +// delivered after the link restarts if the switch has remained online. The +// generated LinkError will show an OutgoingFailureDownstreamHtlcAdd +// FailureDetail. +func (m *memoryMailBox) FailAdd(pkt *htlcPacket) { + // First, remove the packet from mailbox. If we didn't find the packet + // because it has already been acked, we'll exit early to avoid sending + // a duplicate fail message through the switch. + if !m.AckPacket(pkt.inKey()) { + return + } + + var ( + localFailure = false + reason lnwire.OpaqueReason + ) + + // Create a temporary channel failure which we will send back to our + // peer if this is a forward, or report to the user if the failed + // payment was locally initiated. + var failure lnwire.FailureMessage + update, err := m.cfg.fetchUpdate(m.cfg.shortChanID) + if err != nil { + failure = &lnwire.FailTemporaryNodeFailure{} + } else { + failure = lnwire.NewTemporaryChannelFailure(update) + } + + // If the payment was locally initiated (which is indicated by a nil + // obfuscator), we do not need to encrypt it back to the sender. + if pkt.obfuscator == nil { + var b bytes.Buffer + err := lnwire.EncodeFailure(&b, failure, 0) + if err != nil { + log.Errorf("Unable to encode failure: %v", err) + return + } + reason = lnwire.OpaqueReason(b.Bytes()) + localFailure = true + } else { + // If the packet is part of a forward, (identified by a non-nil + // obfuscator) we need to encrypt the error back to the source. + var err error + reason, err = pkt.obfuscator.EncryptFirstHop(failure) + if err != nil { + log.Errorf("Unable to obfuscate error: %v", err) + return + } + } + + // Create a link error containing the temporary channel failure and a + // detail which indicates the we failed to add the htlc. + linkError := NewDetailedLinkError( + failure, OutgoingFailureDownstreamHtlcAdd, + ) + + failPkt := &htlcPacket{ + incomingChanID: pkt.incomingChanID, + incomingHTLCID: pkt.incomingHTLCID, + circuit: pkt.circuit, + sourceRef: pkt.sourceRef, + hasSource: true, + localFailure: localFailure, + linkFailure: linkError, + htlc: &lnwire.UpdateFailHTLC{ + Reason: reason, + }, + } + + errChan := m.cfg.forwardPackets(m.quit, failPkt) + go handleBatchFwdErrs(errChan, log) +} + // MessageOutBox returns a channel that any new messages ready for delivery // will be sent on. // @@ -399,6 +708,8 @@ func (m *memoryMailBox) PacketOutBox() chan *htlcPacket { type mailOrchestrator struct { mu sync.RWMutex + cfg *mailOrchConfig + // mailboxes caches exactly one mailbox for all known channels. mailboxes map[lnwire.ChannelID]MailBox @@ -419,9 +730,29 @@ type mailOrchestrator struct { unclaimedPackets map[lnwire.ShortChannelID][]*htlcPacket } +type mailOrchConfig struct { + // forwardPackets send a varidic number of htlcPackets to the switch to + // be routed. A quit channel should be provided so that the call can + // properly exit during shutdown. + forwardPackets func(chan struct{}, ...*htlcPacket) chan error + + // fetchUpdate retreives the most recent channel update for the channel + // this mailbox belongs to. + fetchUpdate func(lnwire.ShortChannelID) (*lnwire.ChannelUpdate, error) + + // clock is a time source for the generated mailboxes. + clock clock.Clock + + // expiry is the interval after which Adds will be cancelled if they + // have not been yet been delivered. The computed deadline will expiry + // this long after the Adds are added to a mailbox via AddPacket. + expiry time.Duration +} + // newMailOrchestrator initializes a fresh mailOrchestrator. -func newMailOrchestrator() *mailOrchestrator { +func newMailOrchestrator(cfg *mailOrchConfig) *mailOrchestrator { return &mailOrchestrator{ + cfg: cfg, mailboxes: make(map[lnwire.ChannelID]MailBox), liveIndex: make(map[lnwire.ShortChannelID]lnwire.ChannelID), unclaimedPackets: make(map[lnwire.ShortChannelID][]*htlcPacket), @@ -437,7 +768,9 @@ func (mo *mailOrchestrator) Stop() { // GetOrCreateMailBox returns an existing mailbox belonging to `chanID`, or // creates and returns a new mailbox if none is found. -func (mo *mailOrchestrator) GetOrCreateMailBox(chanID lnwire.ChannelID) MailBox { +func (mo *mailOrchestrator) GetOrCreateMailBox(chanID lnwire.ChannelID, + shortChanID lnwire.ShortChannelID) MailBox { + // First, try lookup the mailbox directly using only the shared mutex. mo.mu.RLock() mailbox, ok := mo.mailboxes[chanID] @@ -450,7 +783,7 @@ func (mo *mailOrchestrator) GetOrCreateMailBox(chanID lnwire.ChannelID) MailBox // Otherwise, we will try again with exclusive lock, creating a mailbox // if one still has not been created. mo.mu.Lock() - mailbox = mo.exclusiveGetOrCreateMailBox(chanID) + mailbox = mo.exclusiveGetOrCreateMailBox(chanID, shortChanID) mo.mu.Unlock() return mailbox @@ -462,11 +795,17 @@ func (mo *mailOrchestrator) GetOrCreateMailBox(chanID lnwire.ChannelID) MailBox // // NOTE: This method MUST be invoked with the mailOrchestrator's exclusive lock. func (mo *mailOrchestrator) exclusiveGetOrCreateMailBox( - chanID lnwire.ChannelID) MailBox { + chanID lnwire.ChannelID, shortChanID lnwire.ShortChannelID) MailBox { mailbox, ok := mo.mailboxes[chanID] if !ok { - mailbox = newMemoryMailBox() + mailbox = newMemoryMailBox(&mailBoxConfig{ + shortChanID: shortChanID, + fetchUpdate: mo.cfg.fetchUpdate, + forwardPackets: mo.cfg.forwardPackets, + clock: mo.cfg.clock, + expiry: mo.cfg.expiry, + }) mailbox.Start() mo.mailboxes[chanID] = mailbox } @@ -546,7 +885,7 @@ func (mo *mailOrchestrator) Deliver( // index should only be set if the mailbox had been initialized // beforehand. However, this does ensure that this case is // handled properly in the event that it could happen. - mailbox = mo.exclusiveGetOrCreateMailBox(chanID) + mailbox = mo.exclusiveGetOrCreateMailBox(chanID, sid) mo.mu.Unlock() // Deliver the packet to the mailbox if it was found or created. diff --git a/htlcswitch/mailbox_test.go b/htlcswitch/mailbox_test.go index e8356c97d..655caaffe 100644 --- a/htlcswitch/mailbox_test.go +++ b/htlcswitch/mailbox_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/davecgh/go-spew/spew" + "github.com/lightningnetwork/lnd/clock" "github.com/lightningnetwork/lnd/lnwire" ) @@ -19,7 +20,10 @@ func TestMailBoxCouriers(t *testing.T) { // First, we'll create new instance of the current default mailbox // type. - mailBox := newMemoryMailBox() + mailBox := newMemoryMailBox(&mailBoxConfig{ + clock: clock.NewDefaultClock(), + expiry: time.Minute, + }) mailBox.Start() defer mailBox.Stop() @@ -34,10 +38,16 @@ func TestMailBoxCouriers(t *testing.T) { outgoingChanID: lnwire.NewShortChanIDFromInt(uint64(prand.Int63())), incomingChanID: lnwire.NewShortChanIDFromInt(uint64(prand.Int63())), amount: lnwire.MilliSatoshi(prand.Int63()), + htlc: &lnwire.UpdateAddHTLC{ + ID: uint64(i), + }, } sentPackets[i] = pkt - mailBox.AddPacket(pkt) + err := mailBox.AddPacket(pkt) + if err != nil { + t.Fatalf("unable to add packet: %v", err) + } } // Next, we'll do the same, but this time adding wire messages. @@ -148,6 +158,387 @@ func TestMailBoxCouriers(t *testing.T) { } } +// TestMailBoxResetAfterShutdown tests that ResetMessages and ResetPackets +// return ErrMailBoxShuttingDown after the mailbox has been stopped. +func TestMailBoxResetAfterShutdown(t *testing.T) { + t.Parallel() + + m := newMemoryMailBox(&mailBoxConfig{}) + m.Start() + + // Stop the mailbox, then try to reset the message and packet couriers. + m.Stop() + + err := m.ResetMessages() + if err != ErrMailBoxShuttingDown { + t.Fatalf("expected ErrMailBoxShuttingDown, got: %v", err) + } + + err = m.ResetPackets() + if err != ErrMailBoxShuttingDown { + t.Fatalf("expected ErrMailBoxShuttingDown, got: %v", err) + } +} + +type mailboxContext struct { + t *testing.T + mailbox MailBox + clock *clock.TestClock + forwards chan *htlcPacket +} + +func newMailboxContext(t *testing.T, startTime time.Time, + expiry time.Duration) *mailboxContext { + + ctx := &mailboxContext{ + t: t, + clock: clock.NewTestClock(startTime), + forwards: make(chan *htlcPacket, 1), + } + ctx.mailbox = newMemoryMailBox(&mailBoxConfig{ + fetchUpdate: func(sid lnwire.ShortChannelID) ( + *lnwire.ChannelUpdate, error) { + return &lnwire.ChannelUpdate{ + ShortChannelID: sid, + }, nil + }, + forwardPackets: ctx.forward, + clock: ctx.clock, + expiry: expiry, + }) + ctx.mailbox.Start() + + return ctx +} + +func (c *mailboxContext) forward(_ chan struct{}, + pkts ...*htlcPacket) chan error { + + for _, pkt := range pkts { + c.forwards <- pkt + } + + errChan := make(chan error) + close(errChan) + + return errChan +} + +func (c *mailboxContext) sendAdds(start, num int) []*htlcPacket { + c.t.Helper() + + sentPackets := make([]*htlcPacket, num) + for i := 0; i < num; i++ { + pkt := &htlcPacket{ + outgoingChanID: lnwire.NewShortChanIDFromInt( + uint64(prand.Int63())), + incomingChanID: lnwire.NewShortChanIDFromInt( + uint64(prand.Int63())), + incomingHTLCID: uint64(start + i), + amount: lnwire.MilliSatoshi(prand.Int63()), + htlc: &lnwire.UpdateAddHTLC{ + ID: uint64(start + i), + }, + } + sentPackets[i] = pkt + + err := c.mailbox.AddPacket(pkt) + if err != nil { + c.t.Fatalf("unable to add packet: %v", err) + } + } + + return sentPackets +} + +func (c *mailboxContext) receivePkts(pkts []*htlcPacket) { + c.t.Helper() + + for i, expPkt := range pkts { + select { + case pkt := <-c.mailbox.PacketOutBox(): + if reflect.DeepEqual(expPkt, pkt) { + continue + } + + c.t.Fatalf("inkey mismatch #%d, want: %v vs "+ + "got: %v", i, expPkt.inKey(), pkt.inKey()) + + case <-time.After(50 * time.Millisecond): + c.t.Fatalf("did not receive fail for index %d", i) + } + } +} + +func (c *mailboxContext) checkFails(adds []*htlcPacket) { + c.t.Helper() + + for i, add := range adds { + select { + case fail := <-c.forwards: + if add.inKey() == fail.inKey() { + continue + } + c.t.Fatalf("inkey mismatch #%d, add: %v vs fail: %v", + i, add.inKey(), fail.inKey()) + + case <-time.After(50 * time.Millisecond): + c.t.Fatalf("did not receive fail for index %d", i) + } + } + + select { + case pkt := <-c.forwards: + c.t.Fatalf("unexpected forward: %v", pkt) + case <-time.After(50 * time.Millisecond): + } +} + +// TestMailBoxFailAdd asserts that FailAdd returns a response to the switch +// under various interleavings with other operations on the mailbox. +func TestMailBoxFailAdd(t *testing.T) { + var ( + batchDelay = time.Second + expiry = time.Minute + firstBatchStart = time.Now() + secondBatchStart = time.Now().Add(batchDelay) + thirdBatchStart = time.Now().Add(2 * batchDelay) + thirdBatchExpiry = thirdBatchStart.Add(expiry) + ) + ctx := newMailboxContext(t, firstBatchStart, expiry) + defer ctx.mailbox.Stop() + + failAdds := func(adds []*htlcPacket) { + for _, add := range adds { + ctx.mailbox.FailAdd(add) + } + } + + const numBatchPackets = 5 + + // Send 10 adds, and pull them from the mailbox. + firstBatch := ctx.sendAdds(0, numBatchPackets) + ctx.receivePkts(firstBatch) + + // Fail all of these adds, simulating an error adding the HTLCs to the + // commitment. We should see a failure message for each. + go failAdds(firstBatch) + ctx.checkFails(firstBatch) + + // As a sanity check, Fail all of them again and assert that no + // duplicate fails are sent. + go failAdds(firstBatch) + ctx.checkFails(nil) + + // Now, send a second batch of adds after a short delay and deliver them + // to the link. + ctx.clock.SetTime(secondBatchStart) + secondBatch := ctx.sendAdds(numBatchPackets, numBatchPackets) + ctx.receivePkts(secondBatch) + + // Reset the packet queue w/o changing the current time. This simulates + // the link flapping and coming back up before the second batch's + // expiries have elapsed. We should see no failures sent back. + err := ctx.mailbox.ResetPackets() + if err != nil { + t.Fatalf("unable to reset packets: %v", err) + } + ctx.checkFails(nil) + + // Redeliver the second batch to the link and hold them there. + ctx.receivePkts(secondBatch) + + // Send a third batch of adds shortly after the second batch. + ctx.clock.SetTime(thirdBatchStart) + thirdBatch := ctx.sendAdds(2*numBatchPackets, numBatchPackets) + + // Advance the clock so that the third batch expires. We expect to only + // see fails for the third batch, since the second batch is still being + // held by the link. + ctx.clock.SetTime(thirdBatchExpiry) + ctx.checkFails(thirdBatch) + + // Finally, reset the link which should cause the second batch to be + // cancelled immediately. + err = ctx.mailbox.ResetPackets() + if err != nil { + t.Fatalf("unable to reset packets: %v", err) + } + ctx.checkFails(secondBatch) +} + +// TestMailBoxPacketPrioritization asserts that the mailbox will prioritize +// delivering Settle and Fail packets over Adds if both are available for +// delivery at the same time. +func TestMailBoxPacketPrioritization(t *testing.T) { + t.Parallel() + + // First, we'll create new instance of the current default mailbox + // type. + mailBox := newMemoryMailBox(&mailBoxConfig{ + clock: clock.NewDefaultClock(), + expiry: time.Minute, + }) + mailBox.Start() + defer mailBox.Stop() + + const numPackets = 5 + + _, _, aliceChanID, bobChanID := genIDs() + + // Next we'll send the following sequence of packets: + // - Settle1 + // - Add1 + // - Add2 + // - Fail + // - Settle2 + sentPackets := make([]*htlcPacket, numPackets) + for i := 0; i < numPackets; i++ { + pkt := &htlcPacket{ + outgoingChanID: aliceChanID, + outgoingHTLCID: uint64(i), + incomingChanID: bobChanID, + incomingHTLCID: uint64(i), + amount: lnwire.MilliSatoshi(prand.Int63()), + } + + switch i { + case 0, 4: + // First and last packets are a Settle. A non-Add is + // sent first to make the test deterministic w/o needing + // to sleep. + pkt.htlc = &lnwire.UpdateFulfillHTLC{ID: uint64(i)} + case 1, 2: + // Next two packets are Adds. + pkt.htlc = &lnwire.UpdateAddHTLC{ID: uint64(i)} + case 3: + // Last packet is a Fail. + pkt.htlc = &lnwire.UpdateFailHTLC{ID: uint64(i)} + } + + sentPackets[i] = pkt + + err := mailBox.AddPacket(pkt) + if err != nil { + t.Fatalf("failed to add packet: %v", err) + } + } + + // When dequeueing the packets, we expect the following sequence: + // - Settle1 + // - Fail + // - Settle2 + // - Add1 + // - Add2 + // + // We expect to see Fail and Settle2 to be delivered before either Add1 + // or Add2 due to the prioritization between the split queue. + for i := 0; i < numPackets; i++ { + select { + case pkt := <-mailBox.PacketOutBox(): + var expPkt *htlcPacket + switch i { + case 0: + // First packet should be Settle1. + expPkt = sentPackets[0] + case 1: + // Second packet should be Fail. + expPkt = sentPackets[3] + case 2: + // Third packet should be Settle2. + expPkt = sentPackets[4] + case 3: + // Fourth packet should be Add1. + expPkt = sentPackets[1] + case 4: + // Last packet should be Add2. + expPkt = sentPackets[2] + } + + if !reflect.DeepEqual(expPkt, pkt) { + t.Fatalf("recvd packet mismatch %d, want: %v, got: %v", + i, spew.Sdump(expPkt), spew.Sdump(pkt)) + } + + case <-time.After(50 * time.Millisecond): + t.Fatalf("didn't receive packet %d before timeout", i) + } + } +} + +// TestMailBoxAddExpiry asserts that the mailbox will cancel back Adds that have +// reached their expiry time. +func TestMailBoxAddExpiry(t *testing.T) { + var ( + expiry = time.Minute + batchDelay = time.Second + firstBatchStart = time.Now() + firstBatchExpiry = firstBatchStart.Add(expiry) + secondBatchStart = firstBatchStart.Add(batchDelay) + secondBatchExpiry = secondBatchStart.Add(expiry) + ) + + ctx := newMailboxContext(t, firstBatchStart, expiry) + defer ctx.mailbox.Stop() + + // Each batch will consist of 10 messages. + const numBatchPackets = 10 + + firstBatch := ctx.sendAdds(0, numBatchPackets) + + ctx.clock.SetTime(secondBatchStart) + ctx.checkFails(nil) + + secondBatch := ctx.sendAdds(numBatchPackets, numBatchPackets) + + ctx.clock.SetTime(firstBatchExpiry) + ctx.checkFails(firstBatch) + + ctx.clock.SetTime(secondBatchExpiry) + ctx.checkFails(secondBatch) +} + +// TestMailBoxDuplicateAddPacket asserts that the mailbox returns an +// ErrPacketAlreadyExists failure when two htlcPackets are added with identical +// incoming circuit keys. +func TestMailBoxDuplicateAddPacket(t *testing.T) { + t.Parallel() + + mailBox := newMemoryMailBox(&mailBoxConfig{ + clock: clock.NewDefaultClock(), + }) + mailBox.Start() + defer mailBox.Stop() + + addTwice := func(t *testing.T, pkt *htlcPacket) { + // The first add should succeed. + err := mailBox.AddPacket(pkt) + if err != nil { + t.Fatalf("unable to add packet: %v", err) + } + + // Adding again with the same incoming circuit key should fail. + err = mailBox.AddPacket(pkt) + if err != ErrPacketAlreadyExists { + t.Fatalf("expected ErrPacketAlreadyExists, got: %v", err) + } + } + + // Assert duplicate AddPacket calls fail for all types of HTLCs. + addTwice(t, &htlcPacket{ + incomingHTLCID: 0, + htlc: &lnwire.UpdateAddHTLC{}, + }) + addTwice(t, &htlcPacket{ + incomingHTLCID: 1, + htlc: &lnwire.UpdateFulfillHTLC{}, + }) + addTwice(t, &htlcPacket{ + incomingHTLCID: 2, + htlc: &lnwire.UpdateFailHTLC{}, + }) +} + // TestMailOrchestrator asserts that the orchestrator properly buffers packets // for channels that haven't been made live, such that they are delivered // immediately after BindLiveShortChanID. It also tests that packets are delivered @@ -156,7 +547,10 @@ func TestMailOrchestrator(t *testing.T) { t.Parallel() // First, we'll create a new instance of our orchestrator. - mo := newMailOrchestrator() + mo := newMailOrchestrator(&mailOrchConfig{ + clock: clock.NewDefaultClock(), + expiry: time.Minute, + }) defer mo.Stop() // We'll be delivering 10 htlc packets via the orchestrator. @@ -174,6 +568,9 @@ func TestMailOrchestrator(t *testing.T) { incomingChanID: bobChanID, incomingHTLCID: uint64(i), amount: lnwire.MilliSatoshi(prand.Int63()), + htlc: &lnwire.UpdateAddHTLC{ + ID: uint64(i), + }, } sentPackets[i] = pkt @@ -181,7 +578,7 @@ func TestMailOrchestrator(t *testing.T) { } // Now, initialize a new mailbox for Alice's chanid. - mailbox := mo.GetOrCreateMailBox(chanID1) + mailbox := mo.GetOrCreateMailBox(chanID1, aliceChanID) // Verify that no messages are received, since Alice's mailbox has not // been made live. @@ -226,7 +623,7 @@ func TestMailOrchestrator(t *testing.T) { // For the second half of the test, create a new mailbox for Bob and // immediately make it live with an assigned short chan id. - mailbox = mo.GetOrCreateMailBox(chanID2) + mailbox = mo.GetOrCreateMailBox(chanID2, bobChanID) mo.BindLiveShortChanID(mailbox, chanID2, bobChanID) // Create the second half of our htlcs, and deliver them via the @@ -239,6 +636,9 @@ func TestMailOrchestrator(t *testing.T) { incomingChanID: bobChanID, incomingHTLCID: uint64(halfPackets + i), amount: lnwire.MilliSatoshi(prand.Int63()), + htlc: &lnwire.UpdateAddHTLC{ + ID: uint64(halfPackets + i), + }, } sentPackets[i] = pkt diff --git a/htlcswitch/mock.go b/htlcswitch/mock.go index c65e5fb01..e9a2a1efa 100644 --- a/htlcswitch/mock.go +++ b/htlcswitch/mock.go @@ -177,6 +177,8 @@ func initSwitchWithDB(startingHeight uint32, db *channeldb.DB) (*Switch, error) LogEventTicker: ticker.NewForce(DefaultLogInterval), AckEventTicker: ticker.NewForce(DefaultAckInterval), HtlcNotifier: &mockHTLCNotifier{}, + Clock: clock.NewDefaultClock(), + HTLCExpiry: time.Hour, } return New(cfg, startingHeight) diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index 52d05be8b..06c597bf6 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -9,11 +9,13 @@ import ( "time" "github.com/btcsuite/btcd/wire" + "github.com/btcsuite/btclog" "github.com/btcsuite/btcutil" "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb/kvdb" + "github.com/lightningnetwork/lnd/clock" "github.com/lightningnetwork/lnd/contractcourt" "github.com/lightningnetwork/lnd/htlcswitch/hop" "github.com/lightningnetwork/lnd/lntypes" @@ -35,6 +37,10 @@ const ( // DefaultAckInterval is the duration between attempts to ack any settle // fails in a forwarding package. DefaultAckInterval = 15 * time.Second + + // DefaultHTLCExpiry is the duration after which Adds will be cancelled + // if they could not get added to an outgoing commitment. + DefaultHTLCExpiry = time.Minute ) var ( @@ -173,6 +179,15 @@ type Config struct { // RejectHTLC is a flag that instructs the htlcswitch to reject any // HTLCs that are not from the source hop. RejectHTLC bool + + // Clock is a time source for the switch. + Clock clock.Clock + + // HTLCExpiry is the interval after which Adds will be cancelled if they + // have not been yet been delivered to a link. The computed deadline + // will expiry this long after the Adds are added to a mailbox via + // AddPacket. + HTLCExpiry time.Duration } // Switch is the central messaging bus for all incoming/outgoing HTLCs. @@ -282,12 +297,11 @@ func New(cfg Config, currentHeight uint32) (*Switch, error) { return nil, err } - return &Switch{ + s := &Switch{ bestHeight: currentHeight, cfg: &cfg, circuits: circuitMap, linkIndex: make(map[lnwire.ChannelID]ChannelLink), - mailOrchestrator: newMailOrchestrator(), forwardingIndex: make(map[lnwire.ShortChannelID]ChannelLink), interfaceIndex: make(map[[33]byte]map[lnwire.ChannelID]ChannelLink), pendingLinkIndex: make(map[lnwire.ChannelID]ChannelLink), @@ -296,7 +310,16 @@ func New(cfg Config, currentHeight uint32) (*Switch, error) { chanCloseRequests: make(chan *ChanClose), resolutionMsgs: make(chan *resolutionMsg), quit: make(chan struct{}), - }, nil + } + + s.mailOrchestrator = newMailOrchestrator(&mailOrchConfig{ + fetchUpdate: s.cfg.FetchLastChannelUpdate, + forwardPackets: s.ForwardPackets, + clock: s.cfg.Clock, + expiry: s.cfg.HTLCExpiry, + }) + + return s, nil } // resolutionMsg is a struct that wraps an existing ResolutionMsg with a done @@ -1972,13 +1995,13 @@ func (s *Switch) reforwardSettleFails(fwdPkgs []*channeldb.FwdPkg) { // link quit channel, meaning the send will fail only if the // switch receives a shutdown request. errChan := s.ForwardPackets(nil, switchPackets...) - go handleBatchFwdErrs(errChan) + go handleBatchFwdErrs(errChan, log) } } // handleBatchFwdErrs waits on the given errChan until it is closed, logging the // errors returned from any unsuccessful forwarding attempts. -func handleBatchFwdErrs(errChan chan error) { +func handleBatchFwdErrs(errChan chan error, l btclog.Logger) { for { err, ok := <-errChan if !ok { @@ -1991,7 +2014,7 @@ func handleBatchFwdErrs(errChan chan error) { continue } - log.Errorf("unhandled error while reforwarding htlc "+ + l.Errorf("Unhandled error while reforwarding htlc "+ "settle/fail over htlcswitch: %v", err) } } @@ -2036,7 +2059,8 @@ func (s *Switch) AddLink(link ChannelLink) error { // Get and attach the mailbox for this link, which buffers packets in // case there packets that we tried to deliver while this link was // offline. - mailbox := s.mailOrchestrator.GetOrCreateMailBox(chanID) + shortChanID := link.ShortChanID() + mailbox := s.mailOrchestrator.GetOrCreateMailBox(chanID, shortChanID) link.AttachMailBox(mailbox) if err := link.Start(); err != nil { @@ -2044,7 +2068,6 @@ func (s *Switch) AddLink(link ChannelLink) error { return err } - shortChanID := link.ShortChanID() if shortChanID == hop.Source { log.Infof("Adding pending link chan_id=%v, short_chan_id=%v", chanID, shortChanID) @@ -2216,7 +2239,7 @@ func (s *Switch) UpdateShortChanID(chanID lnwire.ChannelID) error { // Finally, alert the mail orchestrator to the change of short channel // ID, and deliver any unclaimed packets to the link. - mailbox := s.mailOrchestrator.GetOrCreateMailBox(chanID) + mailbox := s.mailOrchestrator.GetOrCreateMailBox(chanID, shortChanID) s.mailOrchestrator.BindLiveShortChanID( mailbox, chanID, shortChanID, ) diff --git a/htlcswitch/test_utils.go b/htlcswitch/test_utils.go index c997c4f90..c591e65bc 100644 --- a/htlcswitch/test_utils.go +++ b/htlcswitch/test_utils.go @@ -1167,6 +1167,7 @@ func (h *hopNetwork) createChannelLink(server, peer *mockServer, BatchSize: 10, BatchTicker: ticker.NewForce(testBatchTimeout), FwdPkgGCTicker: ticker.NewForce(fwdPkgTimeout), + PendingCommitTicker: ticker.NewForce(time.Minute), MinFeeUpdateTimeout: minFeeUpdateTimeout, MaxFeeUpdateTimeout: maxFeeUpdateTimeout, OnChannelFailure: func(lnwire.ChannelID, lnwire.ShortChannelID, LinkFailureError) {}, diff --git a/peer.go b/peer.go index c6e2ae025..3ae997778 100644 --- a/peer.go +++ b/peer.go @@ -669,6 +669,7 @@ func (p *peer) addLink(chanPoint *wire.OutPoint, SyncStates: syncStates, BatchTicker: ticker.New(50 * time.Millisecond), FwdPkgGCTicker: ticker.New(time.Minute), + PendingCommitTicker: ticker.New(time.Minute), BatchSize: 10, UnsafeReplay: cfg.UnsafeReplay, MinFeeUpdateTimeout: htlcswitch.DefaultMinLinkFeeUpdateTimeout, diff --git a/server.go b/server.go index f8c36b306..3fdffb2c3 100644 --- a/server.go +++ b/server.go @@ -496,6 +496,8 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, AckEventTicker: ticker.New(htlcswitch.DefaultAckInterval), AllowCircularRoute: cfg.AllowCircularRoute, RejectHTLC: cfg.RejectHTLC, + Clock: clock.NewDefaultClock(), + HTLCExpiry: htlcswitch.DefaultHTLCExpiry, }, uint32(currentHeight)) if err != nil { return nil, err