Merge pull request #2777 from wpaulino/reject-zombie-anns

channeldb+routing+discovery: reject zombie announcements
This commit is contained in:
Olaoluwa Osuntokun 2019-03-27 19:22:15 -07:00 committed by GitHub
commit 43ba4a5a35
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 822 additions and 125 deletions

View file

@ -268,6 +268,9 @@ func createChannelDB(dbPath string) error {
if _, err := edges.CreateBucket(channelPointBucket); err != nil { if _, err := edges.CreateBucket(channelPointBucket); err != nil {
return err return err
} }
if _, err := edges.CreateBucket(zombieBucket); err != nil {
return err
}
graphMeta, err := tx.CreateBucket(graphMetaBucket) graphMeta, err := tx.CreateBucket(graphMetaBucket)
if err != nil { if err != nil {

View file

@ -1,6 +1,9 @@
package channeldb package channeldb
import "fmt" import (
"errors"
"fmt"
)
var ( var (
// ErrNoChanDBExists is returned when a channel bucket hasn't been // ErrNoChanDBExists is returned when a channel bucket hasn't been
@ -79,6 +82,10 @@ var (
// can't be found. // can't be found.
ErrEdgeNotFound = fmt.Errorf("edge not found") ErrEdgeNotFound = fmt.Errorf("edge not found")
// ErrZombieEdge is an error returned when we attempt to look up an edge
// but it is marked as a zombie within the zombie index.
ErrZombieEdge = errors.New("edge marked as zombie")
// ErrEdgeAlreadyExist is returned when edge with specific // ErrEdgeAlreadyExist is returned when edge with specific
// channel id can't be added because it already exist. // channel id can't be added because it already exist.
ErrEdgeAlreadyExist = fmt.Errorf("edge already exist") ErrEdgeAlreadyExist = fmt.Errorf("edge already exist")

View file

@ -106,6 +106,17 @@ var (
// maps: outPoint -> chanID // maps: outPoint -> chanID
channelPointBucket = []byte("chan-index") channelPointBucket = []byte("chan-index")
// zombieBucket is a sub-bucket of the main edgeBucket bucket
// responsible for maintaining an index of zombie channels. Each entry
// exists within the bucket as follows:
//
// maps: chanID -> pubKey1 || pubKey2
//
// The chanID represents the channel ID of the edge that is marked as a
// zombie and is used as the key, which maps to the public keys of the
// edge's participants.
zombieBucket = []byte("zombie-index")
// graphMetaBucket is a top-level bucket which stores various meta-deta // graphMetaBucket is a top-level bucket which stores various meta-deta
// related to the on-disk channel graph. Data stored in this bucket // related to the on-disk channel graph. Data stored in this bucket
// includes the block to which the graph has been synced to, the total // includes the block to which the graph has been synced to, the total
@ -123,9 +134,6 @@ var (
// case we'll remove all entries from the prune log with a block height // case we'll remove all entries from the prune log with a block height
// that no longer exists. // that no longer exists.
pruneLogBucket = []byte("prune-log") pruneLogBucket = []byte("prune-log")
edgeBloomKey = []byte("edge-bloom")
nodeBloomKey = []byte("node-bloom")
) )
const ( const (
@ -587,17 +595,20 @@ func (c *ChannelGraph) addChannelEdge(tx *bbolt.Tx, edge *ChannelEdgeInfo) error
// HasChannelEdge returns true if the database knows of a channel edge with the // HasChannelEdge returns true if the database knows of a channel edge with the
// passed channel ID, and false otherwise. If an edge with that ID is found // passed channel ID, and false otherwise. If an edge with that ID is found
// within the graph, then two time stamps representing the last time the edge // within the graph, then two time stamps representing the last time the edge
// was updated for both directed edges are returned along with the boolean. // was updated for both directed edges are returned along with the boolean. If
func (c *ChannelGraph) HasChannelEdge(chanID uint64) (time.Time, time.Time, bool, error) { // it is not found, then the zombie index is checked and its result is returned
// TODO(roasbeef): check internal bloom filter first // as the second boolean.
func (c *ChannelGraph) HasChannelEdge(chanID uint64,
) (time.Time, time.Time, bool, bool, error) {
var ( var (
node1UpdateTime time.Time node1UpdateTime time.Time
node2UpdateTime time.Time node2UpdateTime time.Time
exists bool exists bool
isZombie bool
) )
if err := c.db.View(func(tx *bbolt.Tx) error { err := c.db.View(func(tx *bbolt.Tx) error {
edges := tx.Bucket(edgeBucket) edges := tx.Bucket(edgeBucket)
if edges == nil { if edges == nil {
return ErrGraphNoEdgesFound return ErrGraphNoEdgesFound
@ -609,12 +620,21 @@ func (c *ChannelGraph) HasChannelEdge(chanID uint64) (time.Time, time.Time, bool
var channelID [8]byte var channelID [8]byte
byteOrder.PutUint64(channelID[:], chanID) byteOrder.PutUint64(channelID[:], chanID)
// If the edge doesn't exist, then we'll also check our zombie
// index.
if edgeIndex.Get(channelID[:]) == nil { if edgeIndex.Get(channelID[:]) == nil {
exists = false exists = false
zombieIndex := edges.Bucket(zombieBucket)
if zombieIndex != nil {
isZombie, _, _ = isZombieEdge(zombieIndex, chanID)
}
return nil return nil
} }
exists = true exists = true
isZombie = false
// If the channel has been found in the graph, then retrieve // If the channel has been found in the graph, then retrieve
// the edges itself so we can return the last updated // the edges itself so we can return the last updated
@ -640,11 +660,9 @@ func (c *ChannelGraph) HasChannelEdge(chanID uint64) (time.Time, time.Time, bool
} }
return nil return nil
}); err != nil { })
return time.Time{}, time.Time{}, exists, err
}
return node1UpdateTime, node2UpdateTime, exists, nil return node1UpdateTime, node2UpdateTime, exists, isZombie, err
} }
// UpdateChannelEdge retrieves and update edge of the graph database. Method // UpdateChannelEdge retrieves and update edge of the graph database. Method
@ -720,6 +738,10 @@ func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint,
if nodes == nil { if nodes == nil {
return ErrSourceNodeNotSet return ErrSourceNodeNotSet
} }
zombieIndex, err := edges.CreateBucketIfNotExists(zombieBucket)
if err != nil {
return err
}
// For each of the outpoints that have been spent within the // For each of the outpoints that have been spent within the
// block, we attempt to delete them from the graph as if that // block, we attempt to delete them from the graph as if that
@ -753,7 +775,8 @@ func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint,
// a channel. If no error is returned, then a channel // a channel. If no error is returned, then a channel
// was successfully pruned. // was successfully pruned.
err = delChannelByEdge( err = delChannelByEdge(
edges, edgeIndex, chanIndex, nodes, chanPoint, edges, edgeIndex, chanIndex, zombieIndex, nodes,
chanPoint, false,
) )
if err != nil && err != ErrEdgeNotFound { if err != nil && err != ErrEdgeNotFound {
return err return err
@ -963,6 +986,10 @@ func (c *ChannelGraph) DisconnectBlockAtHeight(height uint32) ([]*ChannelEdgeInf
if err != nil { if err != nil {
return err return err
} }
zombieIndex, err := edges.CreateBucketIfNotExists(zombieBucket)
if err != nil {
return err
}
nodes, err := tx.CreateBucketIfNotExists(nodeBucket) nodes, err := tx.CreateBucketIfNotExists(nodeBucket)
if err != nil { if err != nil {
return err return err
@ -980,7 +1007,8 @@ func (c *ChannelGraph) DisconnectBlockAtHeight(height uint32) ([]*ChannelEdgeInf
return err return err
} }
err = delChannelByEdge( err = delChannelByEdge(
edges, edgeIndex, chanIndex, nodes, &edgeInfo.ChannelPoint, edges, edgeIndex, chanIndex, zombieIndex, nodes,
&edgeInfo.ChannelPoint, false,
) )
if err != nil && err != ErrEdgeNotFound { if err != nil && err != ErrEdgeNotFound {
return err return err
@ -1067,8 +1095,9 @@ func (c *ChannelGraph) PruneTip() (*chainhash.Hash, uint32, error) {
} }
// DeleteChannelEdge removes an edge from the database as identified by its // DeleteChannelEdge removes an edge from the database as identified by its
// funding outpoint. If the edge does not exist within the database, then // funding outpoint and also marks it as a zombie. This ensures that we're
// ErrEdgeNotFound will be returned. // unable to re-add this to our database once again. If the edge does not exist
// within the database, then ErrEdgeNotFound will be returned.
func (c *ChannelGraph) DeleteChannelEdge(chanPoint *wire.OutPoint) error { func (c *ChannelGraph) DeleteChannelEdge(chanPoint *wire.OutPoint) error {
// TODO(roasbeef): possibly delete from node bucket if node has no more // TODO(roasbeef): possibly delete from node bucket if node has no more
// channels // channels
@ -1088,19 +1117,22 @@ func (c *ChannelGraph) DeleteChannelEdge(chanPoint *wire.OutPoint) error {
if edgeIndex == nil { if edgeIndex == nil {
return ErrEdgeNotFound return ErrEdgeNotFound
} }
chanIndex := edges.Bucket(channelPointBucket) chanIndex := edges.Bucket(channelPointBucket)
if chanIndex == nil { if chanIndex == nil {
return ErrEdgeNotFound return ErrEdgeNotFound
} }
nodes := tx.Bucket(nodeBucket) nodes := tx.Bucket(nodeBucket)
if nodes == nil { if nodes == nil {
return ErrGraphNodeNotFound return ErrGraphNodeNotFound
} }
zombieIndex, err := edges.CreateBucketIfNotExists(zombieBucket)
if err != nil {
return err
}
return delChannelByEdge( return delChannelByEdge(
edges, edgeIndex, chanIndex, nodes, chanPoint, edges, edgeIndex, chanIndex, zombieIndex, nodes,
chanPoint, true,
) )
}) })
} }
@ -1571,8 +1603,9 @@ func delEdgeUpdateIndexEntry(edgesBucket *bbolt.Bucket, chanID uint64,
return nil return nil
} }
func delChannelByEdge(edges *bbolt.Bucket, edgeIndex *bbolt.Bucket, func delChannelByEdge(edges, edgeIndex, chanIndex, zombieIndex,
chanIndex *bbolt.Bucket, nodes *bbolt.Bucket, chanPoint *wire.OutPoint) error { nodes *bbolt.Bucket, chanPoint *wire.OutPoint, isZombie bool) error {
var b bytes.Buffer var b bytes.Buffer
if err := writeOutpoint(&b, chanPoint); err != nil { if err := writeOutpoint(&b, chanPoint); err != nil {
return err return err
@ -1630,12 +1663,29 @@ func delChannelByEdge(edges *bbolt.Bucket, edgeIndex *bbolt.Bucket,
} }
} }
// Finally, with the edge data deleted, we can purge the information // With the edge data deleted, we can purge the information from the two
// from the two edge indexes. // edge indexes.
if err := edgeIndex.Delete(chanID); err != nil { if err := edgeIndex.Delete(chanID); err != nil {
return err return err
} }
return chanIndex.Delete(b.Bytes()) if err := chanIndex.Delete(b.Bytes()); err != nil {
return err
}
// Finally, we'll mark the edge as a zombie within our index if it's
// being removed due to the channel becoming a zombie. We do this to
// ensure we don't store unnecessary data for spent channels.
if !isZombie {
return nil
}
var pubKey1, pubKey2 [33]byte
copy(pubKey1[:], nodeKeys[:33])
copy(pubKey2[:], nodeKeys[33:])
return markEdgeZombie(
zombieIndex, byteOrder.Uint64(chanID), pubKey1, pubKey2,
)
} }
// UpdateEdgePolicy updates the edge routing policy for a single directed edge // UpdateEdgePolicy updates the edge routing policy for a single directed edge
@ -2497,7 +2547,8 @@ func (c *ChannelEdgePolicy) Signature() (*btcec.Signature, error) {
// found, then ErrEdgeNotFound is returned. A struct which houses the general // found, then ErrEdgeNotFound is returned. A struct which houses the general
// information for the channel itself is returned as well as two structs that // information for the channel itself is returned as well as two structs that
// contain the routing policies for the channel in either direction. // contain the routing policies for the channel in either direction.
func (c *ChannelGraph) FetchChannelEdgesByOutpoint(op *wire.OutPoint) (*ChannelEdgeInfo, *ChannelEdgePolicy, *ChannelEdgePolicy, error) { func (c *ChannelGraph) FetchChannelEdgesByOutpoint(op *wire.OutPoint,
) (*ChannelEdgeInfo, *ChannelEdgePolicy, *ChannelEdgePolicy, error) {
var ( var (
edgeInfo *ChannelEdgeInfo edgeInfo *ChannelEdgeInfo
@ -2575,7 +2626,12 @@ func (c *ChannelGraph) FetchChannelEdgesByOutpoint(op *wire.OutPoint) (*ChannelE
// ErrEdgeNotFound is returned. A struct which houses the general information // ErrEdgeNotFound is returned. A struct which houses the general information
// for the channel itself is returned as well as two structs that contain the // for the channel itself is returned as well as two structs that contain the
// routing policies for the channel in either direction. // routing policies for the channel in either direction.
func (c *ChannelGraph) FetchChannelEdgesByID(chanID uint64) (*ChannelEdgeInfo, *ChannelEdgePolicy, *ChannelEdgePolicy, error) { //
// ErrZombieEdge an be returned if the edge is currently marked as a zombie
// within the database. In this case, the ChannelEdgePolicy's will be nil, and
// the ChannelEdgeInfo will only include the public keys of each node.
func (c *ChannelGraph) FetchChannelEdgesByID(chanID uint64,
) (*ChannelEdgeInfo, *ChannelEdgePolicy, *ChannelEdgePolicy, error) {
var ( var (
edgeInfo *ChannelEdgeInfo edgeInfo *ChannelEdgeInfo
@ -2606,13 +2662,48 @@ func (c *ChannelGraph) FetchChannelEdgesByID(chanID uint64) (*ChannelEdgeInfo, *
byteOrder.PutUint64(channelID[:], chanID) byteOrder.PutUint64(channelID[:], chanID)
// Now, attempt to fetch edge.
edge, err := fetchChanEdgeInfo(edgeIndex, channelID[:]) edge, err := fetchChanEdgeInfo(edgeIndex, channelID[:])
// If it doesn't exist, we'll quickly check our zombie index to
// see if we've previously marked it as so.
if err == ErrEdgeNotFound {
// If the zombie index doesn't exist, or the edge is not
// marked as a zombie within it, then we'll return the
// original ErrEdgeNotFound error.
zombieIndex := edges.Bucket(zombieBucket)
if zombieIndex == nil {
return ErrEdgeNotFound
}
isZombie, pubKey1, pubKey2 := isZombieEdge(
zombieIndex, chanID,
)
if !isZombie {
return ErrEdgeNotFound
}
// Otherwise, the edge is marked as a zombie, so we'll
// populate the edge info with the public keys of each
// party as this is the only information we have about
// it and return an error signaling so.
edgeInfo = &ChannelEdgeInfo{
NodeKey1Bytes: pubKey1,
NodeKey2Bytes: pubKey2,
}
return ErrZombieEdge
}
// Otherwise, we'll just return the error if any.
if err != nil { if err != nil {
return err return err
} }
edgeInfo = &edge edgeInfo = &edge
edgeInfo.db = c.db edgeInfo.db = c.db
// Then we'll attempt to fetch the accompanying policies of this
// edge.
e1, e2, err := fetchChanEdgePolicies( e1, e2, err := fetchChanEdgePolicies(
edgeIndex, edges, nodes, channelID[:], c.db, edgeIndex, edges, nodes, channelID[:], c.db,
) )
@ -2624,6 +2715,9 @@ func (c *ChannelGraph) FetchChannelEdgesByID(chanID uint64) (*ChannelEdgeInfo, *
policy2 = e2 policy2 = e2
return nil return nil
}) })
if err == ErrZombieEdge {
return edgeInfo, nil, nil, err
}
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, nil, err
} }
@ -2785,6 +2879,109 @@ func (c *ChannelGraph) NewChannelEdgePolicy() *ChannelEdgePolicy {
return &ChannelEdgePolicy{db: c.db} return &ChannelEdgePolicy{db: c.db}
} }
// MarkEdgeZombie marks an edge as a zombie within the graph's zombie index.
// The public keys should represent the node public keys of the two parties
// involved in the edge.
func (c *ChannelGraph) MarkEdgeZombie(chanID uint64, pubKey1,
pubKey2 [33]byte) error {
return c.db.Batch(func(tx *bbolt.Tx) error {
edges := tx.Bucket(edgeBucket)
if edges == nil {
return ErrGraphNoEdgesFound
}
zombieIndex, err := edges.CreateBucketIfNotExists(zombieBucket)
if err != nil {
return err
}
return markEdgeZombie(zombieIndex, chanID, pubKey1, pubKey2)
})
}
// markEdgeZombie marks an edge as a zombie within our zombie index. The public
// keys should represent the node public keys of the two parties involved in the
// edge.
func markEdgeZombie(zombieIndex *bbolt.Bucket, chanID uint64, pubKey1,
pubKey2 [33]byte) error {
var k [8]byte
byteOrder.PutUint64(k[:], chanID)
var v [66]byte
copy(v[:33], pubKey1[:])
copy(v[33:], pubKey2[:])
return zombieIndex.Put(k[:], v[:])
}
// MarkEdgeLive clears an edge from our zombie index, deeming it as live.
func (c *ChannelGraph) MarkEdgeLive(chanID uint64) error {
return c.db.Batch(func(tx *bbolt.Tx) error {
edges := tx.Bucket(edgeBucket)
if edges == nil {
return ErrGraphNoEdgesFound
}
zombieIndex := edges.Bucket(zombieBucket)
if zombieIndex == nil {
return nil
}
var k [8]byte
byteOrder.PutUint64(k[:], chanID)
return zombieIndex.Delete(k[:])
})
}
// IsZombieEdge returns whether the edge is considered zombie. If it is a
// zombie, then the two node public keys corresponding to this edge are also
// returned.
func (c *ChannelGraph) IsZombieEdge(chanID uint64) (bool, [33]byte, [33]byte) {
var (
isZombie bool
pubKey1, pubKey2 [33]byte
)
err := c.db.View(func(tx *bbolt.Tx) error {
edges := tx.Bucket(edgeBucket)
if edges == nil {
return ErrGraphNoEdgesFound
}
zombieIndex := edges.Bucket(zombieBucket)
if zombieIndex == nil {
return nil
}
isZombie, pubKey1, pubKey2 = isZombieEdge(zombieIndex, chanID)
return nil
})
if err != nil {
return false, [33]byte{}, [33]byte{}
}
return isZombie, pubKey1, pubKey2
}
// isZombieEdge returns whether an entry exists for the given channel in the
// zombie index. If an entry exists, then the two node public keys corresponding
// to this edge are also returned.
func isZombieEdge(zombieIndex *bbolt.Bucket,
chanID uint64) (bool, [33]byte, [33]byte) {
var k [8]byte
byteOrder.PutUint64(k[:], chanID)
v := zombieIndex.Get(k[:])
if v == nil {
return false, [33]byte{}, [33]byte{}
}
var pubKey1, pubKey2 [33]byte
copy(pubKey1[:], v[:33])
copy(pubKey2[:], v[33:])
return true, pubKey1, pubKey2
}
func putLightningNode(nodeBucket *bbolt.Bucket, aliasBucket *bbolt.Bucket, func putLightningNode(nodeBucket *bbolt.Bucket, aliasBucket *bbolt.Bucket,
updateIndex *bbolt.Bucket, node *LightningNode) error { updateIndex *bbolt.Bucket, node *LightningNode) error {

View file

@ -374,6 +374,10 @@ func TestEdgeInsertionDeletion(t *testing.T) {
if _, _, _, err := graph.FetchChannelEdgesByID(chanID); err == nil { if _, _, _, err := graph.FetchChannelEdgesByID(chanID); err == nil {
t.Fatalf("channel edge not deleted") t.Fatalf("channel edge not deleted")
} }
isZombie, _, _ := graph.IsZombieEdge(chanID)
if !isZombie {
t.Fatal("channel edge not marked as zombie")
}
// Finally, attempt to delete a (now) non-existent edge within the // Finally, attempt to delete a (now) non-existent edge within the
// database, this should result in an error. // database, this should result in an error.
@ -522,29 +526,38 @@ func TestDisconnectBlockAtHeight(t *testing.T) {
} }
// The two first edges should be removed from the db. // The two first edges should be removed from the db.
_, _, has, err := graph.HasChannelEdge(edgeInfo.ChannelID) _, _, has, isZombie, err := graph.HasChannelEdge(edgeInfo.ChannelID)
if err != nil { if err != nil {
t.Fatalf("unable to query for edge: %v", err) t.Fatalf("unable to query for edge: %v", err)
} }
if has { if has {
t.Fatalf("edge1 was not pruned from the graph") t.Fatalf("edge1 was not pruned from the graph")
} }
_, _, has, err = graph.HasChannelEdge(edgeInfo2.ChannelID) if isZombie {
t.Fatal("reorged edge1 should not be marked as zombie")
}
_, _, has, isZombie, err = graph.HasChannelEdge(edgeInfo2.ChannelID)
if err != nil { if err != nil {
t.Fatalf("unable to query for edge: %v", err) t.Fatalf("unable to query for edge: %v", err)
} }
if has { if has {
t.Fatalf("edge2 was not pruned from the graph") t.Fatalf("edge2 was not pruned from the graph")
} }
if isZombie {
t.Fatal("reorged edge2 should not be marked as zombie")
}
// Edge 3 should not be removed. // Edge 3 should not be removed.
_, _, has, err = graph.HasChannelEdge(edgeInfo3.ChannelID) _, _, has, isZombie, err = graph.HasChannelEdge(edgeInfo3.ChannelID)
if err != nil { if err != nil {
t.Fatalf("unable to query for edge: %v", err) t.Fatalf("unable to query for edge: %v", err)
} }
if !has { if !has {
t.Fatalf("edge3 was pruned from the graph") t.Fatalf("edge3 was pruned from the graph")
} }
if isZombie {
t.Fatal("edge3 was marked as zombie")
}
// PruneTip should be set to the blockHash we specified for the block // PruneTip should be set to the blockHash we specified for the block
// at height 155. // at height 155.
@ -755,12 +768,16 @@ func TestEdgeInfoUpdates(t *testing.T) {
// Check for existence of the edge within the database, it should be // Check for existence of the edge within the database, it should be
// found. // found.
_, _, found, err := graph.HasChannelEdge(chanID) _, _, found, isZombie, err := graph.HasChannelEdge(chanID)
if err != nil { if err != nil {
t.Fatalf("unable to query for edge: %v", err) t.Fatalf("unable to query for edge: %v", err)
} else if !found { }
if !found {
t.Fatalf("graph should have of inserted edge") t.Fatalf("graph should have of inserted edge")
} }
if isZombie {
t.Fatal("live edge should not be marked as zombie")
}
// We should also be able to retrieve the channelID only knowing the // We should also be able to retrieve the channelID only knowing the
// channel point of the channel. // channel point of the channel.
@ -2786,6 +2803,67 @@ func TestEdgePolicyMissingMaxHtcl(t *testing.T) {
assertEdgeInfoEqual(t, dbEdgeInfo, edgeInfo) assertEdgeInfoEqual(t, dbEdgeInfo, edgeInfo)
} }
// TestGraphZombieIndex ensures that we can mark edges correctly as zombie/live.
func TestGraphZombieIndex(t *testing.T) {
t.Parallel()
// We'll start by creating our test graph along with a test edge.
db, cleanUp, err := makeTestDB()
defer cleanUp()
if err != nil {
t.Fatalf("unable to create test database: %v", err)
}
graph := db.ChannelGraph()
node1, err := createTestVertex(db)
if err != nil {
t.Fatalf("unable to create test vertex: %v", err)
}
node2, err := createTestVertex(db)
if err != nil {
t.Fatalf("unable to create test vertex: %v", err)
}
edge, _, _ := createChannelEdge(db, node1, node2)
// If the graph is not aware of the edge, then it should not be a
// zombie.
isZombie, _, _ := graph.IsZombieEdge(edge.ChannelID)
if isZombie {
t.Fatal("expected edge to not be marked as zombie")
}
// If we mark the edge as a zombie, then we should expect to see it
// within the index.
err = graph.MarkEdgeZombie(
edge.ChannelID, node1.PubKeyBytes, node2.PubKeyBytes,
)
if err != nil {
t.Fatalf("unable to mark edge as zombie: %v", err)
}
isZombie, pubKey1, pubKey2 := graph.IsZombieEdge(edge.ChannelID)
if !isZombie {
t.Fatal("expected edge to be marked as zombie")
}
if pubKey1 != node1.PubKeyBytes {
t.Fatalf("expected pubKey1 %x, got %x", node1.PubKeyBytes,
pubKey1)
}
if pubKey2 != node2.PubKeyBytes {
t.Fatalf("expected pubKey2 %x, got %x", node2.PubKeyBytes,
pubKey2)
}
// Similarly, if we mark the same edge as live, we should no longer see
// it within the index.
if err := graph.MarkEdgeLive(edge.ChannelID); err != nil {
t.Fatalf("unable to mark edge as live: %v", err)
}
isZombie, _, _ = graph.IsZombieEdge(edge.ChannelID)
if isZombie {
t.Fatal("expected edge to not be marked as zombie")
}
}
// compareNodes is used to compare two LightningNodes while excluding the // compareNodes is used to compare two LightningNodes while excluding the
// Features struct, which cannot be compared as the semantics for reserializing // Features struct, which cannot be compared as the semantics for reserializing
// the featuresMap have not been defined. // the featuresMap have not been defined.

View file

@ -1571,8 +1571,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
return nil return nil
} }
// At this point, we'll now ask the router if this is a stale // At this point, we'll now ask the router if this is a
// update. If so we can skip all the processing below. // zombie/known edge. If so we can skip all the processing
// below.
if d.cfg.Router.IsKnownEdge(msg.ShortChannelID) { if d.cfg.Router.IsKnownEdge(msg.ShortChannelID) {
nMsg.err <- nil nMsg.err <- nil
return nil return nil
@ -1787,13 +1788,12 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
} }
// Before we perform any of the expensive checks below, we'll // Before we perform any of the expensive checks below, we'll
// make sure that the router doesn't already have a fresher // check whether this update is stale or is for a zombie
// announcement for this edge. // channel in order to quickly reject it.
timestamp := time.Unix(int64(msg.Timestamp), 0) timestamp := time.Unix(int64(msg.Timestamp), 0)
if d.cfg.Router.IsStaleEdgePolicy( if d.cfg.Router.IsStaleEdgePolicy(
msg.ShortChannelID, timestamp, msg.ChannelFlags, msg.ShortChannelID, timestamp, msg.ChannelFlags,
) { ) {
nMsg.err <- nil nMsg.err <- nil
return nil return nil
} }
@ -1809,48 +1809,92 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
d.channelMtx.Lock(msg.ShortChannelID.ToUint64()) d.channelMtx.Lock(msg.ShortChannelID.ToUint64())
defer d.channelMtx.Unlock(msg.ShortChannelID.ToUint64()) defer d.channelMtx.Unlock(msg.ShortChannelID.ToUint64())
chanInfo, _, _, err := d.cfg.Router.GetChannelByID(msg.ShortChannelID) chanInfo, _, _, err := d.cfg.Router.GetChannelByID(msg.ShortChannelID)
if err != nil {
switch err { switch err {
// No error, break.
case nil:
break
case channeldb.ErrZombieEdge:
// Since we've deemed the update as not stale above,
// before marking it live, we'll make sure it has been
// signed by the correct party. The least-significant
// bit in the flag on the channel update tells us which
// edge is being updated.
var pubKey *btcec.PublicKey
switch {
case msg.ChannelFlags&lnwire.ChanUpdateDirection == 0:
pubKey, _ = chanInfo.NodeKey1()
case msg.ChannelFlags&lnwire.ChanUpdateDirection == 1:
pubKey, _ = chanInfo.NodeKey2()
}
err := routing.VerifyChannelUpdateSignature(msg, pubKey)
if err != nil {
err := fmt.Errorf("unable to verify channel "+
"update signature: %v", err)
log.Error(err)
nMsg.err <- err
return nil
}
// With the signature valid, we'll proceed to mark the
// edge as live and wait for the channel announcement to
// come through again.
err = d.cfg.Router.MarkEdgeLive(msg.ShortChannelID)
if err != nil {
err := fmt.Errorf("unable to remove edge with "+
"chan_id=%v from zombie index: %v",
msg.ShortChannelID, err)
log.Error(err)
nMsg.err <- err
return nil
}
log.Debugf("Removed edge with chan_id=%v from zombie "+
"index", msg.ShortChannelID)
// We'll fallthrough to ensure we stash the update until
// we receive its corresponding ChannelAnnouncement.
// This is needed to ensure the edge exists in the graph
// before applying the update.
fallthrough
case channeldb.ErrGraphNotFound: case channeldb.ErrGraphNotFound:
fallthrough fallthrough
case channeldb.ErrGraphNoEdgesFound: case channeldb.ErrGraphNoEdgesFound:
fallthrough fallthrough
case channeldb.ErrEdgeNotFound: case channeldb.ErrEdgeNotFound:
// If the edge corresponding to this // If the edge corresponding to this ChannelUpdate was
// ChannelUpdate was not found in the graph, // not found in the graph, this might be a channel in
// this might be a channel in the process of // the process of being opened, and we haven't processed
// being opened, and we haven't processed our // our own ChannelAnnouncement yet, hence it is not
// own ChannelAnnouncement yet, hence it is not // found in the graph. This usually gets resolved after
// found in the graph. This usually gets // the channel proofs are exchanged and the channel is
// resolved after the channel proofs are // broadcasted to the rest of the network, but in case
// exchanged and the channel is broadcasted to // this is a private channel this won't ever happen.
// the rest of the network, but in case this // This can also happen in the case of a zombie channel
// is a private channel this won't ever happen. // with a fresh update for which we don't have a
// Because of this, we temporarily add it to a // ChannelAnnouncement for since we reject them. Because
// map, and reprocess it after our own // of this, we temporarily add it to a map, and
// ChannelAnnouncement has been processed. // reprocess it after our own ChannelAnnouncement has
// been processed.
d.pChanUpdMtx.Lock() d.pChanUpdMtx.Lock()
d.prematureChannelUpdates[shortChanID] = append( d.prematureChannelUpdates[shortChanID] = append(
d.prematureChannelUpdates[shortChanID], d.prematureChannelUpdates[shortChanID], nMsg,
nMsg,
) )
d.pChanUpdMtx.Unlock() d.pChanUpdMtx.Unlock()
log.Debugf("Got ChannelUpdate for edge not "+ log.Debugf("Got ChannelUpdate for edge not found in "+
"found in graph(shortChanID=%v), "+ "graph(shortChanID=%v), saving for "+
"saving for reprocessing later", "reprocessing later", shortChanID)
shortChanID)
// NOTE: We don't return anything on the error // NOTE: We don't return anything on the error channel
// channel for this message, as we expect that // for this message, as we expect that will be done when
// will be done when this ChannelUpdate is // this ChannelUpdate is later reprocessed.
// later reprocessed.
return nil return nil
default: default:
err := fmt.Errorf("unable to validate "+ err := fmt.Errorf("unable to validate channel update "+
"channel update short_chan_id=%v: %v", "short_chan_id=%v: %v", shortChanID, err)
shortChanID, err)
log.Error(err) log.Error(err)
nMsg.err <- err nMsg.err <- err
@ -1859,7 +1903,6 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
d.rejectMtx.Unlock() d.rejectMtx.Unlock()
return nil return nil
} }
}
// The least-significant bit in the flag on the channel update // The least-significant bit in the flag on the channel update
// announcement tells us "which" side of the channels directed // announcement tells us "which" side of the channels directed

View file

@ -114,6 +114,7 @@ type mockGraphSource struct {
nodes []channeldb.LightningNode nodes []channeldb.LightningNode
infos map[uint64]channeldb.ChannelEdgeInfo infos map[uint64]channeldb.ChannelEdgeInfo
edges map[uint64][]channeldb.ChannelEdgePolicy edges map[uint64][]channeldb.ChannelEdgePolicy
zombies map[uint64][][33]byte
} }
func newMockRouter(height uint32) *mockGraphSource { func newMockRouter(height uint32) *mockGraphSource {
@ -121,6 +122,7 @@ func newMockRouter(height uint32) *mockGraphSource {
bestHeight: height, bestHeight: height,
infos: make(map[uint64]channeldb.ChannelEdgeInfo), infos: make(map[uint64]channeldb.ChannelEdgeInfo),
edges: make(map[uint64][]channeldb.ChannelEdgePolicy), edges: make(map[uint64][]channeldb.ChannelEdgePolicy),
zombies: make(map[uint64][][33]byte),
} }
} }
@ -205,11 +207,20 @@ func (r *mockGraphSource) GetChannelByID(chanID lnwire.ShortChannelID) (
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
chanInfo, ok := r.infos[chanID.ToUint64()] chanIDInt := chanID.ToUint64()
chanInfo, ok := r.infos[chanIDInt]
if !ok { if !ok {
pubKeys, isZombie := r.zombies[chanIDInt]
if !isZombie {
return nil, nil, nil, channeldb.ErrEdgeNotFound return nil, nil, nil, channeldb.ErrEdgeNotFound
} }
return &channeldb.ChannelEdgeInfo{
NodeKey1Bytes: pubKeys[0],
NodeKey2Bytes: pubKeys[1],
}, nil, nil, channeldb.ErrZombieEdge
}
edges := r.edges[chanID.ToUint64()] edges := r.edges[chanID.ToUint64()]
if len(edges) == 0 { if len(edges) == 0 {
return &chanInfo, nil, nil, nil return &chanInfo, nil, nil, nil
@ -280,13 +291,15 @@ func (r *mockGraphSource) IsPublicNode(node routing.Vertex) (bool, error) {
} }
// IsKnownEdge returns true if the graph source already knows of the passed // IsKnownEdge returns true if the graph source already knows of the passed
// channel ID. // channel ID either as a live or zombie channel.
func (r *mockGraphSource) IsKnownEdge(chanID lnwire.ShortChannelID) bool { func (r *mockGraphSource) IsKnownEdge(chanID lnwire.ShortChannelID) bool {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
_, ok := r.infos[chanID.ToUint64()] chanIDInt := chanID.ToUint64()
return ok _, exists := r.infos[chanIDInt]
_, isZombie := r.zombies[chanIDInt]
return exists || isZombie
} }
// IsStaleEdgePolicy returns true if the graph source has a channel edge for // IsStaleEdgePolicy returns true if the graph source has a channel edge for
@ -297,13 +310,23 @@ func (r *mockGraphSource) IsStaleEdgePolicy(chanID lnwire.ShortChannelID,
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
edges, ok := r.edges[chanID.ToUint64()] chanIDInt := chanID.ToUint64()
edges, ok := r.edges[chanIDInt]
if !ok { if !ok {
// Since the edge doesn't exist, we'll check our zombie index as
// well.
_, isZombie := r.zombies[chanIDInt]
if !isZombie {
return false return false
} }
switch { // Since it exists within our zombie index, we'll check that it
// respects the router's live edge horizon to determine whether
// it is stale or not.
return time.Since(timestamp) > routing.DefaultChannelPruneExpiry
}
switch {
case len(edges) >= 1 && edges[0].ChannelFlags == flags: case len(edges) >= 1 && edges[0].ChannelFlags == flags:
return !edges[0].LastUpdate.Before(timestamp) return !edges[0].LastUpdate.Before(timestamp)
@ -315,6 +338,26 @@ func (r *mockGraphSource) IsStaleEdgePolicy(chanID lnwire.ShortChannelID,
} }
} }
// MarkEdgeLive clears an edge from our zombie index, deeming it as live.
//
// NOTE: This method is part of the ChannelGraphSource interface.
func (r *mockGraphSource) MarkEdgeLive(chanID lnwire.ShortChannelID) error {
r.mu.Lock()
defer r.mu.Unlock()
delete(r.zombies, chanID.ToUint64())
return nil
}
// MarkEdgeZombie marks an edge as a zombie within our zombie index.
func (r *mockGraphSource) MarkEdgeZombie(chanID lnwire.ShortChannelID, pubKey1,
pubKey2 [33]byte) error {
r.mu.Lock()
defer r.mu.Unlock()
r.zombies[chanID.ToUint64()] = [][33]byte{pubKey1, pubKey2}
return nil
}
type mockNotifier struct { type mockNotifier struct {
clientCounter uint32 clientCounter uint32
epochClients map[uint32]chan *chainntnfs.BlockEpoch epochClients map[uint32]chan *chainntnfs.BlockEpoch
@ -2158,6 +2201,259 @@ func TestForwardPrivateNodeAnnouncement(t *testing.T) {
} }
} }
// TestRejectZombieEdge ensures that we properly reject any announcements for
// zombie edges.
func TestRejectZombieEdge(t *testing.T) {
t.Parallel()
// We'll start by creating our test context with a batch of
// announcements.
ctx, cleanup, err := createTestCtx(0)
if err != nil {
t.Fatalf("unable to create test context: %v", err)
}
defer cleanup()
batch, err := createAnnouncements(0)
if err != nil {
t.Fatalf("unable to create announcements: %v", err)
}
remotePeer := &mockPeer{pk: nodeKeyPriv2.PubKey()}
// processAnnouncements is a helper closure we'll use to test that we
// properly process/reject announcements based on whether they're for a
// zombie edge or not.
processAnnouncements := func(isZombie bool) {
t.Helper()
errChan := ctx.gossiper.ProcessRemoteAnnouncement(
batch.remoteChanAnn, remotePeer,
)
select {
case err := <-errChan:
if isZombie && err != nil {
t.Fatalf("expected to reject live channel "+
"announcement with nil error: %v", err)
}
if !isZombie && err != nil {
t.Fatalf("expected to process live channel "+
"announcement: %v", err)
}
case <-time.After(time.Second):
t.Fatal("expected to process channel announcement")
}
select {
case <-ctx.broadcastedMessage:
if isZombie {
t.Fatal("expected to not broadcast zombie " +
"channel announcement")
}
case <-time.After(2 * trickleDelay):
if !isZombie {
t.Fatal("expected to broadcast live channel " +
"announcement")
}
}
errChan = ctx.gossiper.ProcessRemoteAnnouncement(
batch.chanUpdAnn2, remotePeer,
)
select {
case err := <-errChan:
if isZombie && err != nil {
t.Fatalf("expected to reject zombie channel "+
"update with nil error: %v", err)
}
if !isZombie && err != nil {
t.Fatalf("expected to process live channel "+
"update: %v", err)
}
case <-time.After(time.Second):
t.Fatal("expected to process channel update")
}
select {
case <-ctx.broadcastedMessage:
if isZombie {
t.Fatal("expected to not broadcast zombie " +
"channel update")
}
case <-time.After(2 * trickleDelay):
if !isZombie {
t.Fatal("expected to broadcast live channel " +
"update")
}
}
}
// We'll mark the edge for which we'll process announcements for as a
// zombie within the router. This should reject any announcements for
// this edge while it remains as a zombie.
chanID := batch.remoteChanAnn.ShortChannelID
err = ctx.router.MarkEdgeZombie(
chanID, batch.remoteChanAnn.NodeID1, batch.remoteChanAnn.NodeID2,
)
if err != nil {
t.Fatalf("unable to mark channel %v as zombie: %v", chanID, err)
}
processAnnouncements(true)
// If we then mark the edge as live, the edge's zombie status should be
// overridden and the announcements should be processed.
if err := ctx.router.MarkEdgeLive(chanID); err != nil {
t.Fatalf("unable mark channel %v as zombie: %v", chanID, err)
}
processAnnouncements(false)
}
// TestProcessZombieEdgeNowLive ensures that we can detect when a zombie edge
// becomes live by receiving a fresh update.
func TestProcessZombieEdgeNowLive(t *testing.T) {
t.Parallel()
// We'll start by creating our test context with a batch of
// announcements.
ctx, cleanup, err := createTestCtx(0)
if err != nil {
t.Fatalf("unable to create test context: %v", err)
}
defer cleanup()
batch, err := createAnnouncements(0)
if err != nil {
t.Fatalf("unable to create announcements: %v", err)
}
localPrivKey := nodeKeyPriv1
remotePrivKey := nodeKeyPriv2
remotePeer := &mockPeer{pk: remotePrivKey.PubKey()}
// processAnnouncement is a helper closure we'll use to ensure an
// announcement is properly processed/rejected based on whether the edge
// is a zombie or not. The expectsErr boolean can be used to determine
// whether we should expect an error when processing the message, while
// the isZombie boolean can be used to determine whether the
// announcement should be or not be broadcast.
processAnnouncement := func(ann lnwire.Message, isZombie, expectsErr bool) {
t.Helper()
errChan := ctx.gossiper.ProcessRemoteAnnouncement(
ann, remotePeer,
)
var err error
select {
case err = <-errChan:
case <-time.After(time.Second):
t.Fatal("expected to process announcement")
}
if expectsErr && err == nil {
t.Fatal("expected error when processing announcement")
}
if !expectsErr && err != nil {
t.Fatalf("received unexpected error when processing "+
"announcement: %v", err)
}
select {
case msgWithSenders := <-ctx.broadcastedMessage:
if isZombie {
t.Fatal("expected to not broadcast zombie " +
"channel message")
}
assertMessage(t, ann, msgWithSenders.msg)
case <-time.After(2 * trickleDelay):
if !isZombie {
t.Fatal("expected to broadcast live channel " +
"message")
}
}
}
// We'll generate a channel update with a timestamp far enough in the
// past to consider it a zombie.
zombieTimestamp := time.Now().Add(-routing.DefaultChannelPruneExpiry)
batch.chanUpdAnn2.Timestamp = uint32(zombieTimestamp.Unix())
if err := signUpdate(remotePrivKey, batch.chanUpdAnn2); err != nil {
t.Fatalf("unable to sign update with new timestamp: %v", err)
}
// We'll also add the edge to our zombie index.
chanID := batch.remoteChanAnn.ShortChannelID
err = ctx.router.MarkEdgeZombie(
chanID, batch.remoteChanAnn.NodeID1, batch.remoteChanAnn.NodeID2,
)
if err != nil {
t.Fatalf("unable mark channel %v as zombie: %v", chanID, err)
}
// Attempting to process the current channel update should fail due to
// its edge being considered a zombie and its timestamp not being within
// the live horizon. We should not expect an error here since it is just
// a stale update.
processAnnouncement(batch.chanUpdAnn2, true, false)
// Now we'll generate a new update with a fresh timestamp. This should
// allow the channel update to be processed even though it is still
// marked as a zombie within the index, since it is a fresh new update.
// This won't work however since we'll sign it with the wrong private
// key (local rather than remote).
batch.chanUpdAnn2.Timestamp = uint32(time.Now().Unix())
if err := signUpdate(localPrivKey, batch.chanUpdAnn2); err != nil {
t.Fatalf("unable to sign update with new timestamp: %v", err)
}
// We should expect an error due to the signature being invalid.
processAnnouncement(batch.chanUpdAnn2, true, true)
// Signing it with the correct private key should allow it to be
// processed.
if err := signUpdate(remotePrivKey, batch.chanUpdAnn2); err != nil {
t.Fatalf("unable to sign update with new timestamp: %v", err)
}
// The channel update cannot be successfully processed and broadcast
// until the channel announcement is. Since the channel update indicates
// a fresh new update, the gossiper should stash it until it sees the
// corresponding channel announcement.
updateErrChan := ctx.gossiper.ProcessRemoteAnnouncement(
batch.chanUpdAnn2, remotePeer,
)
select {
case <-ctx.broadcastedMessage:
t.Fatal("expected to not broadcast live channel update " +
"without announcement")
case <-time.After(2 * trickleDelay):
}
// We'll go ahead and process the channel announcement to ensure the
// channel update is processed thereafter.
processAnnouncement(batch.remoteChanAnn, false, false)
// After successfully processing the announcement, the channel update
// should have been processed and broadcast successfully as well.
select {
case err := <-updateErrChan:
if err != nil {
t.Fatalf("expected to process live channel update: %v",
err)
}
case <-time.After(time.Second):
t.Fatal("expected to process announcement")
}
select {
case msgWithSenders := <-ctx.broadcastedMessage:
assertMessage(t, batch.chanUpdAnn2, msgWithSenders.msg)
case <-time.After(2 * trickleDelay):
t.Fatal("expected to broadcast live channel update")
}
}
// TestReceiveRemoteChannelUpdateFirst tests that if we receive a ChannelUpdate // TestReceiveRemoteChannelUpdateFirst tests that if we receive a ChannelUpdate
// from the remote before we have processed our own ChannelAnnouncement, it will // from the remote before we have processed our own ChannelAnnouncement, it will
// be reprocessed later, after our ChannelAnnouncement. // be reprocessed later, after our ChannelAnnouncement.

View file

@ -2,6 +2,7 @@ package routing
import ( import (
"bytes" "bytes"
"fmt"
"github.com/btcsuite/btcd/btcec" "github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/chaincfg/chainhash"
@ -132,20 +133,28 @@ func ValidateChannelUpdateAnn(pubKey *btcec.PublicKey, capacity btcutil.Amount,
return err return err
} }
data, err := a.DataToSign() return VerifyChannelUpdateSignature(a, pubKey)
}
// VerifyChannelUpdateSignature verifies that the channel update message was
// signed by the party with the given node public key.
func VerifyChannelUpdateSignature(msg *lnwire.ChannelUpdate,
pubKey *btcec.PublicKey) error {
data, err := msg.DataToSign()
if err != nil { if err != nil {
return errors.Errorf("unable to reconstruct message: %v", err) return fmt.Errorf("unable to reconstruct message data: %v", err)
} }
dataHash := chainhash.DoubleHashB(data) dataHash := chainhash.DoubleHashB(data)
nodeSig, err := a.Signature.ToSignature() nodeSig, err := msg.Signature.ToSignature()
if err != nil { if err != nil {
return err return err
} }
if !nodeSig.Verify(dataHash, pubKey) { if !nodeSig.Verify(dataHash, pubKey) {
return errors.Errorf("invalid signature for channel "+ return fmt.Errorf("invalid signature for channel update %v",
"update %v", spew.Sdump(a)) spew.Sdump(msg))
} }
return nil return nil

View file

@ -33,6 +33,10 @@ const (
// if we should give up on a payment attempt. This will be used if a // if we should give up on a payment attempt. This will be used if a
// value isn't specified in the LightningNode struct. // value isn't specified in the LightningNode struct.
defaultPayAttemptTimeout = time.Duration(time.Second * 60) defaultPayAttemptTimeout = time.Duration(time.Second * 60)
// DefaultChannelPruneExpiry is the default duration used to determine
// if a channel should be pruned or not.
DefaultChannelPruneExpiry = time.Duration(time.Hour * 24 * 14)
) )
var ( var (
@ -76,7 +80,7 @@ type ChannelGraphSource interface {
IsPublicNode(node Vertex) (bool, error) IsPublicNode(node Vertex) (bool, error)
// IsKnownEdge returns true if the graph source already knows of the // IsKnownEdge returns true if the graph source already knows of the
// passed channel ID. // passed channel ID either as a live or zombie edge.
IsKnownEdge(chanID lnwire.ShortChannelID) bool IsKnownEdge(chanID lnwire.ShortChannelID) bool
// IsStaleEdgePolicy returns true if the graph source has a channel // IsStaleEdgePolicy returns true if the graph source has a channel
@ -85,6 +89,10 @@ type ChannelGraphSource interface {
IsStaleEdgePolicy(chanID lnwire.ShortChannelID, timestamp time.Time, IsStaleEdgePolicy(chanID lnwire.ShortChannelID, timestamp time.Time,
flags lnwire.ChanUpdateChanFlags) bool flags lnwire.ChanUpdateChanFlags) bool
// MarkEdgeLive clears an edge from our zombie index, deeming it as
// live.
MarkEdgeLive(chanID lnwire.ShortChannelID) error
// ForAllOutgoingChannels is used to iterate over all channels // ForAllOutgoingChannels is used to iterate over all channels
// emanating from the "source" node which is the center of the // emanating from the "source" node which is the center of the
// star-graph. // star-graph.
@ -1009,12 +1017,19 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error {
// Prior to processing the announcement we first check if we // Prior to processing the announcement we first check if we
// already know of this channel, if so, then we can exit early. // already know of this channel, if so, then we can exit early.
_, _, exists, err := r.cfg.Graph.HasChannelEdge(msg.ChannelID) _, _, exists, isZombie, err := r.cfg.Graph.HasChannelEdge(
msg.ChannelID,
)
if err != nil && err != channeldb.ErrGraphNoEdgesFound { if err != nil && err != channeldb.ErrGraphNoEdgesFound {
return errors.Errorf("unable to check for edge "+ return errors.Errorf("unable to check for edge "+
"existence: %v", err) "existence: %v", err)
} else if exists { }
return newErrf(ErrIgnored, "Ignoring msg for known "+ if isZombie {
return newErrf(ErrIgnored, "ignoring msg for zombie "+
"chan_id=%v", msg.ChannelID)
}
if exists {
return newErrf(ErrIgnored, "ignoring msg for known "+
"chan_id=%v", msg.ChannelID) "chan_id=%v", msg.ChannelID)
} }
@ -1130,19 +1145,29 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error {
r.channelEdgeMtx.Lock(msg.ChannelID) r.channelEdgeMtx.Lock(msg.ChannelID)
defer r.channelEdgeMtx.Unlock(msg.ChannelID) defer r.channelEdgeMtx.Unlock(msg.ChannelID)
edge1Timestamp, edge2Timestamp, exists, err := r.cfg.Graph.HasChannelEdge( edge1Timestamp, edge2Timestamp, exists, isZombie, err :=
msg.ChannelID, r.cfg.Graph.HasChannelEdge(msg.ChannelID)
)
if err != nil && err != channeldb.ErrGraphNoEdgesFound { if err != nil && err != channeldb.ErrGraphNoEdgesFound {
return errors.Errorf("unable to check for edge "+ return errors.Errorf("unable to check for edge "+
"existence: %v", err) "existence: %v", err)
} }
// If the channel is marked as a zombie in our database, and
// we consider this a stale update, then we should not apply the
// policy.
isStaleUpdate := time.Since(msg.LastUpdate) > r.cfg.ChannelPruneExpiry
if isZombie && isStaleUpdate {
return newErrf(ErrIgnored, "ignoring stale update "+
"(flags=%v|%v) for zombie chan_id=%v",
msg.MessageFlags, msg.ChannelFlags,
msg.ChannelID)
}
// If the channel doesn't exist in our database, we cannot // If the channel doesn't exist in our database, we cannot
// apply the updated policy. // apply the updated policy.
if !exists { if !exists {
return newErrf(ErrIgnored, "Ignoring update "+ return newErrf(ErrIgnored, "ignoring update "+
"(flags=%v|%v) for unknown chan_id=%v", "(flags=%v|%v) for unknown chan_id=%v",
msg.MessageFlags, msg.ChannelFlags, msg.MessageFlags, msg.ChannelFlags,
msg.ChannelID) msg.ChannelID)
@ -2241,12 +2266,12 @@ func (r *ChannelRouter) IsPublicNode(node Vertex) (bool, error) {
} }
// IsKnownEdge returns true if the graph source already knows of the passed // IsKnownEdge returns true if the graph source already knows of the passed
// channel ID. // channel ID either as a live or zombie edge.
// //
// NOTE: This method is part of the ChannelGraphSource interface. // NOTE: This method is part of the ChannelGraphSource interface.
func (r *ChannelRouter) IsKnownEdge(chanID lnwire.ShortChannelID) bool { func (r *ChannelRouter) IsKnownEdge(chanID lnwire.ShortChannelID) bool {
_, _, exists, _ := r.cfg.Graph.HasChannelEdge(chanID.ToUint64()) _, _, exists, isZombie, _ := r.cfg.Graph.HasChannelEdge(chanID.ToUint64())
return exists return exists || isZombie
} }
// IsStaleEdgePolicy returns true if the graph soruce has a channel edge for // IsStaleEdgePolicy returns true if the graph soruce has a channel edge for
@ -2256,14 +2281,19 @@ func (r *ChannelRouter) IsKnownEdge(chanID lnwire.ShortChannelID) bool {
func (r *ChannelRouter) IsStaleEdgePolicy(chanID lnwire.ShortChannelID, func (r *ChannelRouter) IsStaleEdgePolicy(chanID lnwire.ShortChannelID,
timestamp time.Time, flags lnwire.ChanUpdateChanFlags) bool { timestamp time.Time, flags lnwire.ChanUpdateChanFlags) bool {
edge1Timestamp, edge2Timestamp, exists, err := r.cfg.Graph.HasChannelEdge( edge1Timestamp, edge2Timestamp, exists, isZombie, err :=
chanID.ToUint64(), r.cfg.Graph.HasChannelEdge(chanID.ToUint64())
)
if err != nil { if err != nil {
return false return false
} }
// If we know of the edge as a zombie, then we'll check the timestamp of
// this message to determine whether it's fresh.
if isZombie {
return time.Since(timestamp) > r.cfg.ChannelPruneExpiry
}
// If we don't know of the edge, then it means it's fresh (thus not // If we don't know of the edge, then it means it's fresh (thus not
// stale). // stale).
if !exists { if !exists {
@ -2275,7 +2305,6 @@ func (r *ChannelRouter) IsStaleEdgePolicy(chanID lnwire.ShortChannelID,
// already have the most up to date information for that edge. If so, // already have the most up to date information for that edge. If so,
// then we can exit early. // then we can exit early.
switch { switch {
// A flag set of 0 indicates this is an announcement for the "first" // A flag set of 0 indicates this is an announcement for the "first"
// node in the channel. // node in the channel.
case flags&lnwire.ChanUpdateDirection == 0: case flags&lnwire.ChanUpdateDirection == 0:
@ -2289,3 +2318,10 @@ func (r *ChannelRouter) IsStaleEdgePolicy(chanID lnwire.ShortChannelID,
return false return false
} }
// MarkEdgeLive clears an edge from our zombie index, deeming it as live.
//
// NOTE: This method is part of the ChannelGraphSource interface.
func (r *ChannelRouter) MarkEdgeLive(chanID lnwire.ShortChannelID) error {
return r.cfg.Graph.MarkEdgeLive(chanID.ToUint64())
}

View file

@ -1549,21 +1549,27 @@ func TestWakeUpOnStaleBranch(t *testing.T) {
} }
// Check that the fundingTxs are in the graph db. // Check that the fundingTxs are in the graph db.
_, _, has, err := ctx.graph.HasChannelEdge(chanID1) _, _, has, isZombie, err := ctx.graph.HasChannelEdge(chanID1)
if err != nil { if err != nil {
t.Fatalf("error looking for edge: %v", chanID1) t.Fatalf("error looking for edge: %v", chanID1)
} }
if !has { if !has {
t.Fatalf("could not find edge in graph") t.Fatalf("could not find edge in graph")
} }
if isZombie {
t.Fatal("edge was marked as zombie")
}
_, _, has, err = ctx.graph.HasChannelEdge(chanID2) _, _, has, isZombie, err = ctx.graph.HasChannelEdge(chanID2)
if err != nil { if err != nil {
t.Fatalf("error looking for edge: %v", chanID2) t.Fatalf("error looking for edge: %v", chanID2)
} }
if !has { if !has {
t.Fatalf("could not find edge in graph") t.Fatalf("could not find edge in graph")
} }
if isZombie {
t.Fatal("edge was marked as zombie")
}
// Stop the router, so we can reorg the chain while its offline. // Stop the router, so we can reorg the chain while its offline.
if err := ctx.router.Stop(); err != nil { if err := ctx.router.Stop(); err != nil {
@ -1607,22 +1613,27 @@ func TestWakeUpOnStaleBranch(t *testing.T) {
// The channel with chanID2 should not be in the database anymore, // The channel with chanID2 should not be in the database anymore,
// since it is not confirmed on the longest chain. chanID1 should // since it is not confirmed on the longest chain. chanID1 should
// still be. // still be.
_, _, has, err = ctx.graph.HasChannelEdge(chanID1) _, _, has, isZombie, err = ctx.graph.HasChannelEdge(chanID1)
if err != nil { if err != nil {
t.Fatalf("error looking for edge: %v", chanID1) t.Fatalf("error looking for edge: %v", chanID1)
} }
if !has { if !has {
t.Fatalf("did not find edge in graph") t.Fatalf("did not find edge in graph")
} }
if isZombie {
t.Fatal("edge was marked as zombie")
}
_, _, has, err = ctx.graph.HasChannelEdge(chanID2) _, _, has, isZombie, err = ctx.graph.HasChannelEdge(chanID2)
if err != nil { if err != nil {
t.Fatalf("error looking for edge: %v", chanID2) t.Fatalf("error looking for edge: %v", chanID2)
} }
if has { if has {
t.Fatalf("found edge in graph") t.Fatalf("found edge in graph")
} }
if isZombie {
t.Fatal("reorged edge should not be marked as zombie")
}
} }
// TestDisconnectedBlocks checks that the router handles a reorg happening when // TestDisconnectedBlocks checks that the router handles a reorg happening when
@ -1755,21 +1766,27 @@ func TestDisconnectedBlocks(t *testing.T) {
} }
// Check that the fundingTxs are in the graph db. // Check that the fundingTxs are in the graph db.
_, _, has, err := ctx.graph.HasChannelEdge(chanID1) _, _, has, isZombie, err := ctx.graph.HasChannelEdge(chanID1)
if err != nil { if err != nil {
t.Fatalf("error looking for edge: %v", chanID1) t.Fatalf("error looking for edge: %v", chanID1)
} }
if !has { if !has {
t.Fatalf("could not find edge in graph") t.Fatalf("could not find edge in graph")
} }
if isZombie {
t.Fatal("edge was marked as zombie")
}
_, _, has, err = ctx.graph.HasChannelEdge(chanID2) _, _, has, isZombie, err = ctx.graph.HasChannelEdge(chanID2)
if err != nil { if err != nil {
t.Fatalf("error looking for edge: %v", chanID2) t.Fatalf("error looking for edge: %v", chanID2)
} }
if !has { if !has {
t.Fatalf("could not find edge in graph") t.Fatalf("could not find edge in graph")
} }
if isZombie {
t.Fatal("edge was marked as zombie")
}
// Create a 15 block fork. We first let the chainView notify the router // Create a 15 block fork. We first let the chainView notify the router
// about stale blocks, before sending the now connected blocks. We do // about stale blocks, before sending the now connected blocks. We do
@ -1796,22 +1813,27 @@ func TestDisconnectedBlocks(t *testing.T) {
// chanID2 should not be in the database anymore, since it is not // chanID2 should not be in the database anymore, since it is not
// confirmed on the longest chain. chanID1 should still be. // confirmed on the longest chain. chanID1 should still be.
_, _, has, err = ctx.graph.HasChannelEdge(chanID1) _, _, has, isZombie, err = ctx.graph.HasChannelEdge(chanID1)
if err != nil { if err != nil {
t.Fatalf("error looking for edge: %v", chanID1) t.Fatalf("error looking for edge: %v", chanID1)
} }
if !has { if !has {
t.Fatalf("did not find edge in graph") t.Fatalf("did not find edge in graph")
} }
if isZombie {
t.Fatal("edge was marked as zombie")
}
_, _, has, err = ctx.graph.HasChannelEdge(chanID2) _, _, has, isZombie, err = ctx.graph.HasChannelEdge(chanID2)
if err != nil { if err != nil {
t.Fatalf("error looking for edge: %v", chanID2) t.Fatalf("error looking for edge: %v", chanID2)
} }
if has { if has {
t.Fatalf("found edge in graph") t.Fatalf("found edge in graph")
} }
if isZombie {
t.Fatal("reorged edge should not be marked as zombie")
}
} }
// TestChansClosedOfflinePruneGraph tests that if channels we know of are // TestChansClosedOfflinePruneGraph tests that if channels we know of are
@ -1876,13 +1898,16 @@ func TestRouterChansClosedOfflinePruneGraph(t *testing.T) {
} }
// The router should now be aware of the channel we created above. // The router should now be aware of the channel we created above.
_, _, hasChan, err := ctx.graph.HasChannelEdge(chanID1.ToUint64()) _, _, hasChan, isZombie, err := ctx.graph.HasChannelEdge(chanID1.ToUint64())
if err != nil { if err != nil {
t.Fatalf("error looking for edge: %v", chanID1) t.Fatalf("error looking for edge: %v", chanID1)
} }
if !hasChan { if !hasChan {
t.Fatalf("could not find edge in graph") t.Fatalf("could not find edge in graph")
} }
if isZombie {
t.Fatal("edge was marked as zombie")
}
// With the transaction included, and the router's database state // With the transaction included, and the router's database state
// updated, we'll now mine 5 additional blocks on top of it. // updated, we'll now mine 5 additional blocks on top of it.
@ -1957,13 +1982,16 @@ func TestRouterChansClosedOfflinePruneGraph(t *testing.T) {
// At this point, the channel that was pruned should no longer be known // At this point, the channel that was pruned should no longer be known
// by the router. // by the router.
_, _, hasChan, err = ctx.graph.HasChannelEdge(chanID1.ToUint64()) _, _, hasChan, isZombie, err = ctx.graph.HasChannelEdge(chanID1.ToUint64())
if err != nil { if err != nil {
t.Fatalf("error looking for edge: %v", chanID1) t.Fatalf("error looking for edge: %v", chanID1)
} }
if hasChan { if hasChan {
t.Fatalf("channel was found in graph but shouldn't have been") t.Fatalf("channel was found in graph but shouldn't have been")
} }
if isZombie {
t.Fatal("closed channel should not be marked as zombie")
}
} }
// TestFindPathFeeWeighting tests that the findPath method will properly prefer // TestFindPathFeeWeighting tests that the findPath method will properly prefer

View file

@ -583,7 +583,7 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl,
firstHop, htlcAdd, errorDecryptor, firstHop, htlcAdd, errorDecryptor,
) )
}, },
ChannelPruneExpiry: time.Duration(time.Hour * 24 * 14), ChannelPruneExpiry: routing.DefaultChannelPruneExpiry,
GraphPruneInterval: time.Duration(time.Hour), GraphPruneInterval: time.Duration(time.Hour),
QueryBandwidth: func(edge *channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi { QueryBandwidth: func(edge *channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi {
// If we aren't on either side of this edge, then we'll // If we aren't on either side of this edge, then we'll