mirror of
https://github.com/ACINQ/eclair.git
synced 2025-02-22 22:25:26 +01:00
Cancel previous LiftChannelExclusion
(#2510)
Keep at most one scheduled `LiftChannelExclusion` by cancelling the previous one if the new ban lasts longer
This commit is contained in:
parent
e32697e798
commit
6569eaf07d
3 changed files with 59 additions and 6 deletions
|
@ -99,7 +99,7 @@ object RouteCalculation {
|
|||
implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors
|
||||
|
||||
val extraEdges = r.extraEdges.map(GraphEdge(_)).filterNot(_.desc.a == r.source).toSet // we ignore routing hints for our own channels, we have more accurate information
|
||||
val ignoredEdges = r.ignore.channels ++ d.excludedChannels
|
||||
val ignoredEdges = r.ignore.channels ++ d.excludedChannels.keySet
|
||||
val params = r.routeParams
|
||||
val routesToFind = if (params.randomize) DEFAULT_ROUTES_COUNT else 1
|
||||
|
||||
|
|
|
@ -18,7 +18,7 @@ package fr.acinq.eclair.router
|
|||
|
||||
import akka.Done
|
||||
import akka.actor.typed.scaladsl.adapter.actorRefAdapter
|
||||
import akka.actor.{Actor, ActorLogging, ActorRef, Props, Terminated, typed}
|
||||
import akka.actor.{Actor, ActorLogging, ActorRef, Cancellable, Props, Terminated, typed}
|
||||
import akka.event.DiagnosticLoggingAdapter
|
||||
import akka.event.Logging.MDC
|
||||
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
|
||||
|
@ -109,7 +109,7 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm
|
|||
awaiting = Map.empty,
|
||||
privateChannels = Map.empty,
|
||||
scid2PrivateChannels = Map.empty,
|
||||
excludedChannels = Set.empty,
|
||||
excludedChannels = Map.empty,
|
||||
graphWithBalances = GraphWithBalanceEstimates(graph, nodeParams.routerConf.balanceEstimateHalfLife),
|
||||
sync = Map.empty)
|
||||
startWith(NORMAL, data)
|
||||
|
@ -172,13 +172,34 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm
|
|||
|
||||
case Event(ExcludeChannel(desc, duration_opt), d) =>
|
||||
log.info("excluding shortChannelId={} from nodeId={} for duration={}", desc.shortChannelId, desc.a, duration_opt.getOrElse("n/a"))
|
||||
duration_opt.foreach(banDuration => context.system.scheduler.scheduleOnce(banDuration, self, LiftChannelExclusion(desc)))
|
||||
stay() using d.copy(excludedChannels = d.excludedChannels + desc)
|
||||
val (excludedUntil, oldTimer) = d.excludedChannels.get(desc) match {
|
||||
case Some(ExcludedForever) => (TimestampSecond.max, None)
|
||||
case Some(ExcludedUntil(liftExclusionAt, timer)) => (liftExclusionAt, Some(timer))
|
||||
case None => (TimestampSecond.min, None)
|
||||
}
|
||||
val newState = duration_opt match {
|
||||
case Some(banDuration) =>
|
||||
if (TimestampSecond.now() + banDuration > excludedUntil) {
|
||||
oldTimer.foreach(_.cancel())
|
||||
val newTimer = context.system.scheduler.scheduleOnce(banDuration, self, LiftChannelExclusion(desc))
|
||||
ExcludedUntil(TimestampSecond.now() + banDuration, newTimer)
|
||||
} else {
|
||||
d.excludedChannels(desc)
|
||||
}
|
||||
case None =>
|
||||
oldTimer.foreach(_.cancel())
|
||||
ExcludedForever
|
||||
}
|
||||
stay() using d.copy(excludedChannels = d.excludedChannels + (desc -> newState))
|
||||
|
||||
case Event(LiftChannelExclusion(desc@ChannelDesc(shortChannelId, nodeId, _)), d) =>
|
||||
log.info("reinstating shortChannelId={} from nodeId={}", shortChannelId, nodeId)
|
||||
stay() using d.copy(excludedChannels = d.excludedChannels - desc)
|
||||
|
||||
case Event(GetExcludedChannels, d) =>
|
||||
sender() ! d.excludedChannels
|
||||
stay()
|
||||
|
||||
case Event(GetNodes, d) =>
|
||||
sender() ! d.nodes.values
|
||||
stay()
|
||||
|
@ -579,6 +600,12 @@ object Router {
|
|||
/** This is used when we get a TemporaryChannelFailure, to give time for the channel to recover (note that exclusions are directed) */
|
||||
case class ExcludeChannel(desc: ChannelDesc, duration_opt: Option[FiniteDuration])
|
||||
case class LiftChannelExclusion(desc: ChannelDesc)
|
||||
|
||||
sealed trait ExcludedChannelStatus
|
||||
case object ExcludedForever extends ExcludedChannelStatus
|
||||
case class ExcludedUntil(liftExclusionAt: TimestampSecond, timer: Cancellable) extends ExcludedChannelStatus
|
||||
|
||||
case object GetExcludedChannels
|
||||
// @formatter:on
|
||||
|
||||
// @formatter:off
|
||||
|
@ -643,7 +670,7 @@ object Router {
|
|||
awaiting: Map[ChannelAnnouncement, Seq[GossipOrigin]], // note: this is a seq because we want to preserve order: first actor is the one who we need to send a tcp-ack when validation is done
|
||||
privateChannels: Map[ByteVector32, PrivateChannel], // indexed by channel id
|
||||
scid2PrivateChannels: Map[Long, ByteVector32], // real scid or alias to channel_id, only to be used for private channels
|
||||
excludedChannels: Set[ChannelDesc], // those channels are temporarily excluded from route calculation, because their node returned a TemporaryChannelFailure
|
||||
excludedChannels: Map[ChannelDesc, ExcludedChannelStatus], // those channels are temporarily excluded from route calculation, because their node returned a TemporaryChannelFailure
|
||||
graphWithBalances: GraphWithBalanceEstimates,
|
||||
sync: Map[PublicKey, Syncing] // keep tracks of channel range queries sent to each peer. If there is an entry in the map, it means that there is an ongoing query for which we have not yet received an 'end' message
|
||||
) {
|
||||
|
|
|
@ -504,6 +504,32 @@ class RouterSpec extends BaseRouterSpec {
|
|||
sender.expectMsgType[RouteResponse]
|
||||
}
|
||||
|
||||
test("concurrent channel exclusions") { fixture =>
|
||||
import fixture._
|
||||
val sender = TestProbe()
|
||||
sender.send(router, RouteRequest(a, d, DEFAULT_AMOUNT_MSAT, DEFAULT_MAX_FEE, routeParams = DEFAULT_ROUTE_PARAMS))
|
||||
sender.expectMsgType[RouteResponse]
|
||||
val bc = ChannelDesc(scid_bc, b, c)
|
||||
sender.send(router, ExcludeChannel(bc, Some(1 second)))
|
||||
sender.send(router, ExcludeChannel(bc, Some(10 minute)))
|
||||
sender.send(router, ExcludeChannel(bc, Some(1 second)))
|
||||
sender.send(router, RouteRequest(a, d, DEFAULT_AMOUNT_MSAT, DEFAULT_MAX_FEE, routeParams = DEFAULT_ROUTE_PARAMS))
|
||||
sender.expectMsg(Failure(RouteNotFound))
|
||||
sender.send(router, GetExcludedChannels)
|
||||
val excludedChannels1 = sender.expectMsgType[Map[ChannelDesc, ExcludedChannelStatus]]
|
||||
assert(excludedChannels1.size == 1)
|
||||
assert(excludedChannels1(bc).isInstanceOf[ExcludedUntil])
|
||||
assert(excludedChannels1(bc).asInstanceOf[ExcludedUntil].liftExclusionAt > TimestampSecond.now() + 9.minute)
|
||||
sender.send(router, LiftChannelExclusion(bc))
|
||||
sender.send(router, RouteRequest(a, d, DEFAULT_AMOUNT_MSAT, DEFAULT_MAX_FEE, routeParams = DEFAULT_ROUTE_PARAMS))
|
||||
sender.expectMsgType[RouteResponse]
|
||||
sender.send(router, ExcludeChannel(bc, None))
|
||||
sender.send(router, GetExcludedChannels)
|
||||
val excludedChannels2 = sender.expectMsgType[Map[ChannelDesc, ExcludedChannelStatus]]
|
||||
assert(excludedChannels2.size == 1)
|
||||
assert(excludedChannels2(bc) == ExcludedForever)
|
||||
}
|
||||
|
||||
test("send routing state") { fixture =>
|
||||
import fixture._
|
||||
val sender = TestProbe()
|
||||
|
|
Loading…
Add table
Reference in a new issue