mirror of
https://github.com/ACINQ/eclair.git
synced 2024-11-20 02:27:32 +01:00
Connect immediately on restart, then wait (#1040)
* connect immediately on restart, then wait This is to allow herd effect when we restart the app and have numerous peers. Also removed the unnecessary transition and cleaned up delay computation. * always reconnect immediately when disconnected Whether we go to this state from startup, or after getting disconnected. It makes the transition logic simpler, and the potential herd effect at startup is inevitable anyway since our peers will try to reconnect too. * add randomization when reconnecting * randomize delay for first reconnection attempt after startup * make some parameters configurable
This commit is contained in:
parent
f724efaa76
commit
32145e8d6a
@ -86,6 +86,8 @@ eclair {
|
||||
ping-timeout = 10 seconds // will disconnect if peer takes longer than that to respond
|
||||
ping-disconnect = true // disconnect if no answer to our pings
|
||||
auto-reconnect = true
|
||||
initial-random-reconnect-delay = 5 seconds // we add a random delay before the first reconnection attempt, capped by this value
|
||||
max-reconnect-interval = 1 hour // max interval between two reconnection attempts, after the exponential backoff period
|
||||
|
||||
payment-handler = "local"
|
||||
payment-request-expiry = 1 hour // default expiry for payment requests generated by this node
|
||||
|
@ -19,7 +19,6 @@ package fr.acinq.eclair
|
||||
import java.io.File
|
||||
import java.net.InetSocketAddress
|
||||
import java.nio.file.Files
|
||||
import java.sql.DriverManager
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
import com.typesafe.config.{Config, ConfigFactory}
|
||||
@ -29,7 +28,6 @@ import fr.acinq.eclair.NodeParams.WatcherType
|
||||
import fr.acinq.eclair.channel.Channel
|
||||
import fr.acinq.eclair.crypto.KeyManager
|
||||
import fr.acinq.eclair.db._
|
||||
import fr.acinq.eclair.db.sqlite._
|
||||
import fr.acinq.eclair.router.RouterConf
|
||||
import fr.acinq.eclair.tor.Socks5ProxyParams
|
||||
import fr.acinq.eclair.wire.{Color, NodeAddress}
|
||||
@ -69,6 +67,8 @@ case class NodeParams(keyManager: KeyManager,
|
||||
maxFeerateMismatch: Double,
|
||||
updateFeeMinDiffRatio: Double,
|
||||
autoReconnect: Boolean,
|
||||
initialRandomReconnectDelay: FiniteDuration,
|
||||
maxReconnectInterval: FiniteDuration,
|
||||
chainHash: ByteVector32,
|
||||
channelFlags: Byte,
|
||||
watcherType: WatcherType,
|
||||
@ -205,6 +205,8 @@ object NodeParams {
|
||||
maxFeerateMismatch = config.getDouble("max-feerate-mismatch"),
|
||||
updateFeeMinDiffRatio = config.getDouble("update-fee_min-diff-ratio"),
|
||||
autoReconnect = config.getBoolean("auto-reconnect"),
|
||||
initialRandomReconnectDelay = FiniteDuration(config.getDuration("initial-random-reconnect-delay").getSeconds, TimeUnit.SECONDS),
|
||||
maxReconnectInterval = FiniteDuration(config.getDuration("max-reconnect-interval").getSeconds, TimeUnit.SECONDS),
|
||||
chainHash = chainHash,
|
||||
channelFlags = config.getInt("channel-flags").toByte,
|
||||
watcherType = watcherType,
|
||||
|
@ -1813,7 +1813,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
|
||||
d.commitments.remoteNextCommitInfo match {
|
||||
case Left(waitingForRevocation) if revocationTimeout.remoteCommitNumber + 1 == waitingForRevocation.nextRemoteCommit.index =>
|
||||
log.warning(s"waited for too long for a revocation to remoteCommitNumber=${revocationTimeout.remoteCommitNumber}, disconnecting")
|
||||
revocationTimeout.peer ! Peer.Disconnect
|
||||
revocationTimeout.peer ! Peer.Disconnect(remoteNodeId)
|
||||
case _ => ()
|
||||
}
|
||||
stay
|
||||
|
@ -34,6 +34,7 @@ import fr.acinq.eclair.wire._
|
||||
import fr.acinq.eclair.{secureRandom, wire, _}
|
||||
import scodec.Attempt
|
||||
import scodec.bits.ByteVector
|
||||
|
||||
import scala.compat.Platform
|
||||
import scala.concurrent.duration._
|
||||
import scala.util.Random
|
||||
@ -41,7 +42,7 @@ import scala.util.Random
|
||||
/**
|
||||
* Created by PM on 26/08/2016.
|
||||
*/
|
||||
class Peer(nodeParams: NodeParams, remoteNodeId: PublicKey, authenticator: ActorRef, watcher: ActorRef, router: ActorRef, relayer: ActorRef, wallet: EclairWallet) extends FSMDiagnosticActorLogging[Peer.State, Peer.Data] {
|
||||
class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, authenticator: ActorRef, watcher: ActorRef, router: ActorRef, relayer: ActorRef, wallet: EclairWallet) extends FSMDiagnosticActorLogging[Peer.State, Peer.Data] {
|
||||
|
||||
import Peer._
|
||||
|
||||
@ -54,7 +55,13 @@ class Peer(nodeParams: NodeParams, remoteNodeId: PublicKey, authenticator: Actor
|
||||
channel ! INPUT_RESTORED(state)
|
||||
FinalChannelId(state.channelId) -> channel
|
||||
}.toMap
|
||||
goto(DISCONNECTED) using DisconnectedData(previousKnownAddress, channels)
|
||||
// When restarting, we will immediately reconnect, but then:
|
||||
// - we don't want all the subsequent reconnection attempts to be synchronized (herd effect)
|
||||
// - we don't want to go through the exponential backoff delay, because we were offline, not them, so there is no
|
||||
// reason to eagerly retry
|
||||
// That's why we set the next reconnection delay to a random value between MAX_RECONNECT_INTERVAL/2 and MAX_RECONNECT_INTERVAL.
|
||||
val firstNextReconnectionDelay = nodeParams.maxReconnectInterval.minus(Random.nextInt(nodeParams.maxReconnectInterval.toSeconds.toInt / 2).seconds)
|
||||
goto(DISCONNECTED) using DisconnectedData(previousKnownAddress, channels, firstNextReconnectionDelay) // when we restart, we will attempt to reconnect right away, but then we'll wait
|
||||
}
|
||||
|
||||
when(DISCONNECTED) {
|
||||
@ -80,13 +87,12 @@ class Peer(nodeParams: NodeParams, remoteNodeId: PublicKey, authenticator: Actor
|
||||
case Event(Reconnect, d: DisconnectedData) =>
|
||||
d.address_opt.orElse(getPeerAddressFromNodeAnnouncement) match {
|
||||
case _ if d.channels.isEmpty => stay // no-op, no more channels with this peer
|
||||
case None => stay // no-op, we don't know any address to this peer and we won't try reconnecting again
|
||||
case None => stay // no-op, we don't know any address to this peer and we won't try reconnecting again
|
||||
case Some(address) =>
|
||||
context.actorOf(Client.props(nodeParams, authenticator, address, remoteNodeId, origin_opt = None))
|
||||
log.info(s"reconnecting to $address")
|
||||
// exponential backoff retry with a finite max
|
||||
setTimer(RECONNECT_TIMER, Reconnect, Math.min(10 + Math.pow(2, d.attempts), 3600) seconds, repeat = false)
|
||||
stay using d.copy(attempts = d.attempts + 1)
|
||||
log.info(s"reconnecting to $address (next reconnection in ${d.nextReconnectionDelay.toSeconds} seconds)")
|
||||
setTimer(RECONNECT_TIMER, Reconnect, d.nextReconnectionDelay, repeat = false)
|
||||
stay using d.copy(nextReconnectionDelay = nextReconnectionDelay(d.nextReconnectionDelay, nodeParams.maxReconnectInterval))
|
||||
}
|
||||
|
||||
case Event(Authenticator.Authenticated(_, transport, remoteNodeId1, address, outgoing, origin_opt), d: DisconnectedData) =>
|
||||
@ -503,9 +509,17 @@ class Peer(nodeParams: NodeParams, remoteNodeId: PublicKey, authenticator: Actor
|
||||
case Event(_: BadMessage, _) => stay // we got disconnected while syncing
|
||||
}
|
||||
|
||||
/**
|
||||
* The transition INSTANTIATING -> DISCONNECTED happens in 2 scenarios
|
||||
* - Manual connection to a new peer: then when(DISCONNECTED) we expect a Peer.Connect from the switchboard
|
||||
* - Eclair restart: The switchboard creates the peers and sends Init and then Peer.Reconnect to trigger reconnection attempts
|
||||
*
|
||||
* So when we see this transition we NO-OP because we don't want to start a Reconnect timer but the peer will receive the trigger
|
||||
* (Connect/Reconnect) messages from the switchboard.
|
||||
*/
|
||||
onTransition {
|
||||
case INSTANTIATING -> DISCONNECTED if nodeParams.autoReconnect && nextStateData.address_opt.isDefined => self ! Reconnect // we reconnect right away if we just started the peer
|
||||
case _ -> DISCONNECTED if nodeParams.autoReconnect => setTimer(RECONNECT_TIMER, Reconnect, 1 second, repeat = false)
|
||||
case INSTANTIATING -> DISCONNECTED => ()
|
||||
case _ -> DISCONNECTED if nodeParams.autoReconnect => setTimer(RECONNECT_TIMER, Reconnect, Random.nextInt(nodeParams.initialRandomReconnectDelay.toMillis.toInt).millis, repeat = false) // we add some randomization to not have peers reconnect to each other exactly at the same time
|
||||
case DISCONNECTED -> _ if nodeParams.autoReconnect => cancelTimer(RECONNECT_TIMER)
|
||||
}
|
||||
|
||||
@ -568,7 +582,7 @@ object Peer {
|
||||
def channels: Map[_ <: ChannelId, ActorRef] // will be overridden by Map[FinalChannelId, ActorRef] or Map[ChannelId, ActorRef]
|
||||
}
|
||||
case class Nothing() extends Data { override def address_opt = None; override def channels = Map.empty }
|
||||
case class DisconnectedData(address_opt: Option[InetSocketAddress], channels: Map[FinalChannelId, ActorRef], attempts: Int = 0) extends Data
|
||||
case class DisconnectedData(address_opt: Option[InetSocketAddress], channels: Map[FinalChannelId, ActorRef], nextReconnectionDelay: FiniteDuration = 10 seconds) extends Data
|
||||
case class InitializingData(address_opt: Option[InetSocketAddress], transport: ActorRef, channels: Map[FinalChannelId, ActorRef], origin_opt: Option[ActorRef], localInit: wire.Init) extends Data
|
||||
case class ConnectedData(address_opt: Option[InetSocketAddress], transport: ActorRef, localInit: wire.Init, remoteInit: wire.Init, channels: Map[ChannelId, ActorRef], rebroadcastDelay: FiniteDuration, gossipTimestampFilter: Option[GossipTimestampFilter] = None, behavior: Behavior = Behavior(), expectedPong_opt: Option[ExpectedPong] = None) extends Data
|
||||
case class ExpectedPong(ping: Ping, timestamp: Long = Platform.currentTime)
|
||||
@ -656,4 +670,9 @@ object Peer {
|
||||
}
|
||||
|
||||
def hostAndPort2InetSocketAddress(hostAndPort: HostAndPort): InetSocketAddress = new InetSocketAddress(hostAndPort.getHost, hostAndPort.getPort)
|
||||
|
||||
/**
|
||||
* Exponential backoff retry with a finite max
|
||||
*/
|
||||
def nextReconnectionDelay(currentDelay: FiniteDuration, maxReconnectInterval: FiniteDuration): FiniteDuration = (2 * currentDelay).min(maxReconnectInterval)
|
||||
}
|
||||
|
@ -69,17 +69,14 @@ class Switchboard(nodeParams: NodeParams, authenticator: ActorRef, watcher: Acto
|
||||
channels
|
||||
.groupBy(_.commitments.remoteParams.nodeId)
|
||||
.map {
|
||||
case (remoteNodeId, states) =>
|
||||
val address_opt = peers.get(remoteNodeId).orElse {
|
||||
nodeParams.db.network.getNode(remoteNodeId).flatMap(_.addresses.headOption) // gets the first of the list! TODO improve selection?
|
||||
}
|
||||
(remoteNodeId, states, address_opt)
|
||||
case (remoteNodeId, states) => (remoteNodeId, states, peers.get(remoteNodeId))
|
||||
}
|
||||
.foreach {
|
||||
case (remoteNodeId, states, nodeaddress_opt) =>
|
||||
// we might not have an address if we didn't initiate the connection in the first place
|
||||
val address_opt = nodeaddress_opt.map(_.socketAddress)
|
||||
createOrGetPeer(remoteNodeId, previousKnownAddress = address_opt, offlineChannels = states.toSet)
|
||||
val peer = createOrGetPeer(remoteNodeId, previousKnownAddress = address_opt, offlineChannels = states.toSet)
|
||||
peer ! Peer.Reconnect
|
||||
}
|
||||
}
|
||||
|
||||
@ -116,8 +113,6 @@ class Switchboard(nodeParams: NodeParams, authenticator: ActorRef, watcher: Acto
|
||||
|
||||
}
|
||||
|
||||
def peerActorName(remoteNodeId: PublicKey): String = s"peer-$remoteNodeId"
|
||||
|
||||
/**
|
||||
* Retrieves a peer based on its public key.
|
||||
*
|
||||
@ -159,6 +154,8 @@ object Switchboard extends Logging {
|
||||
|
||||
def props(nodeParams: NodeParams, authenticator: ActorRef, watcher: ActorRef, router: ActorRef, relayer: ActorRef, wallet: EclairWallet) = Props(new Switchboard(nodeParams, authenticator, watcher, router, relayer, wallet))
|
||||
|
||||
def peerActorName(remoteNodeId: PublicKey): String = s"peer-$remoteNodeId"
|
||||
|
||||
/**
|
||||
* If we have stopped eclair while it was forwarding HTLCs, it is possible that we are in a state were an incoming HTLC
|
||||
* was committed by both sides, but we didn't have time to send and/or sign the corresponding HTLC to the downstream node.
|
||||
|
@ -77,6 +77,8 @@ object TestConstants {
|
||||
maxFeerateMismatch = 1.5,
|
||||
updateFeeMinDiffRatio = 0.1,
|
||||
autoReconnect = false,
|
||||
initialRandomReconnectDelay = 5 seconds,
|
||||
maxReconnectInterval = 1 hour,
|
||||
chainHash = Block.RegtestGenesisBlock.hash,
|
||||
channelFlags = 1,
|
||||
watcherType = BITCOIND,
|
||||
@ -141,6 +143,8 @@ object TestConstants {
|
||||
maxFeerateMismatch = 1.0,
|
||||
updateFeeMinDiffRatio = 0.1,
|
||||
autoReconnect = false,
|
||||
initialRandomReconnectDelay = 5 seconds,
|
||||
maxReconnectInterval = 1 hour,
|
||||
chainHash = Block.RegtestGenesisBlock.hash,
|
||||
channelFlags = 1,
|
||||
watcherType = BITCOIND,
|
||||
|
@ -1003,7 +1003,7 @@ class NormalStateSpec extends TestkitBaseClass with StateTestsHelperMethods {
|
||||
awaitCond(alice.stateData.asInstanceOf[DATA_NORMAL].commitments.remoteNextCommitInfo.isLeft)
|
||||
val peer = TestProbe()
|
||||
sender.send(alice, RevocationTimeout(alice.stateData.asInstanceOf[DATA_NORMAL].commitments.remoteCommit.index, peer.ref))
|
||||
peer.expectMsg(Peer.Disconnect)
|
||||
peer.expectMsg(Peer.Disconnect(alice.stateData.asInstanceOf[DATA_NORMAL].commitments.remoteParams.nodeId))
|
||||
}
|
||||
|
||||
test("recv CMD_FULFILL_HTLC") { f =>
|
||||
|
@ -18,9 +18,11 @@ package fr.acinq.eclair.io
|
||||
|
||||
import java.net.{Inet4Address, InetSocketAddress}
|
||||
|
||||
import akka.actor.{ActorRef, PoisonPill, Terminated}
|
||||
import java.net.{Inet4Address, InetAddress, InetSocketAddress, ServerSocket}
|
||||
import akka.actor.{ActorRef, ActorSystem, PoisonPill}
|
||||
import akka.actor.FSM.{CurrentState, SubscribeTransitionCallBack, Transition}
|
||||
import akka.testkit.{EventFilter, TestFSMRef, TestKit, TestProbe}
|
||||
import akka.testkit.{TestFSMRef, TestProbe}
|
||||
import fr.acinq.bitcoin.Crypto.PublicKey
|
||||
import fr.acinq.eclair.TestConstants._
|
||||
import fr.acinq.eclair._
|
||||
@ -34,7 +36,6 @@ import fr.acinq.eclair.router.{ChannelRangeQueries, ChannelRangeQueriesSpec, Reb
|
||||
import fr.acinq.eclair.wire.{Color, Error, IPv4, NodeAddress, NodeAnnouncement, Ping, Pong}
|
||||
import org.scalatest.{Outcome, Tag}
|
||||
import scodec.bits.ByteVector
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
class PeerSpec extends TestkitBaseClass {
|
||||
@ -54,8 +55,8 @@ class PeerSpec extends TestkitBaseClass {
|
||||
val aParams = Alice.nodeParams
|
||||
val aliceParams = test.tags.contains("with_node_announcements") match {
|
||||
case true =>
|
||||
val aliceAnnouncement = NodeAnnouncement(randomBytes64, ByteVector.empty, 1, Bob.nodeParams.nodeId, Color(100.toByte, 200.toByte, 300.toByte), "node-alias", fakeIPAddress :: Nil)
|
||||
aParams.db.network.addNode(aliceAnnouncement)
|
||||
val bobAnnouncement = NodeAnnouncement(randomBytes64, ByteVector.empty, 1, Bob.nodeParams.nodeId, Color(100.toByte, 200.toByte, 300.toByte), "node-alias", fakeIPAddress :: Nil)
|
||||
aParams.db.network.addNode(bobAnnouncement)
|
||||
aParams
|
||||
case false => aParams
|
||||
}
|
||||
@ -151,17 +152,50 @@ class PeerSpec extends TestkitBaseClass {
|
||||
probe.expectNoMsg()
|
||||
}
|
||||
|
||||
test("count reconnections") { f =>
|
||||
test("reconnect using the address from node_announcement") { f =>
|
||||
import f._
|
||||
|
||||
// we create a dummy tcp server and update bob's announcement to point to it
|
||||
val mockServer = new ServerSocket(0, 1, InetAddress.getLocalHost) // port will be assigned automatically
|
||||
val mockAddress = NodeAddress.fromParts(mockServer.getInetAddress.getHostAddress, mockServer.getLocalPort).get
|
||||
val bobAnnouncement = NodeAnnouncement(randomBytes64, ByteVector.empty, 1, Bob.nodeParams.nodeId, Color(100.toByte, 200.toByte, 300.toByte), "node-alias", mockAddress :: Nil)
|
||||
peer.underlyingActor.nodeParams.db.network.addNode(bobAnnouncement)
|
||||
|
||||
val probe = TestProbe()
|
||||
awaitCond(peer.stateName == INSTANTIATING)
|
||||
probe.send(peer, Peer.Init(None, Set(ChannelStateSpec.normal)))
|
||||
awaitCond(peer.stateName == DISCONNECTED)
|
||||
|
||||
// we have auto-reconnect=false so we need to manually tell the peer to reconnect
|
||||
probe.send(peer, Reconnect)
|
||||
|
||||
// assert our mock server got an incoming connection (the client was spawned with the address from node_announcement)
|
||||
within(30 seconds) {
|
||||
mockServer.accept()
|
||||
}
|
||||
}
|
||||
|
||||
test("only reconnect once with a randomized delay after startup") { f =>
|
||||
import f._
|
||||
val probe = TestProbe()
|
||||
val previouslyKnownAddress = new InetSocketAddress("1.2.3.4", 9735)
|
||||
probe.send(peer, Peer.Init(Some(previouslyKnownAddress), Set(ChannelStateSpec.normal)))
|
||||
probe.send(peer, Peer.Reconnect)
|
||||
awaitCond(peer.stateData.asInstanceOf[DisconnectedData].attempts == 1)
|
||||
probe.send(peer, Peer.Reconnect)
|
||||
awaitCond(peer.stateData.asInstanceOf[DisconnectedData].attempts == 2)
|
||||
probe.send(peer, Peer.Reconnect)
|
||||
awaitCond(peer.stateData.asInstanceOf[DisconnectedData].attempts == 3)
|
||||
val interval = (peer.underlyingActor.nodeParams.maxReconnectInterval.toSeconds / 2) to peer.underlyingActor.nodeParams.maxReconnectInterval.toSeconds
|
||||
awaitCond(interval contains peer.stateData.asInstanceOf[DisconnectedData].nextReconnectionDelay.toSeconds)
|
||||
}
|
||||
|
||||
test("reconnect with increasing delays") { f =>
|
||||
import f._
|
||||
val probe = TestProbe()
|
||||
connect(remoteNodeId, authenticator, watcher, router, relayer, connection, transport, peer, channels = Set(ChannelStateSpec.normal))
|
||||
probe.send(transport.ref, PoisonPill)
|
||||
awaitCond(peer.stateName === DISCONNECTED)
|
||||
assert(peer.stateData.asInstanceOf[DisconnectedData].nextReconnectionDelay === (10 seconds))
|
||||
probe.send(peer, Reconnect)
|
||||
assert(peer.stateData.asInstanceOf[DisconnectedData].nextReconnectionDelay === (20 seconds))
|
||||
probe.send(peer, Reconnect)
|
||||
assert(peer.stateData.asInstanceOf[DisconnectedData].nextReconnectionDelay === (40 seconds))
|
||||
}
|
||||
|
||||
test("disconnect if incompatible features") { f =>
|
||||
|
@ -1,44 +0,0 @@
|
||||
package fr.acinq.eclair.io
|
||||
|
||||
import akka.actor.{ActorRef, ActorSystem}
|
||||
import akka.testkit.{EventFilter, TestFSMRef, TestKit, TestProbe}
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import fr.acinq.eclair.db.ChannelStateSpec
|
||||
import org.scalatest.{FunSuiteLike, Outcome, Tag}
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import akka.testkit.{TestFSMRef, TestProbe}
|
||||
import fr.acinq.eclair.TestConstants.{Alice, Bob}
|
||||
import fr.acinq.eclair.blockchain.EclairWallet
|
||||
import fr.acinq.eclair.randomBytes64
|
||||
import fr.acinq.eclair.wire.{Color, IPv4, NodeAddress, NodeAnnouncement}
|
||||
import scodec.bits.ByteVector
|
||||
|
||||
class PeerSpecWithLogging extends TestKit(ActorSystem("test", ConfigFactory.parseString("""akka.loggers = ["akka.testkit.TestEventListener"]"""))) with FunSuiteLike {
|
||||
|
||||
val fakeIPAddress = NodeAddress.fromParts("1.2.3.4", 42000).get
|
||||
|
||||
test("reconnect using the address from node_announcement") {
|
||||
val aliceParams = Alice.nodeParams
|
||||
val aliceAnnouncement = NodeAnnouncement(randomBytes64, ByteVector.empty, 1, Bob.nodeParams.nodeId, Color(100.toByte, 200.toByte, 300.toByte), "node-alias", fakeIPAddress :: Nil)
|
||||
aliceParams.db.network.addNode(aliceAnnouncement)
|
||||
val authenticator = TestProbe()
|
||||
val watcher = TestProbe()
|
||||
val router = TestProbe()
|
||||
val relayer = TestProbe()
|
||||
val wallet: EclairWallet = null // unused
|
||||
val remoteNodeId = Bob.nodeParams.nodeId
|
||||
val peer: TestFSMRef[Peer.State, Peer.Data, Peer] = TestFSMRef(new Peer(aliceParams, remoteNodeId, authenticator.ref, watcher.ref, router.ref, relayer.ref, wallet))
|
||||
|
||||
|
||||
val probe = TestProbe()
|
||||
awaitCond({peer.stateName.toString == "INSTANTIATING"}, 10 seconds)
|
||||
probe.send(peer, Peer.Init(None, Set(ChannelStateSpec.normal)))
|
||||
awaitCond({peer.stateName.toString == "DISCONNECTED" && peer.stateData.address_opt.isEmpty}, 10 seconds)
|
||||
EventFilter.info(message = s"reconnecting to ${fakeIPAddress.socketAddress}", occurrences = 1) intercept {
|
||||
probe.send(peer, Peer.Reconnect)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
@ -0,0 +1,95 @@
|
||||
package fr.acinq.eclair.io
|
||||
|
||||
import java.io.File
|
||||
|
||||
import akka.actor.{ActorRef, ActorSystem, Props}
|
||||
import akka.testkit.{TestKit, TestProbe}
|
||||
import fr.acinq.bitcoin.ByteVector64
|
||||
import fr.acinq.bitcoin.Crypto.PublicKey
|
||||
import fr.acinq.eclair.TestConstants._
|
||||
import fr.acinq.eclair.blockchain.TestWallet
|
||||
import fr.acinq.eclair.db._
|
||||
import fr.acinq.eclair.wire.{Color, NodeAddress, NodeAnnouncement}
|
||||
import org.mockito.scalatest.IdiomaticMockito
|
||||
import org.scalatest.FunSuiteLike
|
||||
import scodec.bits._
|
||||
|
||||
class SwitchboardSpec extends TestKit(ActorSystem("test")) with FunSuiteLike with IdiomaticMockito {
|
||||
|
||||
test("on initialization create peers and send Reconnect to them") {
|
||||
|
||||
val mockNetworkDb = mock[NetworkDb]
|
||||
val nodeParams = Alice.nodeParams.copy(
|
||||
db = new Databases {
|
||||
override val network: NetworkDb = mockNetworkDb
|
||||
override val audit: AuditDb = Alice.nodeParams.db.audit
|
||||
override val channels: ChannelsDb = Alice.nodeParams.db.channels
|
||||
override val peers: PeersDb = Alice.nodeParams.db.peers
|
||||
override val payments: PaymentsDb = Alice.nodeParams.db.payments
|
||||
override val pendingRelay: PendingRelayDb = Alice.nodeParams.db.pendingRelay
|
||||
override def backup(file: File): Unit = ()
|
||||
}
|
||||
)
|
||||
|
||||
val remoteNodeId = ChannelStateSpec.normal.commitments.remoteParams.nodeId
|
||||
val authenticator = TestProbe()
|
||||
val watcher = TestProbe()
|
||||
val router = TestProbe()
|
||||
val relayer = TestProbe()
|
||||
val wallet = new TestWallet()
|
||||
val probe = TestProbe()
|
||||
|
||||
// mock the call that will be done by the peer once it receives Peer.Reconnect
|
||||
mockNetworkDb.getNode(remoteNodeId) returns Some(
|
||||
NodeAnnouncement(ByteVector64.Zeroes, ByteVector.empty, 0, remoteNodeId, Color(0,0,0), "alias", List(NodeAddress.fromParts("127.0.0.1", 9735).get))
|
||||
)
|
||||
|
||||
// add a channel to the db
|
||||
nodeParams.db.channels.addOrUpdateChannel(ChannelStateSpec.normal)
|
||||
|
||||
val switchboard = system.actorOf(Switchboard.props(nodeParams, authenticator.ref, watcher.ref, router.ref, relayer.ref, wallet))
|
||||
|
||||
probe.send(switchboard, 'peers)
|
||||
val List(peer) = probe.expectMsgType[Iterable[ActorRef]].toList
|
||||
assert(peer.path.name == Switchboard.peerActorName(remoteNodeId))
|
||||
|
||||
// assert that the peer called `networkDb.getNode` - because it received a Peer.Reconnect
|
||||
awaitAssert(mockNetworkDb.getNode(remoteNodeId).wasCalled(once))
|
||||
}
|
||||
|
||||
test("when connecting to a new peer forward Peer.Connect to it") {
|
||||
val mockNetworkDb = mock[NetworkDb]
|
||||
val nodeParams = Alice.nodeParams.copy(
|
||||
db = new Databases {
|
||||
override val network: NetworkDb = mockNetworkDb
|
||||
override val audit: AuditDb = Alice.nodeParams.db.audit
|
||||
override val channels: ChannelsDb = Alice.nodeParams.db.channels
|
||||
override val peers: PeersDb = Alice.nodeParams.db.peers
|
||||
override val payments: PaymentsDb = Alice.nodeParams.db.payments
|
||||
override val pendingRelay: PendingRelayDb = Alice.nodeParams.db.pendingRelay
|
||||
override def backup(file: File): Unit = ()
|
||||
}
|
||||
)
|
||||
|
||||
val remoteNodeId = PublicKey(hex"03864ef025fde8fb587d989186ce6a4a186895ee44a926bfc370e2c366597a3f8f")
|
||||
val authenticator = TestProbe()
|
||||
val watcher = TestProbe()
|
||||
val router = TestProbe()
|
||||
val relayer = TestProbe()
|
||||
val wallet = new TestWallet()
|
||||
val probe = TestProbe()
|
||||
|
||||
// mock the call that will be done by the peer once it receives Peer.Connect(remoteNodeId)
|
||||
mockNetworkDb.getNode(remoteNodeId) returns Some(
|
||||
NodeAnnouncement(ByteVector64.Zeroes, ByteVector.empty, 0, remoteNodeId, Color(0,0,0), "alias", List(NodeAddress.fromParts("127.0.0.1", 9735).get))
|
||||
)
|
||||
|
||||
val switchboard = system.actorOf(Switchboard.props(nodeParams, authenticator.ref, watcher.ref, router.ref, relayer.ref, wallet))
|
||||
|
||||
// send Peer.Connect to switchboard, it will forward to the Peer and the peer will look up the address on the db
|
||||
probe.send(switchboard, Peer.Connect(remoteNodeId, None))
|
||||
|
||||
// assert that the peer called `networkDb.getNode` - because it received a Peer.Connect(remoteNodeId, None)
|
||||
awaitAssert(mockNetworkDb.getNode(remoteNodeId).wasCalled(once))
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user