From 2ab03c57be15437a645a11a9f90e76d4cc25bd2f Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Wed, 31 May 2017 16:43:37 -0700 Subject: [PATCH] htlcswitch: re-introduce dynamic commitment log tick timer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit fixes a slight regression in the logic of the switch by ensuring that the log commitment timer is only start _after_ we receive a new commitment signature. Otherwise, the ticker will keep ticking and possibly settle HTLC’s that’ve yet to be locked in, or waste a signature causing us to be deprived of a revocation which is required for us to initiate a new state transition. Additionally, the commit performs a few minor post-merge clean ups. --- htlcswitch/link.go | 271 +++++++++++++++++++++++++++------------------ 1 file changed, 164 insertions(+), 107 deletions(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index f97e90c30..af99c6e80 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -27,21 +27,22 @@ type ChannelLinkConfig struct { // packets to other peer which should handle it. Switch *Switch - // DecodeOnion function responsible for decoding htlc Sphinx onion blob, - // and creating hop iterator which will give us next destination of htlc. + // DecodeOnion function responsible for decoding htlc Sphinx onion + // blob, and creating hop iterator which will give us next destination + // of htlc. DecodeOnion func(r io.Reader, meta []byte) (HopIterator, error) - // Peer is a lightning network node with which we have the channel - // link opened. + // Peer is a lightning network node with which we have the channel link + // opened. Peer Peer - // Registry is a sub-system which responsible for managing the - // invoices in thread-safe manner. + // Registry is a sub-system which responsible for managing the invoices + // in thread-safe manner. Registry InvoiceDatabase - // SettledContracts is used to notify that a channel has peacefully been - // closed. Once a channel has been closed the other subsystem no longer - // needs to watch for breach closes. + // SettledContracts is used to notify that a channel has peacefully + // been closed. Once a channel has been closed the other subsystem no + // longer needs to watch for breach closes. SettledContracts chan *wire.OutPoint // DebugHTLC should be turned on if you want all HTLCs sent to a node @@ -60,20 +61,23 @@ type channelLink struct { // The index of the HTLC within the log is mapped to the cancellation // reason. This value is used to thread the proper error through to the // htlcSwitch, or subsystem that initiated the HTLC. + // // TODO(andrew.shvv) remove after payment descriptor start store // htlc cancel reasons. cancelReasons map[uint64]lnwire.OpaqueReason - // blobs tracks the remote log index of the incoming htlc's, - // mapped to the htlc onion blob which encapsulates the next hop. + // blobs tracks the remote log index of the incoming htlc's, mapped to + // the htlc onion blob which encapsulates the next hop. + // // TODO(andrew.shvv) remove after payment descriptor start store // htlc onion blobs. blobs map[uint64][lnwire.OnionPacketSize]byte - // batchCounter is the number of updates which we received from - // remote side, but not include in commitment transaciton yet and plus - // the current number of settles that have been sent, but not yet - // committed to the commitment. + // batchCounter is the number of updates which we received from remote + // side, but not include in commitment transaction yet and plus the + // current number of settles that have been sent, but not yet committed + // to the commitment. + // // TODO(andrew.shvv) remove after we add additional // BatchNumber() method in state machine. batchCounter uint32 @@ -86,18 +90,18 @@ type channelLink struct { // which may affect behaviour of the service. cfg *ChannelLinkConfig - // queue is used to store the htlc add updates which haven't been - // processed because of the commitment trancation overflow. - queue *packetQueue + // overflowQueue is used to store the htlc add updates which haven't + // been processed because of the commitment transaction overflow. + overflowQueue *packetQueue - // upstream is a channel which responsible for propagating the - // received from remote peer messages, with which we have an opened - // channel, to handler function. + // upstream is a channel which responsible for propagating the received + // from remote peer messages, with which we have an opened channel, to + // handler function. upstream chan lnwire.Message - // downstream is a channel which responsible for propagating - // the received htlc switch packet which are forwarded from anther - // channel to the handler function. + // downstream is a channel which responsible for propagating the + // received htlc switch packet which are forwarded from anther channel + // to the handler function. downstream chan *htlcPacket // control is used to propagate the commands to its handlers. This @@ -105,6 +109,14 @@ type channelLink struct { // i.e in the main handler loop. control chan interface{} + // logCommitTimer is a timer which is sent upon if we go an interval + // without receiving/sending a commitment update. It's role is to + // ensure both chains converge to identical state in a timely manner. + // + // TODO(roasbeef): timer should be >> then RTT + logCommitTimer *time.Timer + logCommitTick <-chan time.Time + started int32 shutdown int32 wg sync.WaitGroup @@ -116,15 +128,16 @@ func NewChannelLink(cfg *ChannelLinkConfig, channel *lnwallet.LightningChannel) ChannelLink { return &channelLink{ - cfg: cfg, - channel: channel, - blobs: make(map[uint64][lnwire.OnionPacketSize]byte), - upstream: make(chan lnwire.Message), - downstream: make(chan *htlcPacket), - control: make(chan interface{}), - cancelReasons: make(map[uint64]lnwire.OpaqueReason), - queue: newWaitingQueue(), - quit: make(chan struct{}), + cfg: cfg, + channel: channel, + blobs: make(map[uint64][lnwire.OnionPacketSize]byte), + upstream: make(chan lnwire.Message), + downstream: make(chan *htlcPacket), + control: make(chan interface{}), + cancelReasons: make(map[uint64]lnwire.OpaqueReason), + logCommitTimer: time.NewTimer(300 * time.Millisecond), + overflowQueue: newWaitingQueue(), + quit: make(chan struct{}), } } @@ -132,8 +145,9 @@ func NewChannelLink(cfg *ChannelLinkConfig, // interface. var _ ChannelLink = (*channelLink)(nil) -// Start starts all helper goroutines required for the operation of the -// channel link. +// Start starts all helper goroutines required for the operation of the channel +// link. +// // NOTE: Part of the ChannelLink interface. func (l *channelLink) Start() error { if !atomic.CompareAndSwapInt32(&l.started, 0, 1) { @@ -151,6 +165,7 @@ func (l *channelLink) Start() error { // Stop gracefully stops all active helper goroutines, then waits until they've // exited. +// // NOTE: Part of the ChannelLink interface. func (l *channelLink) Stop() { if !atomic.CompareAndSwapInt32(&l.shutdown, 0, 1) { @@ -172,7 +187,8 @@ func (l *channelLink) Stop() { // htlc packets to the switch. Additionally, the this goroutine handles acting // upon all timeouts for any active HTLCs, manages the channel's revocation // window, and also the htlc trickle queue+timer for this active channels. -// NOTE: Should be started as goroutine. +// +// NOTE: This MUST be run as a goroutine. func (l *channelLink) htlcHandler() { defer l.wg.Done() @@ -198,8 +214,8 @@ func (l *channelLink) htlcHandler() { batchTimer := time.NewTicker(50 * time.Millisecond) defer batchTimer.Stop() - logCommitTimer := time.NewTicker(100 * time.Millisecond) - defer logCommitTimer.Stop() + // TODO(roasbeef): fail chan in case of protocol violation + out: for { select { @@ -222,7 +238,7 @@ out: l.channel.ChannelPoint(), l.cfg.Peer.ID()) break out - case <-logCommitTimer.C: + case <-l.logCommitTick: // If we haven't sent or received a new commitment // update in some time, check to see if we have any // pending updates we need to commit due to our @@ -257,29 +273,33 @@ out: break out } - // Previously add update have been added to the reprocessing - // queue because of the overflooding threat, and now we are - // trying to process it again. - case packet := <-l.queue.pending: + // A packet that previously overflowed the commitment + // transaction is now eligible for processing once again. So + // we'll attempt to re-process the packet in order to allow it + // to continue propagating within the network. + case packet := <-l.overflowQueue.pending: msg := packet.htlc.(*lnwire.UpdateAddHTLC) - log.Infof("Reprocess downstream add update "+ + log.Tracef("Reprocessing downstream add update "+ "with payment hash(%v)", hex.EncodeToString(msg.PaymentHash[:])) + l.handleDownStreamPkt(packet) case pkt := <-l.downstream: - // If we have non empty processing queue than in - // order to preserve the order of add updates - // consume it, and process it later. + // If we have non empty processing queue then in order + // we'll add this to the overflow rather than + // processing it directly. Once an active HTLC is + // either settled or failed, then we'll free up a new + // slot. htlc, ok := pkt.htlc.(*lnwire.UpdateAddHTLC) - if ok && l.queue.length() != 0 { + if ok && l.overflowQueue.length() != 0 { log.Infof("Downstream htlc add update with "+ "payment hash(%v) have been added to "+ "reprocessing queue, batch: %v", hex.EncodeToString(htlc.PaymentHash[:]), l.batchCounter) - l.queue.consume(pkt) + l.overflowQueue.consume(pkt) continue } l.handleDownStreamPkt(pkt) @@ -309,10 +329,9 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket) { var isSettle bool switch htlc := pkt.htlc.(type) { case *lnwire.UpdateAddHTLC: - // A new payment has been initiated via the - // downstream channel, so we add the new HTLC - // to our local log, then update the commitment - // chains. + // A new payment has been initiated via the downstream channel, + // so we add the new HTLC to our local log, then update the + // commitment chains. htlc.ChanID = l.ChanID() index, err := l.channel.AddHTLC(htlc) if err == lnwallet.ErrMaxHTLCNumber { @@ -321,12 +340,9 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket) { "reprocessing queue, batch: %v", hex.EncodeToString(htlc.PaymentHash[:]), l.batchCounter) - l.queue.consume(pkt) + l.overflowQueue.consume(pkt) return } else if err != nil { - // TODO: possibly perform fallback/retry logic - // depending on type of error - // The HTLC was unable to be added to the state // machine, as a result, we'll signal the switch to // cancel the pending payment. @@ -339,7 +355,7 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket) { err) return } - log.Tracef("Receive downstream htlc with payment hash"+ + log.Tracef("Received downstream htlc with payment hash"+ "(%v), assign the index: %v, batch: %v", hex.EncodeToString(htlc.PaymentHash[:]), index, l.batchCounter+1) @@ -399,7 +415,6 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket) { // If this newly added update exceeds the min batch size for adds, or // this is a settle request, then initiate an update. - // TODO(roasbeef): enforce max HTLCs in flight limit if l.batchCounter >= 10 || isSettle { if err := l.updateCommitTx(); err != nil { log.Errorf("unable to update "+ @@ -420,7 +435,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { case *lnwire.UpdateAddHTLC: // We just received an add request from an upstream peer, so we // add it to our state machine, then add the HTLC to our - // "settle" list in the event that we know the preimage + // "settle" list in the event that we know the preimage. index, err := l.channel.ReceiveHTLC(msg) if err != nil { log.Errorf("unable to handle upstream add HTLC: %v", @@ -436,8 +451,8 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { // * time-lock is sane, fee, chain, etc // Store the onion blob which encapsulate the htlc route and - // use in on stage of htlc inclusion to retrieve the - // next hope and propagate the htlc farther. + // use in on stage of htlc inclusion to retrieve the next hope + // and propagate the htlc farther. l.blobs[index] = msg.OnionBlob case *lnwire.UpdateFufillHTLC: @@ -467,6 +482,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { case *lnwire.CommitSig: // We just received a new update to our local commitment chain, // validate this new commitment, closing the link if invalid. + // // TODO(roasbeef): redundant re-serialization sig := msg.CommitSig.Serialize() if err := l.channel.ReceiveNewCommitment(sig); err != nil { @@ -485,6 +501,21 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { } l.cfg.Peer.SendMessage(nextRevocation) + // As we've just received a commitment signature, we'll + // re-start the log commit timer to wake up the main processing + // loop to check if we need to send a commitment signature as + // we owe one. + // + // TODO(roasbeef): instead after revocation? + if !l.logCommitTimer.Stop() { + select { + case <-l.logCommitTimer.C: + default: + } + } + l.logCommitTimer.Reset(300 * time.Millisecond) + l.logCommitTick = l.logCommitTimer.C + // If both commitment chains are fully synced from our PoV, // then we don't need to reply with a signature as both sides // already have a commitment with the latest accepted l. @@ -512,10 +543,10 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { return } - // After we treat HTLCs as included in both - // remote/local commitment transactions they might be - // safely propagated over htlc switch or settled if our node was - // last node in htlc path. + // After we treat HTLCs as included in both remote/local + // commitment transactions they might be safely propagated over + // htlc switch or settled if our node was last node in htlc + // path. htlcsToForward := l.processLockedInHtlcs(htlcs) go func() { for _, packet := range htlcsToForward { @@ -554,19 +585,34 @@ func (l *channelLink) updateCommitTx() error { } l.cfg.Peer.SendMessage(commitSig) + // We've just initiated a state transition, attempt to stop the + // logCommitTimer. If the timer already ticked, then we'll consume the + // value, dropping + if l.logCommitTimer != nil && !l.logCommitTimer.Stop() { + select { + case <-l.logCommitTimer.C: + default: + } + } + l.logCommitTick = nil + + // Finally, clear our the current batch, so we can accurately make + // further batch flushing decisions. l.batchCounter = 0 return nil } -// Peer returns the representation of remote peer with which we -// have the channel link opened. +// Peer returns the representation of remote peer with which we have the +// channel link opened. +// // NOTE: Part of the ChannelLink interface. func (l *channelLink) Peer() Peer { return l.cfg.Peer } // ChannelPoint returns the unique identificator of the channel link. +// // NOTE: Part of the ChannelLink interface. func (l *channelLink) ChanID() lnwire.ChannelID { return lnwire.NewChanIDFromOutPoint(l.channel.ChannelPoint()) @@ -577,9 +623,10 @@ type getBandwidthCmd struct { done chan btcutil.Amount } -// Bandwidth returns the amount which current link might pass -// through channel link. Execution through control channel gives as -// confidence that bandwidth will not be changed during function execution. +// Bandwidth returns the amount which current link might pass through channel +// link. Execution through control channel gives as confidence that bandwidth +// will not be changed during function execution. +// // NOTE: Part of the ChannelLink interface. func (l *channelLink) Bandwidth() btcutil.Amount { command := &getBandwidthCmd{ @@ -594,15 +641,17 @@ func (l *channelLink) Bandwidth() btcutil.Amount { } } -// getBandwidth returns the amount which current link might pass -// through channel link. -// NOTE: Should be use inside main goroutine only, otherwise the result might -// be accurate. +// getBandwidth returns the amount which current link might pass through +// channel link. +// +// NOTE: Should be used inside main goroutine only, otherwise the result might +// not be accurate. func (l *channelLink) getBandwidth() btcutil.Amount { - return l.channel.LocalAvailableBalance() - l.queue.pendingAmount() + return l.channel.LocalAvailableBalance() - l.overflowQueue.pendingAmount() } -// Stats return the statistics of channel link. +// Stats returns the statistics of channel link. +// // NOTE: Part of the ChannelLink interface. func (l *channelLink) Stats() (uint64, btcutil.Amount, btcutil.Amount) { snapshot := l.channel.StateSnapshot() @@ -612,14 +661,16 @@ func (l *channelLink) Stats() (uint64, btcutil.Amount, btcutil.Amount) { } // String returns the string representation of channel link. +// // NOTE: Part of the ChannelLink interface. func (l *channelLink) String() string { return l.channel.ChannelPoint().String() } -// HandleSwitchPacket handles the switch packets. This packets which might -// be forwarded to us from another channel link in case the htlc update came -// from another peer or if the update was created by user +// HandleSwitchPacket handles the switch packets. This packets which might be +// forwarded to us from another channel link in case the htlc update came from +// another peer or if the update was created by user +// // NOTE: Part of the ChannelLink interface. func (l *channelLink) HandleSwitchPacket(packet *htlcPacket) { select { @@ -628,8 +679,9 @@ func (l *channelLink) HandleSwitchPacket(packet *htlcPacket) { } } -// HandleChannelUpdate handles the htlc requests as settle/add/fail which -// sent to us from remote peer we have a channel with. +// HandleChannelUpdate handles the htlc requests as settle/add/fail which sent +// to us from remote peer we have a channel with. +// // NOTE: Part of the ChannelLink interface. func (l *channelLink) HandleChannelUpdate(message lnwire.Message) { select { @@ -639,8 +691,8 @@ func (l *channelLink) HandleChannelUpdate(message lnwire.Message) { } // processLockedInHtlcs function is used to proceed the HTLCs which was -// designated as eligible for forwarding. But not all htlc will be -// forwarder, if htlc reached its final destination that we should settle it. +// designated as eligible for forwarding. But not all htlc will be forwarder, +// if htlc reached its final destination that we should settle it. func (l *channelLink) processLockedInHtlcs( paymentDescriptors []*lnwallet.PaymentDescriptor) []*htlcPacket { @@ -653,7 +705,7 @@ func (l *channelLink) processLockedInHtlcs( switch pd.EntryType { case lnwallet.Settle: - // forward message to switch which will decide does + // Forward message to switch which will decide does // this peer is the final destination of htlc and we // should notify user about successful income or it // should be propagated back to the origin peer. @@ -662,22 +714,22 @@ func (l *channelLink) processLockedInHtlcs( &lnwire.UpdateFufillHTLC{ PaymentPreimage: pd.RPreimage, }, pd.RHash, pd.Amount)) - l.queue.release() + l.overflowQueue.release() case lnwallet.Fail: opaqueReason := l.cancelReasons[pd.ParentIndex] - // forward message to switch which will decide does + // Forward message to switch which will decide does // this peer is the final destination of htlc and we - // should notify user about fail income or it - // should be propagated back to the origin peer. + // should notify user about fail income or it should be + // propagated back to the origin peer. packetsToForward = append(packetsToForward, newFailPacket(l.ChanID(), &lnwire.UpdateFailHTLC{ Reason: opaqueReason, ChanID: l.ChanID(), }, pd.RHash, pd.Amount)) - l.queue.release() + l.overflowQueue.release() case lnwallet.Add: blob := l.blobs[pd.Index] @@ -685,9 +737,10 @@ func (l *channelLink) processLockedInHtlcs( delete(l.blobs, pd.Index) // Before adding the new htlc to the state machine, - // parse the onion object in order to obtain the routing - // information with DecodeOnion function which process - // the Sphinx packet. + // parse the onion object in order to obtain the + // routing information with DecodeOnion function which + // process the Sphinx packet. + // // We include the payment hash of the htlc as it's // authenticated within the Sphinx packet itself as // associated data in order to thwart attempts a replay @@ -706,10 +759,13 @@ func (l *channelLink) processLockedInHtlcs( } if nextChan := chanIterator.Next(); nextChan != nil { - // There are additional channels left within this - // route. - var b bytes.Buffer - var blob [lnwire.OnionPacketSize]byte + // There are additional channels left within + // this route. + var ( + b bytes.Buffer + blob [lnwire.OnionPacketSize]byte + ) + err := chanIterator.Encode(&b) if err != nil { log.Errorf("unable to encode the "+ @@ -744,11 +800,12 @@ func (l *channelLink) processLockedInHtlcs( continue } - // If we're not currently in debug mode, and the - // extended htlc doesn't meet the value requested, - // then we'll fail the htlc. Otherwise, we settle - // this htlc within our local state update log, - // then send the update entry to the remote party. + // If we're not currently in debug mode, and + // the extended htlc doesn't meet the value + // requested, then we'll fail the htlc. + // Otherwise, we settle this htlc within our + // local state update log, then send the update + // entry to the remote party. if !l.cfg.DebugHTLC && pd.Amount < invoice.Terms.Value { log.Errorf("rejecting htlc due to incorrect "+ "amount: expected %v, received %v", @@ -767,8 +824,8 @@ func (l *channelLink) processLockedInHtlcs( return nil } - // Notify the invoiceRegistry of the invoices we - // just settled with this latest commitment + // Notify the invoiceRegistry of the invoices + // we just settled with this latest commitment // update. err = l.cfg.Registry.SettleInvoice(invoiceHash) if err != nil { @@ -777,7 +834,7 @@ func (l *channelLink) processLockedInHtlcs( return nil } - // htlc was successfully settled locally send + // HTLC was successfully settled locally send // notification about it remote peer. l.cfg.Peer.SendMessage(&lnwire.UpdateFufillHTLC{ ChanID: l.ChanID(), @@ -791,8 +848,8 @@ func (l *channelLink) processLockedInHtlcs( if needUpdate { // With all the settle/cancel updates added to the local and - // remote htlc logs, initiate a state transition by updating the - // remote commitment chain. + // remote htlc logs, initiate a state transition by updating + // the remote commitment chain. if err := l.updateCommitTx(); err != nil { log.Errorf("unable to update commitment: %v", err) l.cfg.Peer.Disconnect()