From dc13da5abbfa429273b516abd566f6c6fa5bb200 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Wed, 26 Jun 2019 08:39:34 +0200 Subject: [PATCH] routing: move second chance logic into mission control If nodes return a channel policy related failure, they may get a second chance. Our graph may not be up to date. Previously this logic was contained in the payment session. This commit moves that into global mission control and thereby removes the last mission control state that was kept on the payment level. Because mission control is not aware of the relation between payment attempts and payments, the second chance logic is no longer based tracking second chances given per payment. Instead a time based approach is used. If a node reports a policy failure that prevents forwarding to its peer, it will get a second chance. But it will get it only if the previous second chance was long enough ago. Also those second chances are no longer dependent on whether an associated channel update is valid. It will get the second chance regardless, to prevent creating a dependency between mission control and the graph. This would interfer with (future) replay of history, because the graph may not be the same anymore at that point. --- routing/missioncontrol.go | 90 +++++++++++++++-- routing/missioncontrol_test.go | 22 +++++ routing/mock_test.go | 2 + routing/nodepair.go | 10 ++ routing/payment_session.go | 26 +---- routing/payment_session_source.go | 21 ++-- routing/router.go | 155 ++++++++++++++++-------------- 7 files changed, 213 insertions(+), 113 deletions(-) create mode 100644 routing/nodepair.go diff --git a/routing/missioncontrol.go b/routing/missioncontrol.go index 364a0e3a1..627b17890 100644 --- a/routing/missioncontrol.go +++ b/routing/missioncontrol.go @@ -5,8 +5,6 @@ import ( "sync" "time" - "github.com/coreos/bbolt" - "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing/route" ) @@ -16,6 +14,30 @@ const ( // half-life duration defines after how much time a penalized node or // channel is back at 50% probability. DefaultPenaltyHalfLife = time.Hour + + // minSecondChanceInterval is the minimum time required between + // second-chance failures. + // + // If nodes return a channel policy related failure, they may get a + // second chance to forward the payment. It could be that the channel + // policy that we are aware of is not up to date. This is especially + // important in case of mobile apps that are mostly offline. + // + // However, we don't want to give nodes the option to endlessly return + // new channel updates so that we are kept busy trying to route through + // that node until the payment loop times out. + // + // Therefore we only grant a second chance to a node if the previous + // second chance is sufficiently long ago. This is what + // minSecondChanceInterval defines. If a second policy failure comes in + // within that interval, we will apply a penalty. + // + // Second chances granted are tracked on the level of node pairs. This + // means that if a node has multiple channels to the same peer, they + // will only get a single second chance to route to that peer again. + // Nodes forward non-strict, so it isn't necessary to apply a less + // restrictive channel level tracking scheme here. + minSecondChanceInterval = time.Minute ) // MissionControl contains state which summarizes the past attempts of HTLC @@ -30,6 +52,10 @@ const ( type MissionControl struct { history map[route.Vertex]*nodeHistory + // lastSecondChance tracks the last time a second chance was granted for + // a directed node pair. + lastSecondChance map[DirectedNodePair]time.Time + // now is expected to return the current time. It is supplied as an // external function to enable deterministic unit tests. now func() time.Time @@ -127,13 +153,13 @@ func NewMissionControl(cfg *MissionControlConfig) *MissionControl { cfg.PenaltyHalfLife, cfg.AprioriHopProbability) return &MissionControl{ - history: make(map[route.Vertex]*nodeHistory), - now: time.Now, - cfg: cfg, + history: make(map[route.Vertex]*nodeHistory), + lastSecondChance: make(map[DirectedNodePair]time.Time), + now: time.Now, + cfg: cfg, } } - // ResetHistory resets the history of MissionControl returning it to a state as // if no payment attempts have been made. func (m *MissionControl) ResetHistory() { @@ -141,6 +167,7 @@ func (m *MissionControl) ResetHistory() { defer m.Unlock() m.history = make(map[route.Vertex]*nodeHistory) + m.lastSecondChance = make(map[DirectedNodePair]time.Time) log.Debugf("Mission control history cleared") } @@ -209,6 +236,37 @@ func (m *MissionControl) getEdgeProbabilityForNode(nodeHistory *nodeHistory, return probability } +// requestSecondChance checks whether the node fromNode can have a second chance +// at providing a channel update for its channel with toNode. +func (m *MissionControl) requestSecondChance(timestamp time.Time, + fromNode, toNode route.Vertex) bool { + + // Look up previous second chance time. + pair := DirectedNodePair{ + From: fromNode, + To: toNode, + } + lastSecondChance, ok := m.lastSecondChance[pair] + + // If the channel hasn't already be given a second chance or its last + // second chance was long ago, we give it another chance. + if !ok || timestamp.Sub(lastSecondChance) > minSecondChanceInterval { + m.lastSecondChance[pair] = timestamp + + log.Debugf("Second chance granted for %v->%v", fromNode, toNode) + + return true + } + + // Otherwise penalize the channel, because we don't allow channel + // updates that are that frequent. This is to prevent nodes from keeping + // us busy by continuously sending new channel updates. + log.Debugf("Second chance denied for %v->%v, remaining interval: %v", + fromNode, toNode, timestamp.Sub(lastSecondChance)) + + return false +} + // createHistoryIfNotExists returns the history for the given node. If the node // is yet unknown, it will create an empty history structure. func (m *MissionControl) createHistoryIfNotExists(vertex route.Vertex) *nodeHistory { @@ -237,6 +295,26 @@ func (m *MissionControl) ReportVertexFailure(v route.Vertex) { history.lastFail = &now } +// ReportEdgePolicyFailure reports a policy related failure. +func (m *MissionControl) ReportEdgePolicyFailure(failedEdge edge) { + now := m.now() + + m.Lock() + defer m.Unlock() + + // We may have an out of date graph. Therefore we don't always penalize + // immediately. If some time has passed since the last policy failure, + // we grant the node a second chance at forwarding the payment. + if m.requestSecondChance( + now, failedEdge.from, failedEdge.to, + ) { + return + } + + history := m.createHistoryIfNotExists(failedEdge.from) + history.lastFail = &now +} + // ReportEdgeFailure reports a channel level failure. // // TODO(roasbeef): also add value attempted to send and capacity of channel diff --git a/routing/missioncontrol_test.go b/routing/missioncontrol_test.go index f6f658b8e..237f81df8 100644 --- a/routing/missioncontrol_test.go +++ b/routing/missioncontrol_test.go @@ -106,3 +106,25 @@ func TestMissionControl(t *testing.T) { t.Fatal("unexpected number of channels") } } + +// TestMissionControlChannelUpdate tests that the first channel update is not +// penalizing the channel yet. +func TestMissionControlChannelUpdate(t *testing.T) { + ctx := createMcTestContext(t) + + testEdge := edge{ + channel: 123, + } + + // Report a policy related failure. Because it is the first, we don't + // expect a penalty. + ctx.mc.ReportEdgePolicyFailure(testEdge) + + ctx.expectP(0, 0.8) + + // Report another failure for the same channel. We expect it to be + // pruned. + ctx.mc.ReportEdgePolicyFailure(testEdge) + + ctx.expectP(0, 0) +} diff --git a/routing/mock_test.go b/routing/mock_test.go index cf0a3f3be..4c4f97ebf 100644 --- a/routing/mock_test.go +++ b/routing/mock_test.go @@ -102,6 +102,8 @@ func (m *mockMissionControl) ReportEdgeFailure(failedEdge edge, minPenalizeAmt lnwire.MilliSatoshi) { } +func (m *mockMissionControl) ReportEdgePolicyFailure(failedEdge edge) {} + func (m *mockMissionControl) ReportVertexFailure(v route.Vertex) {} func (m *mockMissionControl) GetEdgeProbability(fromNode route.Vertex, edge EdgeLocator, diff --git a/routing/nodepair.go b/routing/nodepair.go new file mode 100644 index 000000000..edec8e02b --- /dev/null +++ b/routing/nodepair.go @@ -0,0 +1,10 @@ +package routing + +import ( + "github.com/lightningnetwork/lnd/routing/route" +) + +// DirectedNodePair stores a directed pair of nodes. +type DirectedNodePair struct { + From, To route.Vertex +} diff --git a/routing/payment_session.go b/routing/payment_session.go index 4a2c0a5bf..f138ae315 100644 --- a/routing/payment_session.go +++ b/routing/payment_session.go @@ -52,12 +52,6 @@ type paymentSession struct { bandwidthHints map[uint64]lnwire.MilliSatoshi - // errFailedFeeChans is a map of the short channel IDs that were the - // source of policy related routing failures during this payment attempt. - // We'll use this map to prune out channels when the first error may not - // require pruning, but any subsequent ones do. - errFailedPolicyChans map[nodeChannel]struct{} - sessionSource *SessionSource preBuiltRoute *route.Route @@ -109,25 +103,7 @@ func (p *paymentSession) ReportEdgeFailure(failedEdge edge, // // TODO(joostjager): Move this logic into global mission control. func (p *paymentSession) ReportEdgePolicyFailure(failedEdge edge) { - key := nodeChannel{ - node: failedEdge.from, - channel: failedEdge.channel, - } - - // Check to see if we've already reported a policy related failure for - // this channel. If so, then we'll prune out the vertex. - _, ok := p.errFailedPolicyChans[key] - if ok { - // TODO(joostjager): is this aggressive pruning still necessary? - // Just pruning edges may also work unless there is a huge - // number of failing channels from that node? - p.ReportVertexFailure(key.node) - - return - } - - // Finally, we'll record a policy failure from this node and move on. - p.errFailedPolicyChans[key] = struct{}{} + p.sessionSource.MissionControl.ReportEdgePolicyFailure(failedEdge) } // RequestRoute returns a route which is likely to be capable for successfully diff --git a/routing/payment_session_source.go b/routing/payment_session_source.go index 697380267..77dad51e8 100644 --- a/routing/payment_session_source.go +++ b/routing/payment_session_source.go @@ -119,11 +119,10 @@ func (m *SessionSource) NewPaymentSession(routeHints [][]zpay32.HopHint, } return &paymentSession{ - additionalEdges: edges, - bandwidthHints: bandwidthHints, - errFailedPolicyChans: make(map[nodeChannel]struct{}), - sessionSource: m, - pathFinder: findPath, + additionalEdges: edges, + bandwidthHints: bandwidthHints, + sessionSource: m, + pathFinder: findPath, }, nil } @@ -131,9 +130,8 @@ func (m *SessionSource) NewPaymentSession(routeHints [][]zpay32.HopHint, // used for failure reporting to missioncontrol. func (m *SessionSource) NewPaymentSessionForRoute(preBuiltRoute *route.Route) PaymentSession { return &paymentSession{ - errFailedPolicyChans: make(map[nodeChannel]struct{}), - sessionSource: m, - preBuiltRoute: preBuiltRoute, + sessionSource: m, + preBuiltRoute: preBuiltRoute, } } @@ -142,9 +140,8 @@ func (m *SessionSource) NewPaymentSessionForRoute(preBuiltRoute *route.Route) Pa // missioncontrol for resumed payment we don't want to make more attempts for. func (m *SessionSource) NewPaymentSessionEmpty() PaymentSession { return &paymentSession{ - errFailedPolicyChans: make(map[nodeChannel]struct{}), - sessionSource: m, - preBuiltRoute: &route.Route{}, - preBuiltRouteTried: true, + sessionSource: m, + preBuiltRoute: &route.Route{}, + preBuiltRouteTried: true, } } diff --git a/routing/router.go b/routing/router.go index d84d58753..0fd1caf99 100644 --- a/routing/router.go +++ b/routing/router.go @@ -178,6 +178,9 @@ type MissionController interface { ReportEdgeFailure(failedEdge edge, minPenalizeAmt lnwire.MilliSatoshi) + // ReportEdgePolicyFailure reports a policy related failure. + ReportEdgePolicyFailure(failedEdge edge) + // ReportVertexFailure reports a node level failure. ReportVertexFailure(v route.Vertex) @@ -1826,6 +1829,47 @@ func (r *ChannelRouter) sendPayment( } +// tryApplyChannelUpdate tries to apply a channel update present in the failure +// message if any. +func (r *ChannelRouter) tryApplyChannelUpdate(rt *route.Route, + errorSourceIdx int, failure lnwire.FailureMessage) error { + + // It makes no sense to apply our own channel updates. + if errorSourceIdx == 0 { + log.Errorf("Channel update of ourselves received") + + return nil + } + + // Extract channel update if the error contains one. + update := r.extractChannelUpdate(failure) + if update == nil { + return nil + } + + // Parse pubkey to allow validation of the channel update. This should + // always succeed, otherwise there is something wrong in our + // implementation. Therefore return an error. + errVertex := rt.Hops[errorSourceIdx-1].PubKeyBytes + errSource, err := btcec.ParsePubKey( + errVertex[:], btcec.S256(), + ) + if err != nil { + log.Errorf("Cannot parse pubkey: idx=%v, pubkey=%v", + errorSourceIdx, errVertex) + + return err + } + + // Apply channel update. + if !r.applyChannelUpdate(update, errSource) { + log.Debugf("Invalid channel update received: node=%x", + errVertex) + } + + return nil +} + // processSendError analyzes the error for the payment attempt received from the // switch and updates mission control and/or channel policies. Depending on the // error type, this error is either the final outcome of the payment or we need @@ -1851,32 +1895,28 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession, return true, channeldb.FailureReasonError } - var ( - failureSourceIdx = fErr.FailureSourceIdx + failureMessage := fErr.FailureMessage + failureSourceIdx := fErr.FailureSourceIdx - failureVertex route.Vertex - failureSource *btcec.PublicKey - err error - ) + // Apply channel update if the error contains one. For unknown + // failures, failureMessage is nil. + if failureMessage != nil { + err := r.tryApplyChannelUpdate( + rt, failureSourceIdx, failureMessage, + ) + if err != nil { + return true, channeldb.FailureReasonError + } + } + + var failureVertex route.Vertex // For any non-self failure, look up the source pub key in the hops // slice. Otherwise return the self node pubkey. if failureSourceIdx > 0 { failureVertex = rt.Hops[failureSourceIdx-1].PubKeyBytes - failureSource, err = btcec.ParsePubKey(failureVertex[:], btcec.S256()) - if err != nil { - log.Errorf("Cannot parse pubkey %v: %v", - failureVertex, err) - - return true, channeldb.FailureReasonError - } } else { failureVertex = r.selfNode.PubKeyBytes - failureSource, err = r.selfNode.PubKey() - if err != nil { - log.Errorf("Cannot parse self pubkey: %v", err) - return true, channeldb.FailureReasonError - } } log.Tracef("Node %x (index %v) reported failure when sending htlc", failureVertex, failureSourceIdx) @@ -1885,41 +1925,7 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession, // update with id may not be available. failedEdge, failedAmt := getFailedEdge(rt, failureSourceIdx) - // processChannelUpdateAndRetry is a closure that - // handles a failure message containing a channel - // update. This function always tries to apply the - // channel update and passes on the result to the - // payment session to adjust its view on the reliability - // of the network. - // - // As channel id, the locally determined channel id is - // used. It does not rely on the channel id that is part - // of the channel update message, because the remote - // node may lie to us or the update may be corrupt. - processChannelUpdateAndRetry := func( - update *lnwire.ChannelUpdate, - pubKey *btcec.PublicKey) { - - // Try to apply the channel update. - updateOk := r.applyChannelUpdate(update, pubKey) - - // If the update could not be applied, prune the - // edge. There is no reason to continue trying - // this channel. - // - // TODO: Could even prune the node completely? - // Or is there a valid reason for the channel - // update to fail? - if !updateOk { - paySession.ReportEdgeFailure( - failedEdge, 0, - ) - } - - paySession.ReportEdgePolicyFailure(failedEdge) - } - - switch onionErr := fErr.FailureMessage.(type) { + switch fErr.FailureMessage.(type) { // If the end destination didn't know the payment // hash or we sent the wrong payment amount to the @@ -1975,7 +1981,6 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession, // that sent us this error, as it doesn't now what the // correct block height is. case *lnwire.FailExpiryTooSoon: - r.applyChannelUpdate(&onionErr.Update, failureSource) paySession.ReportVertexFailure(failureVertex) return false, 0 @@ -1996,34 +2001,27 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession, // amount, we'll apply the new minimum amount and retry // routing. case *lnwire.FailAmountBelowMinimum: - processChannelUpdateAndRetry( - &onionErr.Update, failureSource, - ) + paySession.ReportEdgePolicyFailure(failedEdge) return false, 0 // If we get a failure due to a fee, we'll apply the // new fee update, and retry our attempt using the // newly updated fees. case *lnwire.FailFeeInsufficient: - processChannelUpdateAndRetry( - &onionErr.Update, failureSource, - ) + paySession.ReportEdgePolicyFailure(failedEdge) return false, 0 // If we get the failure for an intermediate node that // disagrees with our time lock values, then we'll // apply the new delta value and try it once more. case *lnwire.FailIncorrectCltvExpiry: - processChannelUpdateAndRetry( - &onionErr.Update, failureSource, - ) + paySession.ReportEdgePolicyFailure(failedEdge) return false, 0 // The outgoing channel that this node was meant to // forward one is currently disabled, so we'll apply // the update and continue. case *lnwire.FailChannelDisabled: - r.applyChannelUpdate(&onionErr.Update, failureSource) paySession.ReportEdgeFailure(failedEdge, 0) return false, 0 @@ -2031,7 +2029,6 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession, // sufficient capacity, so we'll prune this edge for // now, and continue onwards with our path finding. case *lnwire.FailTemporaryChannelFailure: - r.applyChannelUpdate(onionErr.Update, failureSource) paySession.ReportEdgeFailure(failedEdge, failedAmt) return false, 0 @@ -2103,6 +2100,29 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession, } } +// extractChannelUpdate examines the error and extracts the channel update. +func (r *ChannelRouter) extractChannelUpdate( + failure lnwire.FailureMessage) *lnwire.ChannelUpdate { + + var update *lnwire.ChannelUpdate + switch onionErr := failure.(type) { + case *lnwire.FailExpiryTooSoon: + update = &onionErr.Update + case *lnwire.FailAmountBelowMinimum: + update = &onionErr.Update + case *lnwire.FailFeeInsufficient: + update = &onionErr.Update + case *lnwire.FailIncorrectCltvExpiry: + update = &onionErr.Update + case *lnwire.FailChannelDisabled: + update = &onionErr.Update + case *lnwire.FailTemporaryChannelFailure: + update = onionErr.Update + } + + return update +} + // getFailedEdge tries to locate the failing channel given a route and the // pubkey of the node that sent the failure. It will assume that the failure is // associated with the outgoing channel of the failing node. As a second result, @@ -2147,11 +2167,6 @@ func getFailedEdge(route *route.Route, failureSource int) (edge, // database. It returns a bool indicating whether the updates was successful. func (r *ChannelRouter) applyChannelUpdate(msg *lnwire.ChannelUpdate, pubKey *btcec.PublicKey) bool { - // If we get passed a nil channel update (as it's optional with some - // onion errors), then we'll exit early with a success result. - if msg == nil { - return true - } ch, _, _, err := r.GetChannelByID(msg.ShortChannelID) if err != nil {