diff --git a/eclair-core/src/main/resources/reference.conf b/eclair-core/src/main/resources/reference.conf index 6ee4aba88..815d1dca7 100644 --- a/eclair-core/src/main/resources/reference.conf +++ b/eclair-core/src/main/resources/reference.conf @@ -353,6 +353,9 @@ eclair { # - channels-only: Only relay messages from peers with which we have a channel to peers with which we have a channel. # - relay-all: Relay everything and create new connections if necessary relay-policy = "channels-only" + + # Transient connections opened to relay messages will be closed after this delay of inactivity + kill-transient-connection-after = 30 seconds } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala b/eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala index 515d4d1ab..82e96f358 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala @@ -165,8 +165,8 @@ class EclairImpl(appKit: Kit) extends Eclair with Logging { private val externalIdMaxLength = 66 override def connect(target: Either[NodeURI, PublicKey])(implicit timeout: Timeout): Future[String] = target match { - case Left(uri) => (appKit.switchboard ? Peer.Connect(uri, ActorRef.noSender)).mapTo[PeerConnection.ConnectionResult].map(_.toString) - case Right(pubKey) => (appKit.switchboard ? Peer.Connect(pubKey, None, ActorRef.noSender)).mapTo[PeerConnection.ConnectionResult].map(_.toString) + case Left(uri) => (appKit.switchboard ? Peer.Connect(uri, ActorRef.noSender, isPersistent = true)).mapTo[PeerConnection.ConnectionResult].map(_.toString) + case Right(pubKey) => (appKit.switchboard ? Peer.Connect(pubKey, None, ActorRef.noSender, isPersistent = true)).mapTo[PeerConnection.ConnectionResult].map(_.toString) } override def disconnect(nodeId: PublicKey)(implicit timeout: Timeout): Future[String] = { diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala index 17f180a12..31035f7ae 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala @@ -458,7 +458,8 @@ object NodeParams extends Logging { pingInterval = FiniteDuration(config.getDuration("peer-connection.ping-interval").getSeconds, TimeUnit.SECONDS), pingTimeout = FiniteDuration(config.getDuration("peer-connection.ping-timeout").getSeconds, TimeUnit.SECONDS), pingDisconnect = config.getBoolean("peer-connection.ping-disconnect"), - maxRebroadcastDelay = FiniteDuration(config.getDuration("router.broadcast-interval").getSeconds, TimeUnit.SECONDS) // it makes sense to not delay rebroadcast by more than the rebroadcast period + maxRebroadcastDelay = FiniteDuration(config.getDuration("router.broadcast-interval").getSeconds, TimeUnit.SECONDS), // it makes sense to not delay rebroadcast by more than the rebroadcast period + killIdleDelay = FiniteDuration(config.getDuration("onion-messages.kill-transient-connection-after").getSeconds, TimeUnit.SECONDS) ), routerConf = RouterConf( channelExcludeDuration = FiniteDuration(config.getDuration("router.channel-exclude-duration").getSeconds, TimeUnit.SECONDS), diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/io/Client.scala b/eclair-core/src/main/scala/fr/acinq/eclair/io/Client.scala index 1be7efcde..6fdc36afd 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/io/Client.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/io/Client.scala @@ -35,7 +35,7 @@ import scala.concurrent.duration._ * Created by PM on 27/10/2015. * */ -class Client(keyPair: KeyPair, socks5ProxyParams_opt: Option[Socks5ProxyParams], peerConnectionConf: PeerConnection.Conf, switchboard: ActorRef, router: ActorRef, remoteAddress: InetSocketAddress, remoteNodeId: PublicKey, origin_opt: Option[ActorRef]) extends Actor with DiagnosticActorLogging { +class Client(keyPair: KeyPair, socks5ProxyParams_opt: Option[Socks5ProxyParams], peerConnectionConf: PeerConnection.Conf, switchboard: ActorRef, router: ActorRef, remoteAddress: InetSocketAddress, remoteNodeId: PublicKey, origin_opt: Option[ActorRef], isPersistent: Boolean) extends Actor with DiagnosticActorLogging { import context.system @@ -127,13 +127,13 @@ class Client(keyPair: KeyPair, socks5ProxyParams_opt: Option[Socks5ProxyParams], switchboard = switchboard, router = router )) - peerConnection ! PeerConnection.PendingAuth(connection, remoteNodeId_opt = Some(remoteNodeId), address = remoteAddress, origin_opt = origin_opt) + peerConnection ! PeerConnection.PendingAuth(connection, remoteNodeId_opt = Some(remoteNodeId), address = remoteAddress, origin_opt = origin_opt, isPersistent = isPersistent) peerConnection } } object Client { - def props(keyPair: KeyPair, socks5ProxyParams_opt: Option[Socks5ProxyParams], peerConnectionConf: PeerConnection.Conf, switchboard: ActorRef, router: ActorRef, address: InetSocketAddress, remoteNodeId: PublicKey, origin_opt: Option[ActorRef]): Props = Props(new Client(keyPair, socks5ProxyParams_opt, peerConnectionConf, switchboard, router, address, remoteNodeId, origin_opt)) + def props(keyPair: KeyPair, socks5ProxyParams_opt: Option[Socks5ProxyParams], peerConnectionConf: PeerConnection.Conf, switchboard: ActorRef, router: ActorRef, address: InetSocketAddress, remoteNodeId: PublicKey, origin_opt: Option[ActorRef], isPersistent: Boolean): Props = Props(new Client(keyPair, socks5ProxyParams_opt, peerConnectionConf, switchboard, router, address, remoteNodeId, origin_opt, isPersistent)) } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/io/ClientSpawner.scala b/eclair-core/src/main/scala/fr/acinq/eclair/io/ClientSpawner.scala index 2c7862ea5..efcb6601d 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/io/ClientSpawner.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/io/ClientSpawner.scala @@ -50,7 +50,7 @@ class ClientSpawner(keyPair: KeyPair, socks5ProxyParams_opt: Option[Socks5ProxyP override def receive: Receive = { case req: ClientSpawner.ConnectionRequest => log.info("initiating new connection to nodeId={} origin={}", req.remoteNodeId, sender()) - context.actorOf(Client.props(keyPair, socks5ProxyParams_opt, peerConnectionConf, switchboard, router, req.address, req.remoteNodeId, origin_opt = Some(req.origin))) + context.actorOf(Client.props(keyPair, socks5ProxyParams_opt, peerConnectionConf, switchboard, router, req.address, req.remoteNodeId, origin_opt = Some(req.origin), req.isPersistent)) case DeadLetter(req: ClientSpawner.ConnectionRequest, _, _) => // we only subscribe to the deadletters event stream when in cluster mode // in that case we want to be warned when connections are spawned by the backend @@ -67,5 +67,6 @@ object ClientSpawner { case class ConnectionRequest(address: InetSocketAddress, remoteNodeId: PublicKey, - origin: ActorRef) extends RemoteTypes + origin: ActorRef, + isPersistent: Boolean) extends RemoteTypes } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/io/MessageRelay.scala b/eclair-core/src/main/scala/fr/acinq/eclair/io/MessageRelay.scala index 03dae3437..9295ebded 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/io/MessageRelay.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/io/MessageRelay.scala @@ -62,7 +62,7 @@ object MessageRelay { switchboard ! GetPeerInfo(context.messageAdapter(WrappedPeerInfo), prevNodeId) waitForPreviousPeer(switchboard, nextNodeId, msg, replyTo) case RelayAll => - switchboard ! Peer.Connect(nextNodeId, None, context.messageAdapter(WrappedConnectionResult).toClassic) + switchboard ! Peer.Connect(nextNodeId, None, context.messageAdapter(WrappedConnectionResult).toClassic, isPersistent = false) waitForConnection(msg, replyTo) } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala b/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala index d6fa197a9..ade475fd7 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala @@ -457,11 +457,11 @@ object Peer { case object CONNECTED extends State case class Init(storedChannels: Set[HasCommitments]) - case class Connect(nodeId: PublicKey, address_opt: Option[HostAndPort], replyTo: ActorRef) { + case class Connect(nodeId: PublicKey, address_opt: Option[HostAndPort], replyTo: ActorRef, isPersistent: Boolean) { def uri: Option[NodeURI] = address_opt.map(NodeURI(nodeId, _)) } object Connect { - def apply(uri: NodeURI, replyTo: ActorRef): Connect = new Connect(uri.nodeId, Some(uri.address), replyTo) + def apply(uri: NodeURI, replyTo: ActorRef, isPersistent: Boolean): Connect = new Connect(uri.nodeId, Some(uri.address), replyTo, isPersistent) } case class Disconnect(nodeId: PublicKey) extends PossiblyHarmful diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/io/PeerConnection.scala b/eclair-core/src/main/scala/fr/acinq/eclair/io/PeerConnection.scala index b0fd0dab3..273eaf436 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/io/PeerConnection.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/io/PeerConnection.scala @@ -78,7 +78,7 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A } context.watch(transport) startSingleTimer(AUTH_TIMER, AuthTimeout, conf.authTimeout) - goto(AUTHENTICATING) using AuthenticatingData(p, transport) + goto(AUTHENTICATING) using AuthenticatingData(p, transport, p.isPersistent) } when(AUTHENTICATING) { @@ -88,7 +88,7 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A log.info(s"connection authenticated with $remoteNodeId@${address.getHostString}:${address.getPort} direction=${if (d.pendingAuth.outgoing) "outgoing" else "incoming"}") Metrics.PeerConnectionsConnecting.withTag(Tags.ConnectionState, Tags.ConnectionStates.Authenticated).increment() switchboard ! Authenticated(self, remoteNodeId) - goto(BEFORE_INIT) using BeforeInitData(remoteNodeId, d.pendingAuth, d.transport) + goto(BEFORE_INIT) using BeforeInitData(remoteNodeId, d.pendingAuth, d.transport, d.isPersistent) case Event(AuthTimeout, d: AuthenticatingData) => log.warning(s"authentication timed out after ${conf.authTimeout}") @@ -104,7 +104,7 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A val localInit = protocol.Init(localFeatures, TlvStream(InitTlv.Networks(chainHash :: Nil))) d.transport ! localInit startSingleTimer(INIT_TIMER, InitTimeout, conf.initTimeout) - goto(INITIALIZING) using InitializingData(chainHash, d.pendingAuth, d.remoteNodeId, d.transport, peer, localInit, doSync) + goto(INITIALIZING) using InitializingData(chainHash, d.pendingAuth, d.remoteNodeId, d.transport, peer, localInit, doSync, d.isPersistent) } when(INITIALIZING) { @@ -148,7 +148,7 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A log.info(s"rebroadcast will be delayed by $rebroadcastDelay") context.system.eventStream.subscribe(self, classOf[Rebroadcast]) - goto(CONNECTED) using ConnectedData(d.chainHash, d.remoteNodeId, d.transport, d.peer, d.localInit, remoteInit, rebroadcastDelay) + goto(CONNECTED) using ConnectedData(d.chainHash, d.remoteNodeId, d.transport, d.peer, d.localInit, remoteInit, rebroadcastDelay, isPersistent = d.isPersistent) } case Event(InitTimeout, d: InitializingData) => @@ -162,7 +162,15 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A heartbeat { case Event(msg: LightningMessage, d: ConnectedData) if sender() != d.transport => // if the message doesn't originate from the transport, it is an outgoing message d.transport forward msg - stay() + msg match { + case _: OnionMessage if !d.isPersistent => + startSingleTimer(KILL_IDLE_TIMER, KillIdle, conf.killIdleDelay) + stay() + // If we send any channel management message to this peer, the connection should be persistent. + case _: ChannelMessage if !d.isPersistent => + stay() using d.copy(isPersistent = true) + case _ => stay() + } case Event(SendPing, d: ConnectedData) => if (d.expectedPong_opt.isEmpty) { @@ -279,7 +287,6 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A // this is actually for the channel d.transport ! TransportHandler.ReadAck(msg) d.peer ! msg - stay() case _: ChannelAnnouncement | _: ChannelUpdate | _: NodeAnnouncement if d.behavior.ignoreNetworkAnnouncement => // this peer is currently under embargo! d.transport ! TransportHandler.ReadAck(msg) @@ -357,6 +364,14 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A case Event(ResumeAnnouncements, d: ConnectedData) => log.info(s"resuming processing of network announcements for peer") stay() using d.copy(behavior = d.behavior.copy(fundingTxAlreadySpentCount = 0, ignoreNetworkAnnouncement = false)) + + case Event(KillIdle, d: ConnectedData) => + if (!d.isPersistent) { + log.info("stopping idle transient connection") + stop(FSM.Normal) + } else { + stay() + } } } @@ -461,12 +476,14 @@ object PeerConnection { val AUTH_TIMER = "auth" val INIT_TIMER = "init" val SEND_PING_TIMER = "send_ping" + val KILL_IDLE_TIMER = "kill_idle" // @formatter:on // @formatter:off case object AuthTimeout case object InitTimeout case object SendPing + case object KillIdle case object ResumeAnnouncements case class DoSync(replacePrevious: Boolean) extends RemoteTypes // @formatter:on @@ -484,17 +501,18 @@ object PeerConnection { pingInterval: FiniteDuration, pingTimeout: FiniteDuration, pingDisconnect: Boolean, - maxRebroadcastDelay: FiniteDuration) + maxRebroadcastDelay: FiniteDuration, + killIdleDelay: FiniteDuration) // @formatter:off sealed trait Data sealed trait HasTransport { this: Data => def transport: ActorRef } case object Nothing extends Data - case class AuthenticatingData(pendingAuth: PendingAuth, transport: ActorRef) extends Data with HasTransport - case class BeforeInitData(remoteNodeId: PublicKey, pendingAuth: PendingAuth, transport: ActorRef) extends Data with HasTransport - case class InitializingData(chainHash: ByteVector32, pendingAuth: PendingAuth, remoteNodeId: PublicKey, transport: ActorRef, peer: ActorRef, localInit: protocol.Init, doSync: Boolean) extends Data with HasTransport - case class ConnectedData(chainHash: ByteVector32, remoteNodeId: PublicKey, transport: ActorRef, peer: ActorRef, localInit: protocol.Init, remoteInit: protocol.Init, rebroadcastDelay: FiniteDuration, gossipTimestampFilter: Option[GossipTimestampFilter] = None, behavior: Behavior = Behavior(), expectedPong_opt: Option[ExpectedPong] = None) extends Data with HasTransport + case class AuthenticatingData(pendingAuth: PendingAuth, transport: ActorRef, isPersistent: Boolean) extends Data with HasTransport + case class BeforeInitData(remoteNodeId: PublicKey, pendingAuth: PendingAuth, transport: ActorRef, isPersistent: Boolean) extends Data with HasTransport + case class InitializingData(chainHash: ByteVector32, pendingAuth: PendingAuth, remoteNodeId: PublicKey, transport: ActorRef, peer: ActorRef, localInit: protocol.Init, doSync: Boolean, isPersistent: Boolean) extends Data with HasTransport + case class ConnectedData(chainHash: ByteVector32, remoteNodeId: PublicKey, transport: ActorRef, peer: ActorRef, localInit: protocol.Init, remoteInit: protocol.Init, rebroadcastDelay: FiniteDuration, gossipTimestampFilter: Option[GossipTimestampFilter] = None, behavior: Behavior = Behavior(), expectedPong_opt: Option[ExpectedPong] = None, isPersistent: Boolean) extends Data with HasTransport case class ExpectedPong(ping: Ping, timestamp: TimestampMilli = TimestampMilli.now()) case class PingTimeout(ping: Ping) @@ -506,7 +524,7 @@ object PeerConnection { case object INITIALIZING extends State case object CONNECTED extends State - case class PendingAuth(connection: ActorRef, remoteNodeId_opt: Option[PublicKey], address: InetSocketAddress, origin_opt: Option[ActorRef], transport_opt: Option[ActorRef] = None) { + case class PendingAuth(connection: ActorRef, remoteNodeId_opt: Option[PublicKey], address: InetSocketAddress, origin_opt: Option[ActorRef], transport_opt: Option[ActorRef] = None, isPersistent: Boolean) { def outgoing: Boolean = remoteNodeId_opt.isDefined // if this is an outgoing connection, we know the node id in advance } case class Authenticated(peerConnection: ActorRef, remoteNodeId: PublicKey) extends RemoteTypes diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/io/ReconnectionTask.scala b/eclair-core/src/main/scala/fr/acinq/eclair/io/ReconnectionTask.scala index a44aea74d..cab7a57f0 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/io/ReconnectionTask.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/io/ReconnectionTask.scala @@ -67,7 +67,7 @@ class ReconnectionTask(nodeParams: NodeParams, remoteNodeId: PublicKey) extends // we query the db every time because it may have been updated in the meantime (e.g. with network announcements) getPeerAddressFromDb(nodeParams.db.peers, nodeParams.db.network, remoteNodeId) match { case Some(address) => - connect(address, origin = self) + connect(address, origin = self, isPersistent = true) goto(CONNECTING) using ConnectingData(address, d.nextReconnectionDelay) case None => // we don't have an address for that peer, nothing to do @@ -130,14 +130,14 @@ class ReconnectionTask(nodeParams: NodeParams, remoteNodeId: PublicKey) extends case Event(TickReconnect, _) => stay() - case Event(Peer.Connect(_, hostAndPort_opt, replyTo), _) => + case Event(Peer.Connect(_, hostAndPort_opt, replyTo, isPersistent), _) => // manual connection requests happen completely independently of the automated reconnection process; // we initiate a connection but don't modify our state. // if we are already connecting/connected, the peer will kill any duplicate connections hostAndPort_opt .map(hostAndPort2InetSocketAddress) .orElse(getPeerAddressFromDb(nodeParams.db.peers, nodeParams.db.network, remoteNodeId)) match { - case Some(address) => connect(address, origin = replyTo) + case Some(address) => connect(address, origin = replyTo, isPersistent) case None => replyTo ! PeerConnection.ConnectionResult.NoAddressFound } stay() @@ -148,9 +148,9 @@ class ReconnectionTask(nodeParams: NodeParams, remoteNodeId: PublicKey) extends // activate the extension only on demand, so that tests pass lazy val mediator = DistributedPubSub(context.system).mediator - private def connect(address: InetSocketAddress, origin: ActorRef): Unit = { + private def connect(address: InetSocketAddress, origin: ActorRef, isPersistent: Boolean): Unit = { log.info(s"connecting to $address") - val req = ClientSpawner.ConnectionRequest(address, remoteNodeId, origin) + val req = ClientSpawner.ConnectionRequest(address, remoteNodeId, origin, isPersistent) if (context.system.hasExtension(Cluster) && !address.getHostName.endsWith("onion")) { mediator ! Send(path = "/user/client-spawner", msg = req, localAffinity = false) } else { diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/io/Server.scala b/eclair-core/src/main/scala/fr/acinq/eclair/io/Server.scala index 96a72acca..b4d354275 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/io/Server.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/io/Server.scala @@ -62,7 +62,7 @@ class Server(keyPair: KeyPair, peerConnectionConf: PeerConnection.Conf, switchbo switchboard = switchboard, router = router )) - peerConnection ! PeerConnection.PendingAuth(connection, remoteNodeId_opt = None, address = remote, origin_opt = None) + peerConnection ! PeerConnection.PendingAuth(connection, remoteNodeId_opt = None, address = remote, origin_opt = None, isPersistent = true) listener ! ResumeAccepting(batchSize = 1) } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/io/Switchboard.scala b/eclair-core/src/main/scala/fr/acinq/eclair/io/Switchboard.scala index 45f1419ba..d12c5d15c 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/io/Switchboard.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/io/Switchboard.scala @@ -61,17 +61,17 @@ class Switchboard(nodeParams: NodeParams, peerFactory: Switchboard.PeerFactory) def normal(peersWithChannels: Set[PublicKey]): Receive = { - case Peer.Connect(publicKey, _, _) if publicKey == nodeParams.nodeId => + case Peer.Connect(publicKey, _, _, _) if publicKey == nodeParams.nodeId => sender() ! Status.Failure(new RuntimeException("cannot open connection with oneself")) - case Peer.Connect(nodeId, address_opt, replyTo) => - // we create a peer if it doesn't exist: when the peer doesn't exist, we can be sure that we don't have channels, - // otherwise the peer would have been created during the initialization step. + case Peer.Connect(nodeId, address_opt, replyTo, isPersistent) => + // we create a peer if it doesn't exist: when the peer doesn't exist, we can be sure that we don't have channels, + // otherwise the peer would have been created during the initialization step. val peer = createOrGetPeer(nodeId, offlineChannels = Set.empty) - val c = if (replyTo == ActorRef.noSender) { - Peer.Connect(nodeId, address_opt, sender()) - } else { - Peer.Connect(nodeId, address_opt, replyTo) + val c = if (replyTo == ActorRef.noSender){ + Peer.Connect(nodeId, address_opt, sender(), isPersistent) + }else{ + Peer.Connect(nodeId, address_opt, replyTo, isPersistent) } peer forward c diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/remote/EclairInternalsSerializer.scala b/eclair-core/src/main/scala/fr/acinq/eclair/remote/EclairInternalsSerializer.scala index 909019f84..5c240d951 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/remote/EclairInternalsSerializer.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/remote/EclairInternalsSerializer.scala @@ -105,7 +105,8 @@ object EclairInternalsSerializer { ("pingInterval" | finiteDurationCodec) :: ("pingTimeout" | finiteDurationCodec) :: ("pingDisconnect" | bool(8)) :: - ("maxRebroadcastDelay" | finiteDurationCodec)).as[PeerConnection.Conf] + ("maxRebroadcastDelay" | finiteDurationCodec) :: + ("killIdleDelay" | finiteDurationCodec)).as[PeerConnection.Conf] val peerConnectionDoSyncCodec: Codec[PeerConnection.DoSync] = bool(8).as[PeerConnection.DoSync] @@ -137,7 +138,8 @@ object EclairInternalsSerializer { def connectionRequestCodec(system: ExtendedActorSystem): Codec[ClientSpawner.ConnectionRequest] = ( ("address" | inetSocketAddressCodec) :: ("remoteNodeId" | publicKey) :: - ("origin" | actorRefCodec(system))).as[ClientSpawner.ConnectionRequest] + ("origin" | actorRefCodec(system)) :: + ("isPersistent" | bool8)).as[ClientSpawner.ConnectionRequest] def initializeConnectionCodec(system: ExtendedActorSystem): Codec[PeerConnection.InitializeConnection] = ( ("peer" | actorRefCodec(system)) :: diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala index a5415cb4f..19f9a30cd 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala @@ -164,7 +164,8 @@ object TestConstants { pingInterval = 30 seconds, pingTimeout = 10 seconds, pingDisconnect = true, - maxRebroadcastDelay = 5 seconds + maxRebroadcastDelay = 5 seconds, + killIdleDelay = 1 seconds ), routerConf = RouterConf( channelExcludeDuration = 60 seconds, @@ -291,7 +292,8 @@ object TestConstants { pingInterval = 30 seconds, pingTimeout = 10 seconds, pingDisconnect = true, - maxRebroadcastDelay = 5 seconds + maxRebroadcastDelay = 5 seconds, + killIdleDelay = 10 seconds ), routerConf = RouterConf( channelExcludeDuration = 60 seconds, diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/integration/ChannelIntegrationSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/integration/ChannelIntegrationSpec.scala index 759c7775e..4b343da23 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/integration/ChannelIntegrationSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/integration/ChannelIntegrationSpec.scala @@ -531,7 +531,8 @@ class StandardChannelIntegrationSpec extends ChannelIntegrationSpec { sender.send(fundee.switchboard, Peer.Connect( nodeId = funder.nodeParams.nodeId, address_opt = Some(HostAndPort.fromParts(funder.nodeParams.publicAddresses.head.socketAddress.getHostString, funder.nodeParams.publicAddresses.head.socketAddress.getPort)), - sender.ref + sender.ref, + isPersistent = true )) sender.expectMsgType[PeerConnection.ConnectionResult.HasConnection](30 seconds) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/integration/IntegrationSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/integration/IntegrationSpec.scala index ce22de12f..582f1bf33 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/integration/IntegrationSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/integration/IntegrationSpec.scala @@ -155,7 +155,8 @@ abstract class IntegrationSpec extends TestKitBaseClass with BitcoindService wit sender.send(node1.switchboard, Peer.Connect( nodeId = node2.nodeParams.nodeId, address_opt = Some(HostAndPort.fromParts(address.socketAddress.getHostString, address.socketAddress.getPort)), - sender.ref + sender.ref, + isPersistent = true )) sender.expectMsgType[PeerConnection.ConnectionResult.HasConnection](10 seconds) } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerConnectionSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerConnectionSpec.scala index 39f4f6f41..c65a6a9cb 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerConnectionSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerConnectionSpec.scala @@ -18,13 +18,14 @@ package fr.acinq.eclair.io import akka.actor.PoisonPill import akka.testkit.{TestFSMRef, TestProbe} -import fr.acinq.bitcoin.Block +import fr.acinq.bitcoin.{Block, ByteVector32} import fr.acinq.bitcoin.Crypto.{PrivateKey, PublicKey} import fr.acinq.eclair.FeatureSupport.{Mandatory, Optional} import fr.acinq.eclair.Features.{BasicMultiPartPayment, ChannelRangeQueries, PaymentSecret, VariableLengthOnion} import fr.acinq.eclair.TestConstants._ import fr.acinq.eclair._ import fr.acinq.eclair.crypto.TransportHandler +import fr.acinq.eclair.message.OnionMessages.{IntermediateNode, Recipient, buildMessage} import fr.acinq.eclair.router.Router._ import fr.acinq.eclair.router.RoutingSyncSpec import fr.acinq.eclair.wire.protocol @@ -66,10 +67,10 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi withFixture(test.toNoArgTest(FixtureParam(aliceParams, remoteNodeId, switchboard, router, connection, transport, peerConnection, peer))) } - def connect(aliceParams: NodeParams, remoteNodeId: PublicKey, switchboard: TestProbe, router: TestProbe, connection: TestProbe, transport: TestProbe, peerConnection: TestFSMRef[PeerConnection.State, PeerConnection.Data, PeerConnection], peer: TestProbe, remoteInit: protocol.Init = protocol.Init(Bob.nodeParams.features), doSync: Boolean = false): Unit = { + def connect(aliceParams: NodeParams, remoteNodeId: PublicKey, switchboard: TestProbe, router: TestProbe, connection: TestProbe, transport: TestProbe, peerConnection: TestFSMRef[PeerConnection.State, PeerConnection.Data, PeerConnection], peer: TestProbe, remoteInit: protocol.Init = protocol.Init(Bob.nodeParams.features), doSync: Boolean = false, isPersistent: Boolean = true): Unit = { // let's simulate a connection val probe = TestProbe() - probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = None, transport_opt = Some(transport.ref))) + probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = None, transport_opt = Some(transport.ref), isPersistent = isPersistent)) transport.send(peerConnection, TransportHandler.HandshakeCompleted(remoteNodeId)) switchboard.expectMsg(PeerConnection.Authenticated(peerConnection, remoteNodeId)) probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref, aliceParams.chainHash, aliceParams.features, doSync)) @@ -96,7 +97,7 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi import f._ val probe = TestProbe() probe.watch(peerConnection) - probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = None, transport_opt = Some(transport.ref))) + probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = None, transport_opt = Some(transport.ref), isPersistent = true)) transport.ref ! PoisonPill probe.expectTerminated(peerConnection, 100 millis) } @@ -106,7 +107,7 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi val probe = TestProbe() val origin = TestProbe() probe.watch(peerConnection) - probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = Some(origin.ref), transport_opt = Some(transport.ref))) + probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = Some(origin.ref), transport_opt = Some(transport.ref), isPersistent = true)) probe.expectTerminated(peerConnection, nodeParams.peerConnectionConf.authTimeout / transport.testKitSettings.TestTimeFactor + 1.second) // we don't want dilated time here origin.expectMsg(PeerConnection.ConnectionResult.AuthenticationFailed("authentication timed out")) } @@ -116,7 +117,7 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi val probe = TestProbe() val origin = TestProbe() probe.watch(peerConnection) - probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = Some(origin.ref), transport_opt = Some(transport.ref))) + probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = Some(origin.ref), transport_opt = Some(transport.ref), isPersistent = true)) transport.send(peerConnection, TransportHandler.HandshakeCompleted(remoteNodeId)) probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref, nodeParams.chainHash, nodeParams.features, doSync = true)) probe.expectTerminated(peerConnection, nodeParams.peerConnectionConf.initTimeout / transport.testKitSettings.TestTimeFactor + 1.second) // we don't want dilated time here @@ -128,7 +129,7 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi val probe = TestProbe() val origin = TestProbe() probe.watch(transport.ref) - probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = Some(origin.ref), transport_opt = Some(transport.ref))) + probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = Some(origin.ref), transport_opt = Some(transport.ref), isPersistent = true)) transport.send(peerConnection, TransportHandler.HandshakeCompleted(remoteNodeId)) probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref, nodeParams.chainHash, nodeParams.features, doSync = true)) transport.expectMsgType[TransportHandler.Listener] @@ -144,7 +145,7 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi val probe = TestProbe() val origin = TestProbe() probe.watch(transport.ref) - probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = Some(origin.ref), transport_opt = Some(transport.ref))) + probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = Some(origin.ref), transport_opt = Some(transport.ref), isPersistent = true)) transport.send(peerConnection, TransportHandler.HandshakeCompleted(remoteNodeId)) probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref, nodeParams.chainHash, nodeParams.features, doSync = true)) transport.expectMsgType[TransportHandler.Listener] @@ -160,7 +161,7 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi val probe = TestProbe() val origin = TestProbe() probe.watch(transport.ref) - probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = Some(origin.ref), transport_opt = Some(transport.ref))) + probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = Some(origin.ref), transport_opt = Some(transport.ref), isPersistent = true)) transport.send(peerConnection, TransportHandler.HandshakeCompleted(remoteNodeId)) probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref, nodeParams.chainHash, nodeParams.features, doSync = true)) transport.expectMsgType[TransportHandler.Listener] @@ -177,7 +178,7 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi val probe = TestProbe() val origin = TestProbe() probe.watch(transport.ref) - probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = Some(origin.ref), transport_opt = Some(transport.ref))) + probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = Some(origin.ref), transport_opt = Some(transport.ref), isPersistent = true)) transport.send(peerConnection, TransportHandler.HandshakeCompleted(remoteNodeId)) probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref, nodeParams.chainHash, nodeParams.features, doSync = true)) transport.expectMsgType[TransportHandler.Listener] @@ -372,5 +373,53 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi assert(new String(warn2.data.toArray).startsWith("invalid announcement sig")) } + test("establish transient connection") { f => + import f._ + connect(nodeParams, remoteNodeId, switchboard, router, connection, transport, peerConnection, peer, isPersistent = false) + val probe = TestProbe() + val (_, message) = buildMessage(randomKey(), randomKey(), Nil, Left(Recipient(remoteNodeId, None)), Nil) + probe.send(peerConnection, message) + probe watch peerConnection + probe.expectTerminated(peerConnection, max = Duration(1500, MILLISECONDS)) + } + + def sleep(duration: FiniteDuration): Unit = { + val probe = TestProbe() + system.scheduler.scheduleOnce(duration, probe.ref, ())(system.dispatcher) + probe.expectMsg(()) + } + + test("keep using transient connection") { f => + import f._ + connect(nodeParams, remoteNodeId, switchboard, router, connection, transport, peerConnection, peer, isPersistent = false) + val probe = TestProbe() + val (_, message) = buildMessage(randomKey(), randomKey(), Nil, Left(Recipient(remoteNodeId, None)), Nil) + probe.send(peerConnection, message) + probe watch peerConnection + sleep(900 millis) + assert(peerConnection.stateName === PeerConnection.CONNECTED) + probe.send(peerConnection, message) + sleep(900 millis) + assert(peerConnection.stateName === PeerConnection.CONNECTED) + probe.send(peerConnection, message) + sleep(900 millis) + assert(peerConnection.stateName === PeerConnection.CONNECTED) + sleep(200 millis) + probe.expectTerminated(peerConnection, max = Duration.Zero) + } + + test("convert transient connection to persistent") { f => + import f._ + connect(nodeParams, remoteNodeId, switchboard, router, connection, transport, peerConnection, peer, isPersistent = false) + val probe = TestProbe() + val (_, message) = buildMessage(randomKey(), randomKey(), Nil, Left(Recipient(remoteNodeId, None)), Nil) + probe.send(peerConnection, message) + assert(peerConnection.stateName === PeerConnection.CONNECTED) + probe.send(peerConnection, FundingLocked(ByteVector32(hex"0000000000000000000000000000000000000000000000000000000000000000"), randomKey().publicKey)) + peerConnection.stateData match { + case d : PeerConnection.ConnectedData => assert(d.isPersistent) + case _ => fail() + } + } } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala index 0a6e89576..29a428dcd 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala @@ -112,7 +112,7 @@ class PeerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with Paralle val probe = TestProbe() probe.send(peer, Peer.Init(Set.empty)) - probe.send(peer, Peer.Connect(remoteNodeId, address_opt = None, probe.ref)) + probe.send(peer, Peer.Connect(remoteNodeId, address_opt = None, probe.ref, isPersistent = true)) probe.expectMsg(PeerConnection.ConnectionResult.NoAddressFound) } @@ -129,7 +129,7 @@ class PeerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with Paralle val probe = TestProbe() probe.send(peer, Peer.Init(Set.empty)) // we have auto-reconnect=false so we need to manually tell the peer to reconnect - probe.send(peer, Peer.Connect(remoteNodeId, Some(mockAddress), probe.ref)) + probe.send(peer, Peer.Connect(remoteNodeId, Some(mockAddress), probe.ref, isPersistent = true)) // assert our mock server got an incoming connection (the client was spawned with the address from node_announcement) awaitCond(mockServer.accept() != null, max = 30 seconds, interval = 1 second) @@ -164,7 +164,7 @@ class PeerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with Paralle val probe = TestProbe() connect(remoteNodeId, peer, peerConnection, switchboard, channels = Set(ChannelCodecsSpec.normal)) - probe.send(peer, Peer.Connect(remoteNodeId, None, probe.ref)) + probe.send(peer, Peer.Connect(remoteNodeId, None, probe.ref, isPersistent = true)) probe.expectMsgType[PeerConnection.ConnectionResult.AlreadyConnected] } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/io/ReconnectionTaskSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/io/ReconnectionTaskSpec.scala index 35761b8b5..f722da980 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/io/ReconnectionTaskSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/io/ReconnectionTaskSpec.scala @@ -214,7 +214,7 @@ class ReconnectionTaskSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike val peer = TestProbe() // we have auto-reconnect=false so we need to manually tell the peer to reconnect - peer.send(reconnectionTask, Peer.Connect(remoteNodeId, None, peer.ref)) + peer.send(reconnectionTask, Peer.Connect(remoteNodeId, None, peer.ref, isPersistent = true)) // assert our mock server got an incoming connection (the client was spawned with the address from node_announcement) awaitCond(mockServer.accept() != null, max = 60 seconds, interval = 1 second) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/io/SwitchboardSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/io/SwitchboardSpec.scala index db238c9f7..723e8abe5 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/io/SwitchboardSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/io/SwitchboardSpec.scala @@ -38,7 +38,7 @@ class SwitchboardSpec extends TestKitBaseClass with AnyFunSuiteLike { nodeParams.db.network.addNode(NodeAnnouncement(ByteVector64.Zeroes, Features.empty, 0 unixsec, remoteNodeId, Color(0, 0, 0), "alias", remoteNodeAddress :: Nil)) val switchboard = TestActorRef(new Switchboard(nodeParams, FakePeerFactory(probe, peer))) - probe.send(switchboard, Peer.Connect(remoteNodeId, None, probe.ref)) + probe.send(switchboard, Peer.Connect(remoteNodeId, None, probe.ref, isPersistent = true)) probe.expectMsg(remoteNodeId) peer.expectMsg(Peer.Init(Set.empty)) val connect = peer.expectMsgType[Peer.Connect] @@ -51,7 +51,7 @@ class SwitchboardSpec extends TestKitBaseClass with AnyFunSuiteLike { val (probe, peer) = (TestProbe(), TestProbe()) val remoteNodeId = randomKey().publicKey val switchboard = TestActorRef(new Switchboard(nodeParams, FakePeerFactory(probe, peer))) - probe.send(switchboard, Peer.Connect(remoteNodeId, None, probe.ref)) + probe.send(switchboard, Peer.Connect(remoteNodeId, None, probe.ref, isPersistent = true)) probe.expectMsg(remoteNodeId) peer.expectMsg(Peer.Init(Set.empty)) peer.expectMsgType[Peer.Connect] @@ -125,7 +125,7 @@ class SwitchboardSpec extends TestKitBaseClass with AnyFunSuiteLike { val (probe, peer) = (TestProbe(), TestProbe()) val switchboard = TestActorRef(new Switchboard(Alice.nodeParams, FakePeerFactory(probe, peer))) val knownPeerNodeId = randomKey().publicKey - probe.send(switchboard, Peer.Connect(knownPeerNodeId, None, probe.ref)) + probe.send(switchboard, Peer.Connect(knownPeerNodeId, None, probe.ref, isPersistent = true)) probe.expectMsg(knownPeerNodeId) peer.expectMsgType[Peer.Init] peer.expectMsgType[Peer.Connect]