From 3907ae65c2f13dec167d41a5ee07438fccf600b4 Mon Sep 17 00:00:00 2001 From: Brandon Date: Sun, 24 Sep 2017 18:47:48 -0700 Subject: [PATCH] routing+discovery: implement 2-week network view pruning --- discovery/gossiper.go | 188 +++++++++++++++++++++++------------------- routing/router.go | 10 +++ server.go | 1 + 3 files changed, 112 insertions(+), 87 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 4d6643f1a..595edf2ce 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -92,6 +92,13 @@ type Config struct { // the last trickle tick. TrickleDelay time.Duration + // RetransmitDelay is the period of a timer which indicates that we + // should check if we need to prune or re-broadcast any of our + // personal channels. This addresses the case of "zombie" channels and + // channel advertisements that have been dropped, or not properly + // propagated through the network. + RetransmitDelay time.Duration + // DB is a global boltdb instance which is needed to pass it in waiting // proof storage to make waiting proofs persistent. DB *channeldb.DB @@ -336,11 +343,9 @@ func (d *AuthenticatedGossiper) networkHandler() { // * can use mostly empty struct in db as place holder var announcementBatch []lnwire.Message - // TODO(roasbeef): parametrize the above - retransmitTimer := time.NewTicker(time.Minute * 30) + retransmitTimer := time.NewTicker(d.cfg.RetransmitDelay) defer retransmitTimer.Stop() - // TODO(roasbeef): parametrize the above trickleTimer := time.NewTicker(d.cfg.TrickleDelay) defer trickleTimer.Stop() @@ -449,36 +454,50 @@ func (d *AuthenticatedGossiper) networkHandler() { announcementBatch = nil // The retransmission timer has ticked which indicates that we - // should broadcast our personal channels to the network. This - // addresses the case of channel advertisements whether being - // dropped, or not properly propagated through the network. + // should check if we need to prune or re-broadcast any of our + // personal channels. This addresses the case of "zombie" channels and + // channel advertisements that have been dropped, or not properly + // propagated through the network. case <-retransmitTimer.C: var selfChans []lnwire.Message - // Iterate over our channels and construct the - // announcements array. - err := d.cfg.Router.ForAllOutgoingChannels(func(ei *channeldb.ChannelEdgeInfo, - p *channeldb.ChannelEdgePolicy) error { + // Iterate over all of our channels and check if any of them fall within + // the prune interval or re-broadcast interval. + err := d.cfg.Router.ForAllOutgoingChannels(func(info *channeldb.ChannelEdgeInfo, + edge *channeldb.ChannelEdgePolicy) error { - c := &lnwire.ChannelUpdate{ - Signature: p.Signature, - ShortChannelID: lnwire.NewShortChanIDFromInt(p.ChannelID), - ChainHash: ei.ChainHash, - Timestamp: uint32(p.LastUpdate.Unix()), - Flags: p.Flags, - TimeLockDelta: p.TimeLockDelta, - HtlcMinimumMsat: p.MinHTLC, - BaseFee: uint32(p.FeeBaseMSat), - FeeRate: uint32(p.FeeProportionalMillionths), + const pruneInterval = time.Hour * 24 * 14 + const broadcastInterval = time.Hour * 24 * 13 + + timeElapsed := time.Since(edge.LastUpdate) + + // Prune the edge if it is has not been updated for the past 2 weeks. + // Rebroadcast edge if its last update is close to the 2-week interval. + if timeElapsed >= pruneInterval { + err := d.cfg.Router.DeleteEdge(info) + if err != nil { + log.Errorf("unable to prune stale edge: %v", err) + return err + } + } else if timeElapsed >= broadcastInterval { + // Re-sign and update the channel on disk and retrieve our + // ChannelUpdate to broadcast. + chanUpdate, err := d.updateChannel(info, edge) + if err != nil { + log.Errorf("unable to update channel: %v", err) + return err + } + selfChans = append(selfChans, chanUpdate) } - selfChans = append(selfChans, c) + return nil }) if err != nil { - log.Errorf("unable to retrieve outgoing channels: %v", err) + log.Errorf("error while retrieving outgoing channels: %v", err) continue } + // If we don't have any channels to re-broadcast, then continue. if len(selfChans) == 0 { continue } @@ -487,7 +506,7 @@ func (d *AuthenticatedGossiper) networkHandler() { len(selfChans)) // With all the wire announcements properly crafted, - // we'll broadcast our known outgoing channel to all + // we'll broadcast our known outgoing channels to all // our immediate peers. if err := d.cfg.Broadcast(nil, selfChans...); err != nil { log.Errorf("unable to re-broadcast "+ @@ -532,9 +551,7 @@ func (d *AuthenticatedGossiper) processFeeChanUpdate(feeUpdate *feeUpdateRequest haveChanFilter := len(chansToUpdate) != 0 - var chanUpdates []*lnwire.ChannelUpdate - chanEdges := make(map[lnwire.ShortChannelID]*channeldb.ChannelEdgePolicy) - + var signedAnns []lnwire.Message // Next, we'll loop over all the outgoing channels the router knows of. // If we have a filter then we'll only collected those channels, // otherwise we'll collect them all. @@ -547,76 +564,25 @@ func (d *AuthenticatedGossiper) processFeeChanUpdate(feeUpdate *feeUpdateRequest return nil } - // Otherwise, add the channel update to our batch to be - // updated, as we'll be re-signing it shortly. - c := &lnwire.ChannelUpdate{ - Signature: edge.Signature, - ChainHash: info.ChainHash, - ShortChannelID: lnwire.NewShortChanIDFromInt(edge.ChannelID), - Timestamp: uint32(edge.LastUpdate.Unix()), - Flags: edge.Flags, - TimeLockDelta: edge.TimeLockDelta, - HtlcMinimumMsat: edge.MinHTLC, - BaseFee: uint32(edge.FeeBaseMSat), - FeeRate: uint32(edge.FeeProportionalMillionths), - } - chanUpdates = append(chanUpdates, c) - - // We'll also add it to our edge map so we can find it easily - // later to update the state within the database. - chanEdges[c.ShortChannelID] = edge - return nil - }) - if err != nil { - return nil, err - } - - // With the set of channel updates we need to sign obtained, we'll not - // generate new signatures for each of them using applying the new fee - // schema before signing. - signedAnns := make([]lnwire.Message, len(chanUpdates)) - for i, chanUpdate := range chanUpdates { - edge := chanEdges[chanUpdate.ShortChannelID] - now := time.Now() - - // First, we'll apply the new few schema update to the channel - // update and also the backing database struct. - chanUpdate.BaseFee = uint32(feeUpdate.newSchema.BaseFee) - chanUpdate.FeeRate = feeUpdate.newSchema.FeeRate - chanUpdate.Timestamp = uint32(now.Unix()) + // Apply the new fee schema to the edge. edge.FeeBaseMSat = feeUpdate.newSchema.BaseFee edge.FeeProportionalMillionths = lnwire.MilliSatoshi( feeUpdate.newSchema.FeeRate, ) - edge.LastUpdate = now - // With the update applied, we'll generate a new signature over - // a digest of the channel announcement itself. - sig, err := SignAnnouncement(d.cfg.AnnSigner, d.selfKey, - chanUpdate) + // Re-sign and update the backing ChannelGraphSource, and retrieve our + // ChannelUpdate to broadcast. + chanUpdate, err := d.updateChannel(info, edge) if err != nil { - return nil, err + return err } - // Next, we'll set the new signature in place, and update the - // reference in the backing slice. - edge.Signature = sig - chanUpdate.Signature = sig - signedAnns[i] = chanUpdate + signedAnns = append(signedAnns, chanUpdate) - // To ensure that our signature is valid, we'll verify it - // ourself before committing it to the slice returned. - err = d.validateChannelUpdateAnn(d.selfKey, chanUpdate) - if err != nil { - return nil, fmt.Errorf("generated invalid channel update "+ - "sig: %v", err) - } - - // Finally, we'll update the fee schema for this edge on disk. - edge.Node.PubKey.Curve = nil - if err := d.cfg.Router.UpdateEdge(edge); err != nil { - return nil, err - } + return nil + }) + if err != nil { + return nil, err } return signedAnns, nil @@ -1202,3 +1168,51 @@ func (d *AuthenticatedGossiper) synchronizeWithNode(syncReq *syncRequest) error // single batch to the target peer. return d.cfg.SendToPeer(targetNode, announceMessages...) } + +// updateChannel creates a new fully signed update for the channel, +// and updates the underlying graph with the new state. +func (d *AuthenticatedGossiper) updateChannel( + info *channeldb.ChannelEdgeInfo, + edge *channeldb.ChannelEdgePolicy) (*lnwire.ChannelUpdate, error) { + + edge.LastUpdate = time.Now() + chanUpdate := &lnwire.ChannelUpdate{ + Signature: edge.Signature, + ChainHash: info.ChainHash, + ShortChannelID: lnwire.NewShortChanIDFromInt(edge.ChannelID), + Timestamp: uint32(edge.LastUpdate.Unix()), + Flags: edge.Flags, + TimeLockDelta: edge.TimeLockDelta, + HtlcMinimumMsat: edge.MinHTLC, + BaseFee: uint32(edge.FeeBaseMSat), + FeeRate: uint32(edge.FeeProportionalMillionths), + } + + // With the update applied, we'll generate a new signature over + // a digest of the channel announcement itself. + sig, err := SignAnnouncement(d.cfg.AnnSigner, d.selfKey, chanUpdate) + if err != nil { + return nil, err + } + + // Next, we'll set the new signature in place, and update the + // reference in the backing slice. + edge.Signature = sig + chanUpdate.Signature = sig + + // To ensure that our signature is valid, we'll verify it + // ourself before committing it to the slice returned. + err = d.validateChannelUpdateAnn(d.selfKey, chanUpdate) + if err != nil { + return nil, fmt.Errorf("generated invalid channel update "+ + "sig: %v", err) + } + + // Finally, we'll write the new edge policy to disk. + edge.Node.PubKey.Curve = nil + if err := d.cfg.Router.UpdateEdge(edge); err != nil { + return nil, err + } + + return chanUpdate, err +} diff --git a/routing/router.go b/routing/router.go index e588b04ad..c2551ddf9 100644 --- a/routing/router.go +++ b/routing/router.go @@ -39,6 +39,9 @@ type ChannelGraphSource interface { // edge/channel might be used in construction of payment path. AddEdge(edge *channeldb.ChannelEdgeInfo) error + // DeleteEdge is used to delete an edge from the router database. + DeleteEdge(edge *channeldb.ChannelEdgeInfo) error + // AddProof updates the channel edge info with proof which is needed to // properly announce the edge to the rest of the network. AddProof(chanID lnwire.ShortChannelID, proof *channeldb.ChannelAuthProof) error @@ -1294,6 +1297,13 @@ func (r *ChannelRouter) AddEdge(edge *channeldb.ChannelEdgeInfo) error { } } +// DeleteEdge is used to delete an edge from the router database. +// +// NOTE: This method is part of the ChannelGraphSource interface. +func (r *ChannelRouter) DeleteEdge(edge *channeldb.ChannelEdgeInfo) error { + return r.cfg.Graph.DeleteChannelEdge(&edge.ChannelPoint) +} + // UpdateEdge is used to update edge information, without this message edge // considered as not fully constructed. // diff --git a/server.go b/server.go index 27acbbbeb..472eec58e 100644 --- a/server.go +++ b/server.go @@ -279,6 +279,7 @@ func newServer(listenAddrs []string, chanDB *channeldb.DB, cc *chainControl, SendToPeer: s.SendToPeer, ProofMatureDelta: 0, TrickleDelay: time.Millisecond * 300, + RetransmitDelay: time.Minute * 30, DB: chanDB, AnnSigner: s.nodeSigner, },