1
0
Fork 0
mirror of https://github.com/ACINQ/eclair.git synced 2025-02-23 06:35:11 +01:00

Remember pruned channels (#706)

Currently we don't remember channels that we have pruned, so we will happily revalidate the same channels again if a node re-sends them to us, and prune them again, a.k.a. the "zombie churn".

Before channel queries, rejecting a stale channel without validating it wasn't trivial,  because nodes sent us the `channel_announcement` before `channel_update`s, and only after receiving the `channel_update` could we know if the channel was still stale. Since we had no way of requesting the `channel_announcement` for one particular channel, we would have to buffer it, which would lead to potential DOS issues. 

But now that we have channel queries, we can now be much more efficient. Process goes like this:
(1) channel x is detected as stale gets pruned, and its id is added to the pruned db
(2) later on we receive a `channel_announcement` from Eve, we ignore it because the channel is in the pruned db
(3) we also receive old `channel_update`s from Eve nodes, just ignore them because we don't know the channel
(4) then one day some other node George sends us the `channel_announcement`, we still ignore it because the channel is still in the pruned db
(5) but then George sends us recent `channel_update`s, and we know that the channel is back from the dead. We ignore those `channel_update`s, but we aldo remove the channel id from the pruned db, and we request announcements for that node from George
(6) George sends us the `channel_announcement` again, we validate it
(7) George then sends us the `channel_update`s again, we process them
(8) done!

This also allows removing the pruning code that we were doing on-the-fly when answering to routing table sync requests.

Needless to say that this leads to a huge reduction in CPU/bandwidth usage on well-connected nodes.

Fixes #623, #624.
This commit is contained in:
Pierre-Marie Padiou 2018-09-17 18:30:10 +02:00 committed by GitHub
parent 52b161f5e9
commit 2c1811d18f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 83 additions and 116 deletions

View file

@ -49,4 +49,10 @@ trait NetworkDb {
def listChannelUpdates(): Seq[ChannelUpdate]
def addToPruned(shortChannelId: ShortChannelId)
def removeFromPruned(shortChannelId: ShortChannelId)
def isPruned(shortChannelId: ShortChannelId): Boolean
}

View file

@ -20,7 +20,7 @@ import java.sql.Connection
import fr.acinq.bitcoin.{BinaryData, Crypto, Satoshi}
import fr.acinq.eclair.ShortChannelId
import fr.acinq.eclair.db.NetworkDb
import fr.acinq.eclair.db.{NetworkDb, Payment}
import fr.acinq.eclair.router.Announcements
import fr.acinq.eclair.wire.LightningMessageCodecs.{channelAnnouncementCodec, channelUpdateCodec, nodeAnnouncementCodec}
import fr.acinq.eclair.wire.{ChannelAnnouncement, ChannelUpdate, NodeAnnouncement}
@ -40,6 +40,7 @@ class SqliteNetworkDb(sqlite: Connection) extends NetworkDb {
statement.executeUpdate("CREATE TABLE IF NOT EXISTS channels (short_channel_id INTEGER NOT NULL PRIMARY KEY, txid STRING NOT NULL, data BLOB NOT NULL, capacity_sat INTEGER NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS channel_updates (short_channel_id INTEGER NOT NULL, node_flag INTEGER NOT NULL, data BLOB NOT NULL, PRIMARY KEY(short_channel_id, node_flag), FOREIGN KEY(short_channel_id) REFERENCES channels(short_channel_id))")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS channel_updates_idx ON channel_updates(short_channel_id)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS pruned (short_channel_id INTEGER NOT NULL PRIMARY KEY)")
}
override def addNode(n: NodeAnnouncement): Unit = {
@ -128,4 +129,24 @@ class SqliteNetworkDb(sqlite: Connection) extends NetworkDb {
}
}
override def addToPruned(shortChannelId: ShortChannelId): Unit = {
using(sqlite.prepareStatement("INSERT OR IGNORE INTO pruned VALUES (?)")) { statement =>
statement.setLong(1, shortChannelId.toLong)
statement.executeUpdate()
}
}
override def removeFromPruned(shortChannelId: ShortChannelId): Unit = {
using(sqlite.createStatement) { statement =>
statement.executeUpdate(s"DELETE FROM pruned WHERE short_channel_id=${shortChannelId.toLong}")
}
}
override def isPruned(shortChannelId: ShortChannelId): Boolean = {
using(sqlite.prepareStatement("SELECT short_channel_id from pruned WHERE short_channel_id=?")) { statement =>
statement.setLong(1, shortChannelId.toLong)
val rs = statement.executeQuery()
rs.next()
}
}
}

View file

@ -162,6 +162,8 @@ class Router(nodeParams: NodeParams, watcher: ActorRef) extends FSMDiagnosticAct
// channel wasn't announced but here is the announcement, we will process it *before* the channel_update
watcher ! ValidateRequest(c)
val d1 = d.copy(awaiting = d.awaiting + (c -> Nil)) // no origin
// maybe the local channel was pruned (can happen if we were disconnected for more than 2 weeks)
db.removeFromPruned(c.shortChannelId)
stay using handle(u, self, d1)
case None if d.privateChannels.contains(shortChannelId) =>
// channel isn't announced but we already know about it, we can process the channel_update
@ -196,8 +198,7 @@ class Router(nodeParams: NodeParams, watcher: ActorRef) extends FSMDiagnosticAct
case Event(GetRoutingState, d: Data) =>
log.info(s"getting valid announcements for $sender")
val (validChannels, validNodes, validUpdates) = getValidAnnouncements(d.channels, d.nodes, d.updates)
sender ! RoutingState(validChannels, validUpdates, validNodes)
sender ! RoutingState(d.channels.values, d.updates.values, d.nodes.values)
stay
case Event(v@ValidateResult(c, _, _, _), d0) =>
@ -309,29 +310,8 @@ class Router(nodeParams: NodeParams, watcher: ActorRef) extends FSMDiagnosticAct
stay
} else {
log.info("broadcasting routing messages")
// we don't want to rebroadcast old channels if we don't have a recent channel_update, otherwise we will keep sending zombies channels back and forth
// instead, we base the rebroadcast on updates, and only keep the ones that are younger than 2 weeks
val rebroadcastUpdates1 = d.rebroadcast.updates.filterKeys(u => !isStale(u))
// for each update, we rebroadcast the corresponding channel announcement (suboptimal)
// we have to do this because we didn't broadcast old channels for which we didn't yet receive the channel_update
val rebroadcastChannels1 = rebroadcastUpdates1.foldLeft(Map.empty[ChannelAnnouncement, Set[ActorRef]]) {
case (channels, (u, updateOrigins)) =>
d.channels.get(u.shortChannelId) match {
case Some(c) => d.rebroadcast.channels.get(c) match {
case Some(channelOrigins) => channels + (c -> channelOrigins) // we have origin peers for this channel
case None => channels + (c -> updateOrigins) // we don't have origin peers for this channel, let's use the same origin list as corresponding update (they must know the channel)
}
case None => channels // weird, we don't know this channel, that should never happen and we can ignore it anyway
}
}
// and we only keep nodes that have at least one valid channel
val staleChannels = getStaleChannels(d.channels.values, d.updates)
val validChannels = (d.channels -- staleChannels).values
val rebroadcastNodes1 = d.rebroadcast.nodes.filterKeys(n => hasChannels(n.nodeId, validChannels))
// then we're ready to broadcast
val rebroadcast1 = d.rebroadcast.copy(channels = rebroadcastChannels1, updates = rebroadcastUpdates1, nodes = rebroadcastNodes1)
log.debug("staggered broadcast details: channels={} updates={} nodes={}", rebroadcast1.channels.size, rebroadcast1.updates.size, rebroadcast1.nodes.size)
context.actorSelection(context.system / "*" / "switchboard") ! rebroadcast1
log.debug("staggered broadcast details: channels={} updates={} nodes={}", d.rebroadcast.channels.size, d.rebroadcast.updates.size, d.rebroadcast.nodes.size)
context.actorSelection(context.system / "*" / "switchboard") ! d.rebroadcast
stay using d.copy(rebroadcast = Rebroadcast(channels = Map.empty, updates = Map.empty, nodes = Map.empty))
}
@ -350,6 +330,8 @@ class Router(nodeParams: NodeParams, watcher: ActorRef) extends FSMDiagnosticAct
staleChannels.foreach { shortChannelId =>
log.info("pruning shortChannelId={} (stale)", shortChannelId)
db.removeChannel(shortChannelId) // NB: this also removes channel updates
// we keep track of recently pruned channels so we don't revalidate them (zombie churn)
db.addToPruned(shortChannelId)
context.system.eventStream.publish(ChannelLost(shortChannelId))
}
// we also need to remove updates from the graph
@ -432,10 +414,10 @@ class Router(nodeParams: NodeParams, watcher: ActorRef) extends FSMDiagnosticAct
log.debug("received channel update from {}", sender)
stay using handle(u, sender, d)
case Event(PeerRoutingMessage(_, _, u: ChannelUpdate), d) =>
case Event(PeerRoutingMessage(_, remoteNodeId, u: ChannelUpdate), d) =>
sender ! TransportHandler.ReadAck(u)
log.debug("received channel update for shortChannelId={}", u.shortChannelId)
stay using handle(u, sender, d)
stay using handle(u, sender, d, remoteNodeId_opt = Some(remoteNodeId))
case Event(PeerRoutingMessage(_, _, c: ChannelAnnouncement), d) =>
log.debug("received channel announcement for shortChannelId={} nodeId1={} nodeId2={}", c.shortChannelId, c.nodeId1, c.nodeId2)
@ -449,6 +431,11 @@ class Router(nodeParams: NodeParams, watcher: ActorRef) extends FSMDiagnosticAct
// adding the sender to the list of origins so that we don't send back the same announcement to this peer later
val origins = d.awaiting(c) :+ sender
stay using d.copy(awaiting = d.awaiting + (c -> origins))
} else if (db.isPruned(c.shortChannelId)) {
sender ! TransportHandler.ReadAck(c)
// channel was pruned and we haven't received a recent channel_update, so we have no reason to revalidate it
log.debug("ignoring {} (was pruned)", c)
stay
} else if (!Announcements.checkSigs(c)) {
sender ! TransportHandler.ReadAck(c)
log.warning("bad signature for announcement {}", c)
@ -584,7 +571,7 @@ class Router(nodeParams: NodeParams, watcher: ActorRef) extends FSMDiagnosticAct
d
}
def handle(u: ChannelUpdate, origin: ActorRef, d: Data): Data =
def handle(u: ChannelUpdate, origin: ActorRef, d: Data, remoteNodeId_opt: Option[PublicKey] = None): Data =
if (d.channels.contains(u.shortChannelId)) {
// related channel is already known (note: this means no related channel_update is in the stash)
val publicChannel = true
@ -659,6 +646,30 @@ class Router(nodeParams: NodeParams, watcher: ActorRef) extends FSMDiagnosticAct
addEdge(d.graph, desc, u)
d.copy(privateUpdates = d.privateUpdates + (desc -> u))
}
} else if (db.isPruned(u.shortChannelId) && !isStale(u)) {
// the channel was recently pruned, but if we are here, it means that the update is not stale so this is the case
// of a zombie channel coming back from the dead. they probably sent us a channel_announcement right before this update,
// but we ignored it because the channel was in the 'pruned' list. Now that we know that the channel is alive again,
// let's remove the channel from the zombie list and ask the sender to re-send announcements (channel_announcement + updates)
// about that channel. We can ignore this update since we will receive it again
log.info(s"channel shortChannelId=${u.shortChannelId} is back from the dead! requesting announcements about this channel")
db.removeFromPruned(u.shortChannelId)
remoteNodeId_opt match {
case Some(remoteNodeId) =>
d.sync.get(remoteNodeId) match {
case Some(sync) =>
// we already have a pending request to that node, let's add this channel to the list and we'll get it later
d.copy(sync = d.sync + (remoteNodeId -> sync.copy(missing = sync.missing + u.shortChannelId, totalMissingCount = sync.totalMissingCount + 1)))
case None =>
// we send the query right away
origin ! QueryShortChannelIds(u.chainHash, ChannelRangeQueries.encodeShortChannelIdsSingle(Seq(u.shortChannelId), ChannelRangeQueries.UNCOMPRESSED_FORMAT, useGzip = false))
d.copy(sync = d.sync + (remoteNodeId -> Sync(missing = SortedSet(u.shortChannelId), totalMissingCount = 1)))
}
case None =>
// we don't know which node this update came from (maybe it was stashed and the channel got pruned in the meantime or some other corner case)
// anyway, that's not really a big deal because we have removed the channel from the pruned db so next time it shows up we will revalidate it
d
}
} else {
log.debug("ignoring announcement {} (unknown channel)", u)
d
@ -738,27 +749,7 @@ object Router {
*/
def keep(firstBlockNum: Long, numberOfBlocks: Long, id: ShortChannelId, channels: Map[ShortChannelId, ChannelAnnouncement], updates: Map[ChannelDesc, ChannelUpdate]): Boolean = {
val TxCoordinates(height, _, _) = ShortChannelId.coordinates(id)
val c = channels(id)
val u1 = updates.get(ChannelDesc(c.shortChannelId, c.nodeId1, c.nodeId2))
val u2 = updates.get(ChannelDesc(c.shortChannelId, c.nodeId2, c.nodeId1))
height >= firstBlockNum && height <= (firstBlockNum + numberOfBlocks) && !isStale(c, u1, u2)
}
/**
* Filters announcements that we want to send to nodes asking an `initial_routing_sync`
*
* @param channels
* @param nodes
* @param updates
* @return
*/
def getValidAnnouncements(channels: Map[ShortChannelId, ChannelAnnouncement], nodes: Map[PublicKey, NodeAnnouncement], updates: Map[ChannelDesc, ChannelUpdate]): (Iterable[ChannelAnnouncement], Iterable[NodeAnnouncement], Iterable[ChannelUpdate]) = {
val staleChannels = getStaleChannels(channels.values, updates)
val validChannels = (channels -- staleChannels).values
val staleUpdates = staleChannels.map(channels).flatMap(c => Seq(ChannelDesc(c.shortChannelId, c.nodeId1, c.nodeId2), ChannelDesc(c.shortChannelId, c.nodeId2, c.nodeId1)))
val validUpdates = (updates -- staleUpdates).values
val validNodes = validChannels.foldLeft(Set.empty[NodeAnnouncement]) { case (nodesAcc, c) => nodesAcc ++ nodes.get(c.nodeId1) ++ nodes.get(c.nodeId2) } // using a set deduplicates nodes
(validChannels, validNodes, validUpdates)
height >= firstBlockNum && height <= (firstBlockNum + numberOfBlocks)
}
def syncProgress(d: Data): SyncProgress =

View file

@ -101,4 +101,19 @@ class SqliteNetworkDbSpec extends FunSuite {
db.updateChannelUpdate(channel_update_1)
}
test("add/remove/test pruned channels") {
val sqlite = inmem
val db = new SqliteNetworkDb(sqlite)
db.addToPruned(ShortChannelId(1))
db.addToPruned(ShortChannelId(5))
assert(db.isPruned(ShortChannelId(1)))
assert(!db.isPruned(ShortChannelId(3)))
assert(db.isPruned(ShortChannelId(1)))
db.removeFromPruned(ShortChannelId(5))
assert(!db.isPruned(ShortChannelId(5)))
}
}

View file

@ -218,72 +218,6 @@ class RouteCalculationSpec extends FunSuite {
assert(hops === Hop(a, b, uab) :: Hop(b, c, ubc) :: Hop(c, d, ucd) :: Hop(d, e, ude) :: Nil)
}
test("stale channels pruning") {
// set current block height
Globals.blockCount.set(500000)
def nodeAnnouncement(nodeId: PublicKey) = NodeAnnouncement("", "", 0, nodeId, Color(0, 0, 0), "", Nil)
// we only care about timestamps and nodes ids
def channelAnnouncement(shortChannelId: ShortChannelId, node1: PublicKey, node2: PublicKey) = ChannelAnnouncement("", "", "", "", "", "", shortChannelId, node1, node2, randomKey.publicKey, randomKey.publicKey)
def channelUpdate(shortChannelId: ShortChannelId, timestamp: Long) = ChannelUpdate("", "", shortChannelId, timestamp, "", 0, 0, 0, 0)
def daysAgoInBlocks(daysAgo: Int): Int = Globals.blockCount.get().toInt - 144 * daysAgo
def daysAgoInSeconds(daysAgo: Int): Long = Platform.currentTime / 1000 - daysAgo * 24 * 3600
// note: those are *ordered*
val (node_1, node_2, node_3, node_4) = (nodeAnnouncement(PublicKey("02ca1f8792292fd2ad4001b578e962861cc1120f0140d050e87ce1d143f7179031")), nodeAnnouncement(PublicKey("028689a991673e0888580fc7cd3fb3e8a1b62e7e7f65a5fc9899f44b88307331d8")), nodeAnnouncement(PublicKey("036eee3325d246a54e32aa5c215777493e4867b2b22570c307283f5e160c1997cd")), nodeAnnouncement(PublicKey("039311b2ee0e47fe40e9d35a72416e7c3b6263abb12bce15d250e0e5e20f11029d")))
// a is an old channel with an old channel update => PRUNED
val id_a = ShortChannelId(daysAgoInBlocks(16), 0, 0)
val chan_a = channelAnnouncement(id_a, node_1.nodeId, node_2.nodeId)
val upd_a = channelUpdate(id_a, daysAgoInSeconds(30))
// b is an old channel with no channel update => PRUNED
val id_b = ShortChannelId(daysAgoInBlocks(16), 1, 0)
val chan_b = channelAnnouncement(id_b, node_2.nodeId, node_3.nodeId)
// c is an old channel with a recent channel update => KEPT
val id_c = ShortChannelId(daysAgoInBlocks(16), 2, 0)
val chan_c = channelAnnouncement(id_c, node_1.nodeId, node_3.nodeId)
val upd_c = channelUpdate(id_c, daysAgoInSeconds(2))
// d is a recent channel with a recent channel update => KEPT
val id_d = ShortChannelId(daysAgoInBlocks(2), 0, 0)
val chan_d = channelAnnouncement(id_d, node_3.nodeId, node_4.nodeId)
val upd_d = channelUpdate(id_d, daysAgoInSeconds(2))
// e is a recent channel with no channel update => KEPT
val id_e = ShortChannelId(daysAgoInBlocks(1), 0, 0)
val chan_e = channelAnnouncement(id_e, node_1.nodeId, randomKey.publicKey)
val nodes = Map(
node_1.nodeId -> node_1,
node_2.nodeId -> node_2,
node_3.nodeId -> node_3,
node_4.nodeId -> node_4
)
val channels = Map(
chan_a.shortChannelId -> chan_a,
chan_b.shortChannelId -> chan_b,
chan_c.shortChannelId -> chan_c,
chan_d.shortChannelId -> chan_d,
chan_e.shortChannelId -> chan_e
)
val updates = Map(
ChannelDesc(chan_a.shortChannelId, chan_a.nodeId1, chan_a.nodeId2) -> upd_a,
ChannelDesc(chan_c.shortChannelId, chan_c.nodeId1, chan_c.nodeId2) -> upd_c,
ChannelDesc(chan_d.shortChannelId, chan_d.nodeId1, chan_d.nodeId2) -> upd_d
)
val staleChannels = Router.getStaleChannels(channels.values, updates).toSet
assert(staleChannels === Set(id_a, id_b))
val (validChannels, validNodes, validUpdates) = Router.getValidAnnouncements(channels, nodes, updates)
assert(validChannels.toSet === Set(chan_c, chan_d, chan_e))
assert(validNodes.toSet === Set(node_1, node_3, node_4)) // node 2 has been pruned because its only channel was pruned
assert(validUpdates.toSet === Set(upd_c, upd_d))
}
test("convert extra hops to channel_update") {
val a = randomKey.publicKey
val b = randomKey.publicKey