From 9d4cea93f0333a93b798cb46f45ebf009c516538 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Tue, 3 Apr 2018 19:46:06 -0700 Subject: [PATCH 1/9] discovery: fix deadlock in processChanPolicyUpdate In this commit, we fix an existing deadlock in the processChanPolicyUpdate method. Before this commit, within processChanPolicyUpdate, we would directly call updateChannel *within* the ForEachChannel closure. This would at times result in a deadlock, as updateChannel will itself attempt to create a write transaction in order to persist the newly updated channel. We fix this deadlock by simply performing another loop once we know the set of channels that we wish to update. This second loop will actually update the channels on disk. --- discovery/gossiper.go | 52 +++++++++++++++++++++++++++++-------------- 1 file changed, 35 insertions(+), 17 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index eaaba6b95..b1b1007fb 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -1097,8 +1097,9 @@ func (d *AuthenticatedGossiper) retransmitStaleChannels() error { for _, chanToUpdate := range edgesToUpdate { // Re-sign and update the channel on disk and retrieve our // ChannelUpdate to broadcast. - chanAnn, chanUpdate, err := d.updateChannel(chanToUpdate.info, - chanToUpdate.edge) + chanAnn, chanUpdate, err := d.updateChannel( + chanToUpdate.info, chanToUpdate.edge, + ) if err != nil { return fmt.Errorf("unable to update channel: %v", err) } @@ -1131,8 +1132,8 @@ func (d *AuthenticatedGossiper) retransmitStaleChannels() error { // processChanPolicyUpdate generates a new set of channel updates with the new // channel policy applied for each specified channel identified by its channel -// point. In the case that no channel points are specified, then the update will -// be applied to all channels. Finally, the backing ChannelGraphSource is +// point. In the case that no channel points are specified, then the update +// will be applied to all channels. Finally, the backing ChannelGraphSource is // updated with the latest information reflecting the applied updates. // // TODO(roasbeef): generalize into generic for any channel update @@ -1147,12 +1148,17 @@ func (d *AuthenticatedGossiper) processChanPolicyUpdate( haveChanFilter := len(chansToUpdate) != 0 - var chanUpdates []networkMsg + type edgeWithInfo struct { + info *channeldb.ChannelEdgeInfo + edge *channeldb.ChannelEdgePolicy + } + var edgesToUpdate []edgeWithInfo // Next, we'll loop over all the outgoing channels the router knows of. // If we have a filter then we'll only collected those channels, // otherwise we'll collect them all. - err := d.cfg.Router.ForAllOutgoingChannels(func(info *channeldb.ChannelEdgeInfo, + err := d.cfg.Router.ForAllOutgoingChannels(func( + info *channeldb.ChannelEdgeInfo, edge *channeldb.ChannelEdgePolicy) error { // If we have a channel filter, and this channel isn't a part @@ -1161,20 +1167,37 @@ func (d *AuthenticatedGossiper) processChanPolicyUpdate( return nil } - // Apply the new fee schema to the edge. + // Now that we know we should update this channel, we'll update + // its set of policies. edge.FeeBaseMSat = policyUpdate.newSchema.BaseFee edge.FeeProportionalMillionths = lnwire.MilliSatoshi( policyUpdate.newSchema.FeeRate, ) - - // Apply the new TimeLockDelta. edge.TimeLockDelta = uint16(policyUpdate.newSchema.TimeLockDelta) - // Re-sign and update the backing ChannelGraphSource, and + edgesToUpdate = append(edgesToUpdate, edgeWithInfo{ + info: info, + edge: edge, + }) + + return nil + }) + if err != nil { + return nil, err + } + + // With the set of edges we need to update retrieved, we'll now re-sign + // them, and insert them into the database. + var chanUpdates []networkMsg + for _, edgeInfo := range edgesToUpdate { + // Now that we've collected all the channels we need to update, + // we'll Re-sign and update the backing ChannelGraphSource, and // retrieve our ChannelUpdate to broadcast. - _, chanUpdate, err := d.updateChannel(info, edge) + _, chanUpdate, err := d.updateChannel( + edgeInfo.info, edgeInfo.edge, + ) if err != nil { - return err + return nil, err } // We set ourselves as the source of this message to indicate @@ -1183,11 +1206,6 @@ func (d *AuthenticatedGossiper) processChanPolicyUpdate( peer: d.selfKey, msg: chanUpdate, }) - - return nil - }) - if err != nil { - return nil, err } return chanUpdates, nil From 7037d55f65813a46df51119b758b3edffd5a1ca4 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Tue, 3 Apr 2018 19:51:40 -0700 Subject: [PATCH 2/9] htlcswitch: perform fee related checks at forwarding time In this commit, we fix a very old, lingering bug within the link. When accepting an HTLC we are meant to validate the fee against the constraints of the *outgoing* link. This is due to the fact that we're offering a payment transit service on our outgoing link. Before this commit, we would use the policies of the *incoming* link. This would at times lead to odd routing errors as we would go to route, get an error update and then route again, repeating the process. With this commit, we'll properly use the incoming link for timelock related constraints, and the outgoing link for fee related constraints. We do this by introducing a new HtlcSatisfiesPolicy method in the link. This method should return a non-nil error if the link can carry the HTLC as it satisfies its current forwarding policy. We'll use this method now at *forwarding* time to ensure that we only forward to links that actually accept the policy. This fixes a number of bugs that existed before that could result in a link accepting an HTLC that actually violated its policy. In the case that the policy is violated for *all* links, we take care to return the error returned by the *target* link so the caller can update their sending accordingly. In this commit, we also remove the prior linkControl channel in the channelLink. Instead, of sending a message to update the internal link policy, we'll use a mutex in place. This simplifies the code, and also adds some necessary refactoring in anticipation of the next follow up commit. --- htlcswitch/interfaces.go | 8 ++ htlcswitch/link.go | 245 ++++++++++++++++++--------------------- htlcswitch/switch.go | 62 +++++++++- 3 files changed, 181 insertions(+), 134 deletions(-) diff --git a/htlcswitch/interfaces.go b/htlcswitch/interfaces.go index 36852d36c..6961050ac 100644 --- a/htlcswitch/interfaces.go +++ b/htlcswitch/interfaces.go @@ -76,6 +76,14 @@ type ChannelLink interface { // policy to govern if it an incoming HTLC should be forwarded or not. UpdateForwardingPolicy(ForwardingPolicy) + // HtlcSatifiesPolicy should return a nil error if the passed HTLC + // details satisfy the current forwarding policy fo the target link. + // Otherwise, a valid protocol failure message should be returned in + // order to signal to the source of the HTLC, the policy consistency + // issue. + HtlcSatifiesPolicy(payHash [32]byte, + incomingAmt, amtToForward lnwire.MilliSatoshi) lnwire.FailureMessage + // Bandwidth returns the amount of milli-satoshis which current link // might pass through channel link. The value returned from this method // represents the up to date available flow through the channel. This diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 2a7feae23..d8eecac26 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -79,7 +79,8 @@ type ForwardingPolicy struct { // // TODO(roasbeef): also add in current available channel bandwidth, inverse // func -func ExpectedFee(f ForwardingPolicy, htlcAmt lnwire.MilliSatoshi) lnwire.MilliSatoshi { +func ExpectedFee(f ForwardingPolicy, + htlcAmt lnwire.MilliSatoshi) lnwire.MilliSatoshi { // TODO(roasbeef): write some basic table driven tests return f.BaseFee + (htlcAmt*f.FeeRate)/1000000 @@ -151,10 +152,12 @@ type ChannelLinkConfig struct { // Sphinx onion blob, and creating onion failure obfuscator. ExtractErrorEncrypter ErrorEncrypterExtracter - // GetLastChannelUpdate retrieves the latest routing policy for this - // particular channel. This will be used to provide payment senders our - // latest policy when sending encrypted error messages. - GetLastChannelUpdate func() (*lnwire.ChannelUpdate, error) + // FetchLastChannelUpdate retrieves the latest routing policy for a + // target channel. This channel will typically be the outgoing channel + // specified when we receive an incoming HTLC. This will be used to + // provide payment senders our latest policy when sending encrypted + // error messages. + FetchLastChannelUpdate func(lnwire.ShortChannelID) (*lnwire.ChannelUpdate, error) // Peer is a lightning network node with which we have the channel link // opened. @@ -310,11 +313,6 @@ type channelLink struct { // by the HTLC switch. downstream chan *htlcPacket - // linkControl is a channel which is used to query the state of the - // link, or update various policies used which govern if an HTLC is to - // be forwarded and/or accepted. - linkControl chan interface{} - // htlcUpdates is a channel that we'll use to update outside // sub-systems with the latest set of active HTLC's on our channel. htlcUpdates chan []channeldb.HTLC @@ -342,7 +340,6 @@ func NewChannelLink(cfg ChannelLinkConfig, channel *lnwallet.LightningChannel, cfg: cfg, channel: channel, shortChanID: channel.ShortChanID(), - linkControl: make(chan interface{}), // TODO(roasbeef): just do reserve here? logCommitTimer: time.NewTimer(300 * time.Millisecond), overflowQueue: newPacketQueue(lnwallet.MaxHTLCNumber / 2), @@ -920,30 +917,6 @@ out: case msg := <-l.upstream: l.handleUpstreamMsg(msg) - // TODO(roasbeef): make distinct goroutine to handle? - case cmd := <-l.linkControl: - - switch req := cmd.(type) { - case *policyUpdate: - // In order to avoid overriding a valid policy - // with a "null" field in the new policy, we'll - // only update to the set sub policy if the new - // value isn't uninitialized. - if req.policy.BaseFee != 0 { - l.cfg.FwrdingPolicy.BaseFee = req.policy.BaseFee - } - if req.policy.FeeRate != 0 { - l.cfg.FwrdingPolicy.FeeRate = req.policy.FeeRate - } - if req.policy.TimeLockDelta != 0 { - l.cfg.FwrdingPolicy.TimeLockDelta = req.policy.TimeLockDelta - } - - if req.done != nil { - close(req.done) - } - } - case <-l.quit: break out } @@ -1502,6 +1475,9 @@ func (l *channelLink) Peer() Peer { // // NOTE: Part of the ChannelLink interface. func (l *channelLink) ShortChanID() lnwire.ShortChannelID { + l.RLock() + defer l.RUnlock() + return l.shortChanID } @@ -1581,14 +1557,6 @@ func (l *channelLink) AttachMailBox(mailbox MailBox) { l.Unlock() } -// policyUpdate is a message sent to a channel link when an outside sub-system -// wishes to update the current forwarding policy. -type policyUpdate struct { - policy ForwardingPolicy - - done chan struct{} -} - // UpdateForwardingPolicy updates the forwarding policy for the target // ChannelLink. Once updated, the link will use the new forwarding policy to // govern if it an incoming HTLC should be forwarded or not. Note that this @@ -1598,20 +1566,98 @@ type policyUpdate struct { // // NOTE: Part of the ChannelLink interface. func (l *channelLink) UpdateForwardingPolicy(newPolicy ForwardingPolicy) { - cmd := &policyUpdate{ - policy: newPolicy, - done: make(chan struct{}), + l.Lock() + defer l.Unlock() + + // In order to avoid overriding a valid policy with a "null" field in + // the new policy, we'll only update to the set sub policy if the new + // value isn't uninitialized. + if newPolicy.BaseFee != 0 { + l.cfg.FwrdingPolicy.BaseFee = newPolicy.BaseFee + } + if newPolicy.FeeRate != 0 { + l.cfg.FwrdingPolicy.FeeRate = newPolicy.FeeRate + } + if newPolicy.TimeLockDelta != 0 { + l.cfg.FwrdingPolicy.TimeLockDelta = newPolicy.TimeLockDelta + } + if newPolicy.MinHTLC != 0 { + l.cfg.FwrdingPolicy.MinHTLC = newPolicy.MinHTLC + } +} + +// HtlcSatifiesPolicy should return a nil error if the passed HTLC details +// satisfy the current forwarding policy fo the target link. Otherwise, a +// valid protocol failure message should be returned in order to signal to the +// source of the HTLC, the policy consistency issue. +// +// NOTE: Part of the ChannelLink interface. +func (l *channelLink) HtlcSatifiesPolicy(payHash [32]byte, + incomingHtlcAmt, amtToForward lnwire.MilliSatoshi) lnwire.FailureMessage { + + l.RLock() + defer l.RUnlock() + + // As our first sanity check, we'll ensure that the passed HTLC isn't + // too small for the next hop. If so, then we'll cancel the HTLC + // directly. + if amtToForward < l.cfg.FwrdingPolicy.MinHTLC { + l.errorf("outgoing htlc(%x) is too small: min_htlc=%v, "+ + "htlc_value=%v", payHash[:], l.cfg.FwrdingPolicy.MinHTLC, + amtToForward) + + // As part of the returned error, we'll send our latest routing + // policy so the sending node obtains the most up to date data. + var failure lnwire.FailureMessage + update, err := l.cfg.FetchLastChannelUpdate( + l.shortChanID, + ) + if err != nil { + failure = lnwire.NewTemporaryChannelFailure(nil) + } else { + failure = lnwire.NewAmountBelowMinimum( + amtToForward, *update, + ) + } + + return failure } - select { - case l.linkControl <- cmd: - case <-l.quit: + // Next, using the amount of the incoming HTLC, we'll calculate the + // expected fee this incoming HTLC must carry in order to satisfy the + // constraints of the outgoing link. + expectedFee := ExpectedFee(l.cfg.FwrdingPolicy, amtToForward) + + // If the actual fee is less than our expected fee, then we'll reject + // this HTLC as it didn't provide a sufficient amount of fees, or the + // values have been tampered with, or the send used incorrect/dated + // information to construct the forwarding information for this hop. In + // any case, we'll cancel this HTLC. + actualFee := incomingHtlcAmt - amtToForward + if incomingHtlcAmt < amtToForward || actualFee < expectedFee { + l.errorf("outgoing htlc(%x) has insufficient "+ + "fee: expected %v, got %v", payHash[:], + int64(expectedFee), + int64(actualFee)) + + // As part of the returned error, we'll send our latest routing + // policy so the sending node obtains the most up to date data. + var failure lnwire.FailureMessage + update, err := l.cfg.FetchLastChannelUpdate( + l.shortChanID, + ) + if err != nil { + failure = lnwire.NewTemporaryChannelFailure(nil) + } else { + failure = lnwire.NewFeeInsufficient( + amtToForward, *update, + ) + } + + return failure } - select { - case <-cmd.done: - case <-l.quit: - } + return nil } // Stats returns the statistics of channel link. @@ -2101,17 +2147,22 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg, htlc: addMsg, obfuscator: obfuscator, } - switchPackets = append(switchPackets, - updatePacket) + switchPackets = append( + switchPackets, updatePacket, + ) continue } + // We'll consult the forwarding policy for this link + // when checking time locked related constraints. + hopPolicy := l.cfg.FwrdingPolicy + // We want to avoid forwarding an HTLC which will // expire in the near future, so we'll reject an HTLC // if its expiration time is too close to the current // height. - timeDelta := l.cfg.FwrdingPolicy.TimeLockDelta + timeDelta := hopPolicy.TimeLockDelta if pd.Timeout-timeDelta <= heightNow { log.Errorf("htlc(%x) has an expiry "+ "that's too soon: outgoing_expiry=%v, "+ @@ -2119,7 +2170,9 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg, pd.Timeout-timeDelta, heightNow) var failure lnwire.FailureMessage - update, err := l.cfg.GetLastChannelUpdate() + update, err := l.cfg.FetchLastChannelUpdate( + l.shortChanID, + ) if err != nil { failure = lnwire.NewTemporaryChannelFailure(nil) } else { @@ -2127,78 +2180,8 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg, } l.sendHTLCError( - pd.HtlcIndex, failure, obfuscator, pd.SourceRef, - ) - needUpdate = true - continue - } - - // As our second sanity check, we'll ensure that the - // passed HTLC isn't too small. If so, then - // we'll cancel the HTLC directly. - if pd.Amount < l.cfg.FwrdingPolicy.MinHTLC { - log.Errorf("Incoming htlc(%x) is too "+ - "small: min_htlc=%v, htlc_value=%v", - pd.RHash[:], l.cfg.FwrdingPolicy.MinHTLC, - pd.Amount) - - // As part of the returned error, we'll send - // our latest routing policy so the sending - // node obtains the most up to date data. - var failure lnwire.FailureMessage - update, err := l.cfg.GetLastChannelUpdate() - if err != nil { - failure = lnwire.NewTemporaryChannelFailure(nil) - } else { - failure = lnwire.NewAmountBelowMinimum( - pd.Amount, *update) - } - - l.sendHTLCError( - pd.HtlcIndex, failure, obfuscator, pd.SourceRef, - ) - needUpdate = true - continue - } - - // Next, using the amount of the incoming HTLC, we'll - // calculate the expected fee this incoming HTLC must - // carry in order to be accepted. - expectedFee := ExpectedFee( - l.cfg.FwrdingPolicy, - fwdInfo.AmountToForward, - ) - - // If the actual fee is less than our expected - // fee, then we'll reject this HTLC as it didn't - // provide a sufficient amount of fees, or the values - // have been tampered with, or the send used - // incorrect/dated information to construct the - // forwarding information for this hop. In any case, - // we'll cancel this HTLC. - actualFee := pd.Amount - fwdInfo.AmountToForward - if pd.Amount < fwdInfo.AmountToForward || - actualFee < expectedFee { - - log.Errorf("Incoming htlc(%x) has insufficient "+ - "fee: expected %v, got %v", pd.RHash[:], - int64(expectedFee), - int64(pd.Amount-fwdInfo.AmountToForward)) - - // As part of the returned error, we'll send - // our latest routing policy so the sending - // node obtains the most up to date data. - var failure lnwire.FailureMessage - update, err := l.cfg.GetLastChannelUpdate() - if err != nil { - failure = lnwire.NewTemporaryChannelFailure(nil) - } else { - failure = lnwire.NewFeeInsufficient(pd.Amount, - *update) - } - - l.sendHTLCError( - pd.HtlcIndex, failure, obfuscator, pd.SourceRef, + pd.HtlcIndex, failure, obfuscator, + pd.SourceRef, ) needUpdate = true continue @@ -2220,7 +2203,9 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg, // Grab the latest routing policy so the // sending node is up to date with our current // policy. - update, err := l.cfg.GetLastChannelUpdate() + update, err := l.cfg.FetchLastChannelUpdate( + l.shortChanID, + ) if err != nil { l.fail("unable to create channel update "+ "while handling the error: %v", err) diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index 0fc30c831..f61205562 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -893,13 +893,37 @@ func (s *Switch) handlePacketForward(packet *htlcPacket) error { } interfaceLinks, _ := s.getLinks(targetLink.Peer().PubKey()) + // We'll keep track of any HTLC failures during the link + // selection process. This way we can return the error for + // precise link that the sender selected, while optimistically + // trying all links to utilize our available bandwidth. + linkErrs := make(map[lnwire.ShortChannelID]lnwire.FailureMessage) + // Try to find destination channel link with appropriate // bandwidth. var destination ChannelLink for _, link := range interfaceLinks { // We'll skip any links that aren't yet eligible for // forwarding. - if !link.EligibleToForward() { + switch { + case !link.EligibleToForward(): + continue + + // If the link doesn't yet have a source chan ID, then + // we'll skip it as well. + case link.ShortChanID() == sourceHop: + continue + } + + // Before we check the link's bandwidth, we'll ensure + // that the HTLC satisfies the current forwarding + // policy of this target link. + err := link.HtlcSatifiesPolicy( + htlc.PaymentHash, packet.incomingAmount, + packet.amount, + ) + if err != nil { + linkErrs[link.ShortChanID()] = err continue } @@ -910,10 +934,12 @@ func (s *Switch) handlePacketForward(packet *htlcPacket) error { } } + switch { // If the channel link we're attempting to forward the update - // over has insufficient capacity, then we'll cancel the htlc - // as the payment cannot succeed. - if destination == nil { + // over has insufficient capacity, and didn't violate any + // forwarding policies, then we'll cancel the htlc as the + // payment cannot succeed. + case destination == nil && len(linkErrs) == 0: // If packet was forwarded from another channel link // than we should notify this link that some error // occurred. @@ -923,6 +949,34 @@ func (s *Switch) handlePacketForward(packet *htlcPacket) error { "%v", htlc.Amount) return s.failAddPacket(packet, failure, addErr) + + // If we had a forwarding failure due to the HTLC not + // satisfying the current policy, then we'll send back an + // error, but ensure we send back the error sourced at the + // *target* link. + case destination == nil && len(linkErrs) != 0: + // At this point, some or all of the links rejected the + // HTLC so we couldn't forward it. So we'll try to look + // up the error that came from the source. + linkErr, ok := linkErrs[packet.outgoingChanID] + if !ok { + // If we can't find the error of the source, + // then we'll return an unknown next peer, + // though this should never happen. + linkErr = &lnwire.FailUnknownNextPeer{} + log.Warnf("unable to find err source for "+ + "outgoing_link=%v, errors=%v", + packet.outgoingChanID, newLogClosure(func() string { + return spew.Sdump(linkErrs) + })) + } + + addErr := fmt.Errorf("incoming HTLC(%x) violated "+ + "target outgoing link (id=%v) policy: %v", + htlc.PaymentHash[:], packet.outgoingChanID, + linkErr) + + return s.failAddPacket(packet, linkErr, addErr) } // Send the packet to the destination channel link which From 0a47b2c4aded9b377cc38c76b59f715406fd3c65 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Tue, 3 Apr 2018 20:06:57 -0700 Subject: [PATCH 3/9] htlcswitch: remove linkControl in favor of a mutex guarding all channel indexes In this commit, we simplify the switch's code a bit. Rather than having a set of channels we use to mutate or query for the set of current links, we'll instead now just use a mutex to guard a set of link indexes. This serves to simplify the ode, and also make it such that we don't need to block forwarding in order to add/remove a link. --- htlcswitch/switch.go | 311 +++++++++++-------------------------------- 1 file changed, 80 insertions(+), 231 deletions(-) diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index f61205562..e1235f6bf 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -168,10 +168,6 @@ type Switch struct { // forward the settle/fail htlc updates back to the add htlc initiator. circuits CircuitMap - // links is a map of channel id and channel link which manages - // this channel. - linkIndex map[lnwire.ChannelID]ChannelLink - // mailMtx is a read/write mutex that protects the mailboxes map. mailMtx sync.RWMutex @@ -179,6 +175,14 @@ type Switch struct { // switch to buffer messages for peers that have not come back online. mailboxes map[lnwire.ShortChannelID]MailBox + // indexMtx is a read/write mutex that protects the set of indexes + // below. + indexMtx sync.RWMutex + + // links is a map of channel id and channel link which manages + // this channel. + linkIndex map[lnwire.ChannelID]ChannelLink + // forwardingIndex is an index which is consulted by the switch when it // needs to locate the next hop to forward an incoming/outgoing HTLC // update to/from. @@ -244,7 +248,6 @@ func New(cfg Config) (*Switch, error) { htlcPlex: make(chan *plexPacket), chanCloseRequests: make(chan *ChanClose), resolutionMsgs: make(chan *resolutionMsg), - linkControl: make(chan interface{}), quit: make(chan struct{}), }, nil } @@ -386,63 +389,47 @@ func (s *Switch) SendHTLC(nextNode [33]byte, htlc *lnwire.UpdateAddHTLC, func (s *Switch) UpdateForwardingPolicies(newPolicy ForwardingPolicy, targetChans ...wire.OutPoint) error { - errChan := make(chan error, 1) - select { - case s.linkControl <- &updatePoliciesCmd{ - newPolicy: newPolicy, - targetChans: targetChans, - err: errChan, - }: - case <-s.quit: - return fmt.Errorf("switch is shutting down") - } + log.Debugf("Updating link policies: %v", newLogClosure(func() string { + return spew.Sdump(newPolicy) + })) - select { - case err := <-errChan: - return err - case <-s.quit: - return fmt.Errorf("switch is shutting down") - } -} + s.indexMtx.RLock() -// updatePoliciesCmd is a message sent to the switch to update the forwarding -// policies of a set of target links. -type updatePoliciesCmd struct { - newPolicy ForwardingPolicy - targetChans []wire.OutPoint + var linksToUpdate []ChannelLink - err chan error -} - -// updateLinkPolicies attempts to update the forwarding policies for the set of -// passed links identified by their channel points. If a nil set of channel -// points is passed, then the forwarding policies for all active links will be -// updated. -func (s *Switch) updateLinkPolicies(c *updatePoliciesCmd) error { - log.Debugf("Updating link policies: %v", spew.Sdump(c)) - - // If no channels have been targeted, then we'll update the link policies - // for all active channels - if len(c.targetChans) == 0 { + // If no channels have been targeted, then we'll collect all inks to + // update their policies. + if len(targetChans) == 0 { for _, link := range s.linkIndex { - link.UpdateForwardingPolicy(c.newPolicy) + linksToUpdate = append(linksToUpdate, link) + } + } else { + // Otherwise, we'll only attempt to update the forwarding + // policies for the set of targeted links. + for _, targetLink := range targetChans { + cid := lnwire.NewChanIDFromOutPoint(&targetLink) + + // If we can't locate a link by its converted channel + // ID, then we'll return an error back to the caller. + link, ok := s.linkIndex[cid] + if !ok { + s.indexMtx.RUnlock() + + return fmt.Errorf("unable to find "+ + "ChannelPoint(%v) to update link "+ + "policy", targetLink) + } + + linksToUpdate = append(linksToUpdate, link) } } - // Otherwise, we'll only attempt to update the forwarding policies for the - // set of targeted links. - for _, targetLink := range c.targetChans { - cid := lnwire.NewChanIDFromOutPoint(&targetLink) + s.indexMtx.RUnlock() - // If we can't locate a link by its converted channel ID, then we'll - // return an error back to the caller. - link, ok := s.linkIndex[cid] - if !ok { - return fmt.Errorf("unable to find ChannelPoint(%v) to "+ - "update link policy", targetLink) - } - - link.UpdateForwardingPolicy(c.newPolicy) + // With all the links we need to update collected, we can release the + // mutex then update each link directly. + for _, link := range linksToUpdate { + link.UpdateForwardingPolicy(newPolicy) } return nil @@ -715,14 +702,18 @@ func (s *Switch) handleLocalDispatch(pkt *htlcPacket) error { // appropriate channel link and send the payment over this link. case *lnwire.UpdateAddHTLC: // Try to find links by node destination. + s.indexMtx.RLock() links, err := s.getLinks(pkt.destNode) if err != nil { + s.indexMtx.RUnlock() + log.Errorf("unable to find links by destination %v", err) return &ForwardingError{ ErrorSource: s.cfg.SelfKey, FailureMessage: &lnwire.FailUnknownNextPeer{}, } } + s.indexMtx.RUnlock() // Try to find destination channel link with appropriate // bandwidth. @@ -880,8 +871,11 @@ func (s *Switch) handlePacketForward(packet *htlcPacket) error { return s.handleLocalDispatch(packet) } + s.indexMtx.RLock() targetLink, err := s.getLinkByShortID(packet.outgoingChanID) if err != nil { + s.indexMtx.RUnlock() + // If packet was forwarded from another channel link // than we should notify this link that some error // occurred. @@ -892,6 +886,7 @@ func (s *Switch) handlePacketForward(packet *htlcPacket) error { return s.failAddPacket(packet, failure, addErr) } interfaceLinks, _ := s.getLinks(targetLink.Peer().PubKey()) + s.indexMtx.RUnlock() // We'll keep track of any HTLC failures during the link // selection process. This way we can return the error for @@ -1300,12 +1295,14 @@ func (s *Switch) htlcForwarder() { // Remove all links once we've been signalled for shutdown. defer func() { + s.indexMtx.Lock() for _, link := range s.linkIndex { if err := s.removeLink(link.ChanID()); err != nil { log.Errorf("unable to remove "+ "channel link on stop: %v", err) } } + s.indexMtx.Unlock() // Before we exit fully, we'll attempt to flush out any // forwarding events that may still be lingering since the last @@ -1336,12 +1333,17 @@ func (s *Switch) htlcForwarder() { // cooperatively closed (if possible). case req := <-s.chanCloseRequests: chanID := lnwire.NewChanIDFromOutPoint(req.ChanPoint) + + s.indexMtx.RLock() link, ok := s.linkIndex[chanID] if !ok { + s.indexMtx.RUnlock() + req.Err <- errors.Errorf("no peer for channel with "+ "chan_id=%x", chanID[:]) continue } + s.indexMtx.RUnlock() peerPub := link.Peer().PubKey() log.Debugf("Requesting local channel close: peer=%v, "+ @@ -1421,6 +1423,7 @@ func (s *Switch) htlcForwarder() { // Next, we'll run through all the registered links and // compute their up-to-date forwarding stats. + s.indexMtx.RLock() for _, link := range s.linkIndex { // TODO(roasbeef): when links first registered // stats printed. @@ -1429,6 +1432,7 @@ func (s *Switch) htlcForwarder() { newSatSent += sent.ToSatoshis() newSatRecv += recv.ToSatoshis() } + s.indexMtx.RUnlock() var ( diffNumUpdates uint64 @@ -1478,28 +1482,6 @@ func (s *Switch) htlcForwarder() { totalSatSent += diffSatSent totalSatRecv += diffSatRecv - case req := <-s.linkControl: - switch cmd := req.(type) { - case *updatePoliciesCmd: - cmd.err <- s.updateLinkPolicies(cmd) - case *addLinkCmd: - cmd.err <- s.addLink(cmd.link) - case *removeLinkCmd: - cmd.err <- s.removeLink(cmd.chanID) - case *getLinkCmd: - link, err := s.getLink(cmd.chanID) - cmd.done <- link - cmd.err <- err - case *getLinksCmd: - links, err := s.getLinks(cmd.peer) - cmd.done <- links - cmd.err <- err - case *updateForwardingIndexCmd: - cmd.err <- s.updateShortChanID( - cmd.chanID, cmd.shortChanID, - ) - } - case <-s.quit: return } @@ -1555,8 +1537,7 @@ func (s *Switch) reforwardResponses() error { // loadChannelFwdPkgs loads all forwarding packages owned by the `source` short // channel identifier. -func (s *Switch) loadChannelFwdPkgs( - source lnwire.ShortChannelID) ([]*channeldb.FwdPkg, error) { +func (s *Switch) loadChannelFwdPkgs(source lnwire.ShortChannelID) ([]*channeldb.FwdPkg, error) { var fwdPkgs []*channeldb.FwdPkg if err := s.cfg.DB.Update(func(tx *bolt.Tx) error { @@ -1688,38 +1669,11 @@ func (s *Switch) Stop() error { return nil } -// addLinkCmd is a add link command wrapper, it is used to propagate handler -// parameters and return handler error. -type addLinkCmd struct { - link ChannelLink - err chan error -} - // AddLink is used to initiate the handling of the add link command. The // request will be propagated and handled in the main goroutine. func (s *Switch) AddLink(link ChannelLink) error { - command := &addLinkCmd{ - link: link, - err: make(chan error, 1), - } - - select { - case s.linkControl <- command: - select { - case err := <-command.err: - return err - case <-s.quit: - } - case <-s.quit: - } - - return errors.New("unable to add link htlc switch was stopped") -} - -// addLink is used to add the newly created channel link and start use it to -// handle the channel updates. -func (s *Switch) addLink(link ChannelLink) error { - // TODO(roasbeef): reject if link already tehre? + s.indexMtx.Lock() + defer s.indexMtx.Unlock() // First we'll add the link to the linkIndex which lets us quickly look // up a channel when we need to close or register it, and the @@ -1781,47 +1735,12 @@ func (s *Switch) getOrCreateMailBox(chanID lnwire.ShortChannelID) MailBox { return mailbox } -// getLinkCmd is a get link command wrapper, it is used to propagate handler -// parameters and return handler error. -type getLinkCmd struct { - chanID lnwire.ChannelID - err chan error - done chan ChannelLink -} - // GetLink is used to initiate the handling of the get link command. The // request will be propagated/handled to/in the main goroutine. func (s *Switch) GetLink(chanID lnwire.ChannelID) (ChannelLink, error) { - command := &getLinkCmd{ - chanID: chanID, - err: make(chan error, 1), - done: make(chan ChannelLink, 1), - } + s.indexMtx.RLock() + defer s.indexMtx.RUnlock() -query: - select { - case s.linkControl <- command: - - var link ChannelLink - select { - case link = <-command.done: - case <-s.quit: - break query - } - - select { - case err := <-command.err: - return link, err - case <-s.quit: - } - case <-s.quit: - } - - return nil, errors.New("unable to get link htlc switch was stopped") -} - -// getLink attempts to return the link that has the specified channel ID. -func (s *Switch) getLink(chanID lnwire.ChannelID) (ChannelLink, error) { link, ok := s.linkIndex[chanID] if !ok { return nil, ErrChannelLinkNotFound @@ -1832,6 +1751,8 @@ func (s *Switch) getLink(chanID lnwire.ChannelID) (ChannelLink, error) { // getLinkByShortID attempts to return the link which possesses the target // short channel ID. +// +// NOTE: This MUST be called with the indexMtx held. func (s *Switch) getLinkByShortID(chanID lnwire.ShortChannelID) (ChannelLink, error) { link, ok := s.forwardingIndex[chanID] if !ok { @@ -1841,35 +1762,18 @@ func (s *Switch) getLinkByShortID(chanID lnwire.ShortChannelID) (ChannelLink, er return link, nil } -// removeLinkCmd is a get link command wrapper, it is used to propagate handler -// parameters and return handler error. -type removeLinkCmd struct { - chanID lnwire.ChannelID - err chan error -} - // RemoveLink is used to initiate the handling of the remove link command. The // request will be propagated/handled to/in the main goroutine. func (s *Switch) RemoveLink(chanID lnwire.ChannelID) error { - command := &removeLinkCmd{ - chanID: chanID, - err: make(chan error, 1), - } + s.indexMtx.Lock() + defer s.indexMtx.Unlock() - select { - case s.linkControl <- command: - select { - case err := <-command.err: - return err - case <-s.quit: - } - case <-s.quit: - } - - return errors.New("unable to remove link htlc switch was stopped") + return s.removeLink(chanID) } // removeLink is used to remove and stop the channel link. +// +// NOTE: This MUST be called with the indexMtx held. func (s *Switch) removeLink(chanID lnwire.ChannelID) error { log.Infof("Removing channel link with ChannelID(%v)", chanID) @@ -1891,50 +1795,21 @@ func (s *Switch) removeLink(chanID lnwire.ChannelID) error { return nil } -// updateForwardingIndexCmd is a command sent by outside sub-systems to update -// the forwarding index of the switch in the event that the short channel ID of -// a particular link changes. -type updateForwardingIndexCmd struct { - chanID lnwire.ChannelID - shortChanID lnwire.ShortChannelID - - err chan error -} - // UpdateShortChanID updates the short chan ID for an existing channel. This is // required in the case of a re-org and re-confirmation or a channel, or in the // case that a link was added to the switch before its short chan ID was known. func (s *Switch) UpdateShortChanID(chanID lnwire.ChannelID, shortChanID lnwire.ShortChannelID) error { - command := &updateForwardingIndexCmd{ - chanID: chanID, - shortChanID: shortChanID, - err: make(chan error, 1), - } - - select { - case s.linkControl <- command: - select { - case err := <-command.err: - return err - case <-s.quit: - } - case <-s.quit: - } - - return errors.New("unable to update short chan id htlc switch was stopped") -} - -// updateShortChanID updates the short chan ID of an existing link. -func (s *Switch) updateShortChanID(chanID lnwire.ChannelID, - shortChanID lnwire.ShortChannelID) error { + s.indexMtx.Lock() // First, we'll extract the current link as is from the link link // index. If the link isn't even in the index, then we'll return an // error. link, ok := s.linkIndex[chanID] if !ok { + s.indexMtx.Unlock() + return fmt.Errorf("link %v not found", chanID) } @@ -1945,53 +1820,27 @@ func (s *Switch) updateShortChanID(chanID lnwire.ChannelID, // forwarding index with the next short channel ID. s.forwardingIndex[shortChanID] = link + s.indexMtx.Unlock() + // Finally, we'll notify the link of its new short channel ID. link.UpdateShortChanID(shortChanID) return nil } -// getLinksCmd is a get links command wrapper, it is used to propagate handler -// parameters and return handler error. -type getLinksCmd struct { - peer [33]byte - err chan error - done chan []ChannelLink -} - // GetLinksByInterface fetches all the links connected to a particular node // identified by the serialized compressed form of its public key. func (s *Switch) GetLinksByInterface(hop [33]byte) ([]ChannelLink, error) { - command := &getLinksCmd{ - peer: hop, - err: make(chan error, 1), - done: make(chan []ChannelLink, 1), - } + s.indexMtx.RLock() + defer s.indexMtx.RUnlock() -query: - select { - case s.linkControl <- command: - - var links []ChannelLink - select { - case links = <-command.done: - case <-s.quit: - break query - } - - select { - case err := <-command.err: - return links, err - case <-s.quit: - } - case <-s.quit: - } - - return nil, errors.New("unable to get links htlc switch was stopped") + return s.getLinks(hop) } // getLinks is function which returns the channel links of the peer by hop // destination id. +// +// NOTE: This MUST be called with the indexMtx held. func (s *Switch) getLinks(destination [33]byte) ([]ChannelLink, error) { links, ok := s.interfaceIndex[destination] if !ok { From ec8e3b626d826aa7da5b51c89ad477e387143a57 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Tue, 3 Apr 2018 20:09:51 -0700 Subject: [PATCH 4/9] htlcswitch: update unit tests to account for recent API changes --- htlcswitch/link_test.go | 4 +-- htlcswitch/mock.go | 4 +++ htlcswitch/test_utils.go | 58 +++++++++++++++++++++------------------- 3 files changed, 37 insertions(+), 29 deletions(-) diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index 95f11d0f5..21e65c3ec 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -1476,8 +1476,8 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) ( ErrorEncrypter, lnwire.FailCode) { return obfuscator, lnwire.CodeNone }, - GetLastChannelUpdate: mockGetChanUpdateMessage, - PreimageCache: pCache, + FetchLastChannelUpdate: mockGetChanUpdateMessage, + PreimageCache: pCache, UpdateContractSignals: func(*contractcourt.ContractSignals) error { return nil }, diff --git a/htlcswitch/mock.go b/htlcswitch/mock.go index 42c1843c7..33898293c 100644 --- a/htlcswitch/mock.go +++ b/htlcswitch/mock.go @@ -604,6 +604,10 @@ func (f *mockChannelLink) HandleChannelUpdate(lnwire.Message) { func (f *mockChannelLink) UpdateForwardingPolicy(_ ForwardingPolicy) { } +func (f *mockChannelLink) HtlcSatifiesPolicy([32]byte, lnwire.MilliSatoshi, + lnwire.MilliSatoshi) lnwire.FailureMessage { + return nil +} func (f *mockChannelLink) Stats() (uint64, lnwire.MilliSatoshi, lnwire.MilliSatoshi) { return 0, 0, 0 diff --git a/htlcswitch/test_utils.go b/htlcswitch/test_utils.go index adb5c2d03..ec9ce9ad2 100644 --- a/htlcswitch/test_utils.go +++ b/htlcswitch/test_utils.go @@ -116,9 +116,9 @@ func genIDs() (lnwire.ChannelID, lnwire.ChannelID, lnwire.ShortChannelID, return chanID1, chanID2, aliceChanID, bobChanID } -// mockGetChanUpdateMessage helper function which returns topology update -// of the channel -func mockGetChanUpdateMessage() (*lnwire.ChannelUpdate, error) { +// mockGetChanUpdateMessage helper function which returns topology update of +// the channel +func mockGetChanUpdateMessage(cid lnwire.ShortChannelID) (*lnwire.ChannelUpdate, error) { return &lnwire.ChannelUpdate{ Signature: wireSig, }, nil @@ -902,6 +902,7 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, aliceTicker := time.NewTicker(50 * time.Millisecond) aliceChannelLink := NewChannelLink( ChannelLinkConfig{ + Switch: aliceServer.htlcSwitch, FwrdingPolicy: globalPolicy, Peer: bobServer, Circuits: aliceServer.htlcSwitch.CircuitModifier(), @@ -911,11 +912,11 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, ErrorEncrypter, lnwire.FailCode) { return obfuscator, lnwire.CodeNone }, - GetLastChannelUpdate: mockGetChanUpdateMessage, - Registry: aliceServer.registry, - BlockEpochs: aliceEpoch, - FeeEstimator: feeEstimator, - PreimageCache: pCache, + FetchLastChannelUpdate: mockGetChanUpdateMessage, + Registry: aliceServer.registry, + BlockEpochs: aliceEpoch, + FeeEstimator: feeEstimator, + PreimageCache: pCache, UpdateContractSignals: func(*contractcourt.ContractSignals) error { return nil }, @@ -928,7 +929,7 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, aliceChannel, startingHeight, ) - if err := aliceServer.htlcSwitch.addLink(aliceChannelLink); err != nil { + if err := aliceServer.htlcSwitch.AddLink(aliceChannelLink); err != nil { t.Fatalf("unable to add alice channel link: %v", err) } go func() { @@ -950,6 +951,7 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, firstBobTicker := time.NewTicker(50 * time.Millisecond) firstBobChannelLink := NewChannelLink( ChannelLinkConfig{ + Switch: bobServer.htlcSwitch, FwrdingPolicy: globalPolicy, Peer: aliceServer, Circuits: bobServer.htlcSwitch.CircuitModifier(), @@ -959,11 +961,11 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, ErrorEncrypter, lnwire.FailCode) { return obfuscator, lnwire.CodeNone }, - GetLastChannelUpdate: mockGetChanUpdateMessage, - Registry: bobServer.registry, - BlockEpochs: bobFirstEpoch, - FeeEstimator: feeEstimator, - PreimageCache: pCache, + FetchLastChannelUpdate: mockGetChanUpdateMessage, + Registry: bobServer.registry, + BlockEpochs: bobFirstEpoch, + FeeEstimator: feeEstimator, + PreimageCache: pCache, UpdateContractSignals: func(*contractcourt.ContractSignals) error { return nil }, @@ -976,7 +978,7 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, firstBobChannel, startingHeight, ) - if err := bobServer.htlcSwitch.addLink(firstBobChannelLink); err != nil { + if err := bobServer.htlcSwitch.AddLink(firstBobChannelLink); err != nil { t.Fatalf("unable to add first bob channel link: %v", err) } go func() { @@ -998,6 +1000,7 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, secondBobTicker := time.NewTicker(50 * time.Millisecond) secondBobChannelLink := NewChannelLink( ChannelLinkConfig{ + Switch: bobServer.htlcSwitch, FwrdingPolicy: globalPolicy, Peer: carolServer, Circuits: bobServer.htlcSwitch.CircuitModifier(), @@ -1007,11 +1010,11 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, ErrorEncrypter, lnwire.FailCode) { return obfuscator, lnwire.CodeNone }, - GetLastChannelUpdate: mockGetChanUpdateMessage, - Registry: bobServer.registry, - BlockEpochs: bobSecondEpoch, - FeeEstimator: feeEstimator, - PreimageCache: pCache, + FetchLastChannelUpdate: mockGetChanUpdateMessage, + Registry: bobServer.registry, + BlockEpochs: bobSecondEpoch, + FeeEstimator: feeEstimator, + PreimageCache: pCache, UpdateContractSignals: func(*contractcourt.ContractSignals) error { return nil }, @@ -1024,7 +1027,7 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, secondBobChannel, startingHeight, ) - if err := bobServer.htlcSwitch.addLink(secondBobChannelLink); err != nil { + if err := bobServer.htlcSwitch.AddLink(secondBobChannelLink); err != nil { t.Fatalf("unable to add second bob channel link: %v", err) } go func() { @@ -1046,6 +1049,7 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, carolTicker := time.NewTicker(50 * time.Millisecond) carolChannelLink := NewChannelLink( ChannelLinkConfig{ + Switch: carolServer.htlcSwitch, FwrdingPolicy: globalPolicy, Peer: bobServer, Circuits: carolServer.htlcSwitch.CircuitModifier(), @@ -1055,11 +1059,11 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, ErrorEncrypter, lnwire.FailCode) { return obfuscator, lnwire.CodeNone }, - GetLastChannelUpdate: mockGetChanUpdateMessage, - Registry: carolServer.registry, - BlockEpochs: carolEpoch, - FeeEstimator: feeEstimator, - PreimageCache: pCache, + FetchLastChannelUpdate: mockGetChanUpdateMessage, + Registry: carolServer.registry, + BlockEpochs: carolEpoch, + FeeEstimator: feeEstimator, + PreimageCache: pCache, UpdateContractSignals: func(*contractcourt.ContractSignals) error { return nil }, @@ -1072,7 +1076,7 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, carolChannel, startingHeight, ) - if err := carolServer.htlcSwitch.addLink(carolChannelLink); err != nil { + if err := carolServer.htlcSwitch.AddLink(carolChannelLink); err != nil { t.Fatalf("unable to add carol channel link: %v", err) } go func() { From 8b520377bbc27c369716c9a0a633a35eb3ed7331 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Tue, 3 Apr 2018 20:11:06 -0700 Subject: [PATCH 5/9] htlcswitch: fix TestUpdateForwardingPolicy In this commit, we fix the TestUpdateForwardingPolicy test case after the recent modification in the way we handling validating constraints within the link. After the recent set of changes, Bob will properly use his outgoing link to validate the set of fee related constraints rather than the incoming link. As a result, we need to modify the second channel link, not the first for the test to still be applicable. --- htlcswitch/link_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index 21e65c3ec..699773ed3 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -826,7 +826,7 @@ func TestUpdateForwardingPolicy(t *testing.T) { // update logic newPolicy := n.globalPolicy newPolicy.BaseFee = lnwire.NewMSatFromSatoshis(1000) - n.firstBobChannelLink.UpdateForwardingPolicy(newPolicy) + n.secondBobChannelLink.UpdateForwardingPolicy(newPolicy) // Next, we'll send the payment again, using the exact same per-hop // payload for each node. This payment should fail as it won't factor From ffabb17ce6a0a931d2a55bcb683477a6bdef4891 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Tue, 3 Apr 2018 20:12:03 -0700 Subject: [PATCH 6/9] peer: use new fetchLastChanUpdate method to populate the ChannelLinkConfig --- peer.go | 36 ++++++++++++++++++++---------------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/peer.go b/peer.go index 6f9be4a4a..0f9cab712 100644 --- a/peer.go +++ b/peer.go @@ -413,8 +413,9 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { Peer: p, DecodeHopIterators: p.server.sphinx.DecodeHopIterators, ExtractErrorEncrypter: p.server.sphinx.ExtractErrorEncrypter, - GetLastChannelUpdate: createGetLastUpdate(p.server.chanRouter, - p.PubKey(), lnChan.ShortChanID()), + FetchLastChannelUpdate: fetchLastChanUpdate( + p.server.chanRouter, p.PubKey(), + ), DebugHTLC: cfg.DebugHTLC, HodlHTLC: cfg.HodlHTLC, Registry: p.server.invoices, @@ -1389,8 +1390,9 @@ out: Peer: p, DecodeHopIterators: p.server.sphinx.DecodeHopIterators, ExtractErrorEncrypter: p.server.sphinx.ExtractErrorEncrypter, - GetLastChannelUpdate: createGetLastUpdate(p.server.chanRouter, - p.PubKey(), newChanReq.channel.ShortChanID()), + FetchLastChannelUpdate: fetchLastChanUpdate( + p.server.chanRouter, p.PubKey(), + ), DebugHTLC: cfg.DebugHTLC, HodlHTLC: cfg.HodlHTLC, Registry: p.server.invoices, @@ -1906,21 +1908,20 @@ func (p *peer) PubKey() [33]byte { // TODO(roasbeef): make all start/stop mutexes a CAS -// createGetLastUpdate returns the handler which serve as a source of the last -// update of the channel in a form of lnwire update message. -func createGetLastUpdate(router *routing.ChannelRouter, - pubKey [33]byte, chanID lnwire.ShortChannelID) func() (*lnwire.ChannelUpdate, - error) { +// fetchLastChanUpdate returns a function which is able to retrieve the last +// channel update for a target channel. +func fetchLastChanUpdate(router *routing.ChannelRouter, + pubKey [33]byte) func(lnwire.ShortChannelID) (*lnwire.ChannelUpdate, error) { - return func() (*lnwire.ChannelUpdate, error) { - info, edge1, edge2, err := router.GetChannelByID(chanID) + return func(cid lnwire.ShortChannelID) (*lnwire.ChannelUpdate, error) { + info, edge1, edge2, err := router.GetChannelByID(cid) if err != nil { return nil, err } if edge1 == nil || edge2 == nil { return nil, errors.Errorf("unable to find "+ - "channel by ShortChannelID(%v)", chanID) + "channel by ShortChannelID(%v)", cid) } // If we're the outgoing node on the first edge, then that @@ -1933,7 +1934,7 @@ func createGetLastUpdate(router *routing.ChannelRouter, local = edge1 } - update := &lnwire.ChannelUpdate{ + update := lnwire.ChannelUpdate{ ChainHash: info.ChainHash, ShortChannelID: lnwire.NewShortChanIDFromInt(local.ChannelID), Timestamp: uint32(local.LastUpdate.Unix()), @@ -1948,9 +1949,12 @@ func createGetLastUpdate(router *routing.ChannelRouter, return nil, err } - hswcLog.Debugf("Sending latest channel_update: %v", - spew.Sdump(update)) + hswcLog.Tracef("Sending latest channel_update: %v", + newLogClosure(func() string { + return spew.Sdump(update) + }), + ) - return update, nil + return &update, nil } } From 3fa2e08665923b4b8601d5a78e2043e548f8496c Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Tue, 3 Apr 2018 20:18:42 -0700 Subject: [PATCH 7/9] test: update testUpdateChannelPolicy to ensure Bob's link uses the proper policies In this commit, we update the testUpdateChannelPolicy to exercise the recent set of changes within the switch. If one applies this test to a fresh branch (without those new changes) it should fail. This is due to the fact that before, Bob would attempt to apply the constraints of the incoming link (which we updated) instead of the outgoing link. With the recent set of changes, the test now properly passes. --- discovery/gossiper.go | 10 ++++- htlcswitch/switch.go | 4 +- lnd_test.go | 99 +++++++++++++++++++++++++++---------------- rpcserver.go | 2 +- 4 files changed, 74 insertions(+), 41 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index b1b1007fb..02791bccd 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -1147,6 +1147,12 @@ func (d *AuthenticatedGossiper) processChanPolicyUpdate( } haveChanFilter := len(chansToUpdate) != 0 + if haveChanFilter { + log.Infof("Updating routing policies for chan_points=%v", + spew.Sdump(chansToUpdate)) + } else { + log.Infof("Updating routing policies for all chans") + } type edgeWithInfo struct { info *channeldb.ChannelEdgeInfo @@ -2145,8 +2151,8 @@ func (d *AuthenticatedGossiper) updateChannel(info *channeldb.ChannelEdgeInfo, var err error - // Make sure timestamp is always increased, such that our update - // gets propagated. + // Make sure timestamp is always increased, such that our update gets + // propagated. timestamp := time.Now().Unix() if timestamp <= edge.LastUpdate.Unix() { timestamp = edge.LastUpdate.Unix() + 1 diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index e1235f6bf..e530c73e6 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -393,10 +393,10 @@ func (s *Switch) UpdateForwardingPolicies(newPolicy ForwardingPolicy, return spew.Sdump(newPolicy) })) - s.indexMtx.RLock() - var linksToUpdate []ChannelLink + s.indexMtx.RLock() + // If no channels have been targeted, then we'll collect all inks to // update their policies. if len(targetChans) == 0 { diff --git a/lnd_test.go b/lnd_test.go index d3fe6e745..dd1c58785 100644 --- a/lnd_test.go +++ b/lnd_test.go @@ -479,8 +479,8 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) { ctxb := context.Background() // Launch notification clients for all nodes, such that we can - // get notified when they discover new channels and updates - // in the graph. + // get notified when they discover new channels and updates in the + // graph. aliceUpdates, aQuit := subscribeGraphNotifications(t, ctxb, net.Alice) defer close(aQuit) bobUpdates, bQuit := subscribeGraphNotifications(t, ctxb, net.Bob) @@ -530,8 +530,9 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) { t.Fatalf("carol didn't report channel: %v", err) } - // Update the fees for the channel Alice->Bob, and make sure - // all nodes learn about it. + // With our little cluster set up, we'll update the fees for the + // channel Bob side of the Alice->Bob channel, and make sure all nodes + // learn about it. const feeBase = 1000000 baseFee := int64(1500) feeRate := int64(12) @@ -546,7 +547,7 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) { ChanPoint: chanPoint, } - _, err = net.Alice.UpdateChannelPolicy(ctxb, req) + _, err = net.Bob.UpdateChannelPolicy(ctxb, req) if err != nil { t.Fatalf("unable to get alice's balance: %v", err) } @@ -572,7 +573,8 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) { // A closure that is used to wait for a channel updates that matches // the channel policy update done by Alice. waitForChannelUpdate := func(graphUpdates chan *lnrpc.GraphTopologyUpdate, - chanPoints ...*lnrpc.ChannelPoint) { + advertisingNode string, chanPoints ...*lnrpc.ChannelPoint) { + // Create a map containing all the channel points we are // waiting for updates for. cps := make(map[string]bool) @@ -592,7 +594,7 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) { continue } - if chanUpdate.AdvertisingNode != net.Alice.PubKeyStr { + if chanUpdate.AdvertisingNode != advertisingNode { continue } @@ -623,16 +625,17 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) { } } - // Wait for all nodes to have seen the policy update done by Alice. - waitForChannelUpdate(aliceUpdates, chanPoint) - waitForChannelUpdate(bobUpdates, chanPoint) - waitForChannelUpdate(carolUpdates, chanPoint) + // Wait for all nodes to have seen the policy update done by Bob. + waitForChannelUpdate(aliceUpdates, net.Bob.PubKeyStr, chanPoint) + waitForChannelUpdate(bobUpdates, net.Bob.PubKeyStr, chanPoint) + waitForChannelUpdate(carolUpdates, net.Bob.PubKeyStr, chanPoint) // assertChannelPolicy asserts that the passed node's known channel - // policy for the passed chanPoint is consistent with Alice's current + // policy for the passed chanPoint is consistent with Bob's current // expected policy values. assertChannelPolicy := func(node *lntest.HarnessNode, - chanPoint *lnrpc.ChannelPoint) { + advertisingNode string, chanPoint *lnrpc.ChannelPoint) { + // Get a DescribeGraph from the node. descReq := &lnrpc.ChannelGraphRequest{} chanGraph, err := node.DescribeGraph(ctxb, descReq) @@ -645,7 +648,7 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) { for _, e := range chanGraph.Edges { if e.ChanPoint == txStr(chanPoint) { edgeFound = true - if e.Node1Pub == net.Alice.PubKeyStr { + if e.Node1Pub == advertisingNode { if e.Node1Policy.FeeBaseMsat != baseFee { t.Fatalf("expected base fee "+ "%v, got %v", baseFee, @@ -689,18 +692,42 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) { } - // Check that all nodes now know about Alice's updated policy. - assertChannelPolicy(net.Alice, chanPoint) - assertChannelPolicy(net.Bob, chanPoint) - assertChannelPolicy(carol, chanPoint) + // Check that all nodes now know about Bob's updated policy. + assertChannelPolicy(net.Alice, net.Bob.PubKeyStr, chanPoint) + assertChannelPolicy(net.Bob, net.Bob.PubKeyStr, chanPoint) + assertChannelPolicy(carol, net.Bob.PubKeyStr, chanPoint) - // Open channel to Carol. + // Now that all nodes have received the new channel update, we'll try + // to send a payment from Alice to Carol to ensure that Alice has + // internalized this fee update. This shouldn't affect the route that + // Alice takes though: we updated the Alice -> Bob channel and she + // doesn't pay for transit over that channel as it's direct. + payAmt := lnwire.MilliSatoshi(2000) + invoice := &lnrpc.Invoice{ + Memo: "testing", + Value: int64(payAmt), + } + resp, err := carol.AddInvoice(ctxb, invoice) + if err != nil { + t.Fatalf("unable to add invoice: %v", err) + } + + ctxt, _ = context.WithTimeout(ctxb, timeout) + err = completePaymentRequests( + ctxt, net.Alice, []string{resp.PaymentRequest}, true, + ) + if err != nil { + t.Fatalf("unable to send payment: %v", err) + } + + // We'll now open a channel from Alice directly to Carol. if err := net.ConnectNodes(ctxb, net.Alice, carol); err != nil { t.Fatalf("unable to connect dave to alice: %v", err) } ctxt, _ = context.WithTimeout(ctxb, timeout) - chanPoint3 := openChannelAndAssert(ctxt, t, net, net.Alice, carol, - chanAmt, pushAmt) + chanPoint3 := openChannelAndAssert( + ctxt, t, net, net.Alice, carol, chanAmt, pushAmt, + ) ctxt, _ = context.WithTimeout(ctxb, time.Second*15) err = net.Alice.WaitForNetworkChannelOpen(ctxt, chanPoint3) @@ -712,8 +739,8 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) { t.Fatalf("bob didn't report channel: %v", err) } - // Make a global update, and check that both channels' - // new policies get propagated. + // Make a global update, and check that both channels' new policies get + // propagated. baseFee = int64(800) feeRate = int64(123) timeLockDelta = uint32(22) @@ -730,21 +757,21 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) { t.Fatalf("unable to get alice's balance: %v", err) } - // Wait for all nodes to have seen the policy updates - // for both of Alice's channels. - waitForChannelUpdate(aliceUpdates, chanPoint, chanPoint3) - waitForChannelUpdate(bobUpdates, chanPoint, chanPoint3) - waitForChannelUpdate(carolUpdates, chanPoint, chanPoint3) + // Wait for all nodes to have seen the policy updates for both of + // Alice's channels. + waitForChannelUpdate(aliceUpdates, net.Alice.PubKeyStr, chanPoint3) + waitForChannelUpdate(bobUpdates, net.Alice.PubKeyStr, chanPoint3) + waitForChannelUpdate(carolUpdates, net.Alice.PubKeyStr, chanPoint3) - // And finally check that all nodes remembers the policy - // update they received. - assertChannelPolicy(net.Alice, chanPoint) - assertChannelPolicy(net.Bob, chanPoint) - assertChannelPolicy(carol, chanPoint) + // And finally check that all nodes remembers the policy update they + // received. + assertChannelPolicy(net.Alice, net.Alice.PubKeyStr, chanPoint) + assertChannelPolicy(net.Bob, net.Alice.PubKeyStr, chanPoint) + assertChannelPolicy(carol, net.Alice.PubKeyStr, chanPoint) - assertChannelPolicy(net.Alice, chanPoint3) - assertChannelPolicy(net.Bob, chanPoint3) - assertChannelPolicy(carol, chanPoint3) + assertChannelPolicy(net.Alice, net.Alice.PubKeyStr, chanPoint3) + assertChannelPolicy(net.Bob, net.Alice.PubKeyStr, chanPoint3) + assertChannelPolicy(carol, net.Alice.PubKeyStr, chanPoint3) // Close the channels. ctxt, _ = context.WithTimeout(ctxb, timeout) diff --git a/rpcserver.go b/rpcserver.go index fd6f0518a..14cce2d39 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -3331,7 +3331,7 @@ func (r *rpcServer) UpdateChannelPolicy(ctx context.Context, TimeLockDelta: req.TimeLockDelta, } - rpcsLog.Tracef("[updatechanpolicy] updating channel policy base_fee=%v, "+ + rpcsLog.Debugf("[updatechanpolicy] updating channel policy base_fee=%v, "+ "rate_float=%v, rate_fixed=%v, time_lock_delta: %v, targets=%v", req.BaseFeeMsat, req.FeeRate, feeRateFixed, req.TimeLockDelta, spew.Sdump(targetChans)) From aa7f83d72e7e048b33184e10c3ec0ab9fd199cab Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Thu, 5 Apr 2018 17:59:50 -0700 Subject: [PATCH 8/9] lnwire: remove pointer receiver from ToUint64 for ShortChannelID --- lnwire/short_channel_id.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lnwire/short_channel_id.go b/lnwire/short_channel_id.go index 36d38a3e3..b2b980aa2 100644 --- a/lnwire/short_channel_id.go +++ b/lnwire/short_channel_id.go @@ -36,7 +36,7 @@ func NewShortChanIDFromInt(chanID uint64) ShortChannelID { // ToUint64 converts the ShortChannelID into a compact format encoded within a // uint64 (8 bytes). -func (c *ShortChannelID) ToUint64() uint64 { +func (c ShortChannelID) ToUint64() uint64 { // TODO(roasbeef): explicit error on overflow? return ((uint64(c.BlockHeight) << 40) | (uint64(c.TxIndex) << 16) | (uint64(c.TxPosition))) From a6ffe999c6cd76156a8a8a59d1b40e3ce747be6e Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Thu, 5 Apr 2018 18:01:40 -0700 Subject: [PATCH 9/9] routing: prune vertex, not ege after repeated FeeInsufficientErrors In this commit, we modify the way we handle FeeInsufficientErrors to more aggressively route around nodes that repeatedly return the same error to us. This will ensure we skip older nodes on the network which are running a buggier older version of lnd. Eventually most nodes will upgrade to this new version, making this change less needed. We also update the existing test to properly use a multi-hop route to ensure that we route around the offending node. --- routing/router.go | 6 +++--- routing/router_test.go | 27 ++++++++++++--------------- 2 files changed, 15 insertions(+), 18 deletions(-) diff --git a/routing/router.go b/routing/router.go index be92e6681..25b60d0c0 100644 --- a/routing/router.go +++ b/routing/router.go @@ -1724,12 +1724,12 @@ func (r *ChannelRouter) SendPayment(payment *LightningPayment) ([32]byte, *Route // We'll now check to see if we've already // reported a fee related failure for this // node. If so, then we'll actually prune out - // the edge for now. + // the vertex for now. chanID := update.ShortChannelID _, ok := errFailedFeeChans[chanID] if ok { - pruneEdgeFailure( - paySession, route, errSource, + pruneVertexFailure( + paySession, route, errSource, false, ) continue } diff --git a/routing/router_test.go b/routing/router_test.go index 2471654b1..8c73826ed 100644 --- a/routing/router_test.go +++ b/routing/router_test.go @@ -297,7 +297,7 @@ func TestSendPaymentErrorRepeatedFeeInsufficient(t *testing.T) { // to luo ji for 100 satoshis. var payHash [32]byte payment := LightningPayment{ - Target: ctx.aliases["luoji"], + Target: ctx.aliases["sophon"], Amount: lnwire.NewMSatFromSatoshis(1000), PaymentHash: payHash, } @@ -306,9 +306,9 @@ func TestSendPaymentErrorRepeatedFeeInsufficient(t *testing.T) { copy(preImage[:], bytes.Repeat([]byte{9}, 32)) // We'll also fetch the first outgoing channel edge from roasbeef to - // luo ji. We'll obtain this as we'll need to to generate the + // son goku. We'll obtain this as we'll need to to generate the // FeeInsufficient error that we'll send back. - chanID := uint64(689530843) + chanID := uint64(3495345) _, _, edgeUpateToFail, err := ctx.graph.FetchChannelEdgesByID(chanID) if err != nil { t.Fatalf("unable to fetch chan id: %v", err) @@ -324,24 +324,21 @@ func TestSendPaymentErrorRepeatedFeeInsufficient(t *testing.T) { FeeRate: uint32(edgeUpateToFail.FeeProportionalMillionths), } - sourceNode := ctx.router.selfNode + // The error will be returned by Son Goku. + sourceNode := ctx.aliases["songoku"] // We'll now modify the SendToSwitch method to return an error for the - // outgoing channel to luo ji. This will be a fee related error, so it - // should only cause the edge to be pruned after the second attempt. + // outgoing channel to Son goku. This will be a fee related error, so + // it should only cause the edge to be pruned after the second attempt. ctx.router.cfg.SendToSwitch = func(n [33]byte, _ *lnwire.UpdateAddHTLC, _ *sphinx.Circuit) ([32]byte, error) { - if bytes.Equal(ctx.aliases["luoji"].SerializeCompressed(), n[:]) { - pub, err := sourceNode.PubKey() - if err != nil { - return preImage, err - } + if bytes.Equal(sourceNode.SerializeCompressed(), n[:]) { return [32]byte{}, &htlcswitch.ForwardingError{ - ErrorSource: pub, + ErrorSource: sourceNode, // Within our error, we'll add a channel update - // which is meant to refelct he new fee + // which is meant to reflect he new fee // schedule for the node/channel. FailureMessage: &lnwire.FailFeeInsufficient{ Update: errChanUpdate, @@ -371,8 +368,8 @@ func TestSendPaymentErrorRepeatedFeeInsufficient(t *testing.T) { preImage[:], paymentPreImage[:]) } - // The route should have satoshi as the first hop. - if route.Hops[0].Channel.Node.Alias != "satoshi" { + // The route should have pham nuwen as the first hop. + if route.Hops[0].Channel.Node.Alias != "phamnuwen" { t.Fatalf("route should go through satoshi as first hop, "+ "instead passes through: %v", route.Hops[0].Channel.Node.Alias)