mirror of
https://github.com/ACINQ/eclair.git
synced 2025-02-22 06:21:42 +01:00
Add random delay to rebroadcast (#925)
* add random delay to rebroadcast * ignore `BadMessage` while disconnected * ignore `DelayedRebroadcast` while disconnected
This commit is contained in:
parent
9c37448ebf
commit
933913de08
1 changed files with 17 additions and 3 deletions
|
@ -27,6 +27,7 @@ import fr.acinq.bitcoin.{ByteVector32, DeterministicWallet, MilliSatoshi, Protoc
|
|||
import fr.acinq.eclair.blockchain.EclairWallet
|
||||
import fr.acinq.eclair.channel._
|
||||
import fr.acinq.eclair.crypto.TransportHandler
|
||||
import fr.acinq.eclair.secureRandom
|
||||
import fr.acinq.eclair.router._
|
||||
import fr.acinq.eclair.wire._
|
||||
import fr.acinq.eclair.{wire, _}
|
||||
|
@ -142,7 +143,10 @@ class Peer(nodeParams: NodeParams, remoteNodeId: PublicKey, authenticator: Actor
|
|||
|
||||
// let's bring existing/requested channels online
|
||||
d.channels.values.toSet[ActorRef].foreach(_ ! INPUT_RECONNECTED(d.transport, d.localInit, remoteInit)) // we deduplicate with toSet because there might be two entries per channel (tmp id and final id)
|
||||
goto(CONNECTED) using ConnectedData(d.address_opt, d.transport, d.localInit, remoteInit, d.channels.map { case (k: ChannelId, v) => (k, v) }) forMax (30 seconds) // forMax will trigger a StateTimeout
|
||||
// we will delay all rebroadcasts with this value in order to prevent herd effects (each peer has a different delay)
|
||||
val rebroadcastDelay = Random.nextInt(nodeParams.routerConf.routerBroadcastInterval.toSeconds.toInt).seconds
|
||||
log.info(s"rebroadcast will be delayed by $rebroadcastDelay")
|
||||
goto(CONNECTED) using ConnectedData(d.address_opt, d.transport, d.localInit, remoteInit, d.channels.map { case (k: ChannelId, v) => (k, v) }, rebroadcastDelay) forMax (30 seconds) // forMax will trigger a StateTimeout
|
||||
} else {
|
||||
log.warning(s"incompatible features, disconnecting")
|
||||
d.origin_opt.foreach(origin => origin ! Status.Failure(new RuntimeException("incompatible features")))
|
||||
|
@ -232,7 +236,7 @@ class Peer(nodeParams: NodeParams, remoteNodeId: PublicKey, authenticator: Actor
|
|||
log.debug(s"received pong with latency=$latency")
|
||||
cancelTimer(PingTimeout.toString())
|
||||
// pings are sent periodically with some randomization
|
||||
val nextDelay = nodeParams.pingInterval + secureRandom.nextInt(10).seconds
|
||||
val nextDelay = nodeParams.pingInterval + Random.nextInt(10).seconds
|
||||
setTimer(SendPing.toString, SendPing, nextDelay, repeat = false)
|
||||
case None =>
|
||||
log.debug(s"received unexpected pong with size=${data.length}")
|
||||
|
@ -317,6 +321,10 @@ class Peer(nodeParams: NodeParams, remoteNodeId: PublicKey, authenticator: Actor
|
|||
stay
|
||||
|
||||
case Event(rebroadcast: Rebroadcast, d: ConnectedData) =>
|
||||
context.system.scheduler.scheduleOnce(d.rebroadcastDelay, self, DelayedRebroadcast(rebroadcast))(context.dispatcher)
|
||||
stay
|
||||
|
||||
case Event(DelayedRebroadcast(rebroadcast), d: ConnectedData) =>
|
||||
|
||||
/**
|
||||
* Send and count in a single iteration
|
||||
|
@ -455,6 +463,8 @@ class Peer(nodeParams: NodeParams, remoteNodeId: PublicKey, authenticator: Actor
|
|||
|
||||
case Event(_: Rebroadcast, _) => stay // ignored
|
||||
|
||||
case Event(_: DelayedRebroadcast, _) => stay // ignored
|
||||
|
||||
case Event(_: RoutingState, _) => stay // ignored
|
||||
|
||||
case Event(_: TransportHandler.ReadAck, _) => stay // ignored
|
||||
|
@ -466,6 +476,8 @@ class Peer(nodeParams: NodeParams, remoteNodeId: PublicKey, authenticator: Actor
|
|||
case Event(_: Pong, _) => stay // we got disconnected before receiving the pong
|
||||
|
||||
case Event(_: PingTimeout, _) => stay // we got disconnected after sending a ping
|
||||
|
||||
case Event(_: BadMessage, _) => stay // we got disconnected while syncing
|
||||
}
|
||||
|
||||
onTransition {
|
||||
|
@ -530,7 +542,7 @@ object Peer {
|
|||
case class Nothing() extends Data { override def address_opt = None; override def channels = Map.empty }
|
||||
case class DisconnectedData(address_opt: Option[InetSocketAddress], channels: Map[FinalChannelId, ActorRef], attempts: Int = 0) extends Data
|
||||
case class InitializingData(address_opt: Option[InetSocketAddress], transport: ActorRef, channels: Map[FinalChannelId, ActorRef], origin_opt: Option[ActorRef], localInit: wire.Init) extends Data
|
||||
case class ConnectedData(address_opt: Option[InetSocketAddress], transport: ActorRef, localInit: wire.Init, remoteInit: wire.Init, channels: Map[ChannelId, ActorRef], gossipTimestampFilter: Option[GossipTimestampFilter] = None, behavior: Behavior = Behavior(), expectedPong_opt: Option[ExpectedPong] = None) extends Data
|
||||
case class ConnectedData(address_opt: Option[InetSocketAddress], transport: ActorRef, localInit: wire.Init, remoteInit: wire.Init, channels: Map[ChannelId, ActorRef], rebroadcastDelay: FiniteDuration, gossipTimestampFilter: Option[GossipTimestampFilter] = None, behavior: Behavior = Behavior(), expectedPong_opt: Option[ExpectedPong] = None) extends Data
|
||||
case class ExpectedPong(ping: Ping, timestamp: Long = Platform.currentTime)
|
||||
case class PingTimeout(ping: Ping)
|
||||
|
||||
|
@ -558,6 +570,8 @@ object Peer {
|
|||
|
||||
case class PeerRoutingMessage(transport: ActorRef, remoteNodeId: PublicKey, message: RoutingMessage)
|
||||
|
||||
case class DelayedRebroadcast(rebroadcast: Rebroadcast)
|
||||
|
||||
sealed trait BadMessage
|
||||
case class InvalidSignature(r: RoutingMessage) extends BadMessage
|
||||
case class InvalidAnnouncement(c: ChannelAnnouncement) extends BadMessage
|
||||
|
|
Loading…
Add table
Reference in a new issue