diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index aff104005..ea78912db 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -32,7 +32,7 @@ jobs: # NB: we exclude external API tests from the CI, because we don't want our build to fail because a dependency is failing. # This means we won't automatically catch changes in external APIs, but developers should regularly run the test suite locally so in practice it shouldn't be a problem. - name: Build with Maven - run: mvn compile && mvn scoverage:report -DtagsToExclude=external-api + run: mvn test-compile && mvn scoverage:report -DtagsToExclude=external-api - name: Upload coverage to Codecov uses: codecov/codecov-action@v1 diff --git a/eclair-core/pom.xml b/eclair-core/pom.xml index c0b595999..bab47ed49 100644 --- a/eclair-core/pom.xml +++ b/eclair-core/pom.xml @@ -74,6 +74,7 @@ test-jar + test-compile @@ -142,6 +143,21 @@ akka-slf4j_${scala.version.short} ${akka.version} + + com.typesafe.akka + akka-cluster_${scala.version.short} + ${akka.version} + + + com.typesafe.akka + akka-cluster-typed_${scala.version.short} + ${akka.version} + + + com.typesafe.akka + akka-cluster-tools_${scala.version.short} + ${akka.version} + com.softwaremill.sttp diff --git a/eclair-core/src/main/resources/reference.conf b/eclair-core/src/main/resources/reference.conf index b343c430f..4baae434d 100644 --- a/eclair-core/src/main/resources/reference.conf +++ b/eclair-core/src/main/resources/reference.conf @@ -1,4 +1,5 @@ eclair { + datadir = ${user.home}"/.eclair" chain = "mainnet" // "regtest" for regtest, "testnet" for testnet, "mainnet" for mainnet @@ -254,10 +255,63 @@ akka { # configured receive buffer size. When using value 'unlimited' it will # try to read all from the receive buffer. # As per BOLT#8 lightning messages are at most 2 + 16 + 65535 + 16 = 65569bytes - # Currently the largest message is update_add_htlc (~1500b). # As a tradeoff to reduce the RAM consumption, in conjunction with tcp pull mode, # the default value is chosen to allow for a decent number of messages to be prefetched. - max-received-message-size = 16384b + max-received-message-size = 300000b } } + + actor { + warn-about-java-serializer-usage = on + allow-java-serialization = off + + serializers { + lightning = "fr.acinq.eclair.remote.LightningMessageSerializer" + eclair-internals = "fr.acinq.eclair.remote.EclairInternalsSerializer" + } + + serialization-bindings { + "fr.acinq.eclair.wire.LightningMessage" = lightning + "fr.acinq.eclair.remote.EclairInternalsSerializer$RemoteTypes" = eclair-internals + } + } + + remote.artery { + transport = "tcp" // switching to tls-tcp is highly recommended in a production environment + + // We are using a simple setup (https://doc.akka.io/docs/akka/current/remoting-artery.html#remote-security): + // > Have a single set of keys and a single certificate for all nodes and disable hostname checking + // > - The single set of keys and the single certificate is distributed to all nodes. The certificate can be self-signed as it is distributed both as a certificate for authentication but also as the trusted certificate. + // > - If the keys/certificate are lost, someone else can connect to your cluster. + // > - Adding nodes to the cluster is simple as the key material can be deployed / distributed to the new node. + // Command line used to generate the self-signed certificate: + // keytool -genkeypair -v \ + // -keystore akka-cluster-tls.jks \ + // -dname "O=ACINQ, C=FR" \ + // -keypass:env PW \ + // -storepass:env PW \ + // -keyalg RSA \ + // -keysize 4096 \ + // -validity 9999 + ssl.config-ssl-engine { + key-store = ${eclair.datadir}"/akka-cluster-tls.jks" + trust-store = ${eclair.datadir}"/akka-cluster-tls.jks" + + key-store-password = ${?AKKA_TLS_PASSWORD} + key-password = ${?AKKA_TLS_PASSWORD} + trust-store-password = ${?AKKA_TLS_PASSWORD} + + protocol = "TLSv1.2" + + enabled-algorithms = [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256] + } + } + + cluster { + role { + backend.min-nr-of-members = 1 + frontend.min-nr-of-members = 0 + } + seed-nodes = [ "akka://eclair-node@127.0.0.1:25520" ] + } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelTypes.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelTypes.scala index 7add102c5..7393c6ba9 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelTypes.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelTypes.scala @@ -18,7 +18,7 @@ package fr.acinq.eclair.channel import java.util.UUID -import akka.actor.ActorRef +import akka.actor.{ActorRef, PossiblyHarmful} import fr.acinq.bitcoin.Crypto.PublicKey import fr.acinq.bitcoin.{ByteVector32, DeterministicWallet, OutPoint, Satoshi, Transaction} import fr.acinq.eclair.blockchain.fee.FeeratePerKw @@ -94,7 +94,7 @@ case object INPUT_DISCONNECTED case class INPUT_RECONNECTED(remote: ActorRef, localInit: Init, remoteInit: Init) case class INPUT_RESTORED(data: HasCommitments) -sealed trait BitcoinEvent +sealed trait BitcoinEvent extends PossiblyHarmful case object BITCOIN_FUNDING_PUBLISH_FAILED extends BitcoinEvent case object BITCOIN_FUNDING_DEPTHOK extends BitcoinEvent case object BITCOIN_FUNDING_DEEPLYBURIED extends BitcoinEvent @@ -166,7 +166,7 @@ object Origin { } /** should not be used directly */ -sealed trait Command +sealed trait Command extends PossiblyHarmful sealed trait HasReplyToCommand extends Command { def replyTo: ActorRef } sealed trait HasOptionalReplyToCommand extends Command { def replyTo_opt: Option[ActorRef] } @@ -254,7 +254,7 @@ object ChannelOpenResponse { 8888888P" d88P 888 888 d88P 888 */ -sealed trait Data { +sealed trait Data extends PossiblyHarmful { def channelId: ByteVector32 } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/crypto/TransportHandler.scala b/eclair-core/src/main/scala/fr/acinq/eclair/crypto/TransportHandler.scala index 8443d602a..6b77d2615 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/crypto/TransportHandler.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/crypto/TransportHandler.scala @@ -17,7 +17,6 @@ package fr.acinq.eclair.crypto import java.nio.ByteOrder - import akka.actor.{Actor, ActorRef, ExtendedActorSystem, FSM, PoisonPill, Props, Terminated} import akka.event.Logging.MDC import akka.event._ @@ -28,11 +27,15 @@ import fr.acinq.bitcoin.Protocol import fr.acinq.eclair.Logs.LogCategory import fr.acinq.eclair.crypto.ChaCha20Poly1305.ChaCha20Poly1305Error import fr.acinq.eclair.crypto.Noise._ -import fr.acinq.eclair.wire.{AnnouncementSignatures, ChannelAnnouncement, ChannelUpdate, NodeAnnouncement, ReplyShortChannelIdsEnd, RoutingMessage} +import fr.acinq.eclair.remote.EclairInternalsSerializer.RemoteTypes +import fr.acinq.eclair.wire.{AnnouncementSignatures, RoutingMessage} import fr.acinq.eclair.{Diagnostics, FSMDiagnosticActorLogging, Logs} +import fr.acinq.eclair.wire.{ChannelAnnouncement, ChannelUpdate, NodeAnnouncement} +import fr.acinq.eclair.{Diagnostics, FSMDiagnosticActorLogging, Logs, getSimpleClassName} import scodec.bits.ByteVector import scodec.{Attempt, Codec, DecodeResult} +import java.nio.ByteOrder import scala.annotation.tailrec import scala.collection.immutable.Queue import scala.reflect.ClassTag @@ -256,7 +259,7 @@ class TransportHandler[T: ClassTag](keyPair: KeyPair, rs: Option[ByteVector], co stop(FSM.Normal) case Event(Terminated(actor), _) if actor == connection => - log.info(s"connection terminated, stopping the transport") + log.info("connection actor died") // this can be the connection or the listener, either way it is a cause of death stop(FSM.Normal) @@ -272,6 +275,13 @@ class TransportHandler[T: ClassTag](keyPair: KeyPair, rs: Option[ByteVector], co onTermination { case _: StopEvent => connection ! Tcp.Close // attempts to gracefully close the connection when dying + stateData match { + case normal: NormalData[_] => + // NB: we deduplicate on the class name: each class will appear once but there may be many instances (less verbose and gives debug hints) + log.info("stopping (unackedReceived={} unackedSent={})", normal.unackedReceived.keys.map(getSimpleClassName).toSet.mkString(","), normal.unackedSent.map(getSimpleClassName)) + case _ => + log.info("stopping") + } } initialize() @@ -443,7 +453,7 @@ object TransportHandler { case class HandshakeCompleted(remoteNodeId: PublicKey) - case class ReadAck(msg: Any) + case class ReadAck(msg: Any) extends RemoteTypes case object WriteAck extends Tcp.Event // @formatter:on diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/io/ClientSpawner.scala b/eclair-core/src/main/scala/fr/acinq/eclair/io/ClientSpawner.scala index c1a2abeb4..505a64e2a 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/io/ClientSpawner.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/io/ClientSpawner.scala @@ -18,19 +18,46 @@ package fr.acinq.eclair.io import java.net.InetSocketAddress -import akka.actor.{Actor, ActorLogging, ActorRef, Props} +import akka.actor.{Actor, ActorLogging, ActorRef, DeadLetter, Props} +import akka.cluster.Cluster +import akka.cluster.pubsub.DistributedPubSub +import akka.cluster.pubsub.DistributedPubSubMediator.Put import fr.acinq.bitcoin.Crypto.PublicKey import fr.acinq.eclair.crypto.Noise.KeyPair +import fr.acinq.eclair.remote.EclairInternalsSerializer.RemoteTypes import fr.acinq.eclair.tor.Socks5ProxyParams class ClientSpawner(keyPair: KeyPair, socks5ProxyParams_opt: Option[Socks5ProxyParams], peerConnectionConf: PeerConnection.Conf, switchboard: ActorRef, router: ActorRef) extends Actor with ActorLogging { context.system.eventStream.subscribe(self, classOf[ClientSpawner.ConnectionRequest]) + if (context.system.hasExtension(Cluster)) { + val roles = context.system.extension(Cluster).selfRoles + if (roles.contains("frontend")) { + val mediator = DistributedPubSub(context.system).mediator + mediator ! Put(self) + } else if (roles.contains("backend")) { + // When the cluster is enabled, the backend will handle outgoing connections when there are no front available, by + // registering to the dead letters. + // Another option would have been to register the backend along with the front to the regular distributed pubsub, + // but, even with affinity=false at sending, it would have resulted in outgoing connections being randomly assigned + // to all listeners equally (front and back). What we really want is to always spawn connections on the frontend + // when possible + context.system.eventStream.subscribe(self, classOf[DeadLetter]) + } + } + override def receive: Receive = { case req: ClientSpawner.ConnectionRequest => log.info("initiating new connection to nodeId={} origin={}", req.remoteNodeId, sender) context.actorOf(Client.props(keyPair, socks5ProxyParams_opt, peerConnectionConf, switchboard, router, req.address, req.remoteNodeId, origin_opt = Some(req.origin))) + case DeadLetter(req: ClientSpawner.ConnectionRequest, _, _) => + // we only subscribe to the deadletters event stream when in cluster mode + // in that case we want to be warned when connections are spawned by the backend + log.warning("handling outgoing connection request locally") + self forward req + case _: DeadLetter => + // we don't care about other dead letters } } @@ -40,6 +67,5 @@ object ClientSpawner { case class ConnectionRequest(address: InetSocketAddress, remoteNodeId: PublicKey, - origin: ActorRef) - + origin: ActorRef) extends RemoteTypes } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala b/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala index dfaaee4a6..e62526a8c 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala @@ -16,9 +16,7 @@ package fr.acinq.eclair.io -import java.net.InetSocketAddress - -import akka.actor.{Actor, ActorRef, ExtendedActorSystem, FSM, OneForOneStrategy, PoisonPill, Props, Status, SupervisorStrategy, Terminated} +import akka.actor.{Actor, ActorRef, ExtendedActorSystem, FSM, OneForOneStrategy, PossiblyHarmful, Props, Status, SupervisorStrategy, Terminated} import akka.event.Logging.MDC import akka.event.{BusLogging, DiagnosticLoggingAdapter} import akka.util.Timeout @@ -31,10 +29,14 @@ import fr.acinq.eclair.blockchain.EclairWallet import fr.acinq.eclair.blockchain.fee.FeeratePerKw import fr.acinq.eclair.channel._ import fr.acinq.eclair.io.Monitoring.Metrics +import fr.acinq.eclair.io.PeerConnection.KillReason +import fr.acinq.eclair.remote.EclairInternalsSerializer.RemoteTypes import fr.acinq.eclair.wire._ import fr.acinq.eclair.{wire, _} import scodec.bits.ByteVector +import java.net.InetSocketAddress + /** * This actor represents a logical peer. There is one [[Peer]] per unique remote node id at all time. * @@ -98,7 +100,7 @@ class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, watcher: ActorRe case Event(err@wire.Error(channelId, reason), d: ConnectedData) if channelId == CHANNELID_ZERO => log.error(s"connection-level error, failing all channels! reason=${new String(reason.toArray)}") d.channels.values.toSet[ActorRef].foreach(_ forward err) // we deduplicate with toSet because there might be two entries per channel (tmp id and final id) - d.peerConnection ! PoisonPill + d.peerConnection ! PeerConnection.Kill(KillReason.AllChannelsFail) stay case Event(err: wire.Error, d: ConnectedData) => @@ -169,7 +171,7 @@ class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, watcher: ActorRe case Event(Disconnect(nodeId), d: ConnectedData) if nodeId == remoteNodeId => log.info("disconnecting") sender ! "disconnecting" - d.peerConnection ! PoisonPill + d.peerConnection ! PeerConnection.Kill(KillReason.UserRequest) stay case Event(ConnectionDown(peerConnection), d: ConnectedData) if peerConnection == d.peerConnection => @@ -190,13 +192,13 @@ class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, watcher: ActorRe log.info(s"channel closed: channelId=${channelIds.mkString("/")}") if (d.channels.values.toSet - actor == Set.empty) { log.info(s"that was the last open channel, closing the connection") - d.peerConnection ! PoisonPill + d.peerConnection ! PeerConnection.Kill(KillReason.NoRemainingChannel) } stay using d.copy(channels = d.channels -- channelIds) case Event(connectionReady: PeerConnection.ConnectionReady, d: ConnectedData) => log.info(s"got new connection, killing current one and switching") - d.peerConnection ! PoisonPill + d.peerConnection ! PeerConnection.Kill(KillReason.ConnectionReplaced) d.channels.values.toSet[ActorRef].foreach(_ ! INPUT_DISCONNECTED) // we deduplicate with toSet because there might be two entries per channel (tmp id and final id) gotoConnected(connectionReady, d.channels) @@ -380,8 +382,8 @@ object Peer { def apply(uri: NodeURI): Connect = new Connect(uri.nodeId, Some(uri.address)) } - case class Disconnect(nodeId: PublicKey) - case class OpenChannel(remoteNodeId: PublicKey, fundingSatoshis: Satoshi, pushMsat: MilliSatoshi, fundingTxFeeratePerKw_opt: Option[FeeratePerKw], initialRelayFees_opt: Option[(MilliSatoshi, Int)], channelFlags: Option[Byte], timeout_opt: Option[Timeout]) { + case class Disconnect(nodeId: PublicKey) extends PossiblyHarmful + case class OpenChannel(remoteNodeId: PublicKey, fundingSatoshis: Satoshi, pushMsat: MilliSatoshi, fundingTxFeeratePerKw_opt: Option[FeeratePerKw], initialRelayFees_opt: Option[(MilliSatoshi, Int)], channelFlags: Option[Byte], timeout_opt: Option[Timeout]) extends PossiblyHarmful { require(pushMsat <= fundingSatoshis, s"pushMsat must be less or equal to fundingSatoshis") require(fundingSatoshis >= 0.sat, s"fundingSatoshis must be positive") require(pushMsat >= 0.msat, s"pushMsat must be positive") @@ -390,7 +392,7 @@ object Peer { case object GetPeerInfo case class PeerInfo(nodeId: PublicKey, state: String, address: Option[InetSocketAddress], channels: Int) - case class PeerRoutingMessage(peerConnection: ActorRef, remoteNodeId: PublicKey, message: RoutingMessage) + case class PeerRoutingMessage(peerConnection: ActorRef, remoteNodeId: PublicKey, message: RoutingMessage) extends RemoteTypes case class Transition(previousData: Peer.Data, nextData: Peer.Data) @@ -398,7 +400,7 @@ object Peer { * Sent by the peer-connection to notify the peer that the connection is down. * We could use watchWith on the peer-connection but it doesn't work with akka cluster when untrusted mode is enabled */ - case class ConnectionDown(peerConnection: ActorRef) + case class ConnectionDown(peerConnection: ActorRef) extends RemoteTypes // @formatter:on diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/io/PeerConnection.scala b/eclair-core/src/main/scala/fr/acinq/eclair/io/PeerConnection.scala index 6d6fdf050..d35d316eb 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/io/PeerConnection.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/io/PeerConnection.scala @@ -27,6 +27,7 @@ import fr.acinq.eclair.crypto.Noise.KeyPair import fr.acinq.eclair.crypto.TransportHandler import fr.acinq.eclair.io.Monitoring.{Metrics, Tags} import fr.acinq.eclair.io.Peer.CHANNELID_ZERO +import fr.acinq.eclair.remote.EclairInternalsSerializer.RemoteTypes import fr.acinq.eclair.router.Router._ import fr.acinq.eclair.wire._ import fr.acinq.eclair.{wire, _} @@ -271,7 +272,7 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A val nodesSent = sendAndCount(rebroadcast.nodes) if (channelsSent > 0 || updatesSent > 0 || nodesSent > 0) { - log.info(s"sent announcements: channels={} updates={} nodes={}", channelsSent, updatesSent, nodesSent) + log.debug("sent announcements: channels={} updates={} nodes={}", channelsSent, updatesSent, nodesSent) } stay @@ -382,6 +383,12 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A } stop(FSM.Normal) + case Event(Kill(reason), _) => + Logs.withMdc(diagLog)(Logs.mdc(category_opt = Some(Logs.LogCategory.CONNECTION))) { + log.info(s"stopping with reason=$reason") + stop(FSM.Normal) + } + case Event(_: GossipDecision.Accepted, _) => stay // for now we don't do anything with those events case Event(_: GossipDecision.Rejected, _) => stay // we got disconnected while syncing @@ -505,16 +512,16 @@ object PeerConnection { case class PendingAuth(connection: ActorRef, remoteNodeId_opt: Option[PublicKey], address: InetSocketAddress, origin_opt: Option[ActorRef], transport_opt: Option[ActorRef] = None) { def outgoing: Boolean = remoteNodeId_opt.isDefined // if this is an outgoing connection, we know the node id in advance } - case class Authenticated(peerConnection: ActorRef, remoteNodeId: PublicKey) - case class InitializeConnection(peer: ActorRef, chainHash: ByteVector32, features: Features, doSync: Boolean) - case class ConnectionReady(peerConnection: ActorRef, remoteNodeId: PublicKey, address: InetSocketAddress, outgoing: Boolean, localInit: wire.Init, remoteInit: wire.Init) + case class Authenticated(peerConnection: ActorRef, remoteNodeId: PublicKey) extends RemoteTypes + case class InitializeConnection(peer: ActorRef, chainHash: ByteVector32, features: Features, doSync: Boolean) extends RemoteTypes + case class ConnectionReady(peerConnection: ActorRef, remoteNodeId: PublicKey, address: InetSocketAddress, outgoing: Boolean, localInit: wire.Init, remoteInit: wire.Init) extends RemoteTypes - sealed trait ConnectionResult + sealed trait ConnectionResult extends RemoteTypes object ConnectionResult { sealed trait Success extends ConnectionResult sealed trait Failure extends ConnectionResult - case class NoAddressFound(remoteNodeId: PublicKey) extends ConnectionResult.Failure { override def toString: String = "no address found" } + case object NoAddressFound extends ConnectionResult.Failure { override def toString: String = "no address found" } case class ConnectionFailed(address: InetSocketAddress) extends ConnectionResult.Failure { override def toString: String = s"connection failed to $address" } case class AuthenticationFailed(reason: String) extends ConnectionResult.Failure { override def toString: String = reason } case class InitializationFailed(reason: String) extends ConnectionResult.Failure { override def toString: String = reason } @@ -526,6 +533,19 @@ object PeerConnection { case class Behavior(fundingTxAlreadySpentCount: Int = 0, ignoreNetworkAnnouncement: Boolean = false) + /** + * Kill the actor and the underlying connection. We can't use a [[PoisonPill]] because it would be dropped when using + * akka cluster with unstrusted mode enabled. + */ + case class Kill(reason: KillReason) extends RemoteTypes + + sealed trait KillReason + object KillReason { + case object UserRequest extends KillReason + case object NoRemainingChannel extends KillReason + case object AllChannelsFail extends KillReason + case object ConnectionReplaced extends KillReason + } // @formatter:on /** diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/io/ReconnectionTask.scala b/eclair-core/src/main/scala/fr/acinq/eclair/io/ReconnectionTask.scala index 0c1b08eba..f11b53896 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/io/ReconnectionTask.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/io/ReconnectionTask.scala @@ -18,7 +18,10 @@ package fr.acinq.eclair.io import java.net.InetSocketAddress -import akka.actor.{ActorRef, Props, Status} +import akka.actor.{ActorRef, Props} +import akka.cluster.Cluster +import akka.cluster.pubsub.DistributedPubSub +import akka.cluster.pubsub.DistributedPubSubMediator.Send import akka.event.Logging.MDC import com.google.common.net.HostAndPort import fr.acinq.bitcoin.Crypto.PublicKey @@ -84,7 +87,8 @@ class ReconnectionTask(nodeParams: NodeParams, remoteNodeId: PublicKey) extends val (initialDelay, firstNextReconnectionDelay) = (previousPeerData, d.previousData) match { case (Peer.Nothing, _) => // When restarting, we add some randomization before the first reconnection attempt to avoid herd effect - val initialDelay = randomizeDelay(nodeParams.initialRandomReconnectDelay) + // We also add a fixed delay to give time to the front to boot up + val initialDelay = nodeParams.initialRandomReconnectDelay + randomizeDelay(nodeParams.initialRandomReconnectDelay) // When restarting, we will ~immediately reconnect, but then: // - we don't want all the subsequent reconnection attempts to be synchronized (herd effect) // - we don't want to go through the exponential backoff delay, because we were offline, not them, so there is no @@ -105,7 +109,8 @@ class ReconnectionTask(nodeParams: NodeParams, remoteNodeId: PublicKey) extends // Randomizing the initial delay is important in the case of a reconnection. If both peers have a public // address, they may attempt to simultaneously connect back to each other, which could result in reconnection loop, // given that each new connection will cause the previous one to be killed. - val initialDelay = randomizeDelay(nodeParams.initialRandomReconnectDelay) + // We also add a fixed delay to give time to the front to boot up + val initialDelay = nodeParams.initialRandomReconnectDelay + randomizeDelay(nodeParams.initialRandomReconnectDelay) val firstNextReconnectionDelay = nextReconnectionDelay(initialDelay, nodeParams.maxReconnectInterval) log.info("peer is disconnected, next reconnection in {}", initialDelay) (initialDelay, firstNextReconnectionDelay) @@ -134,16 +139,24 @@ class ReconnectionTask(nodeParams: NodeParams, remoteNodeId: PublicKey) extends .map(hostAndPort2InetSocketAddress) .orElse(getPeerAddressFromDb(nodeParams.db.peers, nodeParams.db.network, remoteNodeId)) match { case Some(address) => connect(address, origin = sender) - case None => sender ! PeerConnection.ConnectionResult.NoAddressFound(remoteNodeId) + case None => sender ! PeerConnection.ConnectionResult.NoAddressFound } stay } private def setReconnectTimer(delay: FiniteDuration): Unit = setTimer(RECONNECT_TIMER, TickReconnect, delay, repeat = false) + // activate the extension only on demand, so that tests pass + lazy val mediator = DistributedPubSub(context.system).mediator + private def connect(address: InetSocketAddress, origin: ActorRef): Unit = { log.info(s"connecting to $address") - context.system.eventStream.publish(ClientSpawner.ConnectionRequest(address, remoteNodeId, origin)) + val req = ClientSpawner.ConnectionRequest(address, remoteNodeId, origin) + if (context.system.hasExtension(Cluster) && !address.getHostName.endsWith("onion")) { + mediator ! Send(path = "/user/client-spawner", msg = req, localAffinity = false) + } else { + context.system.eventStream.publish(req) + } Metrics.ReconnectionsAttempts.withoutTags().increment() } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/io/Switchboard.scala b/eclair-core/src/main/scala/fr/acinq/eclair/io/Switchboard.scala index 822d2a990..d00675668 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/io/Switchboard.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/io/Switchboard.scala @@ -22,6 +22,8 @@ import fr.acinq.eclair.NodeParams import fr.acinq.eclair.blockchain.EclairWallet import fr.acinq.eclair.channel.Helpers.Closing import fr.acinq.eclair.channel._ +import fr.acinq.eclair.remote.EclairInternalsSerializer.RemoteTypes +import fr.acinq.eclair.router.Router.RouterConf /** * Ties network connections to peers. @@ -78,6 +80,7 @@ class Switchboard(nodeParams: NodeParams, watcher: ActorRef, relayer: ActorRef, case Symbol("peers") => sender ! context.children + case GetRouterPeerConf => sender ! RouterPeerConf(nodeParams.routerConf, nodeParams.peerConnectionConf) } /** @@ -115,4 +118,7 @@ object Switchboard { def peerActorName(remoteNodeId: PublicKey): String = s"peer-$remoteNodeId" + case object GetRouterPeerConf extends RemoteTypes + case class RouterPeerConf(routerConf: RouterConf, peerConf: PeerConnection.Conf) extends RemoteTypes + } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/remote/EclairInternalsSerializer.scala b/eclair-core/src/main/scala/fr/acinq/eclair/remote/EclairInternalsSerializer.scala new file mode 100644 index 000000000..5fae8c217 --- /dev/null +++ b/eclair-core/src/main/scala/fr/acinq/eclair/remote/EclairInternalsSerializer.scala @@ -0,0 +1,182 @@ +/* + * Copyright 2020 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair.remote + +import akka.actor.{ActorRef, ExtendedActorSystem} +import akka.serialization.Serialization +import fr.acinq.bitcoin.Crypto.PublicKey +import fr.acinq.eclair.crypto.TransportHandler +import fr.acinq.eclair.io.Peer.PeerRoutingMessage +import fr.acinq.eclair.io.Switchboard.RouterPeerConf +import fr.acinq.eclair.io.{ClientSpawner, Peer, PeerConnection, Switchboard} +import fr.acinq.eclair.router.Router.{GossipDecision, RouterConf, SendChannelQuery} +import fr.acinq.eclair.router._ +import fr.acinq.eclair.wire.CommonCodecs._ +import fr.acinq.eclair.wire.LightningMessageCodecs._ +import fr.acinq.eclair.wire.QueryChannelRangeTlv.queryFlagsCodec +import fr.acinq.eclair.wire._ +import fr.acinq.eclair.{CltvExpiryDelta, Features} +import scodec._ +import scodec.codecs._ + +import java.net.{InetAddress, InetSocketAddress} +import scala.concurrent.duration._ + +class EclairInternalsSerializer(val system: ExtendedActorSystem) extends ScodecSerializer(43, EclairInternalsSerializer.codec(system)) + +object EclairInternalsSerializer { + + trait RemoteTypes extends Serializable + + def finiteDurationCodec: Codec[FiniteDuration] = int64.xmap(_.milliseconds, _.toMillis) + + def iterable[A](codec: Codec[A]): Codec[Iterable[A]] = listOfN(uint16, codec).xmap(_.toIterable, _.toList) + + val routerConfCodec: Codec[RouterConf] = ( + ("randomizeRouteSelection" | bool(8)) :: + ("channelExcludeDuration" | finiteDurationCodec) :: + ("routerBroadcastInterval" | finiteDurationCodec) :: + ("networkStatsRefreshInterval" | finiteDurationCodec) :: + ("requestNodeAnnouncements" | bool(8)) :: + ("encodingType" | discriminated[EncodingType].by(uint8) + .typecase(0, provide(EncodingType.UNCOMPRESSED)) + .typecase(1, provide(EncodingType.COMPRESSED_ZLIB))) :: + ("channelRangeChunkSize" | int32) :: + ("channelQueryChunkSize" | int32) :: + ("searchMaxFeeBase" | satoshi) :: + ("searchMaxFeePct" | double) :: + ("searchMaxRouteLength" | int32) :: + ("searchMaxCltv" | int32.as[CltvExpiryDelta]) :: + ("searchHeuristicsEnabled" | bool(8)) :: + ("searchRatioCltv" | double) :: + ("searchRatioChannelAge" | double) :: + ("searchRatioChannelCapacity" | double) :: + ("mppMinPartAmount" | millisatoshi) :: + ("mppMaxParts" | int32)).as[RouterConf] + + val overrideFeaturesListCodec: Codec[List[(PublicKey, Features)]] = listOfN(uint16, publicKey ~ variableSizeBytes(uint16, featuresCodec)) + + val peerConnectionConfCodec: Codec[PeerConnection.Conf] = ( + ("authTimeout" | finiteDurationCodec) :: + ("initTimeout" | finiteDurationCodec) :: + ("pingInterval" | finiteDurationCodec) :: + ("pingTimeout" | finiteDurationCodec) :: + ("pingDisconnect" | bool(8)) :: + ("maxRebroadcastDelay" | finiteDurationCodec)).as[PeerConnection.Conf] + + val peerConnectionKillReasonCodec: Codec[PeerConnection.KillReason] = discriminated[PeerConnection.KillReason].by(uint16) + .typecase(0, provide(PeerConnection.KillReason.UserRequest)) + .typecase(1, provide(PeerConnection.KillReason.NoRemainingChannel)) + .typecase(2, provide(PeerConnection.KillReason.AllChannelsFail)) + .typecase(3, provide(PeerConnection.KillReason.ConnectionReplaced)) + + val peerConnectionKillCodec: Codec[PeerConnection.Kill] = peerConnectionKillReasonCodec.as[PeerConnection.Kill] + + val lengthPrefixedInitCodec: Codec[Init] = variableSizeBytes(uint16, initCodec) + val lengthPrefixedNodeAnnouncementCodec: Codec[NodeAnnouncement] = variableSizeBytes(uint16, nodeAnnouncementCodec) + val lengthPrefixedChannelAnnouncementCodec: Codec[ChannelAnnouncement] = variableSizeBytes(uint16, channelAnnouncementCodec) + val lengthPrefixedChannelUpdateCodec: Codec[ChannelUpdate] = variableSizeBytes(uint16, channelUpdateCodec) + val lengthPrefixedAnnouncementCodec: Codec[AnnouncementMessage] = variableSizeBytes(uint16, lightningMessageCodec.downcast[AnnouncementMessage]) + val lengthPrefixedLightningMessageCodec: Codec[LightningMessage] = variableSizeBytes(uint16, lightningMessageCodec) + + def actorRefCodec(system: ExtendedActorSystem): Codec[ActorRef] = variableSizeBytes(uint16, utf8).xmap( + (path: String) => system.provider.resolveActorRef(path), + (actor: ActorRef) => Serialization.serializedActorPath(actor)) + + val inetAddressCodec: Codec[InetAddress] = discriminated[InetAddress].by(uint8) + .typecase(0, ipv4address) + .typecase(1, ipv6address) + + val inetSocketAddressCodec: Codec[InetSocketAddress] = (inetAddressCodec ~ uint16).xmap({ case (addr, port) => new InetSocketAddress(addr, port) }, socketAddr => (socketAddr.getAddress, socketAddr.getPort)) + + def connectionRequestCodec(system: ExtendedActorSystem): Codec[ClientSpawner.ConnectionRequest] = ( + ("address" | inetSocketAddressCodec) :: + ("remoteNodeId" | publicKey) :: + ("origin" | actorRefCodec(system))).as[ClientSpawner.ConnectionRequest] + + def initializeConnectionCodec(system: ExtendedActorSystem): Codec[PeerConnection.InitializeConnection] = ( + ("peer" | actorRefCodec(system)) :: + ("chainHash" | bytes32) :: + ("features" | variableSizeBytes(uint16, featuresCodec)) :: + ("doSync" | bool(8))).as[PeerConnection.InitializeConnection] + + def connectionReadyCodec(system: ExtendedActorSystem): Codec[PeerConnection.ConnectionReady] = ( + ("peerConnection" | actorRefCodec(system)) :: + ("remoteNodeId" | publicKey) :: + ("address" | inetSocketAddressCodec) :: + ("outgoing" | bool(8)) :: + ("localInit" | lengthPrefixedInitCodec) :: + ("remoteInit" | lengthPrefixedInitCodec)).as[PeerConnection.ConnectionReady] + + val optionQueryChannelRangeTlv: Codec[Option[QueryChannelRangeTlv]] = variableSizeBytes(uint16, optional(bool(8), variableSizeBytesLong(varintoverflow, queryFlagsCodec.upcast[QueryChannelRangeTlv]))) + + def sendChannelQueryCodec(system: ExtendedActorSystem): Codec[SendChannelQuery] = ( + ("chainsHash" | bytes32) :: + ("remoteNodeId" | publicKey) :: + ("to" | actorRefCodec(system)) :: + ("flags_opt" | optionQueryChannelRangeTlv)).as[SendChannelQuery] + + def peerRoutingMessageCodec(system: ExtendedActorSystem): Codec[PeerRoutingMessage] = ( + ("peerConnection" | actorRefCodec(system)) :: + ("remoteNodeId" | publicKey) :: + ("msg" | lengthPrefixedLightningMessageCodec.downcast[RoutingMessage])).as[PeerRoutingMessage] + + val singleChannelDiscoveredCodec: Codec[SingleChannelDiscovered] = (lengthPrefixedChannelAnnouncementCodec :: satoshi :: optional(bool(8), lengthPrefixedChannelUpdateCodec) :: optional(bool(8), lengthPrefixedChannelUpdateCodec)).as[SingleChannelDiscovered] + + val readAckCodec: Codec[TransportHandler.ReadAck] = lightningMessageCodec.upcast[Any].as[TransportHandler.ReadAck] + + def codec(system: ExtendedActorSystem): Codec[RemoteTypes] = discriminated[RemoteTypes].by(uint16) + .typecase(0, provide(Switchboard.GetRouterPeerConf)) + .typecase(1, (routerConfCodec :: peerConnectionConfCodec).as[RouterPeerConf]) + .typecase(5, readAckCodec) + .typecase(7, connectionRequestCodec(system)) + .typecase(10, (actorRefCodec(system) :: publicKey).as[PeerConnection.Authenticated]) + .typecase(11, initializeConnectionCodec(system)) + .typecase(12, connectionReadyCodec(system)) + .typecase(13, provide(PeerConnection.ConnectionResult.NoAddressFound)) + .typecase(14, inetSocketAddressCodec.as[PeerConnection.ConnectionResult.ConnectionFailed]) + .typecase(15, variableSizeBytes(uint16, utf8).as[PeerConnection.ConnectionResult.AuthenticationFailed]) + .typecase(16, variableSizeBytes(uint16, utf8).as[PeerConnection.ConnectionResult.InitializationFailed]) + .typecase(17, provide(PeerConnection.ConnectionResult.AlreadyConnected)) + .typecase(18, provide(PeerConnection.ConnectionResult.Connected)) + .typecase(19, actorRefCodec(system).as[Peer.ConnectionDown]) + .typecase(20, provide(Router.GetRoutingStateStreaming)) + .typecase(21, provide(Router.RoutingStateStreamingUpToDate)) + .typecase(22, sendChannelQueryCodec(system)) + .typecase(23, peerRoutingMessageCodec(system)) + .typecase(30, iterable(lengthPrefixedNodeAnnouncementCodec).as[NodesDiscovered]) + .typecase(31, lengthPrefixedNodeAnnouncementCodec.as[NodeUpdated]) + .typecase(32, publicKey.as[NodeLost]) + .typecase(33, iterable(singleChannelDiscoveredCodec).as[ChannelsDiscovered]) + .typecase(34, shortchannelid.as[ChannelLost]) + .typecase(35, iterable(lengthPrefixedChannelUpdateCodec).as[ChannelUpdatesReceived]) + .typecase(36, double.as[SyncProgress]) + .typecase(40, lengthPrefixedAnnouncementCodec.as[GossipDecision.Accepted]) + .typecase(41, lengthPrefixedAnnouncementCodec.as[GossipDecision.Duplicate]) + .typecase(42, lengthPrefixedAnnouncementCodec.as[GossipDecision.InvalidSignature]) + .typecase(43, lengthPrefixedNodeAnnouncementCodec.as[GossipDecision.NoKnownChannel]) + .typecase(44, lengthPrefixedChannelAnnouncementCodec.as[GossipDecision.ValidationFailure]) + .typecase(45, lengthPrefixedChannelAnnouncementCodec.as[GossipDecision.InvalidAnnouncement]) + .typecase(46, lengthPrefixedChannelAnnouncementCodec.as[GossipDecision.ChannelPruned]) + .typecase(47, lengthPrefixedChannelAnnouncementCodec.as[GossipDecision.ChannelClosing]) + .typecase(48, lengthPrefixedChannelUpdateCodec.as[GossipDecision.Stale]) + .typecase(49, lengthPrefixedChannelUpdateCodec.as[GossipDecision.NoRelatedChannel]) + .typecase(50, lengthPrefixedChannelUpdateCodec.as[GossipDecision.RelatedChannelPruned]) + .typecase(51, lengthPrefixedChannelAnnouncementCodec.as[GossipDecision.ChannelClosed]) + .typecase(52, peerConnectionKillCodec) + +} \ No newline at end of file diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/remote/LightningMessageSerializer.scala b/eclair-core/src/main/scala/fr/acinq/eclair/remote/LightningMessageSerializer.scala new file mode 100644 index 000000000..9910707f5 --- /dev/null +++ b/eclair-core/src/main/scala/fr/acinq/eclair/remote/LightningMessageSerializer.scala @@ -0,0 +1,21 @@ +/* + * Copyright 2019 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair.remote + +import fr.acinq.eclair.wire.LightningMessageCodecs.lightningMessageCodec + +class LightningMessageSerializer extends ScodecSerializer(42, lightningMessageCodec) \ No newline at end of file diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/remote/ScodecSerializer.scala b/eclair-core/src/main/scala/fr/acinq/eclair/remote/ScodecSerializer.scala new file mode 100644 index 000000000..d5ce91743 --- /dev/null +++ b/eclair-core/src/main/scala/fr/acinq/eclair/remote/ScodecSerializer.scala @@ -0,0 +1,41 @@ +/* + * Copyright 2020 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair.remote + +import java.nio.ByteBuffer + +import akka.serialization.{ByteBufferSerializer, SerializerWithStringManifest} +import scodec.Codec +import scodec.bits.BitVector + +class ScodecSerializer[T <: AnyRef](override val identifier: Int, codec: Codec[T]) extends SerializerWithStringManifest with ByteBufferSerializer { + + override def toBinary(o: AnyRef, buf: ByteBuffer): Unit = buf.put(toBinary(o)) + + override def fromBinary(buf: ByteBuffer, manifest: String): AnyRef = { + val bytes = Array.ofDim[Byte](buf.remaining()) + buf.get(bytes) + fromBinary(bytes, manifest) + } + + /** we don't rely on the manifest to provide backward compatibility, we will use dedicated codecs instead */ + override def manifest(o: AnyRef): String = fr.acinq.eclair.getSimpleClassName(o) + + override def toBinary(o: AnyRef): Array[Byte] = codec.encode(o.asInstanceOf[T]).require.toByteArray + + override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = codec.decode(BitVector(bytes)).require.value +} diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/NetworkEvents.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/NetworkEvents.scala index 46b797af6..9c3ca9ec9 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/NetworkEvents.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/NetworkEvents.scala @@ -19,12 +19,13 @@ package fr.acinq.eclair.router import fr.acinq.bitcoin.Crypto.PublicKey import fr.acinq.bitcoin.Satoshi import fr.acinq.eclair.ShortChannelId +import fr.acinq.eclair.remote.EclairInternalsSerializer.RemoteTypes import fr.acinq.eclair.wire.{ChannelAnnouncement, ChannelUpdate, NodeAnnouncement} /** * Created by PM on 02/02/2017. */ -trait NetworkEvent +trait NetworkEvent extends RemoteTypes case class NodesDiscovered(ann: Iterable[NodeAnnouncement]) extends NetworkEvent diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala index 2411bc878..97a9f8948 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala @@ -19,7 +19,7 @@ package fr.acinq.eclair.router import java.util.UUID import akka.Done -import akka.actor.{ActorRef, Props} +import akka.actor.{Actor, ActorLogging, ActorRef, Props, Terminated} import akka.event.DiagnosticLoggingAdapter import akka.event.Logging.MDC import fr.acinq.bitcoin.Crypto.PublicKey @@ -33,6 +33,7 @@ import fr.acinq.eclair.crypto.TransportHandler import fr.acinq.eclair.db.NetworkDb import fr.acinq.eclair.io.Peer.PeerRoutingMessage import fr.acinq.eclair.payment.PaymentRequest.ExtraHop +import fr.acinq.eclair.remote.EclairInternalsSerializer.RemoteTypes import fr.acinq.eclair.router.Graph.GraphStructure.DirectedGraph import fr.acinq.eclair.router.Graph.WeightRatios import fr.acinq.eclair.router.Monitoring.{Metrics, Tags} @@ -127,6 +128,33 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[ sender ! GetNetworkStatsResponse(d.stats) stay + case Event(GetRoutingStateStreaming, d) => + val listener = sender + d.nodes + .values + .sliding(100, 100) + .map(NodesDiscovered) + .foreach(listener ! _) + d.channels + .values + .map(pc => SingleChannelDiscovered(pc.ann, pc.capacity, pc.update_1_opt, pc.update_2_opt)) + .sliding(100, 100) + .map(ChannelsDiscovered) + .foreach(listener ! _) + listener ! RoutingStateStreamingUpToDate + context.actorOf(Props(new Actor with ActorLogging { + log.info(s"subscribing listener=$listener to network events") + context.system.eventStream.subscribe(listener, classOf[NetworkEvent]) + context.watch(listener) + override def receive: Receive = { + case Terminated(actor) if actor == listener=> + log.warning(s"unsubscribing listener=$listener to network events") + context.system.eventStream.unsubscribe(listener) + context stop self + } + })) + stay + case Event(TickBroadcast, d) => if (d.rebroadcast.channels.isEmpty && d.rebroadcast.updates.isEmpty && d.rebroadcast.nodes.isEmpty) { stay @@ -491,13 +519,13 @@ object Router { // @formatter:on // @formatter:off - case class SendChannelQuery(chainHash: ByteVector32, remoteNodeId: PublicKey, to: ActorRef, flags_opt: Option[QueryChannelRangeTlv]) + case class SendChannelQuery(chainHash: ByteVector32, remoteNodeId: PublicKey, to: ActorRef, flags_opt: Option[QueryChannelRangeTlv]) extends RemoteTypes case object GetNetworkStats case class GetNetworkStatsResponse(stats: Option[NetworkStats]) case object GetRoutingState case class RoutingState(channels: Iterable[PublicChannel], nodes: Iterable[NodeAnnouncement]) - case object GetRoutingStateStreaming - case object RoutingStateStreamingUpToDate + case object GetRoutingStateStreaming extends RemoteTypes + case object RoutingStateStreamingUpToDate extends RemoteTypes case object GetRouterData case object GetNodes case object GetLocalChannels @@ -513,7 +541,7 @@ object Router { /** Gossip that was generated by our node. */ case object LocalGossip extends GossipOrigin - sealed trait GossipDecision { def ann: AnnouncementMessage } + sealed trait GossipDecision extends RemoteTypes { def ann: AnnouncementMessage } object GossipDecision { case class Accepted(ann: AnnouncementMessage) extends GossipDecision diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/wire/LightningMessageTypes.scala b/eclair-core/src/main/scala/fr/acinq/eclair/wire/LightningMessageTypes.scala index 323c4579e..51d5560b3 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/wire/LightningMessageTypes.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/wire/LightningMessageTypes.scala @@ -34,7 +34,7 @@ import scala.util.Try */ // @formatter:off -sealed trait LightningMessage +sealed trait LightningMessage extends Serializable sealed trait SetupMessage extends LightningMessage sealed trait ChannelMessage extends LightningMessage sealed trait HtlcMessage extends LightningMessage diff --git a/eclair-core/src/test/resources/logback-test.xml b/eclair-core/src/test/resources/logback-test.xml index c302676a4..61d69722c 100644 --- a/eclair-core/src/test/resources/logback-test.xml +++ b/eclair-core/src/test/resources/logback-test.xml @@ -15,7 +15,7 @@ ~ limitations under the License. --> - + System.out diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala index 622d0a447..547ca85a4 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala @@ -16,9 +16,6 @@ package fr.acinq.eclair.io -import java.net.{InetAddress, ServerSocket, Socket} -import java.util.concurrent.Executors - import akka.actor.FSM import akka.actor.Status.Failure import akka.testkit.{TestFSMRef, TestProbe} @@ -38,6 +35,8 @@ import org.scalatest.funsuite.FixtureAnyFunSuiteLike import org.scalatest.{Outcome, Tag} import scodec.bits.ByteVector +import java.net.{InetAddress, ServerSocket, Socket} +import java.util.concurrent.Executors import scala.concurrent.duration._ import scala.concurrent.{ExecutionContext, Future} @@ -95,7 +94,7 @@ class PeerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with StateTe val probe = TestProbe() probe.send(peer, Peer.Init(Set.empty)) probe.send(peer, Peer.Connect(remoteNodeId, address_opt = None)) - probe.expectMsg(PeerConnection.ConnectionResult.NoAddressFound(remoteNodeId)) + probe.expectMsg(PeerConnection.ConnectionResult.NoAddressFound) } test("successfully connect to peer at user request") { f => @@ -191,28 +190,26 @@ class PeerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with StateTe test("handle new connection in state CONNECTED") { f => import f._ - connect(remoteNodeId, peer, peerConnection, channels = Set(ChannelCodecsSpec.normal)) - // this is just to extract inits - val Peer.ConnectedData(_, _, localInit, remoteInit, _) = peer.stateData - val peerConnection1 = peerConnection val peerConnection2 = TestProbe() val peerConnection3 = TestProbe() - val deathWatch = TestProbe() - deathWatch.watch(peerConnection1.ref) - deathWatch.watch(peerConnection2.ref) - deathWatch.watch(peerConnection3.ref) + connect(remoteNodeId, peer, peerConnection, channels = Set(ChannelCodecsSpec.normal)) + peerConnection1.expectMsgType[ChannelReestablish] + // this is just to extract inits + val Peer.ConnectedData(_, _, localInit, remoteInit, _) = peer.stateData peerConnection2.send(peer, PeerConnection.ConnectionReady(peerConnection2.ref, remoteNodeId, fakeIPAddress.socketAddress, outgoing = false, localInit, remoteInit)) // peer should kill previous connection - deathWatch.expectTerminated(peerConnection1.ref) + peerConnection1.expectMsg(PeerConnection.Kill(PeerConnection.KillReason.ConnectionReplaced)) awaitCond(peer.stateData.asInstanceOf[Peer.ConnectedData].peerConnection === peerConnection2.ref) + peerConnection2.expectMsgType[ChannelReestablish] peerConnection3.send(peer, PeerConnection.ConnectionReady(peerConnection3.ref, remoteNodeId, fakeIPAddress.socketAddress, outgoing = false, localInit, remoteInit)) // peer should kill previous connection - deathWatch.expectTerminated(peerConnection2.ref) + peerConnection2.expectMsg(PeerConnection.Kill(PeerConnection.KillReason.ConnectionReplaced)) awaitCond(peer.stateData.asInstanceOf[Peer.ConnectedData].peerConnection === peerConnection3.ref) + peerConnection3.expectMsgType[ChannelReestablish] } test("send state transitions to child reconnection actor", Tag("auto_reconnect"), Tag("with_node_announcement")) { f => diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/io/ReconnectionTaskSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/io/ReconnectionTaskSpec.scala index 980a25a2e..45b99c5b1 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/io/ReconnectionTaskSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/io/ReconnectionTaskSpec.scala @@ -144,7 +144,7 @@ class ReconnectionTaskSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike // auto reconnect val TransitionWithData(ReconnectionTask.IDLE, ReconnectionTask.WAITING, _, waitingData0: WaitingData) = monitor.expectMsgType[TransitionWithData] assert(waitingData0.nextReconnectionDelay >= (200 milliseconds)) - assert(waitingData0.nextReconnectionDelay <= (10 seconds)) + assert(waitingData0.nextReconnectionDelay <= (20 seconds)) probe.send(reconnectionTask, ReconnectionTask.TickReconnect) // we send it manually in order to not have to actually wait (duplicates don't matter since we look at transitions sequentially) val TransitionWithData(ReconnectionTask.WAITING, ReconnectionTask.CONNECTING, _, _) = monitor.expectMsgType[TransitionWithData] diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/remote/EclairInternalsSerializerSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/remote/EclairInternalsSerializerSpec.scala new file mode 100644 index 000000000..842e84e64 --- /dev/null +++ b/eclair-core/src/test/scala/fr/acinq/eclair/remote/EclairInternalsSerializerSpec.scala @@ -0,0 +1,45 @@ +/* + * Copyright 2020 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair.remote + +import fr.acinq.eclair.router.Router.GossipDecision +import org.scalatest.funsuite.AnyFunSuite + +class EclairInternalsSerializerSpec extends AnyFunSuite { + + test("canary test codec gossip decision") { + + def codec(d: GossipDecision) = d match { + case _: GossipDecision.Accepted => () + case _: GossipDecision.Duplicate => () + case _: GossipDecision.InvalidSignature => () + case _: GossipDecision.NoKnownChannel => () + case _: GossipDecision.ValidationFailure => () + case _: GossipDecision.InvalidAnnouncement => () + case _: GossipDecision.ChannelPruned => () + case _: GossipDecision.ChannelClosing => () + case _: GossipDecision.ChannelClosed => () + case _: GossipDecision.Stale => () + case _: GossipDecision.NoRelatedChannel => () + case _: GossipDecision.RelatedChannelPruned => () + // NB: if a new gossip decision is added, this pattern matching will fail + // this serves as a reminder that a new codec is to be added in EclairInternalsSerializer.codec + } + + } + +} diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala index b93ee7fbe..291430394 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala @@ -32,7 +32,7 @@ import fr.acinq.eclair.router.BaseRouterSpec.channelAnnouncement import fr.acinq.eclair.router.RouteCalculationSpec.{DEFAULT_AMOUNT_MSAT, DEFAULT_MAX_FEE} import fr.acinq.eclair.router.Router._ import fr.acinq.eclair.transactions.Scripts -import fr.acinq.eclair.wire.{Color, QueryShortChannelIds} +import fr.acinq.eclair.wire.{ChannelAnnouncement, ChannelUpdate, Color, NodeAnnouncement, QueryShortChannelIds} import fr.acinq.eclair.{CltvExpiryDelta, Features, LongToBtcAmount, MilliSatoshi, ShortChannelId, TestConstants, randomKey} import scodec.bits._ @@ -659,4 +659,33 @@ class RouterSpec extends BaseRouterSpec { } } + test("stream updates to front") { fixture => + import fixture._ + + val sender = TestProbe() + sender.send(router, GetRoutingStateStreaming) + + // initial sync + var nodes = Set.empty[NodeAnnouncement] + var channels = Set.empty[ChannelAnnouncement] + var updates = Set.empty[ChannelUpdate] + sender.fishForMessage() { + case nd: NodesDiscovered => + nodes = nodes ++ nd.ann + false + case cd: ChannelsDiscovered => + channels = channels ++ cd.c.map(_.ann) + updates = updates ++ cd.c.flatMap(sc => sc.u1_opt.toSeq ++ sc.u2_opt.toSeq) + false + case RoutingStateStreamingUpToDate => true + } + assert(nodes.size === 8 && channels.size === 5 && updates.size === 10) // public channels only + + // new announcements + val update_ab_2 = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_a, b, channelId_ab, CltvExpiryDelta(7), htlcMinimumMsat = 0 msat, feeBaseMsat = 10 msat, feeProportionalMillionths = 10, htlcMaximumMsat = htlcMaximum, timestamp = update_ab.timestamp + 1) + val peerConnection = TestProbe() + router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, update_ab_2) + sender.expectMsg(ChannelUpdatesReceived(List(update_ab_2))) + } + } diff --git a/eclair-front/modules/assembly.xml b/eclair-front/modules/assembly.xml new file mode 100644 index 000000000..36aec98d2 --- /dev/null +++ b/eclair-front/modules/assembly.xml @@ -0,0 +1,60 @@ + + ${git.commit.id.abbrev}-awseb_bundle + + zip + + false + + + lib + true + false + runtime + + + + + ../ + + README.md + LICENSE* + + + + ${project.basedir}/modules/awseb + . + + Procfile + + unix + + + + + ${project.basedir}/modules/awseb + . + + run.sh + + 0755 + unix + + + ${project.build.directory} + . + + application.jar + + + + \ No newline at end of file diff --git a/eclair-front/modules/awseb/Procfile b/eclair-front/modules/awseb/Procfile new file mode 100644 index 000000000..42eaa06ef --- /dev/null +++ b/eclair-front/modules/awseb/Procfile @@ -0,0 +1 @@ +web: ./run.sh \ No newline at end of file diff --git a/eclair-front/modules/awseb/run.sh b/eclair-front/modules/awseb/run.sh new file mode 100644 index 000000000..f6c34109a --- /dev/null +++ b/eclair-front/modules/awseb/run.sh @@ -0,0 +1,3 @@ +export LOCAL_IP=$(curl -s 169.254.169.254/latest/meta-data/local-ipv4) +export HOSTNAME=$(hostname) +exec java -javaagent:lib/kanela-agent-1.0.5.jar -jar application.jar \ No newline at end of file diff --git a/eclair-front/pom.xml b/eclair-front/pom.xml new file mode 100644 index 000000000..e5aafa1f4 --- /dev/null +++ b/eclair-front/pom.xml @@ -0,0 +1,153 @@ + + + + + 4.0.0 + + fr.acinq.eclair + eclair_2.13 + 0.4.3-SNAPSHOT + + + eclair-front_2.13 + jar + + eclair-front + + + + + org.apache.maven.plugins + maven-jar-plugin + + application + + + true + lib/ + fr.acinq.eclair.Boot + true + + + + ${git.commit.id} + ${project.parent.url} + + + + + + maven-assembly-plugin + 3.2.0 + + + modules/assembly.xml + + + + + package + + single + + + + + + + + + + + false + + bintray-kamon-io-releases + bintray + https://dl.bintray.com/kamon-io/releases + + + + + + fr.acinq.eclair + eclair-core_${scala.version.short} + ${project.version} + + + + ch.qos.logback + logback-classic + 1.2.3 + + + org.codehaus.janino + janino + 3.1.2 + + + + com.amazonaws + aws-java-sdk-secretsmanager + 1.11.776 + + + + io.kamon + kamon-apm-reporter_${scala.version.short} + ${kamon.version} + + + io.kamon + kamon-system-metrics_${scala.version.short} + ${kamon.version} + + + + io.kamon + kanela-agent + 1.0.5 + + + + fr.acinq.eclair + eclair-core_${scala.version.short} + tests + ${project.version} + test-jar + test + + + com.typesafe.akka + akka-testkit_${scala.version.short} + ${akka.version} + test + + + com.typesafe.akka + akka-actor-testkit-typed_${scala.version.short} + ${akka.version} + test + + + com.opentable.components + otj-pg-embedded + 0.13.3 + test + + + diff --git a/eclair-front/src/main/resources/application.conf b/eclair-front/src/main/resources/application.conf new file mode 100644 index 000000000..53eee4a50 --- /dev/null +++ b/eclair-front/src/main/resources/application.conf @@ -0,0 +1,70 @@ +eclair { + enable-kamon = false + + front { + // To be overriden with the same key as the backend, so that the front has the same nodeid + priv-key-provider = "aws-sm" // aws-sm (AWS Secrets Manager) or env (environment variable) + priv-key = ${?NODE_PRIV_KEY} // used if priv-key-provider = env + aws-sm.priv-key-name = "node-priv-key" // used if priv-key-provider = aws-sm + // As a security measure, we also require the pub key, which will be matched against the priv key to make sure the + // front is really using the expected key + pub = ${NODE_PUB_KEY} + } +} + +akka { + actor.provider = cluster + remote.artery { + canonical.hostname = "127.0.0.1" + canonical.hostname = ${?LOCAL_IP} // this will override the default value with the env variable if set + canonical.port = 25520 + + untrusted-mode = on + trusted-selection-paths = [ + "/system/cluster/core/daemon", + "/system/cluster/heartbeatReceiver", + "/system/distributedPubSubMediator", + "/system/clusterReceptionist/replicator" + ] + + advanced { + outbound-message-queue-size = 30720 // 10x default because of sync + } + } + cluster { + shutdown-after-unsuccessful-join-seed-nodes = 10s # front won't start if back is offline + roles = [frontend] + seed-nodes = ["akka://eclair-node@"${BACKEND_IP}":25520"] + } + coordinated-shutdown.terminate-actor-system = on + coordinated-shutdown.exit-jvm = on + //It is recommended to load the extension when the actor system is started by defining it in akka.extensions + //configuration property. Otherwise it will be activated when first used and then it takes a while for it to be populated. + extensions = ["akka.cluster.pubsub.DistributedPubSub"] + +} + +kamon { + environment.host = ${HOSTNAME} + instrumentation.akka { + filters { + actors { + # Decides which actors generate Spans for the messages they process, given that there is already an ongoing trace + # in the Context of the processed message (i.e. there is a Sampled Span in the Context). + # + trace { + includes = [] + excludes = ["**"] # we don't want automatically generated spans because they conflict with the ones we define + } + } + } + } +} + +akka { + + loggers = ["akka.event.slf4j.Slf4jLogger"] + logger-startup-timeout = 30s + loglevel = "DEBUG" # akka doc: You can enable DEBUG level for akka.loglevel and control the actual level in the SLF4J backend without any significant overhead, also for production. + logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" +} diff --git a/eclair-front/src/main/resources/logback.xml b/eclair-front/src/main/resources/logback.xml new file mode 100644 index 000000000..fd539b69e --- /dev/null +++ b/eclair-front/src/main/resources/logback.xml @@ -0,0 +1,44 @@ + + + + + + + + + + + return formattedMessage.contains("connected to /10.") || + (formattedMessage.contains("connection closed") && !mdc.containsKey("nodeId")) || + (formattedMessage.contains("transport died") && !mdc.containsKey("nodeId")); + + + NEUTRAL + DENY + + System.out + false + + ${HOSTNAME} %d %-5level %logger{24}%X{category}%X{nodeId}%X{channelId}%X{paymentHash}%.-11X{parentPaymentId}%.-11X{paymentId} - %msg%ex{12}%n + + + + + + + + \ No newline at end of file diff --git a/eclair-front/src/main/scala/fr/acinq/eclair/Boot.scala b/eclair-front/src/main/scala/fr/acinq/eclair/Boot.scala new file mode 100644 index 000000000..7c2d63540 --- /dev/null +++ b/eclair-front/src/main/scala/fr/acinq/eclair/Boot.scala @@ -0,0 +1,61 @@ +/* + * Copyright 2019 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair + +import java.io.File + +import akka.actor.ActorSystem +import com.typesafe.config.{ConfigFactory, ConfigParseOptions, ConfigSyntax} +import grizzled.slf4j.Logging +import kamon.Kamon + +import scala.concurrent.ExecutionContext +import scala.util.{Failure, Success} + +object Boot extends App with Logging { + try { + val datadir = new File(System.getProperty("eclair.datadir", System.getProperty("user.home") + "/.eclair")) + val config = ConfigFactory.parseString( + Option(System.getenv("AKKA_CONF")).getOrElse("").replace(";", "\n"), + ConfigParseOptions.defaults().setSyntax(ConfigSyntax.PROPERTIES)) + .withFallback(ConfigFactory.load()) + + // the actor system name needs to be the same for all members of the cluster + implicit val system: ActorSystem = ActorSystem("eclair-node", config) + implicit val ec: ExecutionContext = system.dispatcher + + val setup = new FrontSetup(datadir) + + if (config.getBoolean("eclair.enable-kamon")) { + Kamon.init(config) + } + + setup.bootstrap onComplete { + case Success(_) => () + case Failure(t) => onError(t) + } + } catch { + case t: Throwable => onError(t) + } + + def onError(t: Throwable): Unit = { + val errorMsg = if (t.getMessage != null) t.getMessage else t.getClass.getSimpleName + System.err.println(s"fatal error: $errorMsg") + logger.error(s"fatal error: $errorMsg", t) + System.exit(1) + } +} diff --git a/eclair-front/src/main/scala/fr/acinq/eclair/ClusterListener.scala b/eclair-front/src/main/scala/fr/acinq/eclair/ClusterListener.scala new file mode 100644 index 000000000..d17b2be59 --- /dev/null +++ b/eclair-front/src/main/scala/fr/acinq/eclair/ClusterListener.scala @@ -0,0 +1,53 @@ +/* + * Copyright 2020 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair + +import akka.Done +import akka.actor.{Actor, ActorLogging, Address} +import akka.cluster.ClusterEvent.{InitialStateAsEvents, MemberDowned, MemberLeft, MemberUp, UnreachableMember} +import akka.cluster.{Cluster, Member} + +import scala.concurrent.Promise + +class ClusterListener(frontJoinedCluster: Promise[Done], backendAddressFound: Promise[Address]) extends Actor with ActorLogging { + + val cluster = Cluster(context.system) + cluster.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[MemberUp], classOf[MemberLeft], classOf[MemberDowned], classOf[UnreachableMember]) + val BackendRole = "backend" + + override def receive: Receive = { + case MemberUp(member) => + if (member.roles.contains(BackendRole)) { + log.info(s"found backend=$member") + backendAddressFound.success(member.address) + } else if (member == cluster.selfMember) { + log.info("we have joined the cluster") + frontJoinedCluster.success(Done) + } + case MemberLeft(member) => maybeShutdown(member) + case MemberDowned(member) => maybeShutdown(member) + case UnreachableMember(member) => maybeShutdown(member) + } + + private def maybeShutdown(member: Member): Unit = { + if (member.roles.contains(BackendRole)) { + log.info(s"backend is down, stopping...") + System.exit(0) + } + } + +} diff --git a/eclair-front/src/main/scala/fr/acinq/eclair/FrontSetup.scala b/eclair-front/src/main/scala/fr/acinq/eclair/FrontSetup.scala new file mode 100644 index 000000000..29f778329 --- /dev/null +++ b/eclair-front/src/main/scala/fr/acinq/eclair/FrontSetup.scala @@ -0,0 +1,113 @@ +/* + * Copyright 2019 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair + +import java.io.File +import java.net.InetSocketAddress + +import akka.Done +import akka.actor.{ActorSystem, Address, Props, RootActorPath, SupervisorStrategy} +import akka.pattern.ask +import akka.util.Timeout +import com.amazonaws.services.secretsmanager.AWSSecretsManagerClient +import com.amazonaws.services.secretsmanager.model.GetSecretValueRequest +import fr.acinq.bitcoin.Crypto.{PrivateKey, PublicKey} +import fr.acinq.eclair.crypto.Noise.KeyPair +import fr.acinq.eclair.io.Switchboard.{GetRouterPeerConf, RouterPeerConf} +import fr.acinq.eclair.io.{ClientSpawner, Server} +import fr.acinq.eclair.router.FrontRouter +import grizzled.slf4j.Logging +import scodec.bits.ByteVector + +import scala.concurrent.duration._ +import scala.concurrent.{ExecutionContext, Future, Promise} + +class FrontSetup(datadir: File)(implicit system: ActorSystem) extends Logging { + + implicit val timeout = Timeout(30 seconds) + implicit val ec: ExecutionContext = system.dispatcher + + logger.info(s"hello!") + logger.info(s"version=${getClass.getPackage.getImplementationVersion} commit=${getClass.getPackage.getSpecificationVersion}") + logger.info(s"datadir=${datadir.getCanonicalPath}") + logger.info(s"initializing secure random generator") + // this will force the secure random instance to initialize itself right now, making sure it doesn't hang later (see comment in package.scala) + secureRandom.nextInt() + + datadir.mkdirs() + + val config = system.settings.config.getConfig("eclair") + + val keyPair = { + val pub = ByteVector.fromValidHex(config.getString("front.pub")) + val priv = config.getString("front.priv-key-provider") match { + case "aws-sm" => + val sm = AWSSecretsManagerClient.builder().build() + try { + // we retrieve the node key from AWS secrets manager and we compare the corresponding pub key with the expected one + val secretId = config.getString("front.aws-sm.priv-key-name") + ByteVector.fromValidHex(sm.getSecretValue(new GetSecretValueRequest().withSecretId(secretId)).getSecretString) + } finally { + sm.shutdown() + } + case "env" => ByteVector.fromValidHex(config.getString("front.priv-key")) + } + val keyPair = KeyPair(pub, priv) + require(PrivateKey(priv).publicKey == PublicKey(pub), "priv/pub keys mismatch") + keyPair + } + + logger.info(s"nodeid=${keyPair.pub.toHex}") + + val serverBindingAddress = new InetSocketAddress( + config.getString("server.binding-ip"), + config.getInt("server.port")) + + def bootstrap: Future[Unit] = { + + val frontJoinedCluster = Promise[Done]() + val backendAddressFound = Promise[Address]() + val tcpBound = Promise[Done]() + + for { + _ <- Future.successful(0) + + _ = system.actorOf(Props(new ClusterListener(frontJoinedCluster, backendAddressFound)), name = "cluster-listener") + _ <- frontJoinedCluster.future + backendAddress <- backendAddressFound.future + + // we give time for the cluster to be ready + _ <- akka.pattern.after(5.seconds)(Future.successful()) + + switchBoardSelection = system.actorSelection(RootActorPath(backendAddress) / "user" / "*" / "switchboard") + remoteSwitchboard <- switchBoardSelection.resolveOne() + routerSelection = system.actorSelection(RootActorPath(backendAddress) / "user" / "*" / "router") + remoteRouter <- routerSelection.resolveOne() + + RouterPeerConf(routerConf, peerConnectionConf) <- (remoteSwitchboard ? GetRouterPeerConf).mapTo[RouterPeerConf] + + frontRouterInitialized = Promise[Done]() + frontRouter = system.actorOf(SimpleSupervisor.props(FrontRouter.props(routerConf, remoteRouter, Some(frontRouterInitialized)), "front-router", SupervisorStrategy.Resume)) + _ <- frontRouterInitialized.future + + clientSpawner = system.actorOf(Props(new ClientSpawner(keyPair, None, peerConnectionConf, remoteSwitchboard, frontRouter)), name = "client-spawner") + + server = system.actorOf(SimpleSupervisor.props(Server.props(keyPair, peerConnectionConf, remoteSwitchboard, frontRouter, serverBindingAddress, Some(tcpBound)), "server", SupervisorStrategy.Restart)) + } yield () + } + +} diff --git a/eclair-front/src/main/scala/fr/acinq/eclair/router/FrontRouter.scala b/eclair-front/src/main/scala/fr/acinq/eclair/router/FrontRouter.scala new file mode 100644 index 000000000..6af03de06 --- /dev/null +++ b/eclair-front/src/main/scala/fr/acinq/eclair/router/FrontRouter.scala @@ -0,0 +1,320 @@ +/* + * Copyright 2019 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair.router + +import akka.Done +import akka.actor.{ActorRef, Props} +import akka.event.Logging.MDC +import akka.event.LoggingAdapter +import fr.acinq.bitcoin.ByteVector32 +import fr.acinq.bitcoin.Crypto.PublicKey +import fr.acinq.eclair.Logs.LogCategory +import fr.acinq.eclair.crypto.TransportHandler +import fr.acinq.eclair.io.Peer.PeerRoutingMessage +import fr.acinq.eclair.router.Router._ +import fr.acinq.eclair.wire._ +import fr.acinq.eclair.{FSMDiagnosticActorLogging, Logs, ShortChannelId, getSimpleClassName} +import kamon.Kamon +import kamon.metric.Counter + +import scala.collection.immutable.SortedMap +import scala.concurrent.Promise + +class FrontRouter(routerConf: RouterConf, remoteRouter: ActorRef, initialized: Option[Promise[Done]] = None) extends FSMDiagnosticActorLogging[FrontRouter.State, FrontRouter.Data] { + + import FrontRouter._ + + // we pass these to helpers classes so that they have the logging context + implicit def implicitLog: LoggingAdapter = log + + remoteRouter ! GetRoutingStateStreaming + + startWith(SYNCING, Data(Map.empty, SortedMap.empty, Map.empty, Map.empty, rebroadcast = Rebroadcast(channels = Map.empty, updates = Map.empty, nodes = Map.empty))) + + when(SYNCING) { + case Event(networkEvent: NetworkEvent, d) => + stay using FrontRouter.updateTable(d, networkEvent, doRebroadcast = false) + + case Event(RoutingStateStreamingUpToDate, d) => + log.info("sync done nodes={} channels={}", d.nodes.size, d.channels.size) + initialized.map(_.success(Done)) + setTimer(TickBroadcast.toString, TickBroadcast, routerConf.routerBroadcastInterval, repeat = true) + goto(NORMAL) using d + } + + when(NORMAL) { + case Event(GetRoutingState, d) => + log.info(s"getting valid announcements for $sender") + sender ! RoutingState(d.channels.values, d.nodes.values) + stay + + case Event(s: SendChannelQuery, _) => + remoteRouter forward s + stay + + case Event(PeerRoutingMessage(peerConnection, remoteNodeId, q: QueryChannelRange), d) => + Sync.handleQueryChannelRange(d.channels, routerConf, RemoteGossip(peerConnection, remoteNodeId), q) + stay + + case Event(PeerRoutingMessage(peerConnection, remoteNodeId, q: QueryShortChannelIds), d) => + Sync.handleQueryShortChannelIds(d.nodes, d.channels, RemoteGossip(peerConnection, remoteNodeId), q) + stay + + case Event(PeerRoutingMessage(peerConnection, remoteNodeId, ann: AnnouncementMessage), d) => + val origin = RemoteGossip(peerConnection, remoteNodeId) + val d1 = d.processing.get(ann) match { + case Some(origins) if origins.contains(origin) => + log.warning("acking duplicate msg={}", ann) + origin.peerConnection ! TransportHandler.ReadAck(ann) + d + case Some(origins) => + log.debug("message is already in processing={} origins.size={}", ann, origins.size) + Metrics.gossipStashed(ann).increment() + // we have already forwarded that message to the router + // we could just acknowledge the message now, but then we would lose the information that we did receive this + // announcement from that peer, and would send it back to that same peer if our master router accepts it + // in the general case, we should fairly often receive the same gossip from several peers almost at the same time + val origins1 = origins + origin + d.copy(processing = d.processing + (ann -> origins1)) + case None => + d.accepted.get(ann) match { + case Some(origins) if origins.contains(origin) => + log.warning("acking duplicate msg={}", ann) + origin.peerConnection ! TransportHandler.ReadAck(ann) + d + case Some(origins) => + log.debug("message is already in accepted={} origins.size={}", ann, origins.size) + val origins1 = origins + origin + d.copy(accepted = d.accepted + (ann -> origins1)) + case None => + ann match { + case n: NodeAnnouncement if d.nodes.contains(n.nodeId) => + origin.peerConnection ! TransportHandler.ReadAck(ann) + Metrics.gossipDropped(ann).increment() + d + case c: ChannelAnnouncement if d.channels.contains(c.shortChannelId) => + origin.peerConnection ! TransportHandler.ReadAck(ann) + Metrics.gossipDropped(ann).increment() + d + case u: ChannelUpdate if d.channels.contains(u.shortChannelId) && d.channels(u.shortChannelId).getChannelUpdateSameSideAs(u).contains(u) => + origin.peerConnection ! TransportHandler.ReadAck(ann) + Metrics.gossipDropped(ann).increment() + d + case n: NodeAnnouncement if d.rebroadcast.nodes.contains(n) => + origin.peerConnection ! TransportHandler.ReadAck(ann) + Metrics.gossipStashedRebroadcast(ann).increment() + d.copy(rebroadcast = d.rebroadcast.copy(nodes = d.rebroadcast.nodes + (n -> (d.rebroadcast.nodes(n) + origin)))) + case c: ChannelAnnouncement if d.rebroadcast.channels.contains(c) => + origin.peerConnection ! TransportHandler.ReadAck(ann) + Metrics.gossipStashedRebroadcast(ann).increment() + d.copy(rebroadcast = d.rebroadcast.copy(channels = d.rebroadcast.channels + (c -> (d.rebroadcast.channels(c) + origin)))) + case u: ChannelUpdate if d.rebroadcast.updates.contains(u) => + origin.peerConnection ! TransportHandler.ReadAck(ann) + Metrics.gossipStashedRebroadcast(ann).increment() + d.copy(rebroadcast = d.rebroadcast.copy(updates = d.rebroadcast.updates + (u -> (d.rebroadcast.updates(u) + origin)))) + case _ => + Metrics.gossipForwarded(ann).increment() + log.debug("sending announcement class={} to master router", ann.getClass.getSimpleName) + remoteRouter ! PeerRoutingMessage(self, remoteNodeId, ann) // nb: we set ourselves as the origin + d.copy(processing = d.processing + (ann -> Set(origin))) + } + } + } + stay using d1 + + case Event(accepted: GossipDecision.Accepted, d) => + log.debug("message has been accepted by router: {}", accepted) + Metrics.gossipAccepted(accepted.ann).increment() + d.processing.get(accepted.ann) match { + case Some(origins) => origins.foreach { origin => + log.debug("acking msg={} for origin={}", accepted.ann, origin) + origin.peerConnection ! TransportHandler.ReadAck(accepted.ann) + origin.peerConnection ! accepted + } + case None => () + } + // NB: we will also shortly receive a NetworkEvent from the router for this announcement, where we put it in the + // rebroadcast map. We keep the origin peers in the accepted map, so we don't send back the same announcement to + // the peers that sent us in the first place. + // Why do we not just leave the announcement in the processing map? Because we would have a race between other + // peers that send us that same announcement, and the NetworkEvent. If the other peers win the race, we will defer + // acknowledging their message (because the announcement is still in the processing map) and we will + // wait forever for the very gossip decision that we are processing now, resulting in a stuck connection + val origins1 = d.processing.getOrElse(accepted.ann, Set.empty[RemoteGossip]) + stay using d.copy(processing = d.processing - accepted.ann, accepted = d.accepted + (accepted.ann -> origins1)) + + case Event(rejected: GossipDecision.Rejected, d) => + log.debug("message has been rejected by router: {}", rejected) + Metrics.gossipRejected(rejected.ann, rejected).increment() + d.processing.get(rejected.ann) match { + case Some(origins) => origins.foreach { origin => + log.debug("acking msg={} for origin={}", rejected.ann, origin) + origin.peerConnection ! TransportHandler.ReadAck(rejected.ann) + origin.peerConnection ! rejected + } + case None => () + } + stay using d.copy(processing = d.processing - rejected.ann) + + case Event(networkEvent: NetworkEvent, d) => + log.debug("received event={}", networkEvent) + Metrics.routerEvent(networkEvent).increment() + stay using FrontRouter.updateTable(d, networkEvent, doRebroadcast = true) + + case Event(TickBroadcast, d) => + if (d.rebroadcast.channels.isEmpty && d.rebroadcast.updates.isEmpty && d.rebroadcast.nodes.isEmpty) { + stay + } else { + log.debug("broadcasting routing messages") + log.debug("staggered broadcast details: channels={} updates={} nodes={}", d.rebroadcast.channels.size, d.rebroadcast.updates.size, d.rebroadcast.nodes.size) + context.system.eventStream.publish(d.rebroadcast) + stay using d.copy(rebroadcast = Rebroadcast(channels = Map.empty, updates = Map.empty, nodes = Map.empty)) + } + + case Event(msg: PeerRoutingMessage, _) => + log.debug("forwarding peer routing message class={}", msg.message.getClass.getSimpleName) + remoteRouter forward msg + stay + + case Event(_: TransportHandler.ReadAck, _) => stay // acks from remote router + } + + override def mdc(currentMessage: Any): MDC = { + val category_opt = LogCategory(currentMessage) + currentMessage match { + case PeerRoutingMessage(_, remoteNodeId, _) => Logs.mdc(category_opt, remoteNodeId_opt = Some(remoteNodeId)) + case _ => Logs.mdc(category_opt) + } + } +} + +object FrontRouter { + + def props(routerConf: RouterConf, remoteRouter: ActorRef, initialized: Option[Promise[Done]] = None): Props = Props(new FrontRouter(routerConf: RouterConf, remoteRouter: ActorRef, initialized)) + + // @formatter:off + sealed trait State + case object SYNCING extends State + case object NORMAL extends State + // @formatter:on + + case class Data(nodes: Map[PublicKey, NodeAnnouncement], + channels: SortedMap[ShortChannelId, PublicChannel], + processing: Map[AnnouncementMessage, Set[RemoteGossip]], + accepted: Map[AnnouncementMessage, Set[RemoteGossip]], + rebroadcast: Rebroadcast) + + object Metrics { + private val Gossip = Kamon.counter("front.router.gossip") + private val GossipResult = Kamon.counter("front.router.gossip.result") + private val RouterEvent = Kamon.counter("front.router.event") + + // @formatter:off + def gossipDropped(ann: AnnouncementMessage): Counter = Gossip.withTag("status", "dropped").withTag("type", getSimpleClassName(ann)) + def gossipStashed(ann: AnnouncementMessage): Counter = Gossip.withTag("status", "stashed").withTag("type", getSimpleClassName(ann)) + def gossipStashedRebroadcast(ann: AnnouncementMessage): Counter = Gossip.withTag("status", "stashed-rebroadcast").withTag("type", getSimpleClassName(ann)) + def gossipForwarded(ann: AnnouncementMessage): Counter = Gossip.withTag("status", "forwarded").withTag("type", getSimpleClassName(ann)) + + def gossipAccepted(ann: AnnouncementMessage): Counter = GossipResult.withTag("result", "accepted").withTag("type", getSimpleClassName(ann)) + def gossipRejected(ann: AnnouncementMessage, reason: GossipDecision.Rejected): Counter = GossipResult.withTag("result", "rejected").withTag("reason", getSimpleClassName(reason)).withTag("type", getSimpleClassName(ann)) + + def routerEvent(event: NetworkEvent): Counter = RouterEvent.withTag("type", getSimpleClassName(event)) + // @formatter:on + } + + def updateTable(d: Data, event: NetworkEvent, doRebroadcast: Boolean)(implicit log: LoggingAdapter): Data = { + event match { + case NodesDiscovered(nodes) => + log.debug("adding {} nodes", nodes.size) + val nodes1 = nodes.map(n => n.nodeId -> n).toMap + val d1 = d.copy(nodes = d.nodes ++ nodes1) + if (doRebroadcast) { + nodes.foldLeft(d1) { case (d, ann) => FrontRouter.rebroadcast(d, ann) } + } else { + d1 + } + + case NodeUpdated(n) => + log.debug("updating {} nodes", 1) + val d1 = d.copy(nodes = d.nodes + (n.nodeId -> n)) + if (doRebroadcast) { + FrontRouter.rebroadcast(d1, n) + } else { + d1 + } + + case NodeLost(nodeId) => + log.debug("removing {} nodes", 1) + d.copy(nodes = d.nodes - nodeId) + + case ChannelsDiscovered(channels) => + log.debug("adding {} channels", channels.size) + val channels1 = channels.foldLeft(SortedMap.empty[ShortChannelId, PublicChannel]) { + case (channels, sc) => channels + (sc.ann.shortChannelId -> PublicChannel(sc.ann, ByteVector32.Zeroes, sc.capacity, sc.u1_opt, sc.u2_opt, None)) + } + val d1 = d.copy(channels = d.channels ++ channels1) + if (doRebroadcast) { + channels.foldLeft(d1) { case (d, sc) => FrontRouter.rebroadcast(d, sc.ann) } + } else { + d1 + } + + case ChannelLost(channelId) => + log.debug("removing {} channels", 1) + d.copy(channels = d.channels - channelId) + + case ChannelUpdatesReceived(updates) => + log.debug("adding/updating {} channel_updates", updates.size) + val channels1 = updates.foldLeft(d.channels) { + case (channels, u) => channels.get(u.shortChannelId) match { + case Some(c) => channels + (c.ann.shortChannelId -> c.updateChannelUpdateSameSideAs(u)) + case None => channels + } + } + val d1 = d.copy(channels = channels1) + if (doRebroadcast) { + updates.foldLeft(d1) { case (d, ann) => FrontRouter.rebroadcast(d, ann) } + } else { + d1 + } + + case _: SyncProgress => + // we receive this as part of network events but it's useless + d + } + } + + /** + * Schedule accepted announcements for rebroadcasting to our peers. + */ + def rebroadcast(d: Data, ann: AnnouncementMessage)(implicit log: LoggingAdapter): Data = { + // We don't want to send back the announcement to the peer(s) that sent it to us in the first place. Announcements + // that came from our peers are in the [[d.accepted]] map. + val origins = d.accepted.getOrElse(ann, Set.empty[RemoteGossip]).map(o => o: GossipOrigin) + val rebroadcast1 = ann match { + case n: NodeAnnouncement => d.rebroadcast.copy(nodes = d.rebroadcast.nodes + (n -> origins)) + case c: ChannelAnnouncement => d.rebroadcast.copy(channels = d.rebroadcast.channels + (c -> origins)) + case u: ChannelUpdate => + if (d.channels.contains(u.shortChannelId)) { + d.rebroadcast.copy(updates = d.rebroadcast.updates + (u -> origins)) + } else { + d.rebroadcast // private channel, we don't rebroadcast the channel_update + } + } + d.copy(accepted = d.accepted - ann, rebroadcast = rebroadcast1) + } +} \ No newline at end of file diff --git a/eclair-front/src/test/resources/application.conf b/eclair-front/src/test/resources/application.conf new file mode 100644 index 000000000..acf2225b7 --- /dev/null +++ b/eclair-front/src/test/resources/application.conf @@ -0,0 +1,12 @@ +eclair.front { + // just placeholders because this conf key is mandatory + pub = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa01" +} + +kamon.environment.host = "auto" + +akka { + actor.provider = local + cluster.seed-nodes = [] + extensions = [] +} \ No newline at end of file diff --git a/eclair-front/src/test/scala/fr/acinq/eclair/router/FrontRouterSpec.scala b/eclair-front/src/test/scala/fr/acinq/eclair/router/FrontRouterSpec.scala new file mode 100644 index 000000000..0e646b8d7 --- /dev/null +++ b/eclair-front/src/test/scala/fr/acinq/eclair/router/FrontRouterSpec.scala @@ -0,0 +1,357 @@ +/* + * Copyright 2019 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair.router + +import akka.actor.ActorSystem +import akka.testkit.{TestKit, TestProbe} +import fr.acinq.bitcoin.Crypto.PrivateKey +import fr.acinq.bitcoin.Script.{pay2wsh, write} +import fr.acinq.bitcoin.{Block, Transaction, TxOut} +import fr.acinq.eclair.TestConstants.Alice +import fr.acinq.eclair.blockchain.{UtxoStatus, ValidateRequest, ValidateResult} +import fr.acinq.eclair.crypto.TransportHandler +import fr.acinq.eclair.io.Peer.PeerRoutingMessage +import fr.acinq.eclair.router.Announcements.{makeChannelAnnouncement, makeChannelUpdate, makeNodeAnnouncement, signChannelAnnouncement} +import fr.acinq.eclair.router.Router._ +import fr.acinq.eclair.transactions.Scripts +import fr.acinq.eclair.wire.Color +import fr.acinq.eclair.{CltvExpiryDelta, ShortChannelId, randomKey, _} +import org.scalatest.funsuite.AnyFunSuiteLike +import scodec.bits._ + +import scala.concurrent.duration._ + +class FrontRouterSpec extends TestKit(ActorSystem("test")) with AnyFunSuiteLike { + + import FrontRouterSpec._ + + test("correctly dispatch valid gossip") { + val nodeParams = Alice.nodeParams + + val watcher = TestProbe() + val router = system.actorOf(Router.props(nodeParams, watcher.ref)) + + val system1 = ActorSystem("front-system-1") + val system2 = ActorSystem("front-system-2") + val system3 = ActorSystem("front-system-3") + + // we use those to control messages exchanged between front and back routers + val pipe1 = TestProbe() + val pipe2 = TestProbe() + val pipe3 = TestProbe() + + val front1 = system1.actorOf(FrontRouter.props(nodeParams.routerConf, pipe1.ref)) + val front2 = system2.actorOf(FrontRouter.props(nodeParams.routerConf, pipe2.ref)) + val front3 = system3.actorOf(FrontRouter.props(nodeParams.routerConf, pipe3.ref)) + + pipe1.expectMsg(GetRoutingStateStreaming) + pipe1.send(router, GetRoutingStateStreaming) + pipe1.expectMsg(RoutingStateStreamingUpToDate) + pipe1.forward(front1) + + pipe2.expectMsg(GetRoutingStateStreaming) + pipe2.send(router, GetRoutingStateStreaming) + pipe2.expectMsg(RoutingStateStreamingUpToDate) + pipe2.forward(front2) + + pipe3.expectMsg(GetRoutingStateStreaming) + pipe3.send(router, GetRoutingStateStreaming) + pipe3.expectMsg(RoutingStateStreamingUpToDate) + pipe3.forward(front3) + + val peerConnection1a = TestProbe() + val peerConnection1b = TestProbe() + val peerConnection2a = TestProbe() + val peerConnection3a = TestProbe() + + system1.eventStream.subscribe(peerConnection1a.ref, classOf[Rebroadcast]) + system1.eventStream.subscribe(peerConnection1b.ref, classOf[Rebroadcast]) + system2.eventStream.subscribe(peerConnection2a.ref, classOf[Rebroadcast]) + system3.eventStream.subscribe(peerConnection3a.ref, classOf[Rebroadcast]) + + val origin1a = RemoteGossip(peerConnection1a.ref, randomKey.publicKey) + val origin1b = RemoteGossip(peerConnection1b.ref, randomKey.publicKey) + val origin2a = RemoteGossip(peerConnection2a.ref, randomKey.publicKey) + + peerConnection1a.send(front1, PeerRoutingMessage(peerConnection1a.ref, origin1a.nodeId, chan_ab)) + pipe1.expectMsg(PeerRoutingMessage(front1, origin1a.nodeId, chan_ab)) + pipe1.send(router, PeerRoutingMessage(pipe1.ref, origin1a.nodeId, chan_ab)) + + watcher.expectMsg(ValidateRequest(chan_ab)) + + peerConnection1b.send(front1, PeerRoutingMessage(peerConnection1b.ref, origin1b.nodeId, chan_ab)) + pipe1.expectNoMessage() + + peerConnection2a.send(front2, PeerRoutingMessage(peerConnection2a.ref, origin2a.nodeId, chan_ab)) + pipe2.expectMsg(PeerRoutingMessage(front2, origin2a.nodeId, chan_ab)) + pipe2.send(router, PeerRoutingMessage(pipe2.ref, origin2a.nodeId, chan_ab)) + pipe2.expectMsg(TransportHandler.ReadAck(chan_ab)) + + pipe1.expectNoMessage() + pipe2.expectNoMessage() + + watcher.send(router, ValidateResult(chan_ab, Right((Transaction(version = 0, txIn = Nil, txOut = TxOut(1000000 sat, write(pay2wsh(Scripts.multiSig2of2(funding_a, funding_b)))) :: Nil, lockTime = 0), UtxoStatus.Unspent)))) + pipe1.expectMsg(TransportHandler.ReadAck(chan_ab)) + + pipe1.expectMsg(GossipDecision.Accepted(chan_ab)) + pipe1.forward(front1) + pipe1.expectMsg(ChannelsDiscovered(Seq(SingleChannelDiscovered(chan_ab, 1000000 sat, None, None)))) + pipe1.forward(front1) + pipe2.expectMsg(GossipDecision.Accepted(chan_ab)) + pipe2.forward(front2) + pipe2.expectMsg(ChannelsDiscovered(Seq(SingleChannelDiscovered(chan_ab, 1000000 sat, None, None)))) + pipe2.forward(front2) + pipe3.expectMsg(ChannelsDiscovered(Seq(SingleChannelDiscovered(chan_ab, 1000000 sat, None, None)))) + pipe3.forward(front3) + + pipe1.expectNoMessage() + pipe2.expectNoMessage() + pipe3.expectNoMessage() + + peerConnection1a.expectMsg(TransportHandler.ReadAck(chan_ab)) + peerConnection1b.expectMsg(TransportHandler.ReadAck(chan_ab)) + peerConnection2a.expectMsg(TransportHandler.ReadAck(chan_ab)) + + peerConnection1a.expectMsg(GossipDecision.Accepted(chan_ab)) + peerConnection1b.expectMsg(GossipDecision.Accepted(chan_ab)) + peerConnection2a.expectMsg(GossipDecision.Accepted(chan_ab)) + + // we have to wait 2 times the broadcast interval because there is an additional per-peer delay + val maxBroadcastDelay = 2 * nodeParams.routerConf.routerBroadcastInterval + 1.second + peerConnection1a.expectMsg(maxBroadcastDelay, Rebroadcast(channels = Map(chan_ab -> Set(origin1a, origin1b)), updates = Map.empty, nodes = Map.empty)) + peerConnection1b.expectMsg(maxBroadcastDelay, Rebroadcast(channels = Map(chan_ab -> Set(origin1a, origin1b)), updates = Map.empty, nodes = Map.empty)) + peerConnection2a.expectMsg(maxBroadcastDelay, Rebroadcast(channels = Map(chan_ab -> Set(origin2a)), updates = Map.empty, nodes = Map.empty)) + peerConnection3a.expectMsg(maxBroadcastDelay, Rebroadcast(channels = Map(chan_ab -> Set.empty), updates = Map.empty, nodes = Map.empty)) + } + + test("aggregate gossip") { + val nodeParams = Alice.nodeParams + + val watcher = TestProbe() + val router = system.actorOf(Router.props(nodeParams, watcher.ref)) + + val system1 = ActorSystem("front-system-1") + val system2 = ActorSystem("front-system-2") + val system3 = ActorSystem("front-system-3") + + val front1 = system1.actorOf(FrontRouter.props(nodeParams.routerConf, router)) + val front2 = system2.actorOf(FrontRouter.props(nodeParams.routerConf, router)) + val front3 = system3.actorOf(FrontRouter.props(nodeParams.routerConf, router)) + + val peerConnection1a = TestProbe("peerconn-1a") + val peerConnection1b = TestProbe("peerconn-1b") + val peerConnection2a = TestProbe("peerconn-2a") + val peerConnection3a = TestProbe("peerconn-3a") + + system1.eventStream.subscribe(peerConnection1a.ref, classOf[Rebroadcast]) + system1.eventStream.subscribe(peerConnection1b.ref, classOf[Rebroadcast]) + system2.eventStream.subscribe(peerConnection2a.ref, classOf[Rebroadcast]) + system3.eventStream.subscribe(peerConnection3a.ref, classOf[Rebroadcast]) + + val origin1a = RemoteGossip(peerConnection1a.ref, randomKey.publicKey) + val origin1b = RemoteGossip(peerConnection1b.ref, randomKey.publicKey) + val origin2a = RemoteGossip(peerConnection2a.ref, randomKey.publicKey) + val origin3a = RemoteGossip(peerConnection3a.ref, randomKey.publicKey) + + peerConnection1a.send(front1, PeerRoutingMessage(peerConnection1a.ref, origin1a.nodeId, chan_ab)) + watcher.expectMsg(ValidateRequest(chan_ab)) + peerConnection1b.send(front1, PeerRoutingMessage(peerConnection1b.ref, origin1b.nodeId, chan_ab)) + peerConnection2a.send(front2, PeerRoutingMessage(peerConnection2a.ref, origin2a.nodeId, chan_ab)) + + peerConnection1a.send(front1, PeerRoutingMessage(peerConnection1a.ref, origin1a.nodeId, ann_c)) + peerConnection1a.expectMsg(TransportHandler.ReadAck(ann_c)) + peerConnection1a.expectMsg(GossipDecision.NoKnownChannel(ann_c)) + peerConnection3a.send(front3, PeerRoutingMessage(peerConnection3a.ref, origin3a.nodeId, ann_a)) + peerConnection3a.send(front3, PeerRoutingMessage(peerConnection3a.ref, origin3a.nodeId, channelUpdate_ba)) + peerConnection3a.send(front3, PeerRoutingMessage(peerConnection3a.ref, origin3a.nodeId, channelUpdate_bc)) + peerConnection3a.expectMsg(TransportHandler.ReadAck(channelUpdate_bc)) + peerConnection3a.expectMsg(GossipDecision.NoRelatedChannel(channelUpdate_bc)) + + + watcher.send(router, ValidateResult(chan_ab, Right((Transaction(version = 0, txIn = Nil, txOut = TxOut(1000000 sat, write(pay2wsh(Scripts.multiSig2of2(funding_a, funding_b)))) :: Nil, lockTime = 0), UtxoStatus.Unspent)))) + + peerConnection1a.expectMsg(TransportHandler.ReadAck(chan_ab)) + peerConnection1b.expectMsg(TransportHandler.ReadAck(chan_ab)) + peerConnection2a.expectMsg(TransportHandler.ReadAck(chan_ab)) + + peerConnection1a.expectMsg(GossipDecision.Accepted(chan_ab)) + peerConnection1b.expectMsg(GossipDecision.Accepted(chan_ab)) + peerConnection2a.expectMsg(GossipDecision.Accepted(chan_ab)) + + peerConnection3a.expectMsg(TransportHandler.ReadAck(channelUpdate_ba)) + peerConnection3a.expectMsg(GossipDecision.Accepted(channelUpdate_ba)) + + peerConnection3a.expectMsg(TransportHandler.ReadAck(ann_a)) + peerConnection3a.expectMsg(GossipDecision.Accepted(ann_a)) + + peerConnection1b.send(front1, PeerRoutingMessage(peerConnection1b.ref, origin1b.nodeId, channelUpdate_ab)) + peerConnection1b.expectMsg(TransportHandler.ReadAck(channelUpdate_ab)) + peerConnection1b.expectMsg(GossipDecision.Accepted(channelUpdate_ab)) + + peerConnection3a.send(front3, PeerRoutingMessage(peerConnection3a.ref, origin3a.nodeId, ann_b)) + peerConnection3a.expectMsg(TransportHandler.ReadAck(ann_b)) + peerConnection3a.expectMsg(GossipDecision.Accepted(ann_b)) + + // we have to wait 2 times the broadcast interval because there is an additional per-peer delay + val maxBroadcastDelay = 2 * nodeParams.routerConf.routerBroadcastInterval + 1.second + peerConnection1a.expectMsg(maxBroadcastDelay, Rebroadcast(channels = Map(chan_ab -> Set(origin1a, origin1b)), updates = Map(channelUpdate_ab -> Set(origin1b), channelUpdate_ba -> Set.empty), nodes = Map(ann_a -> Set.empty, ann_b -> Set.empty))) + peerConnection1b.expectMsg(maxBroadcastDelay, Rebroadcast(channels = Map(chan_ab -> Set(origin1a, origin1b)), updates = Map(channelUpdate_ab -> Set(origin1b), channelUpdate_ba -> Set.empty), nodes = Map(ann_a -> Set.empty, ann_b -> Set.empty))) + peerConnection2a.expectMsg(maxBroadcastDelay, Rebroadcast(channels = Map(chan_ab -> Set(origin2a)), updates = Map(channelUpdate_ab -> Set.empty, channelUpdate_ba -> Set.empty), nodes = Map(ann_a -> Set.empty, ann_b -> Set.empty))) + peerConnection3a.expectMsg(maxBroadcastDelay, Rebroadcast(channels = Map(chan_ab -> Set.empty), updates = Map(channelUpdate_ab -> Set.empty, channelUpdate_ba -> Set(origin3a)), nodes = Map(ann_a -> Set(origin3a), ann_b -> Set(origin3a)))) + } + + test("do not forward duplicate gossip") { + val nodeParams = Alice.nodeParams + val router = TestProbe() + val system1 = ActorSystem("front-system-1") + val front1 = system1.actorOf(FrontRouter.props(nodeParams.routerConf, router.ref)) + router.expectMsg(GetRoutingStateStreaming) + router.send(front1, RoutingStateStreamingUpToDate) + + val peerConnection1 = TestProbe() + system1.eventStream.subscribe(peerConnection1.ref, classOf[Rebroadcast]) + + val origin1 = RemoteGossip(peerConnection1.ref, randomKey.publicKey) + + peerConnection1.send(front1, PeerRoutingMessage(peerConnection1.ref, origin1.nodeId, chan_ab)) + router.expectMsg(PeerRoutingMessage(front1, origin1.nodeId, chan_ab)) + router.send(front1, TransportHandler.ReadAck(chan_ab)) + peerConnection1.expectNoMessage() + router.send(front1, GossipDecision.Accepted(chan_ab)) + peerConnection1.expectMsg(TransportHandler.ReadAck(chan_ab)) + peerConnection1.expectMsg(GossipDecision.Accepted(chan_ab)) + router.send(front1, ChannelsDiscovered(SingleChannelDiscovered(chan_ab, 0.sat, None, None) :: Nil)) + + peerConnection1.send(front1, PeerRoutingMessage(peerConnection1.ref, origin1.nodeId, chan_ab)) + router.expectNoMessage // announcement is pending rebroadcast + peerConnection1.expectMsg(TransportHandler.ReadAck(chan_ab)) + + router.send(front1, TickBroadcast) + peerConnection1.expectMsg(Rebroadcast(channels = Map(chan_ab -> Set(origin1)), updates = Map.empty, nodes = Map.empty)) + + peerConnection1.send(front1, PeerRoutingMessage(peerConnection1.ref, origin1.nodeId, chan_ab)) + router.expectNoMessage // announcement is already known + peerConnection1.expectMsg(TransportHandler.ReadAck(chan_ab)) + } + + test("acknowledge duplicate gossip") { + val nodeParams = Alice.nodeParams + val router = TestProbe() + val system1 = ActorSystem("front-system-1") + val front1 = system1.actorOf(FrontRouter.props(nodeParams.routerConf, router.ref)) + router.expectMsg(GetRoutingStateStreaming) + router.send(front1, RoutingStateStreamingUpToDate) + + val peerConnection1 = TestProbe() + system1.eventStream.subscribe(peerConnection1.ref, classOf[Rebroadcast]) + + val origin1 = RemoteGossip(peerConnection1.ref, randomKey.publicKey) + + // first message arrives and is forwarded to router + peerConnection1.send(front1, PeerRoutingMessage(peerConnection1.ref, origin1.nodeId, chan_ab)) + router.expectMsg(PeerRoutingMessage(front1, origin1.nodeId, chan_ab)) + peerConnection1.expectNoMessage() + // duplicate message is immediately acknowledged + peerConnection1.send(front1, PeerRoutingMessage(peerConnection1.ref, origin1.nodeId, chan_ab)) + peerConnection1.expectMsg(TransportHandler.ReadAck(chan_ab)) + // router acknowledges the first message + router.send(front1, TransportHandler.ReadAck(chan_ab)) + // but we still wait for the decision before acking the original message + peerConnection1.expectNoMessage() + // decision arrives, message is acknowledged + router.send(front1, GossipDecision.Accepted(chan_ab)) + peerConnection1.expectMsg(TransportHandler.ReadAck(chan_ab)) + peerConnection1.expectMsg(GossipDecision.Accepted(chan_ab)) + } + + test("do not rebroadcast channel_update for private channels") { + val nodeParams = Alice.nodeParams + val router = TestProbe() + val system1 = ActorSystem("front-system-1") + val front1 = system1.actorOf(FrontRouter.props(nodeParams.routerConf, router.ref)) + router.expectMsg(GetRoutingStateStreaming) + router.send(front1, RoutingStateStreamingUpToDate) + + val peerConnection1 = TestProbe() + system1.eventStream.subscribe(peerConnection1.ref, classOf[Rebroadcast]) + + val origin1 = RemoteGossip(peerConnection1.ref, randomKey.publicKey) + + // channel_update arrives and is forwarded to router (there is no associated channel, because it is private) + peerConnection1.send(front1, PeerRoutingMessage(peerConnection1.ref, origin1.nodeId, channelUpdate_ab)) + router.expectMsg(PeerRoutingMessage(front1, origin1.nodeId, channelUpdate_ab)) + peerConnection1.expectNoMessage() + // router acknowledges the message + router.send(front1, TransportHandler.ReadAck(channelUpdate_ab)) + // but we still wait for the decision before acking the original message + peerConnection1.expectNoMessage() + // decision arrives, message is acknowledged + router.send(front1, GossipDecision.Accepted(channelUpdate_ab)) + peerConnection1.expectMsg(TransportHandler.ReadAck(channelUpdate_ab)) + peerConnection1.expectMsg(GossipDecision.Accepted(channelUpdate_ab)) + // then the event arrives + front1 ! ChannelUpdatesReceived(channelUpdate_ab :: Nil) + // rebroadcast + front1 ! TickBroadcast + peerConnection1.expectNoMessage() + } + + +} + +object FrontRouterSpec { + val (priv_a, priv_b, priv_c, priv_d, priv_e, priv_f) = (randomKey, randomKey, randomKey, randomKey, randomKey, randomKey) + val (a, b, c, d, e, f) = (priv_a.publicKey, priv_b.publicKey, priv_c.publicKey, priv_d.publicKey, priv_e.publicKey, priv_f.publicKey) + + val (priv_funding_a, priv_funding_b, priv_funding_c, priv_funding_d, priv_funding_e, priv_funding_f) = (randomKey, randomKey, randomKey, randomKey, randomKey, randomKey) + val (funding_a, funding_b, funding_c, funding_d, funding_e, funding_f) = (priv_funding_a.publicKey, priv_funding_b.publicKey, priv_funding_c.publicKey, priv_funding_d.publicKey, priv_funding_e.publicKey, priv_funding_f.publicKey) + + val ann_a = makeNodeAnnouncement(priv_a, "node-A", Color(15, 10, -70), Nil, Features(hex"0200")) + val ann_b = makeNodeAnnouncement(priv_b, "node-B", Color(50, 99, -80), Nil, Features(hex"")) + val ann_c = makeNodeAnnouncement(priv_c, "node-C", Color(123, 100, -40), Nil, Features(hex"0200")) + val ann_d = makeNodeAnnouncement(priv_d, "node-D", Color(-120, -20, 60), Nil, Features(hex"00")) + val ann_e = makeNodeAnnouncement(priv_e, "node-E", Color(-50, 0, 10), Nil, Features(hex"00")) + val ann_f = makeNodeAnnouncement(priv_f, "node-F", Color(30, 10, -50), Nil, Features(hex"00")) + + val channelId_ab = ShortChannelId(420000, 1, 0) + val channelId_bc = ShortChannelId(420000, 2, 0) + val channelId_cd = ShortChannelId(420000, 3, 0) + val channelId_ef = ShortChannelId(420000, 4, 0) + + def channelAnnouncement(shortChannelId: ShortChannelId, node1_priv: PrivateKey, node2_priv: PrivateKey, funding1_priv: PrivateKey, funding2_priv: PrivateKey) = { + val witness = Announcements.generateChannelAnnouncementWitness(Block.RegtestGenesisBlock.hash, shortChannelId, node1_priv.publicKey, node2_priv.publicKey, funding1_priv.publicKey, funding2_priv.publicKey, Features.empty) + val node1_sig = Announcements.signChannelAnnouncement(witness, node1_priv) + val funding1_sig = Announcements.signChannelAnnouncement(witness, funding1_priv) + val node2_sig = Announcements.signChannelAnnouncement(witness, node2_priv) + val funding2_sig = Announcements.signChannelAnnouncement(witness, funding2_priv) + makeChannelAnnouncement(Block.RegtestGenesisBlock.hash, shortChannelId, node1_priv.publicKey, node2_priv.publicKey, funding1_priv.publicKey, funding2_priv.publicKey, node1_sig, node2_sig, funding1_sig, funding2_sig) + } + + val chan_ab = channelAnnouncement(channelId_ab, priv_a, priv_b, priv_funding_a, priv_funding_b) + val chan_bc = channelAnnouncement(channelId_bc, priv_b, priv_c, priv_funding_b, priv_funding_c) + val chan_cd = channelAnnouncement(channelId_cd, priv_c, priv_d, priv_funding_c, priv_funding_d) + val chan_ef = channelAnnouncement(channelId_ef, priv_e, priv_f, priv_funding_e, priv_funding_f) + + val channelUpdate_ab = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_a, b, channelId_ab, CltvExpiryDelta(7), htlcMinimumMsat = 0 msat, feeBaseMsat = 10 msat, feeProportionalMillionths = 10, htlcMaximumMsat = 500000000 msat) + val channelUpdate_ba = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_b, a, channelId_ab, CltvExpiryDelta(7), htlcMinimumMsat = 0 msat, feeBaseMsat = 10 msat, feeProportionalMillionths = 10, htlcMaximumMsat = 500000000 msat) + val channelUpdate_bc = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_b, c, channelId_bc, CltvExpiryDelta(5), htlcMinimumMsat = 0 msat, feeBaseMsat = 10 msat, feeProportionalMillionths = 1, htlcMaximumMsat = 500000000 msat) + val channelUpdate_cb = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_c, b, channelId_bc, CltvExpiryDelta(5), htlcMinimumMsat = 0 msat, feeBaseMsat = 10 msat, feeProportionalMillionths = 1, htlcMaximumMsat = 500000000 msat) + val channelUpdate_cd = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_c, d, channelId_cd, CltvExpiryDelta(3), htlcMinimumMsat = 0 msat, feeBaseMsat = 10 msat, feeProportionalMillionths = 4, htlcMaximumMsat = 500000000 msat) + val channelUpdate_dc = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_d, c, channelId_cd, CltvExpiryDelta(3), htlcMinimumMsat = 0 msat, feeBaseMsat = 10 msat, feeProportionalMillionths = 4, htlcMaximumMsat = 500000000 msat) + val channelUpdate_ef = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_e, f, channelId_ef, CltvExpiryDelta(9), htlcMinimumMsat = 0 msat, feeBaseMsat = 10 msat, feeProportionalMillionths = 8, htlcMaximumMsat = 500000000 msat) + val channelUpdate_fe = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_f, e, channelId_ef, CltvExpiryDelta(9), htlcMinimumMsat = 0 msat, feeBaseMsat = 10 msat, feeProportionalMillionths = 8, htlcMaximumMsat = 500000000 msat) +} diff --git a/eclair-node/src/main/resources/application.conf b/eclair-node/src/main/resources/application.conf index a55e28644..270ae3d42 100644 --- a/eclair-node/src/main/resources/application.conf +++ b/eclair-node/src/main/resources/application.conf @@ -2,6 +2,36 @@ eclair { enable-kamon = false } +akka { + actor.provider = cluster + remote { + artery { + canonical.hostname = "127.0.0.1" + canonical.port = 25520 + + untrusted-mode = on + trusted-selection-paths = [ + "/user/*/switchboard", + "/user/*/router", + "/system/cluster/core/daemon", + "/system/cluster/heartbeatReceiver", + "/system/distributedPubSubMediator", + "/system/clusterReceptionist/replicator" + ] + } + deployment { + enable-whitelist = on + whitelist = [] // no remote deployment + } + } + cluster.roles = [backend] + coordinated-shutdown.terminate-actor-system = on + coordinated-shutdown.exit-jvm = on + //It is recommended to load the extension when the actor system is started by defining it in akka.extensions + //configuration property. Otherwise it will be activated when first used and then it takes a while for it to be populated. + extensions = ["akka.cluster.pubsub.DistributedPubSub"] +} + kamon.instrumentation.akka { filters { actors { diff --git a/eclair-node/src/main/scala/fr/acinq/eclair/Boot.scala b/eclair-node/src/main/scala/fr/acinq/eclair/Boot.scala index 3c834d249..4c8b64766 100644 --- a/eclair-node/src/main/scala/fr/acinq/eclair/Boot.scala +++ b/eclair-node/src/main/scala/fr/acinq/eclair/Boot.scala @@ -29,8 +29,8 @@ import scala.concurrent.ExecutionContext import scala.util.{Failure, Success} /** - * Created by PM on 25/01/2016. - */ + * Created by PM on 25/01/2016. + */ object Boot extends App with Logging { try { val datadir = new File(System.getProperty("eclair.datadir", System.getProperty("user.home") + "/.eclair")) @@ -59,15 +59,15 @@ object Boot extends App with Logging { } /** - * Starts the http APIs service if enabled in the configuration - * - * @param kit - * @param system - * @param ec - */ + * Starts the http APIs service if enabled in the configuration + * + * @param kit + * @param system + * @param ec + */ def startApiServiceIfEnabled(kit: Kit)(implicit system: ActorSystem, ec: ExecutionContext) = { val config = system.settings.config.getConfig("eclair") - if(config.getBoolean("api.enabled")){ + if (config.getBoolean("api.enabled")) { logger.info(s"json API enabled on port=${config.getInt("api.port")}") implicit val materializer = ActorMaterializer() val apiPassword = config.getString("api.password") match { diff --git a/pom.xml b/pom.xml index 274fddb19..f47e5f946 100644 --- a/pom.xml +++ b/pom.xml @@ -25,6 +25,7 @@ eclair-core + eclair-front eclair-node eclair-node-gui