mirror of
https://github.com/ACINQ/eclair.git
synced 2025-03-13 19:37:35 +01:00
Router: prune channel that our peer does not have (#647)
* router: prune channel that our peer does not have when we receive a reply to a channel query, we check whether channels that we have but they don't can are stale and can be pruned. * router: cap the number of channels that we would prune We limit the number of channels that we will prune to avoid freezing the app. * pruning: simplify our process and address change requests we restore the computation of missing channels that we had before, remove new `isStale` and `isInRange` method and simplify short channel ids range check. * pruning: improve test, don't prune nodes (they are not handled on android)
This commit is contained in:
parent
4deb67600a
commit
6d5ed22c4d
4 changed files with 163 additions and 25 deletions
|
@ -28,7 +28,8 @@ case class ShortChannelId(private val id: Long) extends Ordered[ShortChannelId]
|
|||
|
||||
override def toString: String = id.toHexString
|
||||
|
||||
override def compare(that: ShortChannelId): Int = this.id.compareTo(that.id)
|
||||
// we use an unsigned long comparison here
|
||||
override def compare(that: ShortChannelId): Int = (this.id + Long.MinValue).compareTo(that.id + Long.MinValue)
|
||||
}
|
||||
|
||||
object ShortChannelId {
|
||||
|
|
|
@ -199,8 +199,6 @@ class Router(nodeParams: NodeParams, watcher: ActorRef) extends FSMDiagnosticAct
|
|||
// finally we remove nodes that aren't tied to any channels anymore (and deduplicate them)
|
||||
val potentialStaleNodes = staleChannels.map(d.channels).flatMap(c => Set(c.nodeId1, c.nodeId2)).toSet
|
||||
val channels1 = d.channels -- staleChannels
|
||||
// no need to iterate on all nodes, just on those that are affected by current pruning
|
||||
val staleNodes = potentialStaleNodes.filterNot(nodeId => hasChannels(nodeId, channels1.values))
|
||||
|
||||
// let's clean the db and send the events
|
||||
staleChannels.foreach { shortChannelId =>
|
||||
|
@ -213,13 +211,7 @@ class Router(nodeParams: NodeParams, watcher: ActorRef) extends FSMDiagnosticAct
|
|||
removeEdge(d.graph, ChannelDesc(c.shortChannelId, c.nodeId1, c.nodeId2))
|
||||
removeEdge(d.graph, ChannelDesc(c.shortChannelId, c.nodeId2, c.nodeId1))
|
||||
}
|
||||
staleNodes.foreach {
|
||||
case nodeId =>
|
||||
log.info("pruning nodeId={} (stale)", nodeId)
|
||||
db.removeNode(nodeId)
|
||||
context.system.eventStream.publish(NodeLost(nodeId))
|
||||
}
|
||||
stay using d.copy(nodes = d.nodes -- staleNodes, channels = channels1, updates = d.updates -- staleUpdates)
|
||||
stay using d.copy(channels = channels1, updates = d.updates -- staleUpdates)
|
||||
|
||||
case Event(ExcludeChannel(desc@ChannelDesc(shortChannelId, nodeId, _)), d) =>
|
||||
val banDuration = nodeParams.channelExcludeDuration
|
||||
|
@ -338,12 +330,43 @@ class Router(nodeParams: NodeParams, watcher: ActorRef) extends FSMDiagnosticAct
|
|||
case Event(PeerRoutingMessage(_, routingMessage@ReplyChannelRange(chainHash, firstBlockNum, numberOfBlocks, _, data)), d) =>
|
||||
sender ! TransportHandler.ReadAck(routingMessage)
|
||||
val (format, theirShortChannelIds, useGzip) = ChannelRangeQueries.decodeShortChannelIds(data)
|
||||
// keep our channel ids that are in [firstBlockNum, firstBlockNum + numberOfBlocks]
|
||||
val ourShortChannelIds: SortedSet[ShortChannelId] = d.channels.keySet.filter(keep(firstBlockNum, numberOfBlocks, _, d.channels, d.updates))
|
||||
val missing: SortedSet[ShortChannelId] = theirShortChannelIds -- ourShortChannelIds
|
||||
log.info("received reply_channel_range, we're missing {} channel announcements/updates, format={} useGzip={}", missing.size, format, useGzip)
|
||||
val blocks = ChannelRangeQueries.encodeShortChannelIds(firstBlockNum, numberOfBlocks, missing, format, useGzip)
|
||||
blocks.foreach(block => sender ! QueryShortChannelIds(chainHash, block.shortChannelIds))
|
||||
stay
|
||||
|
||||
// we have channel announcement that they don't have: check if we can prune them
|
||||
val pruningCandidates = {
|
||||
val first = ShortChannelId(firstBlockNum.toInt, 0, 0)
|
||||
val last = ShortChannelId((firstBlockNum + numberOfBlocks).toInt, 0xFFFFFFFF, 0xFFFF)
|
||||
// channel ids are sorted so we can simplify our range check
|
||||
val shortChannelIds = d.channels.keySet.dropWhile(_ < first).takeWhile(_ <= last) -- theirShortChannelIds
|
||||
log.info("we have {} channel that they do not have", shortChannelIds.size)
|
||||
d.channels.filterKeys(id => shortChannelIds.contains(id))
|
||||
}
|
||||
|
||||
// we limit the maximum number of channels that we will prune in one go to avoid "freezing" the app
|
||||
// we first check which candidates are stale, then cap the result. We could also cap the candidate list first, there
|
||||
// would be less calls to getStaleChannels but it would be less efficient from a "pruning" p.o.v
|
||||
val staleChannels = getStaleChannels(pruningCandidates.values, d.updates).take(MAX_PRUNE_COUNT)
|
||||
// then we clean up the related channel updates
|
||||
val staleUpdates = staleChannels.map(d.channels).flatMap(c => Seq(ChannelDesc(c.shortChannelId, c.nodeId1, c.nodeId2), ChannelDesc(c.shortChannelId, c.nodeId2, c.nodeId1)))
|
||||
val channels1 = d.channels -- staleChannels
|
||||
|
||||
// let's clean the db and send the events
|
||||
staleChannels.foreach { shortChannelId =>
|
||||
log.info("pruning shortChannelId={} (stale)", shortChannelId)
|
||||
db.removeChannel(shortChannelId) // NB: this also removes channel updates
|
||||
context.system.eventStream.publish(ChannelLost(shortChannelId))
|
||||
}
|
||||
// we also need to remove updates from the graph
|
||||
staleChannels.map(d.channels).foreach { c =>
|
||||
removeEdge(d.graph, ChannelDesc(c.shortChannelId, c.nodeId1, c.nodeId2))
|
||||
removeEdge(d.graph, ChannelDesc(c.shortChannelId, c.nodeId2, c.nodeId1))
|
||||
}
|
||||
stay using d.copy(channels = channels1, updates = d.updates -- staleUpdates)
|
||||
|
||||
case Event(PeerRoutingMessage(_, routingMessage@QueryShortChannelIds(chainHash, data)), d) =>
|
||||
sender ! TransportHandler.ReadAck(routingMessage)
|
||||
|
@ -514,6 +537,9 @@ object Router {
|
|||
u.timestamp < staleThresholdSeconds
|
||||
}
|
||||
|
||||
// maximum number of stale channels that we will prune on startup
|
||||
val MAX_PRUNE_COUNT = 200
|
||||
|
||||
/**
|
||||
* Is stale a channel that:
|
||||
* (1) is older than 2 weeks (2*7*144 = 2016 blocks)
|
||||
|
@ -553,6 +579,7 @@ object Router {
|
|||
height >= firstBlockNum && height <= (firstBlockNum + numberOfBlocks) && !isStale(c, u1, u2)
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Filters announcements that we want to send to nodes asking an `initial_routing_sync`
|
||||
*
|
||||
|
|
|
@ -0,0 +1,110 @@
|
|||
/*
|
||||
* Copyright 2018 ACINQ SAS
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package fr.acinq.eclair.router
|
||||
|
||||
import akka.actor.{Actor, ActorRef, Props}
|
||||
import akka.testkit.TestProbe
|
||||
import fr.acinq.bitcoin.Crypto.PrivateKey
|
||||
import fr.acinq.bitcoin.{BinaryData, Satoshi}
|
||||
import fr.acinq.eclair.TestConstants.Alice
|
||||
import fr.acinq.eclair.crypto.TransportHandler
|
||||
import fr.acinq.eclair.io.Peer.PeerRoutingMessage
|
||||
import fr.acinq.eclair.router.RoutingSyncSpec.{FakeWatcher, makeFakeRoutingInfo}
|
||||
import fr.acinq.eclair.wire._
|
||||
import fr.acinq.eclair.{ShortChannelId, TestkitBaseClass}
|
||||
import org.junit.runner.RunWith
|
||||
import org.scalatest.junit.JUnitRunner
|
||||
import org.scalatest.{BeforeAndAfterAll, Outcome}
|
||||
|
||||
import scala.collection.{SortedSet, immutable}
|
||||
import scala.concurrent.duration._
|
||||
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
class PruningSpec extends TestkitBaseClass with BeforeAndAfterAll {
|
||||
|
||||
val txid = BinaryData("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
|
||||
val remoteNodeId = PrivateKey(BinaryData("01" * 32), true).publicKey
|
||||
|
||||
val startHeight = 400000 - 25 * 2016
|
||||
val shortChannelIds: immutable.SortedSet[ShortChannelId] = (for {
|
||||
block <- startHeight to startHeight + 50 * 50 by 50
|
||||
txindex <- 0 to 3
|
||||
outputIndex <- 0 to 1
|
||||
} yield ShortChannelId(block, txindex, outputIndex)).foldLeft(SortedSet.empty[ShortChannelId])(_ + _)
|
||||
|
||||
val fakeRoutingInfo = shortChannelIds.map(makeFakeRoutingInfo)
|
||||
|
||||
override type FixtureParam = ActorRef
|
||||
|
||||
override protected def withFixture(test: OneArgTest): Outcome = {
|
||||
val watcherA = system.actorOf(Props(new FakeWatcher()))
|
||||
val paramsA = Alice.nodeParams
|
||||
val routingInfoA = fakeRoutingInfo
|
||||
routingInfoA.map {
|
||||
case (a, u1, u2, n1, n2) =>
|
||||
paramsA.networkDb.addChannel(a, txid, Satoshi(100000))
|
||||
paramsA.networkDb.addChannelUpdate(u1)
|
||||
paramsA.networkDb.addChannelUpdate(u2)
|
||||
paramsA.networkDb.addNode(n1)
|
||||
paramsA.networkDb.addNode(n2)
|
||||
}
|
||||
val probe = TestProbe()
|
||||
val switchboard = system.actorOf(Props(new Actor {
|
||||
override def receive: Receive = {
|
||||
case msg => probe.ref forward msg
|
||||
}
|
||||
}), "switchboard")
|
||||
|
||||
val routerA = system.actorOf(Props(new Router(paramsA, watcherA)), "routerA")
|
||||
|
||||
val sender = TestProbe()
|
||||
awaitCond({
|
||||
sender.send(routerA, 'channels)
|
||||
val channelsA = sender.expectMsgType[Iterable[ChannelAnnouncement]]
|
||||
channelsA.size == routingInfoA.size
|
||||
}, max = 30 seconds)
|
||||
|
||||
test(routerA)
|
||||
}
|
||||
|
||||
test("prune stale channel") {
|
||||
router => {
|
||||
val probe = TestProbe()
|
||||
probe.ignoreMsg { case TransportHandler.ReadAck(_) => true }
|
||||
val remoteNodeId = PrivateKey("01" * 32, true).publicKey
|
||||
|
||||
// tell router to ask for our channel ids
|
||||
probe.send(router, SendChannelQuery(remoteNodeId, probe.ref))
|
||||
val QueryChannelRange(chainHash, firstBlockNum, numberOfBlocks) = probe.expectMsgType[QueryChannelRange]
|
||||
probe.expectMsgType[GossipTimestampFilter]
|
||||
|
||||
// we don't send the first 10 channels, which are stale
|
||||
val shortChannelIds1 = shortChannelIds.drop(10)
|
||||
val reply = ReplyChannelRange(chainHash, firstBlockNum, numberOfBlocks, 1.toByte, ChannelRangeQueries.encodeShortChannelIdsSingle(shortChannelIds1, ChannelRangeQueries.ZLIB_FORMAT, false))
|
||||
probe.send(router, PeerRoutingMessage(remoteNodeId, reply))
|
||||
probe.expectMsgType[QueryShortChannelIds]
|
||||
|
||||
// router should see that it has 10 channels that we don't have, check if they're stale, and prune them
|
||||
awaitCond({
|
||||
probe.send(router, 'channels)
|
||||
val channels = probe.expectMsgType[Iterable[ChannelAnnouncement]]
|
||||
val ourIds = channels.map(_.shortChannelId).toSet
|
||||
ourIds == shortChannelIds1
|
||||
}, max = 30 seconds)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -43,24 +43,13 @@ class RoutingSyncSpec extends TestkitBaseClass {
|
|||
|
||||
type FixtureParam = Tuple3[ActorRef, ActorRef, ActorRef]
|
||||
|
||||
val shortChannelIds = ChannelRangeQueriesSpec.shortChannelIds.take(500)
|
||||
val shortChannelIds = ChannelRangeQueriesSpec.shortChannelIds.take(100)
|
||||
|
||||
val fakeRoutingInfo = shortChannelIds.map(makeFakeRoutingInfo)
|
||||
// A will be missing the last 1000 items
|
||||
val routingInfoA = fakeRoutingInfo.dropRight(100)
|
||||
val routingInfoA = fakeRoutingInfo.dropRight(10)
|
||||
// and B will be missing the first 1000 items
|
||||
val routingInfoB = fakeRoutingInfo.drop(100)
|
||||
|
||||
class FakeWatcher extends Actor {
|
||||
def receive = {
|
||||
case _: WatchSpentBasic => ()
|
||||
case ValidateRequest(ann) =>
|
||||
val txOut = TxOut(Satoshi(1000000), Script.pay2wsh(Scripts.multiSig2of2(ann.bitcoinKey1, ann.bitcoinKey2)))
|
||||
val TxCoordinates(_, _, outputIndex) = ShortChannelId.coordinates(ann.shortChannelId)
|
||||
sender ! ValidateResult(ann, Some(Transaction(version = 0, txIn = Nil, txOut = List.fill(outputIndex + 1)(txOut), lockTime = 0)), true, None)
|
||||
case unexpected => println(s"unexpected : $unexpected")
|
||||
}
|
||||
}
|
||||
val routingInfoB = fakeRoutingInfo.drop(10)
|
||||
|
||||
override def withFixture(test: OneArgTest) = {
|
||||
val watcherA = system.actorOf(Props(new FakeWatcher()))
|
||||
|
@ -127,6 +116,17 @@ class RoutingSyncSpec extends TestkitBaseClass {
|
|||
}
|
||||
|
||||
object RoutingSyncSpec {
|
||||
class FakeWatcher extends Actor {
|
||||
def receive = {
|
||||
case _: WatchSpentBasic => ()
|
||||
case ValidateRequest(ann) =>
|
||||
val txOut = TxOut(Satoshi(1000000), Script.pay2wsh(Scripts.multiSig2of2(ann.bitcoinKey1, ann.bitcoinKey2)))
|
||||
val TxCoordinates(_, _, outputIndex) = ShortChannelId.coordinates(ann.shortChannelId)
|
||||
sender ! ValidateResult(ann, Some(Transaction(version = 0, txIn = Nil, txOut = List.fill(outputIndex + 1)(txOut), lockTime = 0)), true, None)
|
||||
case unexpected => println(s"unexpected : $unexpected")
|
||||
}
|
||||
}
|
||||
|
||||
def makeFakeRoutingInfo(shortChannelId: ShortChannelId): (ChannelAnnouncement, ChannelUpdate, ChannelUpdate, NodeAnnouncement, NodeAnnouncement) = {
|
||||
val (priv_a, priv_b, priv_funding_a, priv_funding_b) = (randomKey, randomKey, randomKey, randomKey)
|
||||
val channelAnn_ab = channelAnnouncement(shortChannelId, priv_a, priv_b, priv_funding_a, priv_funding_b)
|
||||
|
|
Loading…
Add table
Reference in a new issue