1
0
mirror of https://github.com/ACINQ/eclair.git synced 2025-01-19 13:43:43 +01:00

Kill idle peers (#2096)

We may create connections to new peers for relaying onion messages but we don't want to keep them open for long.
This commit is contained in:
Thomas HUET 2021-12-15 14:05:51 +01:00 committed by GitHub
parent 576c0f6e39
commit 3d88c43b12
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 138 additions and 60 deletions

View File

@ -353,6 +353,9 @@ eclair {
# - channels-only: Only relay messages from peers with which we have a channel to peers with which we have a channel.
# - relay-all: Relay everything and create new connections if necessary
relay-policy = "channels-only"
# Transient connections opened to relay messages will be closed after this delay of inactivity
kill-transient-connection-after = 30 seconds
}
}

View File

@ -165,8 +165,8 @@ class EclairImpl(appKit: Kit) extends Eclair with Logging {
private val externalIdMaxLength = 66
override def connect(target: Either[NodeURI, PublicKey])(implicit timeout: Timeout): Future[String] = target match {
case Left(uri) => (appKit.switchboard ? Peer.Connect(uri, ActorRef.noSender)).mapTo[PeerConnection.ConnectionResult].map(_.toString)
case Right(pubKey) => (appKit.switchboard ? Peer.Connect(pubKey, None, ActorRef.noSender)).mapTo[PeerConnection.ConnectionResult].map(_.toString)
case Left(uri) => (appKit.switchboard ? Peer.Connect(uri, ActorRef.noSender, isPersistent = true)).mapTo[PeerConnection.ConnectionResult].map(_.toString)
case Right(pubKey) => (appKit.switchboard ? Peer.Connect(pubKey, None, ActorRef.noSender, isPersistent = true)).mapTo[PeerConnection.ConnectionResult].map(_.toString)
}
override def disconnect(nodeId: PublicKey)(implicit timeout: Timeout): Future[String] = {

View File

@ -458,7 +458,8 @@ object NodeParams extends Logging {
pingInterval = FiniteDuration(config.getDuration("peer-connection.ping-interval").getSeconds, TimeUnit.SECONDS),
pingTimeout = FiniteDuration(config.getDuration("peer-connection.ping-timeout").getSeconds, TimeUnit.SECONDS),
pingDisconnect = config.getBoolean("peer-connection.ping-disconnect"),
maxRebroadcastDelay = FiniteDuration(config.getDuration("router.broadcast-interval").getSeconds, TimeUnit.SECONDS) // it makes sense to not delay rebroadcast by more than the rebroadcast period
maxRebroadcastDelay = FiniteDuration(config.getDuration("router.broadcast-interval").getSeconds, TimeUnit.SECONDS), // it makes sense to not delay rebroadcast by more than the rebroadcast period
killIdleDelay = FiniteDuration(config.getDuration("onion-messages.kill-transient-connection-after").getSeconds, TimeUnit.SECONDS)
),
routerConf = RouterConf(
channelExcludeDuration = FiniteDuration(config.getDuration("router.channel-exclude-duration").getSeconds, TimeUnit.SECONDS),

View File

@ -35,7 +35,7 @@ import scala.concurrent.duration._
* Created by PM on 27/10/2015.
*
*/
class Client(keyPair: KeyPair, socks5ProxyParams_opt: Option[Socks5ProxyParams], peerConnectionConf: PeerConnection.Conf, switchboard: ActorRef, router: ActorRef, remoteAddress: InetSocketAddress, remoteNodeId: PublicKey, origin_opt: Option[ActorRef]) extends Actor with DiagnosticActorLogging {
class Client(keyPair: KeyPair, socks5ProxyParams_opt: Option[Socks5ProxyParams], peerConnectionConf: PeerConnection.Conf, switchboard: ActorRef, router: ActorRef, remoteAddress: InetSocketAddress, remoteNodeId: PublicKey, origin_opt: Option[ActorRef], isPersistent: Boolean) extends Actor with DiagnosticActorLogging {
import context.system
@ -127,13 +127,13 @@ class Client(keyPair: KeyPair, socks5ProxyParams_opt: Option[Socks5ProxyParams],
switchboard = switchboard,
router = router
))
peerConnection ! PeerConnection.PendingAuth(connection, remoteNodeId_opt = Some(remoteNodeId), address = remoteAddress, origin_opt = origin_opt)
peerConnection ! PeerConnection.PendingAuth(connection, remoteNodeId_opt = Some(remoteNodeId), address = remoteAddress, origin_opt = origin_opt, isPersistent = isPersistent)
peerConnection
}
}
object Client {
def props(keyPair: KeyPair, socks5ProxyParams_opt: Option[Socks5ProxyParams], peerConnectionConf: PeerConnection.Conf, switchboard: ActorRef, router: ActorRef, address: InetSocketAddress, remoteNodeId: PublicKey, origin_opt: Option[ActorRef]): Props = Props(new Client(keyPair, socks5ProxyParams_opt, peerConnectionConf, switchboard, router, address, remoteNodeId, origin_opt))
def props(keyPair: KeyPair, socks5ProxyParams_opt: Option[Socks5ProxyParams], peerConnectionConf: PeerConnection.Conf, switchboard: ActorRef, router: ActorRef, address: InetSocketAddress, remoteNodeId: PublicKey, origin_opt: Option[ActorRef], isPersistent: Boolean): Props = Props(new Client(keyPair, socks5ProxyParams_opt, peerConnectionConf, switchboard, router, address, remoteNodeId, origin_opt, isPersistent))
}

View File

@ -50,7 +50,7 @@ class ClientSpawner(keyPair: KeyPair, socks5ProxyParams_opt: Option[Socks5ProxyP
override def receive: Receive = {
case req: ClientSpawner.ConnectionRequest =>
log.info("initiating new connection to nodeId={} origin={}", req.remoteNodeId, sender())
context.actorOf(Client.props(keyPair, socks5ProxyParams_opt, peerConnectionConf, switchboard, router, req.address, req.remoteNodeId, origin_opt = Some(req.origin)))
context.actorOf(Client.props(keyPair, socks5ProxyParams_opt, peerConnectionConf, switchboard, router, req.address, req.remoteNodeId, origin_opt = Some(req.origin), req.isPersistent))
case DeadLetter(req: ClientSpawner.ConnectionRequest, _, _) =>
// we only subscribe to the deadletters event stream when in cluster mode
// in that case we want to be warned when connections are spawned by the backend
@ -67,5 +67,6 @@ object ClientSpawner {
case class ConnectionRequest(address: InetSocketAddress,
remoteNodeId: PublicKey,
origin: ActorRef) extends RemoteTypes
origin: ActorRef,
isPersistent: Boolean) extends RemoteTypes
}

View File

@ -62,7 +62,7 @@ object MessageRelay {
switchboard ! GetPeerInfo(context.messageAdapter(WrappedPeerInfo), prevNodeId)
waitForPreviousPeer(switchboard, nextNodeId, msg, replyTo)
case RelayAll =>
switchboard ! Peer.Connect(nextNodeId, None, context.messageAdapter(WrappedConnectionResult).toClassic)
switchboard ! Peer.Connect(nextNodeId, None, context.messageAdapter(WrappedConnectionResult).toClassic, isPersistent = false)
waitForConnection(msg, replyTo)
}
}

View File

@ -457,11 +457,11 @@ object Peer {
case object CONNECTED extends State
case class Init(storedChannels: Set[HasCommitments])
case class Connect(nodeId: PublicKey, address_opt: Option[HostAndPort], replyTo: ActorRef) {
case class Connect(nodeId: PublicKey, address_opt: Option[HostAndPort], replyTo: ActorRef, isPersistent: Boolean) {
def uri: Option[NodeURI] = address_opt.map(NodeURI(nodeId, _))
}
object Connect {
def apply(uri: NodeURI, replyTo: ActorRef): Connect = new Connect(uri.nodeId, Some(uri.address), replyTo)
def apply(uri: NodeURI, replyTo: ActorRef, isPersistent: Boolean): Connect = new Connect(uri.nodeId, Some(uri.address), replyTo, isPersistent)
}
case class Disconnect(nodeId: PublicKey) extends PossiblyHarmful

View File

@ -78,7 +78,7 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A
}
context.watch(transport)
startSingleTimer(AUTH_TIMER, AuthTimeout, conf.authTimeout)
goto(AUTHENTICATING) using AuthenticatingData(p, transport)
goto(AUTHENTICATING) using AuthenticatingData(p, transport, p.isPersistent)
}
when(AUTHENTICATING) {
@ -88,7 +88,7 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A
log.info(s"connection authenticated with $remoteNodeId@${address.getHostString}:${address.getPort} direction=${if (d.pendingAuth.outgoing) "outgoing" else "incoming"}")
Metrics.PeerConnectionsConnecting.withTag(Tags.ConnectionState, Tags.ConnectionStates.Authenticated).increment()
switchboard ! Authenticated(self, remoteNodeId)
goto(BEFORE_INIT) using BeforeInitData(remoteNodeId, d.pendingAuth, d.transport)
goto(BEFORE_INIT) using BeforeInitData(remoteNodeId, d.pendingAuth, d.transport, d.isPersistent)
case Event(AuthTimeout, d: AuthenticatingData) =>
log.warning(s"authentication timed out after ${conf.authTimeout}")
@ -104,7 +104,7 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A
val localInit = protocol.Init(localFeatures, TlvStream(InitTlv.Networks(chainHash :: Nil)))
d.transport ! localInit
startSingleTimer(INIT_TIMER, InitTimeout, conf.initTimeout)
goto(INITIALIZING) using InitializingData(chainHash, d.pendingAuth, d.remoteNodeId, d.transport, peer, localInit, doSync)
goto(INITIALIZING) using InitializingData(chainHash, d.pendingAuth, d.remoteNodeId, d.transport, peer, localInit, doSync, d.isPersistent)
}
when(INITIALIZING) {
@ -148,7 +148,7 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A
log.info(s"rebroadcast will be delayed by $rebroadcastDelay")
context.system.eventStream.subscribe(self, classOf[Rebroadcast])
goto(CONNECTED) using ConnectedData(d.chainHash, d.remoteNodeId, d.transport, d.peer, d.localInit, remoteInit, rebroadcastDelay)
goto(CONNECTED) using ConnectedData(d.chainHash, d.remoteNodeId, d.transport, d.peer, d.localInit, remoteInit, rebroadcastDelay, isPersistent = d.isPersistent)
}
case Event(InitTimeout, d: InitializingData) =>
@ -162,7 +162,15 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A
heartbeat {
case Event(msg: LightningMessage, d: ConnectedData) if sender() != d.transport => // if the message doesn't originate from the transport, it is an outgoing message
d.transport forward msg
stay()
msg match {
case _: OnionMessage if !d.isPersistent =>
startSingleTimer(KILL_IDLE_TIMER, KillIdle, conf.killIdleDelay)
stay()
// If we send any channel management message to this peer, the connection should be persistent.
case _: ChannelMessage if !d.isPersistent =>
stay() using d.copy(isPersistent = true)
case _ => stay()
}
case Event(SendPing, d: ConnectedData) =>
if (d.expectedPong_opt.isEmpty) {
@ -279,7 +287,6 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A
// this is actually for the channel
d.transport ! TransportHandler.ReadAck(msg)
d.peer ! msg
stay()
case _: ChannelAnnouncement | _: ChannelUpdate | _: NodeAnnouncement if d.behavior.ignoreNetworkAnnouncement =>
// this peer is currently under embargo!
d.transport ! TransportHandler.ReadAck(msg)
@ -357,6 +364,14 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A
case Event(ResumeAnnouncements, d: ConnectedData) =>
log.info(s"resuming processing of network announcements for peer")
stay() using d.copy(behavior = d.behavior.copy(fundingTxAlreadySpentCount = 0, ignoreNetworkAnnouncement = false))
case Event(KillIdle, d: ConnectedData) =>
if (!d.isPersistent) {
log.info("stopping idle transient connection")
stop(FSM.Normal)
} else {
stay()
}
}
}
@ -461,12 +476,14 @@ object PeerConnection {
val AUTH_TIMER = "auth"
val INIT_TIMER = "init"
val SEND_PING_TIMER = "send_ping"
val KILL_IDLE_TIMER = "kill_idle"
// @formatter:on
// @formatter:off
case object AuthTimeout
case object InitTimeout
case object SendPing
case object KillIdle
case object ResumeAnnouncements
case class DoSync(replacePrevious: Boolean) extends RemoteTypes
// @formatter:on
@ -484,17 +501,18 @@ object PeerConnection {
pingInterval: FiniteDuration,
pingTimeout: FiniteDuration,
pingDisconnect: Boolean,
maxRebroadcastDelay: FiniteDuration)
maxRebroadcastDelay: FiniteDuration,
killIdleDelay: FiniteDuration)
// @formatter:off
sealed trait Data
sealed trait HasTransport { this: Data => def transport: ActorRef }
case object Nothing extends Data
case class AuthenticatingData(pendingAuth: PendingAuth, transport: ActorRef) extends Data with HasTransport
case class BeforeInitData(remoteNodeId: PublicKey, pendingAuth: PendingAuth, transport: ActorRef) extends Data with HasTransport
case class InitializingData(chainHash: ByteVector32, pendingAuth: PendingAuth, remoteNodeId: PublicKey, transport: ActorRef, peer: ActorRef, localInit: protocol.Init, doSync: Boolean) extends Data with HasTransport
case class ConnectedData(chainHash: ByteVector32, remoteNodeId: PublicKey, transport: ActorRef, peer: ActorRef, localInit: protocol.Init, remoteInit: protocol.Init, rebroadcastDelay: FiniteDuration, gossipTimestampFilter: Option[GossipTimestampFilter] = None, behavior: Behavior = Behavior(), expectedPong_opt: Option[ExpectedPong] = None) extends Data with HasTransport
case class AuthenticatingData(pendingAuth: PendingAuth, transport: ActorRef, isPersistent: Boolean) extends Data with HasTransport
case class BeforeInitData(remoteNodeId: PublicKey, pendingAuth: PendingAuth, transport: ActorRef, isPersistent: Boolean) extends Data with HasTransport
case class InitializingData(chainHash: ByteVector32, pendingAuth: PendingAuth, remoteNodeId: PublicKey, transport: ActorRef, peer: ActorRef, localInit: protocol.Init, doSync: Boolean, isPersistent: Boolean) extends Data with HasTransport
case class ConnectedData(chainHash: ByteVector32, remoteNodeId: PublicKey, transport: ActorRef, peer: ActorRef, localInit: protocol.Init, remoteInit: protocol.Init, rebroadcastDelay: FiniteDuration, gossipTimestampFilter: Option[GossipTimestampFilter] = None, behavior: Behavior = Behavior(), expectedPong_opt: Option[ExpectedPong] = None, isPersistent: Boolean) extends Data with HasTransport
case class ExpectedPong(ping: Ping, timestamp: TimestampMilli = TimestampMilli.now())
case class PingTimeout(ping: Ping)
@ -506,7 +524,7 @@ object PeerConnection {
case object INITIALIZING extends State
case object CONNECTED extends State
case class PendingAuth(connection: ActorRef, remoteNodeId_opt: Option[PublicKey], address: InetSocketAddress, origin_opt: Option[ActorRef], transport_opt: Option[ActorRef] = None) {
case class PendingAuth(connection: ActorRef, remoteNodeId_opt: Option[PublicKey], address: InetSocketAddress, origin_opt: Option[ActorRef], transport_opt: Option[ActorRef] = None, isPersistent: Boolean) {
def outgoing: Boolean = remoteNodeId_opt.isDefined // if this is an outgoing connection, we know the node id in advance
}
case class Authenticated(peerConnection: ActorRef, remoteNodeId: PublicKey) extends RemoteTypes

View File

@ -67,7 +67,7 @@ class ReconnectionTask(nodeParams: NodeParams, remoteNodeId: PublicKey) extends
// we query the db every time because it may have been updated in the meantime (e.g. with network announcements)
getPeerAddressFromDb(nodeParams.db.peers, nodeParams.db.network, remoteNodeId) match {
case Some(address) =>
connect(address, origin = self)
connect(address, origin = self, isPersistent = true)
goto(CONNECTING) using ConnectingData(address, d.nextReconnectionDelay)
case None =>
// we don't have an address for that peer, nothing to do
@ -130,14 +130,14 @@ class ReconnectionTask(nodeParams: NodeParams, remoteNodeId: PublicKey) extends
case Event(TickReconnect, _) => stay()
case Event(Peer.Connect(_, hostAndPort_opt, replyTo), _) =>
case Event(Peer.Connect(_, hostAndPort_opt, replyTo, isPersistent), _) =>
// manual connection requests happen completely independently of the automated reconnection process;
// we initiate a connection but don't modify our state.
// if we are already connecting/connected, the peer will kill any duplicate connections
hostAndPort_opt
.map(hostAndPort2InetSocketAddress)
.orElse(getPeerAddressFromDb(nodeParams.db.peers, nodeParams.db.network, remoteNodeId)) match {
case Some(address) => connect(address, origin = replyTo)
case Some(address) => connect(address, origin = replyTo, isPersistent)
case None => replyTo ! PeerConnection.ConnectionResult.NoAddressFound
}
stay()
@ -148,9 +148,9 @@ class ReconnectionTask(nodeParams: NodeParams, remoteNodeId: PublicKey) extends
// activate the extension only on demand, so that tests pass
lazy val mediator = DistributedPubSub(context.system).mediator
private def connect(address: InetSocketAddress, origin: ActorRef): Unit = {
private def connect(address: InetSocketAddress, origin: ActorRef, isPersistent: Boolean): Unit = {
log.info(s"connecting to $address")
val req = ClientSpawner.ConnectionRequest(address, remoteNodeId, origin)
val req = ClientSpawner.ConnectionRequest(address, remoteNodeId, origin, isPersistent)
if (context.system.hasExtension(Cluster) && !address.getHostName.endsWith("onion")) {
mediator ! Send(path = "/user/client-spawner", msg = req, localAffinity = false)
} else {

View File

@ -62,7 +62,7 @@ class Server(keyPair: KeyPair, peerConnectionConf: PeerConnection.Conf, switchbo
switchboard = switchboard,
router = router
))
peerConnection ! PeerConnection.PendingAuth(connection, remoteNodeId_opt = None, address = remote, origin_opt = None)
peerConnection ! PeerConnection.PendingAuth(connection, remoteNodeId_opt = None, address = remote, origin_opt = None, isPersistent = true)
listener ! ResumeAccepting(batchSize = 1)
}

View File

@ -61,17 +61,17 @@ class Switchboard(nodeParams: NodeParams, peerFactory: Switchboard.PeerFactory)
def normal(peersWithChannels: Set[PublicKey]): Receive = {
case Peer.Connect(publicKey, _, _) if publicKey == nodeParams.nodeId =>
case Peer.Connect(publicKey, _, _, _) if publicKey == nodeParams.nodeId =>
sender() ! Status.Failure(new RuntimeException("cannot open connection with oneself"))
case Peer.Connect(nodeId, address_opt, replyTo) =>
// we create a peer if it doesn't exist: when the peer doesn't exist, we can be sure that we don't have channels,
// otherwise the peer would have been created during the initialization step.
case Peer.Connect(nodeId, address_opt, replyTo, isPersistent) =>
// we create a peer if it doesn't exist: when the peer doesn't exist, we can be sure that we don't have channels,
// otherwise the peer would have been created during the initialization step.
val peer = createOrGetPeer(nodeId, offlineChannels = Set.empty)
val c = if (replyTo == ActorRef.noSender) {
Peer.Connect(nodeId, address_opt, sender())
} else {
Peer.Connect(nodeId, address_opt, replyTo)
val c = if (replyTo == ActorRef.noSender){
Peer.Connect(nodeId, address_opt, sender(), isPersistent)
}else{
Peer.Connect(nodeId, address_opt, replyTo, isPersistent)
}
peer forward c

View File

@ -105,7 +105,8 @@ object EclairInternalsSerializer {
("pingInterval" | finiteDurationCodec) ::
("pingTimeout" | finiteDurationCodec) ::
("pingDisconnect" | bool(8)) ::
("maxRebroadcastDelay" | finiteDurationCodec)).as[PeerConnection.Conf]
("maxRebroadcastDelay" | finiteDurationCodec) ::
("killIdleDelay" | finiteDurationCodec)).as[PeerConnection.Conf]
val peerConnectionDoSyncCodec: Codec[PeerConnection.DoSync] = bool(8).as[PeerConnection.DoSync]
@ -137,7 +138,8 @@ object EclairInternalsSerializer {
def connectionRequestCodec(system: ExtendedActorSystem): Codec[ClientSpawner.ConnectionRequest] = (
("address" | inetSocketAddressCodec) ::
("remoteNodeId" | publicKey) ::
("origin" | actorRefCodec(system))).as[ClientSpawner.ConnectionRequest]
("origin" | actorRefCodec(system)) ::
("isPersistent" | bool8)).as[ClientSpawner.ConnectionRequest]
def initializeConnectionCodec(system: ExtendedActorSystem): Codec[PeerConnection.InitializeConnection] = (
("peer" | actorRefCodec(system)) ::

View File

@ -164,7 +164,8 @@ object TestConstants {
pingInterval = 30 seconds,
pingTimeout = 10 seconds,
pingDisconnect = true,
maxRebroadcastDelay = 5 seconds
maxRebroadcastDelay = 5 seconds,
killIdleDelay = 1 seconds
),
routerConf = RouterConf(
channelExcludeDuration = 60 seconds,
@ -291,7 +292,8 @@ object TestConstants {
pingInterval = 30 seconds,
pingTimeout = 10 seconds,
pingDisconnect = true,
maxRebroadcastDelay = 5 seconds
maxRebroadcastDelay = 5 seconds,
killIdleDelay = 10 seconds
),
routerConf = RouterConf(
channelExcludeDuration = 60 seconds,

View File

@ -531,7 +531,8 @@ class StandardChannelIntegrationSpec extends ChannelIntegrationSpec {
sender.send(fundee.switchboard, Peer.Connect(
nodeId = funder.nodeParams.nodeId,
address_opt = Some(HostAndPort.fromParts(funder.nodeParams.publicAddresses.head.socketAddress.getHostString, funder.nodeParams.publicAddresses.head.socketAddress.getPort)),
sender.ref
sender.ref,
isPersistent = true
))
sender.expectMsgType[PeerConnection.ConnectionResult.HasConnection](30 seconds)

View File

@ -155,7 +155,8 @@ abstract class IntegrationSpec extends TestKitBaseClass with BitcoindService wit
sender.send(node1.switchboard, Peer.Connect(
nodeId = node2.nodeParams.nodeId,
address_opt = Some(HostAndPort.fromParts(address.socketAddress.getHostString, address.socketAddress.getPort)),
sender.ref
sender.ref,
isPersistent = true
))
sender.expectMsgType[PeerConnection.ConnectionResult.HasConnection](10 seconds)
}

View File

@ -18,13 +18,14 @@ package fr.acinq.eclair.io
import akka.actor.PoisonPill
import akka.testkit.{TestFSMRef, TestProbe}
import fr.acinq.bitcoin.Block
import fr.acinq.bitcoin.{Block, ByteVector32}
import fr.acinq.bitcoin.Crypto.{PrivateKey, PublicKey}
import fr.acinq.eclair.FeatureSupport.{Mandatory, Optional}
import fr.acinq.eclair.Features.{BasicMultiPartPayment, ChannelRangeQueries, PaymentSecret, VariableLengthOnion}
import fr.acinq.eclair.TestConstants._
import fr.acinq.eclair._
import fr.acinq.eclair.crypto.TransportHandler
import fr.acinq.eclair.message.OnionMessages.{IntermediateNode, Recipient, buildMessage}
import fr.acinq.eclair.router.Router._
import fr.acinq.eclair.router.RoutingSyncSpec
import fr.acinq.eclair.wire.protocol
@ -66,10 +67,10 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi
withFixture(test.toNoArgTest(FixtureParam(aliceParams, remoteNodeId, switchboard, router, connection, transport, peerConnection, peer)))
}
def connect(aliceParams: NodeParams, remoteNodeId: PublicKey, switchboard: TestProbe, router: TestProbe, connection: TestProbe, transport: TestProbe, peerConnection: TestFSMRef[PeerConnection.State, PeerConnection.Data, PeerConnection], peer: TestProbe, remoteInit: protocol.Init = protocol.Init(Bob.nodeParams.features), doSync: Boolean = false): Unit = {
def connect(aliceParams: NodeParams, remoteNodeId: PublicKey, switchboard: TestProbe, router: TestProbe, connection: TestProbe, transport: TestProbe, peerConnection: TestFSMRef[PeerConnection.State, PeerConnection.Data, PeerConnection], peer: TestProbe, remoteInit: protocol.Init = protocol.Init(Bob.nodeParams.features), doSync: Boolean = false, isPersistent: Boolean = true): Unit = {
// let's simulate a connection
val probe = TestProbe()
probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = None, transport_opt = Some(transport.ref)))
probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = None, transport_opt = Some(transport.ref), isPersistent = isPersistent))
transport.send(peerConnection, TransportHandler.HandshakeCompleted(remoteNodeId))
switchboard.expectMsg(PeerConnection.Authenticated(peerConnection, remoteNodeId))
probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref, aliceParams.chainHash, aliceParams.features, doSync))
@ -96,7 +97,7 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi
import f._
val probe = TestProbe()
probe.watch(peerConnection)
probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = None, transport_opt = Some(transport.ref)))
probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = None, transport_opt = Some(transport.ref), isPersistent = true))
transport.ref ! PoisonPill
probe.expectTerminated(peerConnection, 100 millis)
}
@ -106,7 +107,7 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi
val probe = TestProbe()
val origin = TestProbe()
probe.watch(peerConnection)
probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = Some(origin.ref), transport_opt = Some(transport.ref)))
probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = Some(origin.ref), transport_opt = Some(transport.ref), isPersistent = true))
probe.expectTerminated(peerConnection, nodeParams.peerConnectionConf.authTimeout / transport.testKitSettings.TestTimeFactor + 1.second) // we don't want dilated time here
origin.expectMsg(PeerConnection.ConnectionResult.AuthenticationFailed("authentication timed out"))
}
@ -116,7 +117,7 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi
val probe = TestProbe()
val origin = TestProbe()
probe.watch(peerConnection)
probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = Some(origin.ref), transport_opt = Some(transport.ref)))
probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = Some(origin.ref), transport_opt = Some(transport.ref), isPersistent = true))
transport.send(peerConnection, TransportHandler.HandshakeCompleted(remoteNodeId))
probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref, nodeParams.chainHash, nodeParams.features, doSync = true))
probe.expectTerminated(peerConnection, nodeParams.peerConnectionConf.initTimeout / transport.testKitSettings.TestTimeFactor + 1.second) // we don't want dilated time here
@ -128,7 +129,7 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi
val probe = TestProbe()
val origin = TestProbe()
probe.watch(transport.ref)
probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = Some(origin.ref), transport_opt = Some(transport.ref)))
probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = Some(origin.ref), transport_opt = Some(transport.ref), isPersistent = true))
transport.send(peerConnection, TransportHandler.HandshakeCompleted(remoteNodeId))
probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref, nodeParams.chainHash, nodeParams.features, doSync = true))
transport.expectMsgType[TransportHandler.Listener]
@ -144,7 +145,7 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi
val probe = TestProbe()
val origin = TestProbe()
probe.watch(transport.ref)
probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = Some(origin.ref), transport_opt = Some(transport.ref)))
probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = Some(origin.ref), transport_opt = Some(transport.ref), isPersistent = true))
transport.send(peerConnection, TransportHandler.HandshakeCompleted(remoteNodeId))
probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref, nodeParams.chainHash, nodeParams.features, doSync = true))
transport.expectMsgType[TransportHandler.Listener]
@ -160,7 +161,7 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi
val probe = TestProbe()
val origin = TestProbe()
probe.watch(transport.ref)
probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = Some(origin.ref), transport_opt = Some(transport.ref)))
probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = Some(origin.ref), transport_opt = Some(transport.ref), isPersistent = true))
transport.send(peerConnection, TransportHandler.HandshakeCompleted(remoteNodeId))
probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref, nodeParams.chainHash, nodeParams.features, doSync = true))
transport.expectMsgType[TransportHandler.Listener]
@ -177,7 +178,7 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi
val probe = TestProbe()
val origin = TestProbe()
probe.watch(transport.ref)
probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = Some(origin.ref), transport_opt = Some(transport.ref)))
probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = Some(origin.ref), transport_opt = Some(transport.ref), isPersistent = true))
transport.send(peerConnection, TransportHandler.HandshakeCompleted(remoteNodeId))
probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref, nodeParams.chainHash, nodeParams.features, doSync = true))
transport.expectMsgType[TransportHandler.Listener]
@ -372,5 +373,53 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi
assert(new String(warn2.data.toArray).startsWith("invalid announcement sig"))
}
test("establish transient connection") { f =>
import f._
connect(nodeParams, remoteNodeId, switchboard, router, connection, transport, peerConnection, peer, isPersistent = false)
val probe = TestProbe()
val (_, message) = buildMessage(randomKey(), randomKey(), Nil, Left(Recipient(remoteNodeId, None)), Nil)
probe.send(peerConnection, message)
probe watch peerConnection
probe.expectTerminated(peerConnection, max = Duration(1500, MILLISECONDS))
}
def sleep(duration: FiniteDuration): Unit = {
val probe = TestProbe()
system.scheduler.scheduleOnce(duration, probe.ref, ())(system.dispatcher)
probe.expectMsg(())
}
test("keep using transient connection") { f =>
import f._
connect(nodeParams, remoteNodeId, switchboard, router, connection, transport, peerConnection, peer, isPersistent = false)
val probe = TestProbe()
val (_, message) = buildMessage(randomKey(), randomKey(), Nil, Left(Recipient(remoteNodeId, None)), Nil)
probe.send(peerConnection, message)
probe watch peerConnection
sleep(900 millis)
assert(peerConnection.stateName === PeerConnection.CONNECTED)
probe.send(peerConnection, message)
sleep(900 millis)
assert(peerConnection.stateName === PeerConnection.CONNECTED)
probe.send(peerConnection, message)
sleep(900 millis)
assert(peerConnection.stateName === PeerConnection.CONNECTED)
sleep(200 millis)
probe.expectTerminated(peerConnection, max = Duration.Zero)
}
test("convert transient connection to persistent") { f =>
import f._
connect(nodeParams, remoteNodeId, switchboard, router, connection, transport, peerConnection, peer, isPersistent = false)
val probe = TestProbe()
val (_, message) = buildMessage(randomKey(), randomKey(), Nil, Left(Recipient(remoteNodeId, None)), Nil)
probe.send(peerConnection, message)
assert(peerConnection.stateName === PeerConnection.CONNECTED)
probe.send(peerConnection, FundingLocked(ByteVector32(hex"0000000000000000000000000000000000000000000000000000000000000000"), randomKey().publicKey))
peerConnection.stateData match {
case d : PeerConnection.ConnectedData => assert(d.isPersistent)
case _ => fail()
}
}
}

View File

@ -112,7 +112,7 @@ class PeerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with Paralle
val probe = TestProbe()
probe.send(peer, Peer.Init(Set.empty))
probe.send(peer, Peer.Connect(remoteNodeId, address_opt = None, probe.ref))
probe.send(peer, Peer.Connect(remoteNodeId, address_opt = None, probe.ref, isPersistent = true))
probe.expectMsg(PeerConnection.ConnectionResult.NoAddressFound)
}
@ -129,7 +129,7 @@ class PeerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with Paralle
val probe = TestProbe()
probe.send(peer, Peer.Init(Set.empty))
// we have auto-reconnect=false so we need to manually tell the peer to reconnect
probe.send(peer, Peer.Connect(remoteNodeId, Some(mockAddress), probe.ref))
probe.send(peer, Peer.Connect(remoteNodeId, Some(mockAddress), probe.ref, isPersistent = true))
// assert our mock server got an incoming connection (the client was spawned with the address from node_announcement)
awaitCond(mockServer.accept() != null, max = 30 seconds, interval = 1 second)
@ -164,7 +164,7 @@ class PeerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with Paralle
val probe = TestProbe()
connect(remoteNodeId, peer, peerConnection, switchboard, channels = Set(ChannelCodecsSpec.normal))
probe.send(peer, Peer.Connect(remoteNodeId, None, probe.ref))
probe.send(peer, Peer.Connect(remoteNodeId, None, probe.ref, isPersistent = true))
probe.expectMsgType[PeerConnection.ConnectionResult.AlreadyConnected]
}

View File

@ -214,7 +214,7 @@ class ReconnectionTaskSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
val peer = TestProbe()
// we have auto-reconnect=false so we need to manually tell the peer to reconnect
peer.send(reconnectionTask, Peer.Connect(remoteNodeId, None, peer.ref))
peer.send(reconnectionTask, Peer.Connect(remoteNodeId, None, peer.ref, isPersistent = true))
// assert our mock server got an incoming connection (the client was spawned with the address from node_announcement)
awaitCond(mockServer.accept() != null, max = 60 seconds, interval = 1 second)

View File

@ -38,7 +38,7 @@ class SwitchboardSpec extends TestKitBaseClass with AnyFunSuiteLike {
nodeParams.db.network.addNode(NodeAnnouncement(ByteVector64.Zeroes, Features.empty, 0 unixsec, remoteNodeId, Color(0, 0, 0), "alias", remoteNodeAddress :: Nil))
val switchboard = TestActorRef(new Switchboard(nodeParams, FakePeerFactory(probe, peer)))
probe.send(switchboard, Peer.Connect(remoteNodeId, None, probe.ref))
probe.send(switchboard, Peer.Connect(remoteNodeId, None, probe.ref, isPersistent = true))
probe.expectMsg(remoteNodeId)
peer.expectMsg(Peer.Init(Set.empty))
val connect = peer.expectMsgType[Peer.Connect]
@ -51,7 +51,7 @@ class SwitchboardSpec extends TestKitBaseClass with AnyFunSuiteLike {
val (probe, peer) = (TestProbe(), TestProbe())
val remoteNodeId = randomKey().publicKey
val switchboard = TestActorRef(new Switchboard(nodeParams, FakePeerFactory(probe, peer)))
probe.send(switchboard, Peer.Connect(remoteNodeId, None, probe.ref))
probe.send(switchboard, Peer.Connect(remoteNodeId, None, probe.ref, isPersistent = true))
probe.expectMsg(remoteNodeId)
peer.expectMsg(Peer.Init(Set.empty))
peer.expectMsgType[Peer.Connect]
@ -125,7 +125,7 @@ class SwitchboardSpec extends TestKitBaseClass with AnyFunSuiteLike {
val (probe, peer) = (TestProbe(), TestProbe())
val switchboard = TestActorRef(new Switchboard(Alice.nodeParams, FakePeerFactory(probe, peer)))
val knownPeerNodeId = randomKey().publicKey
probe.send(switchboard, Peer.Connect(knownPeerNodeId, None, probe.ref))
probe.send(switchboard, Peer.Connect(knownPeerNodeId, None, probe.ref, isPersistent = true))
probe.expectMsg(knownPeerNodeId)
peer.expectMsgType[Peer.Init]
peer.expectMsgType[Peer.Connect]