mirror of
https://github.com/ACINQ/eclair.git
synced 2025-02-20 13:34:35 +01:00
Distribute connection-handling on multiple machines using akka-cluster (#1566)
The goal is to offload the back from everything connection-related: - incoming connections - outgoing connections - gossip queries + pings - incoming gossip aggregation - outgoing gossip dispatch (rebroadcast) Co-authored-by: Bastien Teinturier <31281497+t-bast@users.noreply.github.com>
This commit is contained in:
parent
4e567053ab
commit
08c21fa5e9
36 changed files with 1838 additions and 69 deletions
2
.github/workflows/main.yml
vendored
2
.github/workflows/main.yml
vendored
|
@ -32,7 +32,7 @@ jobs:
|
|||
# NB: we exclude external API tests from the CI, because we don't want our build to fail because a dependency is failing.
|
||||
# This means we won't automatically catch changes in external APIs, but developers should regularly run the test suite locally so in practice it shouldn't be a problem.
|
||||
- name: Build with Maven
|
||||
run: mvn compile && mvn scoverage:report -DtagsToExclude=external-api
|
||||
run: mvn test-compile && mvn scoverage:report -DtagsToExclude=external-api
|
||||
|
||||
- name: Upload coverage to Codecov
|
||||
uses: codecov/codecov-action@v1
|
||||
|
|
|
@ -74,6 +74,7 @@
|
|||
<goals>
|
||||
<goal>test-jar</goal>
|
||||
</goals>
|
||||
<phase>test-compile</phase>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
|
@ -142,6 +143,21 @@
|
|||
<artifactId>akka-slf4j_${scala.version.short}</artifactId>
|
||||
<version>${akka.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.typesafe.akka</groupId>
|
||||
<artifactId>akka-cluster_${scala.version.short}</artifactId>
|
||||
<version>${akka.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.typesafe.akka</groupId>
|
||||
<artifactId>akka-cluster-typed_${scala.version.short}</artifactId>
|
||||
<version>${akka.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.typesafe.akka</groupId>
|
||||
<artifactId>akka-cluster-tools_${scala.version.short}</artifactId>
|
||||
<version>${akka.version}</version>
|
||||
</dependency>
|
||||
<!-- HTTP CLIENT -->
|
||||
<dependency>
|
||||
<groupId>com.softwaremill.sttp</groupId>
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
eclair {
|
||||
datadir = ${user.home}"/.eclair"
|
||||
|
||||
chain = "mainnet" // "regtest" for regtest, "testnet" for testnet, "mainnet" for mainnet
|
||||
|
||||
|
@ -254,10 +255,63 @@ akka {
|
|||
# configured receive buffer size. When using value 'unlimited' it will
|
||||
# try to read all from the receive buffer.
|
||||
# As per BOLT#8 lightning messages are at most 2 + 16 + 65535 + 16 = 65569bytes
|
||||
# Currently the largest message is update_add_htlc (~1500b).
|
||||
# As a tradeoff to reduce the RAM consumption, in conjunction with tcp pull mode,
|
||||
# the default value is chosen to allow for a decent number of messages to be prefetched.
|
||||
max-received-message-size = 16384b
|
||||
max-received-message-size = 300000b
|
||||
}
|
||||
}
|
||||
|
||||
actor {
|
||||
warn-about-java-serializer-usage = on
|
||||
allow-java-serialization = off
|
||||
|
||||
serializers {
|
||||
lightning = "fr.acinq.eclair.remote.LightningMessageSerializer"
|
||||
eclair-internals = "fr.acinq.eclair.remote.EclairInternalsSerializer"
|
||||
}
|
||||
|
||||
serialization-bindings {
|
||||
"fr.acinq.eclair.wire.LightningMessage" = lightning
|
||||
"fr.acinq.eclair.remote.EclairInternalsSerializer$RemoteTypes" = eclair-internals
|
||||
}
|
||||
}
|
||||
|
||||
remote.artery {
|
||||
transport = "tcp" // switching to tls-tcp is highly recommended in a production environment
|
||||
|
||||
// We are using a simple setup (https://doc.akka.io/docs/akka/current/remoting-artery.html#remote-security):
|
||||
// > Have a single set of keys and a single certificate for all nodes and disable hostname checking
|
||||
// > - The single set of keys and the single certificate is distributed to all nodes. The certificate can be self-signed as it is distributed both as a certificate for authentication but also as the trusted certificate.
|
||||
// > - If the keys/certificate are lost, someone else can connect to your cluster.
|
||||
// > - Adding nodes to the cluster is simple as the key material can be deployed / distributed to the new node.
|
||||
// Command line used to generate the self-signed certificate:
|
||||
// keytool -genkeypair -v \
|
||||
// -keystore akka-cluster-tls.jks \
|
||||
// -dname "O=ACINQ, C=FR" \
|
||||
// -keypass:env PW \
|
||||
// -storepass:env PW \
|
||||
// -keyalg RSA \
|
||||
// -keysize 4096 \
|
||||
// -validity 9999
|
||||
ssl.config-ssl-engine {
|
||||
key-store = ${eclair.datadir}"/akka-cluster-tls.jks"
|
||||
trust-store = ${eclair.datadir}"/akka-cluster-tls.jks"
|
||||
|
||||
key-store-password = ${?AKKA_TLS_PASSWORD}
|
||||
key-password = ${?AKKA_TLS_PASSWORD}
|
||||
trust-store-password = ${?AKKA_TLS_PASSWORD}
|
||||
|
||||
protocol = "TLSv1.2"
|
||||
|
||||
enabled-algorithms = [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]
|
||||
}
|
||||
}
|
||||
|
||||
cluster {
|
||||
role {
|
||||
backend.min-nr-of-members = 1
|
||||
frontend.min-nr-of-members = 0
|
||||
}
|
||||
seed-nodes = [ "akka://eclair-node@127.0.0.1:25520" ]
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,7 +18,7 @@ package fr.acinq.eclair.channel
|
|||
|
||||
import java.util.UUID
|
||||
|
||||
import akka.actor.ActorRef
|
||||
import akka.actor.{ActorRef, PossiblyHarmful}
|
||||
import fr.acinq.bitcoin.Crypto.PublicKey
|
||||
import fr.acinq.bitcoin.{ByteVector32, DeterministicWallet, OutPoint, Satoshi, Transaction}
|
||||
import fr.acinq.eclair.blockchain.fee.FeeratePerKw
|
||||
|
@ -94,7 +94,7 @@ case object INPUT_DISCONNECTED
|
|||
case class INPUT_RECONNECTED(remote: ActorRef, localInit: Init, remoteInit: Init)
|
||||
case class INPUT_RESTORED(data: HasCommitments)
|
||||
|
||||
sealed trait BitcoinEvent
|
||||
sealed trait BitcoinEvent extends PossiblyHarmful
|
||||
case object BITCOIN_FUNDING_PUBLISH_FAILED extends BitcoinEvent
|
||||
case object BITCOIN_FUNDING_DEPTHOK extends BitcoinEvent
|
||||
case object BITCOIN_FUNDING_DEEPLYBURIED extends BitcoinEvent
|
||||
|
@ -166,7 +166,7 @@ object Origin {
|
|||
}
|
||||
|
||||
/** should not be used directly */
|
||||
sealed trait Command
|
||||
sealed trait Command extends PossiblyHarmful
|
||||
sealed trait HasReplyToCommand extends Command { def replyTo: ActorRef }
|
||||
sealed trait HasOptionalReplyToCommand extends Command { def replyTo_opt: Option[ActorRef] }
|
||||
|
||||
|
@ -254,7 +254,7 @@ object ChannelOpenResponse {
|
|||
8888888P" d88P 888 888 d88P 888
|
||||
*/
|
||||
|
||||
sealed trait Data {
|
||||
sealed trait Data extends PossiblyHarmful {
|
||||
def channelId: ByteVector32
|
||||
}
|
||||
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
package fr.acinq.eclair.crypto
|
||||
|
||||
import java.nio.ByteOrder
|
||||
|
||||
import akka.actor.{Actor, ActorRef, ExtendedActorSystem, FSM, PoisonPill, Props, Terminated}
|
||||
import akka.event.Logging.MDC
|
||||
import akka.event._
|
||||
|
@ -28,11 +27,15 @@ import fr.acinq.bitcoin.Protocol
|
|||
import fr.acinq.eclair.Logs.LogCategory
|
||||
import fr.acinq.eclair.crypto.ChaCha20Poly1305.ChaCha20Poly1305Error
|
||||
import fr.acinq.eclair.crypto.Noise._
|
||||
import fr.acinq.eclair.wire.{AnnouncementSignatures, ChannelAnnouncement, ChannelUpdate, NodeAnnouncement, ReplyShortChannelIdsEnd, RoutingMessage}
|
||||
import fr.acinq.eclair.remote.EclairInternalsSerializer.RemoteTypes
|
||||
import fr.acinq.eclair.wire.{AnnouncementSignatures, RoutingMessage}
|
||||
import fr.acinq.eclair.{Diagnostics, FSMDiagnosticActorLogging, Logs}
|
||||
import fr.acinq.eclair.wire.{ChannelAnnouncement, ChannelUpdate, NodeAnnouncement}
|
||||
import fr.acinq.eclair.{Diagnostics, FSMDiagnosticActorLogging, Logs, getSimpleClassName}
|
||||
import scodec.bits.ByteVector
|
||||
import scodec.{Attempt, Codec, DecodeResult}
|
||||
|
||||
import java.nio.ByteOrder
|
||||
import scala.annotation.tailrec
|
||||
import scala.collection.immutable.Queue
|
||||
import scala.reflect.ClassTag
|
||||
|
@ -256,7 +259,7 @@ class TransportHandler[T: ClassTag](keyPair: KeyPair, rs: Option[ByteVector], co
|
|||
stop(FSM.Normal)
|
||||
|
||||
case Event(Terminated(actor), _) if actor == connection =>
|
||||
log.info(s"connection terminated, stopping the transport")
|
||||
log.info("connection actor died")
|
||||
// this can be the connection or the listener, either way it is a cause of death
|
||||
stop(FSM.Normal)
|
||||
|
||||
|
@ -272,6 +275,13 @@ class TransportHandler[T: ClassTag](keyPair: KeyPair, rs: Option[ByteVector], co
|
|||
onTermination {
|
||||
case _: StopEvent =>
|
||||
connection ! Tcp.Close // attempts to gracefully close the connection when dying
|
||||
stateData match {
|
||||
case normal: NormalData[_] =>
|
||||
// NB: we deduplicate on the class name: each class will appear once but there may be many instances (less verbose and gives debug hints)
|
||||
log.info("stopping (unackedReceived={} unackedSent={})", normal.unackedReceived.keys.map(getSimpleClassName).toSet.mkString(","), normal.unackedSent.map(getSimpleClassName))
|
||||
case _ =>
|
||||
log.info("stopping")
|
||||
}
|
||||
}
|
||||
|
||||
initialize()
|
||||
|
@ -443,7 +453,7 @@ object TransportHandler {
|
|||
|
||||
case class HandshakeCompleted(remoteNodeId: PublicKey)
|
||||
|
||||
case class ReadAck(msg: Any)
|
||||
case class ReadAck(msg: Any) extends RemoteTypes
|
||||
case object WriteAck extends Tcp.Event
|
||||
// @formatter:on
|
||||
|
||||
|
|
|
@ -18,19 +18,46 @@ package fr.acinq.eclair.io
|
|||
|
||||
import java.net.InetSocketAddress
|
||||
|
||||
import akka.actor.{Actor, ActorLogging, ActorRef, Props}
|
||||
import akka.actor.{Actor, ActorLogging, ActorRef, DeadLetter, Props}
|
||||
import akka.cluster.Cluster
|
||||
import akka.cluster.pubsub.DistributedPubSub
|
||||
import akka.cluster.pubsub.DistributedPubSubMediator.Put
|
||||
import fr.acinq.bitcoin.Crypto.PublicKey
|
||||
import fr.acinq.eclair.crypto.Noise.KeyPair
|
||||
import fr.acinq.eclair.remote.EclairInternalsSerializer.RemoteTypes
|
||||
import fr.acinq.eclair.tor.Socks5ProxyParams
|
||||
|
||||
class ClientSpawner(keyPair: KeyPair, socks5ProxyParams_opt: Option[Socks5ProxyParams], peerConnectionConf: PeerConnection.Conf, switchboard: ActorRef, router: ActorRef) extends Actor with ActorLogging {
|
||||
|
||||
context.system.eventStream.subscribe(self, classOf[ClientSpawner.ConnectionRequest])
|
||||
|
||||
if (context.system.hasExtension(Cluster)) {
|
||||
val roles = context.system.extension(Cluster).selfRoles
|
||||
if (roles.contains("frontend")) {
|
||||
val mediator = DistributedPubSub(context.system).mediator
|
||||
mediator ! Put(self)
|
||||
} else if (roles.contains("backend")) {
|
||||
// When the cluster is enabled, the backend will handle outgoing connections when there are no front available, by
|
||||
// registering to the dead letters.
|
||||
// Another option would have been to register the backend along with the front to the regular distributed pubsub,
|
||||
// but, even with affinity=false at sending, it would have resulted in outgoing connections being randomly assigned
|
||||
// to all listeners equally (front and back). What we really want is to always spawn connections on the frontend
|
||||
// when possible
|
||||
context.system.eventStream.subscribe(self, classOf[DeadLetter])
|
||||
}
|
||||
}
|
||||
|
||||
override def receive: Receive = {
|
||||
case req: ClientSpawner.ConnectionRequest =>
|
||||
log.info("initiating new connection to nodeId={} origin={}", req.remoteNodeId, sender)
|
||||
context.actorOf(Client.props(keyPair, socks5ProxyParams_opt, peerConnectionConf, switchboard, router, req.address, req.remoteNodeId, origin_opt = Some(req.origin)))
|
||||
case DeadLetter(req: ClientSpawner.ConnectionRequest, _, _) =>
|
||||
// we only subscribe to the deadletters event stream when in cluster mode
|
||||
// in that case we want to be warned when connections are spawned by the backend
|
||||
log.warning("handling outgoing connection request locally")
|
||||
self forward req
|
||||
case _: DeadLetter =>
|
||||
// we don't care about other dead letters
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -40,6 +67,5 @@ object ClientSpawner {
|
|||
|
||||
case class ConnectionRequest(address: InetSocketAddress,
|
||||
remoteNodeId: PublicKey,
|
||||
origin: ActorRef)
|
||||
|
||||
origin: ActorRef) extends RemoteTypes
|
||||
}
|
||||
|
|
|
@ -16,9 +16,7 @@
|
|||
|
||||
package fr.acinq.eclair.io
|
||||
|
||||
import java.net.InetSocketAddress
|
||||
|
||||
import akka.actor.{Actor, ActorRef, ExtendedActorSystem, FSM, OneForOneStrategy, PoisonPill, Props, Status, SupervisorStrategy, Terminated}
|
||||
import akka.actor.{Actor, ActorRef, ExtendedActorSystem, FSM, OneForOneStrategy, PossiblyHarmful, Props, Status, SupervisorStrategy, Terminated}
|
||||
import akka.event.Logging.MDC
|
||||
import akka.event.{BusLogging, DiagnosticLoggingAdapter}
|
||||
import akka.util.Timeout
|
||||
|
@ -31,10 +29,14 @@ import fr.acinq.eclair.blockchain.EclairWallet
|
|||
import fr.acinq.eclair.blockchain.fee.FeeratePerKw
|
||||
import fr.acinq.eclair.channel._
|
||||
import fr.acinq.eclair.io.Monitoring.Metrics
|
||||
import fr.acinq.eclair.io.PeerConnection.KillReason
|
||||
import fr.acinq.eclair.remote.EclairInternalsSerializer.RemoteTypes
|
||||
import fr.acinq.eclair.wire._
|
||||
import fr.acinq.eclair.{wire, _}
|
||||
import scodec.bits.ByteVector
|
||||
|
||||
import java.net.InetSocketAddress
|
||||
|
||||
/**
|
||||
* This actor represents a logical peer. There is one [[Peer]] per unique remote node id at all time.
|
||||
*
|
||||
|
@ -98,7 +100,7 @@ class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, watcher: ActorRe
|
|||
case Event(err@wire.Error(channelId, reason), d: ConnectedData) if channelId == CHANNELID_ZERO =>
|
||||
log.error(s"connection-level error, failing all channels! reason=${new String(reason.toArray)}")
|
||||
d.channels.values.toSet[ActorRef].foreach(_ forward err) // we deduplicate with toSet because there might be two entries per channel (tmp id and final id)
|
||||
d.peerConnection ! PoisonPill
|
||||
d.peerConnection ! PeerConnection.Kill(KillReason.AllChannelsFail)
|
||||
stay
|
||||
|
||||
case Event(err: wire.Error, d: ConnectedData) =>
|
||||
|
@ -169,7 +171,7 @@ class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, watcher: ActorRe
|
|||
case Event(Disconnect(nodeId), d: ConnectedData) if nodeId == remoteNodeId =>
|
||||
log.info("disconnecting")
|
||||
sender ! "disconnecting"
|
||||
d.peerConnection ! PoisonPill
|
||||
d.peerConnection ! PeerConnection.Kill(KillReason.UserRequest)
|
||||
stay
|
||||
|
||||
case Event(ConnectionDown(peerConnection), d: ConnectedData) if peerConnection == d.peerConnection =>
|
||||
|
@ -190,13 +192,13 @@ class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, watcher: ActorRe
|
|||
log.info(s"channel closed: channelId=${channelIds.mkString("/")}")
|
||||
if (d.channels.values.toSet - actor == Set.empty) {
|
||||
log.info(s"that was the last open channel, closing the connection")
|
||||
d.peerConnection ! PoisonPill
|
||||
d.peerConnection ! PeerConnection.Kill(KillReason.NoRemainingChannel)
|
||||
}
|
||||
stay using d.copy(channels = d.channels -- channelIds)
|
||||
|
||||
case Event(connectionReady: PeerConnection.ConnectionReady, d: ConnectedData) =>
|
||||
log.info(s"got new connection, killing current one and switching")
|
||||
d.peerConnection ! PoisonPill
|
||||
d.peerConnection ! PeerConnection.Kill(KillReason.ConnectionReplaced)
|
||||
d.channels.values.toSet[ActorRef].foreach(_ ! INPUT_DISCONNECTED) // we deduplicate with toSet because there might be two entries per channel (tmp id and final id)
|
||||
gotoConnected(connectionReady, d.channels)
|
||||
|
||||
|
@ -380,8 +382,8 @@ object Peer {
|
|||
def apply(uri: NodeURI): Connect = new Connect(uri.nodeId, Some(uri.address))
|
||||
}
|
||||
|
||||
case class Disconnect(nodeId: PublicKey)
|
||||
case class OpenChannel(remoteNodeId: PublicKey, fundingSatoshis: Satoshi, pushMsat: MilliSatoshi, fundingTxFeeratePerKw_opt: Option[FeeratePerKw], initialRelayFees_opt: Option[(MilliSatoshi, Int)], channelFlags: Option[Byte], timeout_opt: Option[Timeout]) {
|
||||
case class Disconnect(nodeId: PublicKey) extends PossiblyHarmful
|
||||
case class OpenChannel(remoteNodeId: PublicKey, fundingSatoshis: Satoshi, pushMsat: MilliSatoshi, fundingTxFeeratePerKw_opt: Option[FeeratePerKw], initialRelayFees_opt: Option[(MilliSatoshi, Int)], channelFlags: Option[Byte], timeout_opt: Option[Timeout]) extends PossiblyHarmful {
|
||||
require(pushMsat <= fundingSatoshis, s"pushMsat must be less or equal to fundingSatoshis")
|
||||
require(fundingSatoshis >= 0.sat, s"fundingSatoshis must be positive")
|
||||
require(pushMsat >= 0.msat, s"pushMsat must be positive")
|
||||
|
@ -390,7 +392,7 @@ object Peer {
|
|||
case object GetPeerInfo
|
||||
case class PeerInfo(nodeId: PublicKey, state: String, address: Option[InetSocketAddress], channels: Int)
|
||||
|
||||
case class PeerRoutingMessage(peerConnection: ActorRef, remoteNodeId: PublicKey, message: RoutingMessage)
|
||||
case class PeerRoutingMessage(peerConnection: ActorRef, remoteNodeId: PublicKey, message: RoutingMessage) extends RemoteTypes
|
||||
|
||||
case class Transition(previousData: Peer.Data, nextData: Peer.Data)
|
||||
|
||||
|
@ -398,7 +400,7 @@ object Peer {
|
|||
* Sent by the peer-connection to notify the peer that the connection is down.
|
||||
* We could use watchWith on the peer-connection but it doesn't work with akka cluster when untrusted mode is enabled
|
||||
*/
|
||||
case class ConnectionDown(peerConnection: ActorRef)
|
||||
case class ConnectionDown(peerConnection: ActorRef) extends RemoteTypes
|
||||
|
||||
// @formatter:on
|
||||
|
||||
|
|
|
@ -27,6 +27,7 @@ import fr.acinq.eclair.crypto.Noise.KeyPair
|
|||
import fr.acinq.eclair.crypto.TransportHandler
|
||||
import fr.acinq.eclair.io.Monitoring.{Metrics, Tags}
|
||||
import fr.acinq.eclair.io.Peer.CHANNELID_ZERO
|
||||
import fr.acinq.eclair.remote.EclairInternalsSerializer.RemoteTypes
|
||||
import fr.acinq.eclair.router.Router._
|
||||
import fr.acinq.eclair.wire._
|
||||
import fr.acinq.eclair.{wire, _}
|
||||
|
@ -271,7 +272,7 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A
|
|||
val nodesSent = sendAndCount(rebroadcast.nodes)
|
||||
|
||||
if (channelsSent > 0 || updatesSent > 0 || nodesSent > 0) {
|
||||
log.info(s"sent announcements: channels={} updates={} nodes={}", channelsSent, updatesSent, nodesSent)
|
||||
log.debug("sent announcements: channels={} updates={} nodes={}", channelsSent, updatesSent, nodesSent)
|
||||
}
|
||||
stay
|
||||
|
||||
|
@ -382,6 +383,12 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A
|
|||
}
|
||||
stop(FSM.Normal)
|
||||
|
||||
case Event(Kill(reason), _) =>
|
||||
Logs.withMdc(diagLog)(Logs.mdc(category_opt = Some(Logs.LogCategory.CONNECTION))) {
|
||||
log.info(s"stopping with reason=$reason")
|
||||
stop(FSM.Normal)
|
||||
}
|
||||
|
||||
case Event(_: GossipDecision.Accepted, _) => stay // for now we don't do anything with those events
|
||||
|
||||
case Event(_: GossipDecision.Rejected, _) => stay // we got disconnected while syncing
|
||||
|
@ -505,16 +512,16 @@ object PeerConnection {
|
|||
case class PendingAuth(connection: ActorRef, remoteNodeId_opt: Option[PublicKey], address: InetSocketAddress, origin_opt: Option[ActorRef], transport_opt: Option[ActorRef] = None) {
|
||||
def outgoing: Boolean = remoteNodeId_opt.isDefined // if this is an outgoing connection, we know the node id in advance
|
||||
}
|
||||
case class Authenticated(peerConnection: ActorRef, remoteNodeId: PublicKey)
|
||||
case class InitializeConnection(peer: ActorRef, chainHash: ByteVector32, features: Features, doSync: Boolean)
|
||||
case class ConnectionReady(peerConnection: ActorRef, remoteNodeId: PublicKey, address: InetSocketAddress, outgoing: Boolean, localInit: wire.Init, remoteInit: wire.Init)
|
||||
case class Authenticated(peerConnection: ActorRef, remoteNodeId: PublicKey) extends RemoteTypes
|
||||
case class InitializeConnection(peer: ActorRef, chainHash: ByteVector32, features: Features, doSync: Boolean) extends RemoteTypes
|
||||
case class ConnectionReady(peerConnection: ActorRef, remoteNodeId: PublicKey, address: InetSocketAddress, outgoing: Boolean, localInit: wire.Init, remoteInit: wire.Init) extends RemoteTypes
|
||||
|
||||
sealed trait ConnectionResult
|
||||
sealed trait ConnectionResult extends RemoteTypes
|
||||
object ConnectionResult {
|
||||
sealed trait Success extends ConnectionResult
|
||||
sealed trait Failure extends ConnectionResult
|
||||
|
||||
case class NoAddressFound(remoteNodeId: PublicKey) extends ConnectionResult.Failure { override def toString: String = "no address found" }
|
||||
case object NoAddressFound extends ConnectionResult.Failure { override def toString: String = "no address found" }
|
||||
case class ConnectionFailed(address: InetSocketAddress) extends ConnectionResult.Failure { override def toString: String = s"connection failed to $address" }
|
||||
case class AuthenticationFailed(reason: String) extends ConnectionResult.Failure { override def toString: String = reason }
|
||||
case class InitializationFailed(reason: String) extends ConnectionResult.Failure { override def toString: String = reason }
|
||||
|
@ -526,6 +533,19 @@ object PeerConnection {
|
|||
|
||||
case class Behavior(fundingTxAlreadySpentCount: Int = 0, ignoreNetworkAnnouncement: Boolean = false)
|
||||
|
||||
/**
|
||||
* Kill the actor and the underlying connection. We can't use a [[PoisonPill]] because it would be dropped when using
|
||||
* akka cluster with unstrusted mode enabled.
|
||||
*/
|
||||
case class Kill(reason: KillReason) extends RemoteTypes
|
||||
|
||||
sealed trait KillReason
|
||||
object KillReason {
|
||||
case object UserRequest extends KillReason
|
||||
case object NoRemainingChannel extends KillReason
|
||||
case object AllChannelsFail extends KillReason
|
||||
case object ConnectionReplaced extends KillReason
|
||||
}
|
||||
// @formatter:on
|
||||
|
||||
/**
|
||||
|
|
|
@ -18,7 +18,10 @@ package fr.acinq.eclair.io
|
|||
|
||||
import java.net.InetSocketAddress
|
||||
|
||||
import akka.actor.{ActorRef, Props, Status}
|
||||
import akka.actor.{ActorRef, Props}
|
||||
import akka.cluster.Cluster
|
||||
import akka.cluster.pubsub.DistributedPubSub
|
||||
import akka.cluster.pubsub.DistributedPubSubMediator.Send
|
||||
import akka.event.Logging.MDC
|
||||
import com.google.common.net.HostAndPort
|
||||
import fr.acinq.bitcoin.Crypto.PublicKey
|
||||
|
@ -84,7 +87,8 @@ class ReconnectionTask(nodeParams: NodeParams, remoteNodeId: PublicKey) extends
|
|||
val (initialDelay, firstNextReconnectionDelay) = (previousPeerData, d.previousData) match {
|
||||
case (Peer.Nothing, _) =>
|
||||
// When restarting, we add some randomization before the first reconnection attempt to avoid herd effect
|
||||
val initialDelay = randomizeDelay(nodeParams.initialRandomReconnectDelay)
|
||||
// We also add a fixed delay to give time to the front to boot up
|
||||
val initialDelay = nodeParams.initialRandomReconnectDelay + randomizeDelay(nodeParams.initialRandomReconnectDelay)
|
||||
// When restarting, we will ~immediately reconnect, but then:
|
||||
// - we don't want all the subsequent reconnection attempts to be synchronized (herd effect)
|
||||
// - we don't want to go through the exponential backoff delay, because we were offline, not them, so there is no
|
||||
|
@ -105,7 +109,8 @@ class ReconnectionTask(nodeParams: NodeParams, remoteNodeId: PublicKey) extends
|
|||
// Randomizing the initial delay is important in the case of a reconnection. If both peers have a public
|
||||
// address, they may attempt to simultaneously connect back to each other, which could result in reconnection loop,
|
||||
// given that each new connection will cause the previous one to be killed.
|
||||
val initialDelay = randomizeDelay(nodeParams.initialRandomReconnectDelay)
|
||||
// We also add a fixed delay to give time to the front to boot up
|
||||
val initialDelay = nodeParams.initialRandomReconnectDelay + randomizeDelay(nodeParams.initialRandomReconnectDelay)
|
||||
val firstNextReconnectionDelay = nextReconnectionDelay(initialDelay, nodeParams.maxReconnectInterval)
|
||||
log.info("peer is disconnected, next reconnection in {}", initialDelay)
|
||||
(initialDelay, firstNextReconnectionDelay)
|
||||
|
@ -134,16 +139,24 @@ class ReconnectionTask(nodeParams: NodeParams, remoteNodeId: PublicKey) extends
|
|||
.map(hostAndPort2InetSocketAddress)
|
||||
.orElse(getPeerAddressFromDb(nodeParams.db.peers, nodeParams.db.network, remoteNodeId)) match {
|
||||
case Some(address) => connect(address, origin = sender)
|
||||
case None => sender ! PeerConnection.ConnectionResult.NoAddressFound(remoteNodeId)
|
||||
case None => sender ! PeerConnection.ConnectionResult.NoAddressFound
|
||||
}
|
||||
stay
|
||||
}
|
||||
|
||||
private def setReconnectTimer(delay: FiniteDuration): Unit = setTimer(RECONNECT_TIMER, TickReconnect, delay, repeat = false)
|
||||
|
||||
// activate the extension only on demand, so that tests pass
|
||||
lazy val mediator = DistributedPubSub(context.system).mediator
|
||||
|
||||
private def connect(address: InetSocketAddress, origin: ActorRef): Unit = {
|
||||
log.info(s"connecting to $address")
|
||||
context.system.eventStream.publish(ClientSpawner.ConnectionRequest(address, remoteNodeId, origin))
|
||||
val req = ClientSpawner.ConnectionRequest(address, remoteNodeId, origin)
|
||||
if (context.system.hasExtension(Cluster) && !address.getHostName.endsWith("onion")) {
|
||||
mediator ! Send(path = "/user/client-spawner", msg = req, localAffinity = false)
|
||||
} else {
|
||||
context.system.eventStream.publish(req)
|
||||
}
|
||||
Metrics.ReconnectionsAttempts.withoutTags().increment()
|
||||
}
|
||||
|
||||
|
|
|
@ -22,6 +22,8 @@ import fr.acinq.eclair.NodeParams
|
|||
import fr.acinq.eclair.blockchain.EclairWallet
|
||||
import fr.acinq.eclair.channel.Helpers.Closing
|
||||
import fr.acinq.eclair.channel._
|
||||
import fr.acinq.eclair.remote.EclairInternalsSerializer.RemoteTypes
|
||||
import fr.acinq.eclair.router.Router.RouterConf
|
||||
|
||||
/**
|
||||
* Ties network connections to peers.
|
||||
|
@ -78,6 +80,7 @@ class Switchboard(nodeParams: NodeParams, watcher: ActorRef, relayer: ActorRef,
|
|||
|
||||
case Symbol("peers") => sender ! context.children
|
||||
|
||||
case GetRouterPeerConf => sender ! RouterPeerConf(nodeParams.routerConf, nodeParams.peerConnectionConf)
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -115,4 +118,7 @@ object Switchboard {
|
|||
|
||||
def peerActorName(remoteNodeId: PublicKey): String = s"peer-$remoteNodeId"
|
||||
|
||||
case object GetRouterPeerConf extends RemoteTypes
|
||||
case class RouterPeerConf(routerConf: RouterConf, peerConf: PeerConnection.Conf) extends RemoteTypes
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,182 @@
|
|||
/*
|
||||
* Copyright 2020 ACINQ SAS
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package fr.acinq.eclair.remote
|
||||
|
||||
import akka.actor.{ActorRef, ExtendedActorSystem}
|
||||
import akka.serialization.Serialization
|
||||
import fr.acinq.bitcoin.Crypto.PublicKey
|
||||
import fr.acinq.eclair.crypto.TransportHandler
|
||||
import fr.acinq.eclair.io.Peer.PeerRoutingMessage
|
||||
import fr.acinq.eclair.io.Switchboard.RouterPeerConf
|
||||
import fr.acinq.eclair.io.{ClientSpawner, Peer, PeerConnection, Switchboard}
|
||||
import fr.acinq.eclair.router.Router.{GossipDecision, RouterConf, SendChannelQuery}
|
||||
import fr.acinq.eclair.router._
|
||||
import fr.acinq.eclair.wire.CommonCodecs._
|
||||
import fr.acinq.eclair.wire.LightningMessageCodecs._
|
||||
import fr.acinq.eclair.wire.QueryChannelRangeTlv.queryFlagsCodec
|
||||
import fr.acinq.eclair.wire._
|
||||
import fr.acinq.eclair.{CltvExpiryDelta, Features}
|
||||
import scodec._
|
||||
import scodec.codecs._
|
||||
|
||||
import java.net.{InetAddress, InetSocketAddress}
|
||||
import scala.concurrent.duration._
|
||||
|
||||
class EclairInternalsSerializer(val system: ExtendedActorSystem) extends ScodecSerializer(43, EclairInternalsSerializer.codec(system))
|
||||
|
||||
object EclairInternalsSerializer {
|
||||
|
||||
trait RemoteTypes extends Serializable
|
||||
|
||||
def finiteDurationCodec: Codec[FiniteDuration] = int64.xmap(_.milliseconds, _.toMillis)
|
||||
|
||||
def iterable[A](codec: Codec[A]): Codec[Iterable[A]] = listOfN(uint16, codec).xmap(_.toIterable, _.toList)
|
||||
|
||||
val routerConfCodec: Codec[RouterConf] = (
|
||||
("randomizeRouteSelection" | bool(8)) ::
|
||||
("channelExcludeDuration" | finiteDurationCodec) ::
|
||||
("routerBroadcastInterval" | finiteDurationCodec) ::
|
||||
("networkStatsRefreshInterval" | finiteDurationCodec) ::
|
||||
("requestNodeAnnouncements" | bool(8)) ::
|
||||
("encodingType" | discriminated[EncodingType].by(uint8)
|
||||
.typecase(0, provide(EncodingType.UNCOMPRESSED))
|
||||
.typecase(1, provide(EncodingType.COMPRESSED_ZLIB))) ::
|
||||
("channelRangeChunkSize" | int32) ::
|
||||
("channelQueryChunkSize" | int32) ::
|
||||
("searchMaxFeeBase" | satoshi) ::
|
||||
("searchMaxFeePct" | double) ::
|
||||
("searchMaxRouteLength" | int32) ::
|
||||
("searchMaxCltv" | int32.as[CltvExpiryDelta]) ::
|
||||
("searchHeuristicsEnabled" | bool(8)) ::
|
||||
("searchRatioCltv" | double) ::
|
||||
("searchRatioChannelAge" | double) ::
|
||||
("searchRatioChannelCapacity" | double) ::
|
||||
("mppMinPartAmount" | millisatoshi) ::
|
||||
("mppMaxParts" | int32)).as[RouterConf]
|
||||
|
||||
val overrideFeaturesListCodec: Codec[List[(PublicKey, Features)]] = listOfN(uint16, publicKey ~ variableSizeBytes(uint16, featuresCodec))
|
||||
|
||||
val peerConnectionConfCodec: Codec[PeerConnection.Conf] = (
|
||||
("authTimeout" | finiteDurationCodec) ::
|
||||
("initTimeout" | finiteDurationCodec) ::
|
||||
("pingInterval" | finiteDurationCodec) ::
|
||||
("pingTimeout" | finiteDurationCodec) ::
|
||||
("pingDisconnect" | bool(8)) ::
|
||||
("maxRebroadcastDelay" | finiteDurationCodec)).as[PeerConnection.Conf]
|
||||
|
||||
val peerConnectionKillReasonCodec: Codec[PeerConnection.KillReason] = discriminated[PeerConnection.KillReason].by(uint16)
|
||||
.typecase(0, provide(PeerConnection.KillReason.UserRequest))
|
||||
.typecase(1, provide(PeerConnection.KillReason.NoRemainingChannel))
|
||||
.typecase(2, provide(PeerConnection.KillReason.AllChannelsFail))
|
||||
.typecase(3, provide(PeerConnection.KillReason.ConnectionReplaced))
|
||||
|
||||
val peerConnectionKillCodec: Codec[PeerConnection.Kill] = peerConnectionKillReasonCodec.as[PeerConnection.Kill]
|
||||
|
||||
val lengthPrefixedInitCodec: Codec[Init] = variableSizeBytes(uint16, initCodec)
|
||||
val lengthPrefixedNodeAnnouncementCodec: Codec[NodeAnnouncement] = variableSizeBytes(uint16, nodeAnnouncementCodec)
|
||||
val lengthPrefixedChannelAnnouncementCodec: Codec[ChannelAnnouncement] = variableSizeBytes(uint16, channelAnnouncementCodec)
|
||||
val lengthPrefixedChannelUpdateCodec: Codec[ChannelUpdate] = variableSizeBytes(uint16, channelUpdateCodec)
|
||||
val lengthPrefixedAnnouncementCodec: Codec[AnnouncementMessage] = variableSizeBytes(uint16, lightningMessageCodec.downcast[AnnouncementMessage])
|
||||
val lengthPrefixedLightningMessageCodec: Codec[LightningMessage] = variableSizeBytes(uint16, lightningMessageCodec)
|
||||
|
||||
def actorRefCodec(system: ExtendedActorSystem): Codec[ActorRef] = variableSizeBytes(uint16, utf8).xmap(
|
||||
(path: String) => system.provider.resolveActorRef(path),
|
||||
(actor: ActorRef) => Serialization.serializedActorPath(actor))
|
||||
|
||||
val inetAddressCodec: Codec[InetAddress] = discriminated[InetAddress].by(uint8)
|
||||
.typecase(0, ipv4address)
|
||||
.typecase(1, ipv6address)
|
||||
|
||||
val inetSocketAddressCodec: Codec[InetSocketAddress] = (inetAddressCodec ~ uint16).xmap({ case (addr, port) => new InetSocketAddress(addr, port) }, socketAddr => (socketAddr.getAddress, socketAddr.getPort))
|
||||
|
||||
def connectionRequestCodec(system: ExtendedActorSystem): Codec[ClientSpawner.ConnectionRequest] = (
|
||||
("address" | inetSocketAddressCodec) ::
|
||||
("remoteNodeId" | publicKey) ::
|
||||
("origin" | actorRefCodec(system))).as[ClientSpawner.ConnectionRequest]
|
||||
|
||||
def initializeConnectionCodec(system: ExtendedActorSystem): Codec[PeerConnection.InitializeConnection] = (
|
||||
("peer" | actorRefCodec(system)) ::
|
||||
("chainHash" | bytes32) ::
|
||||
("features" | variableSizeBytes(uint16, featuresCodec)) ::
|
||||
("doSync" | bool(8))).as[PeerConnection.InitializeConnection]
|
||||
|
||||
def connectionReadyCodec(system: ExtendedActorSystem): Codec[PeerConnection.ConnectionReady] = (
|
||||
("peerConnection" | actorRefCodec(system)) ::
|
||||
("remoteNodeId" | publicKey) ::
|
||||
("address" | inetSocketAddressCodec) ::
|
||||
("outgoing" | bool(8)) ::
|
||||
("localInit" | lengthPrefixedInitCodec) ::
|
||||
("remoteInit" | lengthPrefixedInitCodec)).as[PeerConnection.ConnectionReady]
|
||||
|
||||
val optionQueryChannelRangeTlv: Codec[Option[QueryChannelRangeTlv]] = variableSizeBytes(uint16, optional(bool(8), variableSizeBytesLong(varintoverflow, queryFlagsCodec.upcast[QueryChannelRangeTlv])))
|
||||
|
||||
def sendChannelQueryCodec(system: ExtendedActorSystem): Codec[SendChannelQuery] = (
|
||||
("chainsHash" | bytes32) ::
|
||||
("remoteNodeId" | publicKey) ::
|
||||
("to" | actorRefCodec(system)) ::
|
||||
("flags_opt" | optionQueryChannelRangeTlv)).as[SendChannelQuery]
|
||||
|
||||
def peerRoutingMessageCodec(system: ExtendedActorSystem): Codec[PeerRoutingMessage] = (
|
||||
("peerConnection" | actorRefCodec(system)) ::
|
||||
("remoteNodeId" | publicKey) ::
|
||||
("msg" | lengthPrefixedLightningMessageCodec.downcast[RoutingMessage])).as[PeerRoutingMessage]
|
||||
|
||||
val singleChannelDiscoveredCodec: Codec[SingleChannelDiscovered] = (lengthPrefixedChannelAnnouncementCodec :: satoshi :: optional(bool(8), lengthPrefixedChannelUpdateCodec) :: optional(bool(8), lengthPrefixedChannelUpdateCodec)).as[SingleChannelDiscovered]
|
||||
|
||||
val readAckCodec: Codec[TransportHandler.ReadAck] = lightningMessageCodec.upcast[Any].as[TransportHandler.ReadAck]
|
||||
|
||||
def codec(system: ExtendedActorSystem): Codec[RemoteTypes] = discriminated[RemoteTypes].by(uint16)
|
||||
.typecase(0, provide(Switchboard.GetRouterPeerConf))
|
||||
.typecase(1, (routerConfCodec :: peerConnectionConfCodec).as[RouterPeerConf])
|
||||
.typecase(5, readAckCodec)
|
||||
.typecase(7, connectionRequestCodec(system))
|
||||
.typecase(10, (actorRefCodec(system) :: publicKey).as[PeerConnection.Authenticated])
|
||||
.typecase(11, initializeConnectionCodec(system))
|
||||
.typecase(12, connectionReadyCodec(system))
|
||||
.typecase(13, provide(PeerConnection.ConnectionResult.NoAddressFound))
|
||||
.typecase(14, inetSocketAddressCodec.as[PeerConnection.ConnectionResult.ConnectionFailed])
|
||||
.typecase(15, variableSizeBytes(uint16, utf8).as[PeerConnection.ConnectionResult.AuthenticationFailed])
|
||||
.typecase(16, variableSizeBytes(uint16, utf8).as[PeerConnection.ConnectionResult.InitializationFailed])
|
||||
.typecase(17, provide(PeerConnection.ConnectionResult.AlreadyConnected))
|
||||
.typecase(18, provide(PeerConnection.ConnectionResult.Connected))
|
||||
.typecase(19, actorRefCodec(system).as[Peer.ConnectionDown])
|
||||
.typecase(20, provide(Router.GetRoutingStateStreaming))
|
||||
.typecase(21, provide(Router.RoutingStateStreamingUpToDate))
|
||||
.typecase(22, sendChannelQueryCodec(system))
|
||||
.typecase(23, peerRoutingMessageCodec(system))
|
||||
.typecase(30, iterable(lengthPrefixedNodeAnnouncementCodec).as[NodesDiscovered])
|
||||
.typecase(31, lengthPrefixedNodeAnnouncementCodec.as[NodeUpdated])
|
||||
.typecase(32, publicKey.as[NodeLost])
|
||||
.typecase(33, iterable(singleChannelDiscoveredCodec).as[ChannelsDiscovered])
|
||||
.typecase(34, shortchannelid.as[ChannelLost])
|
||||
.typecase(35, iterable(lengthPrefixedChannelUpdateCodec).as[ChannelUpdatesReceived])
|
||||
.typecase(36, double.as[SyncProgress])
|
||||
.typecase(40, lengthPrefixedAnnouncementCodec.as[GossipDecision.Accepted])
|
||||
.typecase(41, lengthPrefixedAnnouncementCodec.as[GossipDecision.Duplicate])
|
||||
.typecase(42, lengthPrefixedAnnouncementCodec.as[GossipDecision.InvalidSignature])
|
||||
.typecase(43, lengthPrefixedNodeAnnouncementCodec.as[GossipDecision.NoKnownChannel])
|
||||
.typecase(44, lengthPrefixedChannelAnnouncementCodec.as[GossipDecision.ValidationFailure])
|
||||
.typecase(45, lengthPrefixedChannelAnnouncementCodec.as[GossipDecision.InvalidAnnouncement])
|
||||
.typecase(46, lengthPrefixedChannelAnnouncementCodec.as[GossipDecision.ChannelPruned])
|
||||
.typecase(47, lengthPrefixedChannelAnnouncementCodec.as[GossipDecision.ChannelClosing])
|
||||
.typecase(48, lengthPrefixedChannelUpdateCodec.as[GossipDecision.Stale])
|
||||
.typecase(49, lengthPrefixedChannelUpdateCodec.as[GossipDecision.NoRelatedChannel])
|
||||
.typecase(50, lengthPrefixedChannelUpdateCodec.as[GossipDecision.RelatedChannelPruned])
|
||||
.typecase(51, lengthPrefixedChannelAnnouncementCodec.as[GossipDecision.ChannelClosed])
|
||||
.typecase(52, peerConnectionKillCodec)
|
||||
|
||||
}
|
|
@ -0,0 +1,21 @@
|
|||
/*
|
||||
* Copyright 2019 ACINQ SAS
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package fr.acinq.eclair.remote
|
||||
|
||||
import fr.acinq.eclair.wire.LightningMessageCodecs.lightningMessageCodec
|
||||
|
||||
class LightningMessageSerializer extends ScodecSerializer(42, lightningMessageCodec)
|
|
@ -0,0 +1,41 @@
|
|||
/*
|
||||
* Copyright 2020 ACINQ SAS
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package fr.acinq.eclair.remote
|
||||
|
||||
import java.nio.ByteBuffer
|
||||
|
||||
import akka.serialization.{ByteBufferSerializer, SerializerWithStringManifest}
|
||||
import scodec.Codec
|
||||
import scodec.bits.BitVector
|
||||
|
||||
class ScodecSerializer[T <: AnyRef](override val identifier: Int, codec: Codec[T]) extends SerializerWithStringManifest with ByteBufferSerializer {
|
||||
|
||||
override def toBinary(o: AnyRef, buf: ByteBuffer): Unit = buf.put(toBinary(o))
|
||||
|
||||
override def fromBinary(buf: ByteBuffer, manifest: String): AnyRef = {
|
||||
val bytes = Array.ofDim[Byte](buf.remaining())
|
||||
buf.get(bytes)
|
||||
fromBinary(bytes, manifest)
|
||||
}
|
||||
|
||||
/** we don't rely on the manifest to provide backward compatibility, we will use dedicated codecs instead */
|
||||
override def manifest(o: AnyRef): String = fr.acinq.eclair.getSimpleClassName(o)
|
||||
|
||||
override def toBinary(o: AnyRef): Array[Byte] = codec.encode(o.asInstanceOf[T]).require.toByteArray
|
||||
|
||||
override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = codec.decode(BitVector(bytes)).require.value
|
||||
}
|
|
@ -19,12 +19,13 @@ package fr.acinq.eclair.router
|
|||
import fr.acinq.bitcoin.Crypto.PublicKey
|
||||
import fr.acinq.bitcoin.Satoshi
|
||||
import fr.acinq.eclair.ShortChannelId
|
||||
import fr.acinq.eclair.remote.EclairInternalsSerializer.RemoteTypes
|
||||
import fr.acinq.eclair.wire.{ChannelAnnouncement, ChannelUpdate, NodeAnnouncement}
|
||||
|
||||
/**
|
||||
* Created by PM on 02/02/2017.
|
||||
*/
|
||||
trait NetworkEvent
|
||||
trait NetworkEvent extends RemoteTypes
|
||||
|
||||
case class NodesDiscovered(ann: Iterable[NodeAnnouncement]) extends NetworkEvent
|
||||
|
||||
|
|
|
@ -19,7 +19,7 @@ package fr.acinq.eclair.router
|
|||
import java.util.UUID
|
||||
|
||||
import akka.Done
|
||||
import akka.actor.{ActorRef, Props}
|
||||
import akka.actor.{Actor, ActorLogging, ActorRef, Props, Terminated}
|
||||
import akka.event.DiagnosticLoggingAdapter
|
||||
import akka.event.Logging.MDC
|
||||
import fr.acinq.bitcoin.Crypto.PublicKey
|
||||
|
@ -33,6 +33,7 @@ import fr.acinq.eclair.crypto.TransportHandler
|
|||
import fr.acinq.eclair.db.NetworkDb
|
||||
import fr.acinq.eclair.io.Peer.PeerRoutingMessage
|
||||
import fr.acinq.eclair.payment.PaymentRequest.ExtraHop
|
||||
import fr.acinq.eclair.remote.EclairInternalsSerializer.RemoteTypes
|
||||
import fr.acinq.eclair.router.Graph.GraphStructure.DirectedGraph
|
||||
import fr.acinq.eclair.router.Graph.WeightRatios
|
||||
import fr.acinq.eclair.router.Monitoring.{Metrics, Tags}
|
||||
|
@ -127,6 +128,33 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[
|
|||
sender ! GetNetworkStatsResponse(d.stats)
|
||||
stay
|
||||
|
||||
case Event(GetRoutingStateStreaming, d) =>
|
||||
val listener = sender
|
||||
d.nodes
|
||||
.values
|
||||
.sliding(100, 100)
|
||||
.map(NodesDiscovered)
|
||||
.foreach(listener ! _)
|
||||
d.channels
|
||||
.values
|
||||
.map(pc => SingleChannelDiscovered(pc.ann, pc.capacity, pc.update_1_opt, pc.update_2_opt))
|
||||
.sliding(100, 100)
|
||||
.map(ChannelsDiscovered)
|
||||
.foreach(listener ! _)
|
||||
listener ! RoutingStateStreamingUpToDate
|
||||
context.actorOf(Props(new Actor with ActorLogging {
|
||||
log.info(s"subscribing listener=$listener to network events")
|
||||
context.system.eventStream.subscribe(listener, classOf[NetworkEvent])
|
||||
context.watch(listener)
|
||||
override def receive: Receive = {
|
||||
case Terminated(actor) if actor == listener=>
|
||||
log.warning(s"unsubscribing listener=$listener to network events")
|
||||
context.system.eventStream.unsubscribe(listener)
|
||||
context stop self
|
||||
}
|
||||
}))
|
||||
stay
|
||||
|
||||
case Event(TickBroadcast, d) =>
|
||||
if (d.rebroadcast.channels.isEmpty && d.rebroadcast.updates.isEmpty && d.rebroadcast.nodes.isEmpty) {
|
||||
stay
|
||||
|
@ -491,13 +519,13 @@ object Router {
|
|||
// @formatter:on
|
||||
|
||||
// @formatter:off
|
||||
case class SendChannelQuery(chainHash: ByteVector32, remoteNodeId: PublicKey, to: ActorRef, flags_opt: Option[QueryChannelRangeTlv])
|
||||
case class SendChannelQuery(chainHash: ByteVector32, remoteNodeId: PublicKey, to: ActorRef, flags_opt: Option[QueryChannelRangeTlv]) extends RemoteTypes
|
||||
case object GetNetworkStats
|
||||
case class GetNetworkStatsResponse(stats: Option[NetworkStats])
|
||||
case object GetRoutingState
|
||||
case class RoutingState(channels: Iterable[PublicChannel], nodes: Iterable[NodeAnnouncement])
|
||||
case object GetRoutingStateStreaming
|
||||
case object RoutingStateStreamingUpToDate
|
||||
case object GetRoutingStateStreaming extends RemoteTypes
|
||||
case object RoutingStateStreamingUpToDate extends RemoteTypes
|
||||
case object GetRouterData
|
||||
case object GetNodes
|
||||
case object GetLocalChannels
|
||||
|
@ -513,7 +541,7 @@ object Router {
|
|||
/** Gossip that was generated by our node. */
|
||||
case object LocalGossip extends GossipOrigin
|
||||
|
||||
sealed trait GossipDecision { def ann: AnnouncementMessage }
|
||||
sealed trait GossipDecision extends RemoteTypes { def ann: AnnouncementMessage }
|
||||
object GossipDecision {
|
||||
case class Accepted(ann: AnnouncementMessage) extends GossipDecision
|
||||
|
||||
|
|
|
@ -34,7 +34,7 @@ import scala.util.Try
|
|||
*/
|
||||
|
||||
// @formatter:off
|
||||
sealed trait LightningMessage
|
||||
sealed trait LightningMessage extends Serializable
|
||||
sealed trait SetupMessage extends LightningMessage
|
||||
sealed trait ChannelMessage extends LightningMessage
|
||||
sealed trait HtlcMessage extends LightningMessage
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
~ limitations under the License.
|
||||
-->
|
||||
|
||||
<configuration scan="true" debug="false">
|
||||
<configuration>
|
||||
|
||||
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<target>System.out</target>
|
||||
|
|
|
@ -16,9 +16,6 @@
|
|||
|
||||
package fr.acinq.eclair.io
|
||||
|
||||
import java.net.{InetAddress, ServerSocket, Socket}
|
||||
import java.util.concurrent.Executors
|
||||
|
||||
import akka.actor.FSM
|
||||
import akka.actor.Status.Failure
|
||||
import akka.testkit.{TestFSMRef, TestProbe}
|
||||
|
@ -38,6 +35,8 @@ import org.scalatest.funsuite.FixtureAnyFunSuiteLike
|
|||
import org.scalatest.{Outcome, Tag}
|
||||
import scodec.bits.ByteVector
|
||||
|
||||
import java.net.{InetAddress, ServerSocket, Socket}
|
||||
import java.util.concurrent.Executors
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
|
||||
|
@ -95,7 +94,7 @@ class PeerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with StateTe
|
|||
val probe = TestProbe()
|
||||
probe.send(peer, Peer.Init(Set.empty))
|
||||
probe.send(peer, Peer.Connect(remoteNodeId, address_opt = None))
|
||||
probe.expectMsg(PeerConnection.ConnectionResult.NoAddressFound(remoteNodeId))
|
||||
probe.expectMsg(PeerConnection.ConnectionResult.NoAddressFound)
|
||||
}
|
||||
|
||||
test("successfully connect to peer at user request") { f =>
|
||||
|
@ -191,28 +190,26 @@ class PeerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with StateTe
|
|||
test("handle new connection in state CONNECTED") { f =>
|
||||
import f._
|
||||
|
||||
connect(remoteNodeId, peer, peerConnection, channels = Set(ChannelCodecsSpec.normal))
|
||||
// this is just to extract inits
|
||||
val Peer.ConnectedData(_, _, localInit, remoteInit, _) = peer.stateData
|
||||
|
||||
val peerConnection1 = peerConnection
|
||||
val peerConnection2 = TestProbe()
|
||||
val peerConnection3 = TestProbe()
|
||||
|
||||
val deathWatch = TestProbe()
|
||||
deathWatch.watch(peerConnection1.ref)
|
||||
deathWatch.watch(peerConnection2.ref)
|
||||
deathWatch.watch(peerConnection3.ref)
|
||||
connect(remoteNodeId, peer, peerConnection, channels = Set(ChannelCodecsSpec.normal))
|
||||
peerConnection1.expectMsgType[ChannelReestablish]
|
||||
// this is just to extract inits
|
||||
val Peer.ConnectedData(_, _, localInit, remoteInit, _) = peer.stateData
|
||||
|
||||
peerConnection2.send(peer, PeerConnection.ConnectionReady(peerConnection2.ref, remoteNodeId, fakeIPAddress.socketAddress, outgoing = false, localInit, remoteInit))
|
||||
// peer should kill previous connection
|
||||
deathWatch.expectTerminated(peerConnection1.ref)
|
||||
peerConnection1.expectMsg(PeerConnection.Kill(PeerConnection.KillReason.ConnectionReplaced))
|
||||
awaitCond(peer.stateData.asInstanceOf[Peer.ConnectedData].peerConnection === peerConnection2.ref)
|
||||
peerConnection2.expectMsgType[ChannelReestablish]
|
||||
|
||||
peerConnection3.send(peer, PeerConnection.ConnectionReady(peerConnection3.ref, remoteNodeId, fakeIPAddress.socketAddress, outgoing = false, localInit, remoteInit))
|
||||
// peer should kill previous connection
|
||||
deathWatch.expectTerminated(peerConnection2.ref)
|
||||
peerConnection2.expectMsg(PeerConnection.Kill(PeerConnection.KillReason.ConnectionReplaced))
|
||||
awaitCond(peer.stateData.asInstanceOf[Peer.ConnectedData].peerConnection === peerConnection3.ref)
|
||||
peerConnection3.expectMsgType[ChannelReestablish]
|
||||
}
|
||||
|
||||
test("send state transitions to child reconnection actor", Tag("auto_reconnect"), Tag("with_node_announcement")) { f =>
|
||||
|
|
|
@ -144,7 +144,7 @@ class ReconnectionTaskSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
|
|||
// auto reconnect
|
||||
val TransitionWithData(ReconnectionTask.IDLE, ReconnectionTask.WAITING, _, waitingData0: WaitingData) = monitor.expectMsgType[TransitionWithData]
|
||||
assert(waitingData0.nextReconnectionDelay >= (200 milliseconds))
|
||||
assert(waitingData0.nextReconnectionDelay <= (10 seconds))
|
||||
assert(waitingData0.nextReconnectionDelay <= (20 seconds))
|
||||
probe.send(reconnectionTask, ReconnectionTask.TickReconnect) // we send it manually in order to not have to actually wait (duplicates don't matter since we look at transitions sequentially)
|
||||
val TransitionWithData(ReconnectionTask.WAITING, ReconnectionTask.CONNECTING, _, _) = monitor.expectMsgType[TransitionWithData]
|
||||
|
||||
|
|
|
@ -0,0 +1,45 @@
|
|||
/*
|
||||
* Copyright 2020 ACINQ SAS
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package fr.acinq.eclair.remote
|
||||
|
||||
import fr.acinq.eclair.router.Router.GossipDecision
|
||||
import org.scalatest.funsuite.AnyFunSuite
|
||||
|
||||
class EclairInternalsSerializerSpec extends AnyFunSuite {
|
||||
|
||||
test("canary test codec gossip decision") {
|
||||
|
||||
def codec(d: GossipDecision) = d match {
|
||||
case _: GossipDecision.Accepted => ()
|
||||
case _: GossipDecision.Duplicate => ()
|
||||
case _: GossipDecision.InvalidSignature => ()
|
||||
case _: GossipDecision.NoKnownChannel => ()
|
||||
case _: GossipDecision.ValidationFailure => ()
|
||||
case _: GossipDecision.InvalidAnnouncement => ()
|
||||
case _: GossipDecision.ChannelPruned => ()
|
||||
case _: GossipDecision.ChannelClosing => ()
|
||||
case _: GossipDecision.ChannelClosed => ()
|
||||
case _: GossipDecision.Stale => ()
|
||||
case _: GossipDecision.NoRelatedChannel => ()
|
||||
case _: GossipDecision.RelatedChannelPruned => ()
|
||||
// NB: if a new gossip decision is added, this pattern matching will fail
|
||||
// this serves as a reminder that a new codec is to be added in EclairInternalsSerializer.codec
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -32,7 +32,7 @@ import fr.acinq.eclair.router.BaseRouterSpec.channelAnnouncement
|
|||
import fr.acinq.eclair.router.RouteCalculationSpec.{DEFAULT_AMOUNT_MSAT, DEFAULT_MAX_FEE}
|
||||
import fr.acinq.eclair.router.Router._
|
||||
import fr.acinq.eclair.transactions.Scripts
|
||||
import fr.acinq.eclair.wire.{Color, QueryShortChannelIds}
|
||||
import fr.acinq.eclair.wire.{ChannelAnnouncement, ChannelUpdate, Color, NodeAnnouncement, QueryShortChannelIds}
|
||||
import fr.acinq.eclair.{CltvExpiryDelta, Features, LongToBtcAmount, MilliSatoshi, ShortChannelId, TestConstants, randomKey}
|
||||
import scodec.bits._
|
||||
|
||||
|
@ -659,4 +659,33 @@ class RouterSpec extends BaseRouterSpec {
|
|||
}
|
||||
}
|
||||
|
||||
test("stream updates to front") { fixture =>
|
||||
import fixture._
|
||||
|
||||
val sender = TestProbe()
|
||||
sender.send(router, GetRoutingStateStreaming)
|
||||
|
||||
// initial sync
|
||||
var nodes = Set.empty[NodeAnnouncement]
|
||||
var channels = Set.empty[ChannelAnnouncement]
|
||||
var updates = Set.empty[ChannelUpdate]
|
||||
sender.fishForMessage() {
|
||||
case nd: NodesDiscovered =>
|
||||
nodes = nodes ++ nd.ann
|
||||
false
|
||||
case cd: ChannelsDiscovered =>
|
||||
channels = channels ++ cd.c.map(_.ann)
|
||||
updates = updates ++ cd.c.flatMap(sc => sc.u1_opt.toSeq ++ sc.u2_opt.toSeq)
|
||||
false
|
||||
case RoutingStateStreamingUpToDate => true
|
||||
}
|
||||
assert(nodes.size === 8 && channels.size === 5 && updates.size === 10) // public channels only
|
||||
|
||||
// new announcements
|
||||
val update_ab_2 = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_a, b, channelId_ab, CltvExpiryDelta(7), htlcMinimumMsat = 0 msat, feeBaseMsat = 10 msat, feeProportionalMillionths = 10, htlcMaximumMsat = htlcMaximum, timestamp = update_ab.timestamp + 1)
|
||||
val peerConnection = TestProbe()
|
||||
router ! PeerRoutingMessage(peerConnection.ref, remoteNodeId, update_ab_2)
|
||||
sender.expectMsg(ChannelUpdatesReceived(List(update_ab_2)))
|
||||
}
|
||||
|
||||
}
|
||||
|
|
60
eclair-front/modules/assembly.xml
Normal file
60
eclair-front/modules/assembly.xml
Normal file
|
@ -0,0 +1,60 @@
|
|||
<assembly xmlns="http://maven.apache.org/ASSEMBLY/2.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.0.0 http://maven.apache.org/xsd/assembly-2.0.0.xsd">
|
||||
<id>${git.commit.id.abbrev}-awseb_bundle</id>
|
||||
<formats>
|
||||
<format>zip</format>
|
||||
</formats>
|
||||
<includeBaseDirectory>false</includeBaseDirectory>
|
||||
<dependencySets> <!-- include dependencies -->
|
||||
<dependencySet>
|
||||
<outputDirectory>lib</outputDirectory>
|
||||
<useProjectArtifact>true</useProjectArtifact> <!-- include eclair-core -->
|
||||
<unpack>false</unpack>
|
||||
<scope>runtime</scope>
|
||||
</dependencySet>
|
||||
</dependencySets>
|
||||
<fileSets> <!-- Include readme and license -->
|
||||
<fileSet>
|
||||
<directory>../</directory>
|
||||
<includes>
|
||||
<include>README.md</include>
|
||||
<include>LICENSE*</include>
|
||||
</includes>
|
||||
</fileSet>
|
||||
<fileSet>
|
||||
<directory>${project.basedir}/modules/awseb</directory>
|
||||
<outputDirectory>.</outputDirectory>
|
||||
<includes>
|
||||
<include>Procfile</include>
|
||||
</includes>
|
||||
<lineEnding>unix</lineEnding>
|
||||
</fileSet>
|
||||
<!-- uncomment if you want to include a keystore in the awseb package for TLS -->
|
||||
<!--fileSet>
|
||||
<directory>${project.basedir}/modules/awseb</directory>
|
||||
<outputDirectory>.</outputDirectory>
|
||||
<includes>
|
||||
<include>*.jks</include>
|
||||
</includes>
|
||||
<fileMode>0400</fileMode>
|
||||
<lineEnding>keep</lineEnding>
|
||||
</fileSet-->
|
||||
<fileSet>
|
||||
<directory>${project.basedir}/modules/awseb</directory>
|
||||
<outputDirectory>.</outputDirectory>
|
||||
<includes>
|
||||
<include>run.sh</include>
|
||||
</includes>
|
||||
<fileMode>0755</fileMode>
|
||||
<lineEnding>unix</lineEnding>
|
||||
</fileSet>
|
||||
<fileSet>
|
||||
<directory>${project.build.directory}</directory>
|
||||
<outputDirectory>.</outputDirectory>
|
||||
<includes>
|
||||
<include>application.jar</include>
|
||||
</includes>
|
||||
</fileSet>
|
||||
</fileSets>
|
||||
</assembly>
|
1
eclair-front/modules/awseb/Procfile
Normal file
1
eclair-front/modules/awseb/Procfile
Normal file
|
@ -0,0 +1 @@
|
|||
web: ./run.sh
|
3
eclair-front/modules/awseb/run.sh
Normal file
3
eclair-front/modules/awseb/run.sh
Normal file
|
@ -0,0 +1,3 @@
|
|||
export LOCAL_IP=$(curl -s 169.254.169.254/latest/meta-data/local-ipv4)
|
||||
export HOSTNAME=$(hostname)
|
||||
exec java -javaagent:lib/kanela-agent-1.0.5.jar -jar application.jar
|
153
eclair-front/pom.xml
Normal file
153
eclair-front/pom.xml
Normal file
|
@ -0,0 +1,153 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
~ Copyright 2020 ACINQ SAS
|
||||
~
|
||||
~ Licensed under the Apache License, Version 2.0 (the "License");
|
||||
~ you may not use this file except in compliance with the License.
|
||||
~ You may obtain a copy of the License at
|
||||
~
|
||||
~ http://www.apache.org/licenses/LICENSE-2.0
|
||||
~
|
||||
~ Unless required by applicable law or agreed to in writing, software
|
||||
~ distributed under the License is distributed on an "AS IS" BASIS,
|
||||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
~ See the License for the specific language governing permissions and
|
||||
~ limitations under the License.
|
||||
-->
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>fr.acinq.eclair</groupId>
|
||||
<artifactId>eclair_2.13</artifactId>
|
||||
<version>0.4.3-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>eclair-front_2.13</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<name>eclair-front</name>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-jar-plugin</artifactId>
|
||||
<configuration>
|
||||
<finalName>application</finalName>
|
||||
<archive>
|
||||
<manifest>
|
||||
<addClasspath>true</addClasspath>
|
||||
<classpathPrefix>lib/</classpathPrefix>
|
||||
<mainClass>fr.acinq.eclair.Boot</mainClass>
|
||||
<addDefaultImplementationEntries>true</addDefaultImplementationEntries>
|
||||
</manifest>
|
||||
<manifestEntries>
|
||||
<!-- we hide the git commit in the Specification-Version standard field-->
|
||||
<Specification-Version>${git.commit.id}</Specification-Version>
|
||||
<Url>${project.parent.url}</Url>
|
||||
</manifestEntries>
|
||||
</archive>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<artifactId>maven-assembly-plugin</artifactId>
|
||||
<version>3.2.0</version>
|
||||
<configuration>
|
||||
<descriptors>
|
||||
<descriptor>modules/assembly.xml</descriptor>
|
||||
</descriptors>
|
||||
</configuration>
|
||||
<executions>
|
||||
<execution>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>single</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
<repositories>
|
||||
<repository>
|
||||
<snapshots>
|
||||
<enabled>false</enabled>
|
||||
</snapshots>
|
||||
<id>bintray-kamon-io-releases</id>
|
||||
<name>bintray</name>
|
||||
<url>https://dl.bintray.com/kamon-io/releases</url>
|
||||
</repository>
|
||||
</repositories>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>fr.acinq.eclair</groupId>
|
||||
<artifactId>eclair-core_${scala.version.short}</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<!-- logging -->
|
||||
<dependency>
|
||||
<groupId>ch.qos.logback</groupId>
|
||||
<artifactId>logback-classic</artifactId>
|
||||
<version>1.2.3</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.codehaus.janino</groupId>
|
||||
<artifactId>janino</artifactId>
|
||||
<version>3.1.2</version>
|
||||
</dependency>
|
||||
<!-- key management -->
|
||||
<dependency>
|
||||
<groupId>com.amazonaws</groupId>
|
||||
<artifactId>aws-java-sdk-secretsmanager</artifactId>
|
||||
<version>1.11.776</version>
|
||||
</dependency>
|
||||
<!-- metrics -->
|
||||
<dependency>
|
||||
<groupId>io.kamon</groupId>
|
||||
<artifactId>kamon-apm-reporter_${scala.version.short}</artifactId>
|
||||
<version>${kamon.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.kamon</groupId>
|
||||
<artifactId>kamon-system-metrics_${scala.version.short}</artifactId>
|
||||
<version>${kamon.version}</version>
|
||||
</dependency>
|
||||
<!-- agents -->
|
||||
<dependency>
|
||||
<groupId>io.kamon</groupId>
|
||||
<artifactId>kanela-agent</artifactId>
|
||||
<version>1.0.5</version>
|
||||
</dependency>
|
||||
<!-- tests -->
|
||||
<dependency>
|
||||
<groupId>fr.acinq.eclair</groupId>
|
||||
<artifactId>eclair-core_${scala.version.short}</artifactId>
|
||||
<classifier>tests</classifier>
|
||||
<version>${project.version}</version>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.typesafe.akka</groupId>
|
||||
<artifactId>akka-testkit_${scala.version.short}</artifactId>
|
||||
<version>${akka.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.typesafe.akka</groupId>
|
||||
<artifactId>akka-actor-testkit-typed_${scala.version.short}</artifactId>
|
||||
<version>${akka.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.opentable.components</groupId>
|
||||
<artifactId>otj-pg-embedded</artifactId>
|
||||
<version>0.13.3</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
70
eclair-front/src/main/resources/application.conf
Normal file
70
eclair-front/src/main/resources/application.conf
Normal file
|
@ -0,0 +1,70 @@
|
|||
eclair {
|
||||
enable-kamon = false
|
||||
|
||||
front {
|
||||
// To be overriden with the same key as the backend, so that the front has the same nodeid
|
||||
priv-key-provider = "aws-sm" // aws-sm (AWS Secrets Manager) or env (environment variable)
|
||||
priv-key = ${?NODE_PRIV_KEY} // used if priv-key-provider = env
|
||||
aws-sm.priv-key-name = "node-priv-key" // used if priv-key-provider = aws-sm
|
||||
// As a security measure, we also require the pub key, which will be matched against the priv key to make sure the
|
||||
// front is really using the expected key
|
||||
pub = ${NODE_PUB_KEY}
|
||||
}
|
||||
}
|
||||
|
||||
akka {
|
||||
actor.provider = cluster
|
||||
remote.artery {
|
||||
canonical.hostname = "127.0.0.1"
|
||||
canonical.hostname = ${?LOCAL_IP} // this will override the default value with the env variable if set
|
||||
canonical.port = 25520
|
||||
|
||||
untrusted-mode = on
|
||||
trusted-selection-paths = [
|
||||
"/system/cluster/core/daemon",
|
||||
"/system/cluster/heartbeatReceiver",
|
||||
"/system/distributedPubSubMediator",
|
||||
"/system/clusterReceptionist/replicator"
|
||||
]
|
||||
|
||||
advanced {
|
||||
outbound-message-queue-size = 30720 // 10x default because of sync
|
||||
}
|
||||
}
|
||||
cluster {
|
||||
shutdown-after-unsuccessful-join-seed-nodes = 10s # front won't start if back is offline
|
||||
roles = [frontend]
|
||||
seed-nodes = ["akka://eclair-node@"${BACKEND_IP}":25520"]
|
||||
}
|
||||
coordinated-shutdown.terminate-actor-system = on
|
||||
coordinated-shutdown.exit-jvm = on
|
||||
//It is recommended to load the extension when the actor system is started by defining it in akka.extensions
|
||||
//configuration property. Otherwise it will be activated when first used and then it takes a while for it to be populated.
|
||||
extensions = ["akka.cluster.pubsub.DistributedPubSub"]
|
||||
|
||||
}
|
||||
|
||||
kamon {
|
||||
environment.host = ${HOSTNAME}
|
||||
instrumentation.akka {
|
||||
filters {
|
||||
actors {
|
||||
# Decides which actors generate Spans for the messages they process, given that there is already an ongoing trace
|
||||
# in the Context of the processed message (i.e. there is a Sampled Span in the Context).
|
||||
#
|
||||
trace {
|
||||
includes = []
|
||||
excludes = ["**"] # we don't want automatically generated spans because they conflict with the ones we define
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
akka {
|
||||
|
||||
loggers = ["akka.event.slf4j.Slf4jLogger"]
|
||||
logger-startup-timeout = 30s
|
||||
loglevel = "DEBUG" # akka doc: You can enable DEBUG level for akka.loglevel and control the actual level in the SLF4J backend without any significant overhead, also for production.
|
||||
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
|
||||
}
|
44
eclair-front/src/main/resources/logback.xml
Normal file
44
eclair-front/src/main/resources/logback.xml
Normal file
|
@ -0,0 +1,44 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
~ Copyright 2019 ACINQ SAS
|
||||
~
|
||||
~ Licensed under the Apache License, Version 2.0 (the "License");
|
||||
~ you may not use this file except in compliance with the License.
|
||||
~ You may obtain a copy of the License at
|
||||
~
|
||||
~ http://www.apache.org/licenses/LICENSE-2.0
|
||||
~
|
||||
~ Unless required by applicable law or agreed to in writing, software
|
||||
~ distributed under the License is distributed on an "AS IS" BASIS,
|
||||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
~ See the License for the specific language governing permissions and
|
||||
~ limitations under the License.
|
||||
-->
|
||||
|
||||
<configuration>
|
||||
|
||||
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<filter class="ch.qos.logback.core.filter.EvaluatorFilter">
|
||||
<!-- this filters out AWS ELB probes -->
|
||||
<evaluator>
|
||||
<expression>
|
||||
return formattedMessage.contains("connected to /10.") ||
|
||||
(formattedMessage.contains("connection closed") && !mdc.containsKey("nodeId")) ||
|
||||
(formattedMessage.contains("transport died") && !mdc.containsKey("nodeId"));
|
||||
</expression>
|
||||
</evaluator>
|
||||
<OnMismatch>NEUTRAL</OnMismatch>
|
||||
<OnMatch>DENY</OnMatch>
|
||||
</filter>
|
||||
<target>System.out</target>
|
||||
<withJansi>false</withJansi>
|
||||
<encoder>
|
||||
<pattern>${HOSTNAME} %d %-5level %logger{24}%X{category}%X{nodeId}%X{channelId}%X{paymentHash}%.-11X{parentPaymentId}%.-11X{paymentId} - %msg%ex{12}%n</pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<root level="INFO">
|
||||
<appender-ref ref="CONSOLE"/>
|
||||
</root>
|
||||
|
||||
</configuration>
|
61
eclair-front/src/main/scala/fr/acinq/eclair/Boot.scala
Normal file
61
eclair-front/src/main/scala/fr/acinq/eclair/Boot.scala
Normal file
|
@ -0,0 +1,61 @@
|
|||
/*
|
||||
* Copyright 2019 ACINQ SAS
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package fr.acinq.eclair
|
||||
|
||||
import java.io.File
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
import com.typesafe.config.{ConfigFactory, ConfigParseOptions, ConfigSyntax}
|
||||
import grizzled.slf4j.Logging
|
||||
import kamon.Kamon
|
||||
|
||||
import scala.concurrent.ExecutionContext
|
||||
import scala.util.{Failure, Success}
|
||||
|
||||
object Boot extends App with Logging {
|
||||
try {
|
||||
val datadir = new File(System.getProperty("eclair.datadir", System.getProperty("user.home") + "/.eclair"))
|
||||
val config = ConfigFactory.parseString(
|
||||
Option(System.getenv("AKKA_CONF")).getOrElse("").replace(";", "\n"),
|
||||
ConfigParseOptions.defaults().setSyntax(ConfigSyntax.PROPERTIES))
|
||||
.withFallback(ConfigFactory.load())
|
||||
|
||||
// the actor system name needs to be the same for all members of the cluster
|
||||
implicit val system: ActorSystem = ActorSystem("eclair-node", config)
|
||||
implicit val ec: ExecutionContext = system.dispatcher
|
||||
|
||||
val setup = new FrontSetup(datadir)
|
||||
|
||||
if (config.getBoolean("eclair.enable-kamon")) {
|
||||
Kamon.init(config)
|
||||
}
|
||||
|
||||
setup.bootstrap onComplete {
|
||||
case Success(_) => ()
|
||||
case Failure(t) => onError(t)
|
||||
}
|
||||
} catch {
|
||||
case t: Throwable => onError(t)
|
||||
}
|
||||
|
||||
def onError(t: Throwable): Unit = {
|
||||
val errorMsg = if (t.getMessage != null) t.getMessage else t.getClass.getSimpleName
|
||||
System.err.println(s"fatal error: $errorMsg")
|
||||
logger.error(s"fatal error: $errorMsg", t)
|
||||
System.exit(1)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,53 @@
|
|||
/*
|
||||
* Copyright 2020 ACINQ SAS
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package fr.acinq.eclair
|
||||
|
||||
import akka.Done
|
||||
import akka.actor.{Actor, ActorLogging, Address}
|
||||
import akka.cluster.ClusterEvent.{InitialStateAsEvents, MemberDowned, MemberLeft, MemberUp, UnreachableMember}
|
||||
import akka.cluster.{Cluster, Member}
|
||||
|
||||
import scala.concurrent.Promise
|
||||
|
||||
class ClusterListener(frontJoinedCluster: Promise[Done], backendAddressFound: Promise[Address]) extends Actor with ActorLogging {
|
||||
|
||||
val cluster = Cluster(context.system)
|
||||
cluster.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[MemberUp], classOf[MemberLeft], classOf[MemberDowned], classOf[UnreachableMember])
|
||||
val BackendRole = "backend"
|
||||
|
||||
override def receive: Receive = {
|
||||
case MemberUp(member) =>
|
||||
if (member.roles.contains(BackendRole)) {
|
||||
log.info(s"found backend=$member")
|
||||
backendAddressFound.success(member.address)
|
||||
} else if (member == cluster.selfMember) {
|
||||
log.info("we have joined the cluster")
|
||||
frontJoinedCluster.success(Done)
|
||||
}
|
||||
case MemberLeft(member) => maybeShutdown(member)
|
||||
case MemberDowned(member) => maybeShutdown(member)
|
||||
case UnreachableMember(member) => maybeShutdown(member)
|
||||
}
|
||||
|
||||
private def maybeShutdown(member: Member): Unit = {
|
||||
if (member.roles.contains(BackendRole)) {
|
||||
log.info(s"backend is down, stopping...")
|
||||
System.exit(0)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
113
eclair-front/src/main/scala/fr/acinq/eclair/FrontSetup.scala
Normal file
113
eclair-front/src/main/scala/fr/acinq/eclair/FrontSetup.scala
Normal file
|
@ -0,0 +1,113 @@
|
|||
/*
|
||||
* Copyright 2019 ACINQ SAS
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package fr.acinq.eclair
|
||||
|
||||
import java.io.File
|
||||
import java.net.InetSocketAddress
|
||||
|
||||
import akka.Done
|
||||
import akka.actor.{ActorSystem, Address, Props, RootActorPath, SupervisorStrategy}
|
||||
import akka.pattern.ask
|
||||
import akka.util.Timeout
|
||||
import com.amazonaws.services.secretsmanager.AWSSecretsManagerClient
|
||||
import com.amazonaws.services.secretsmanager.model.GetSecretValueRequest
|
||||
import fr.acinq.bitcoin.Crypto.{PrivateKey, PublicKey}
|
||||
import fr.acinq.eclair.crypto.Noise.KeyPair
|
||||
import fr.acinq.eclair.io.Switchboard.{GetRouterPeerConf, RouterPeerConf}
|
||||
import fr.acinq.eclair.io.{ClientSpawner, Server}
|
||||
import fr.acinq.eclair.router.FrontRouter
|
||||
import grizzled.slf4j.Logging
|
||||
import scodec.bits.ByteVector
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.{ExecutionContext, Future, Promise}
|
||||
|
||||
class FrontSetup(datadir: File)(implicit system: ActorSystem) extends Logging {
|
||||
|
||||
implicit val timeout = Timeout(30 seconds)
|
||||
implicit val ec: ExecutionContext = system.dispatcher
|
||||
|
||||
logger.info(s"hello!")
|
||||
logger.info(s"version=${getClass.getPackage.getImplementationVersion} commit=${getClass.getPackage.getSpecificationVersion}")
|
||||
logger.info(s"datadir=${datadir.getCanonicalPath}")
|
||||
logger.info(s"initializing secure random generator")
|
||||
// this will force the secure random instance to initialize itself right now, making sure it doesn't hang later (see comment in package.scala)
|
||||
secureRandom.nextInt()
|
||||
|
||||
datadir.mkdirs()
|
||||
|
||||
val config = system.settings.config.getConfig("eclair")
|
||||
|
||||
val keyPair = {
|
||||
val pub = ByteVector.fromValidHex(config.getString("front.pub"))
|
||||
val priv = config.getString("front.priv-key-provider") match {
|
||||
case "aws-sm" =>
|
||||
val sm = AWSSecretsManagerClient.builder().build()
|
||||
try {
|
||||
// we retrieve the node key from AWS secrets manager and we compare the corresponding pub key with the expected one
|
||||
val secretId = config.getString("front.aws-sm.priv-key-name")
|
||||
ByteVector.fromValidHex(sm.getSecretValue(new GetSecretValueRequest().withSecretId(secretId)).getSecretString)
|
||||
} finally {
|
||||
sm.shutdown()
|
||||
}
|
||||
case "env" => ByteVector.fromValidHex(config.getString("front.priv-key"))
|
||||
}
|
||||
val keyPair = KeyPair(pub, priv)
|
||||
require(PrivateKey(priv).publicKey == PublicKey(pub), "priv/pub keys mismatch")
|
||||
keyPair
|
||||
}
|
||||
|
||||
logger.info(s"nodeid=${keyPair.pub.toHex}")
|
||||
|
||||
val serverBindingAddress = new InetSocketAddress(
|
||||
config.getString("server.binding-ip"),
|
||||
config.getInt("server.port"))
|
||||
|
||||
def bootstrap: Future[Unit] = {
|
||||
|
||||
val frontJoinedCluster = Promise[Done]()
|
||||
val backendAddressFound = Promise[Address]()
|
||||
val tcpBound = Promise[Done]()
|
||||
|
||||
for {
|
||||
_ <- Future.successful(0)
|
||||
|
||||
_ = system.actorOf(Props(new ClusterListener(frontJoinedCluster, backendAddressFound)), name = "cluster-listener")
|
||||
_ <- frontJoinedCluster.future
|
||||
backendAddress <- backendAddressFound.future
|
||||
|
||||
// we give time for the cluster to be ready
|
||||
_ <- akka.pattern.after(5.seconds)(Future.successful())
|
||||
|
||||
switchBoardSelection = system.actorSelection(RootActorPath(backendAddress) / "user" / "*" / "switchboard")
|
||||
remoteSwitchboard <- switchBoardSelection.resolveOne()
|
||||
routerSelection = system.actorSelection(RootActorPath(backendAddress) / "user" / "*" / "router")
|
||||
remoteRouter <- routerSelection.resolveOne()
|
||||
|
||||
RouterPeerConf(routerConf, peerConnectionConf) <- (remoteSwitchboard ? GetRouterPeerConf).mapTo[RouterPeerConf]
|
||||
|
||||
frontRouterInitialized = Promise[Done]()
|
||||
frontRouter = system.actorOf(SimpleSupervisor.props(FrontRouter.props(routerConf, remoteRouter, Some(frontRouterInitialized)), "front-router", SupervisorStrategy.Resume))
|
||||
_ <- frontRouterInitialized.future
|
||||
|
||||
clientSpawner = system.actorOf(Props(new ClientSpawner(keyPair, None, peerConnectionConf, remoteSwitchboard, frontRouter)), name = "client-spawner")
|
||||
|
||||
server = system.actorOf(SimpleSupervisor.props(Server.props(keyPair, peerConnectionConf, remoteSwitchboard, frontRouter, serverBindingAddress, Some(tcpBound)), "server", SupervisorStrategy.Restart))
|
||||
} yield ()
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,320 @@
|
|||
/*
|
||||
* Copyright 2019 ACINQ SAS
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package fr.acinq.eclair.router
|
||||
|
||||
import akka.Done
|
||||
import akka.actor.{ActorRef, Props}
|
||||
import akka.event.Logging.MDC
|
||||
import akka.event.LoggingAdapter
|
||||
import fr.acinq.bitcoin.ByteVector32
|
||||
import fr.acinq.bitcoin.Crypto.PublicKey
|
||||
import fr.acinq.eclair.Logs.LogCategory
|
||||
import fr.acinq.eclair.crypto.TransportHandler
|
||||
import fr.acinq.eclair.io.Peer.PeerRoutingMessage
|
||||
import fr.acinq.eclair.router.Router._
|
||||
import fr.acinq.eclair.wire._
|
||||
import fr.acinq.eclair.{FSMDiagnosticActorLogging, Logs, ShortChannelId, getSimpleClassName}
|
||||
import kamon.Kamon
|
||||
import kamon.metric.Counter
|
||||
|
||||
import scala.collection.immutable.SortedMap
|
||||
import scala.concurrent.Promise
|
||||
|
||||
class FrontRouter(routerConf: RouterConf, remoteRouter: ActorRef, initialized: Option[Promise[Done]] = None) extends FSMDiagnosticActorLogging[FrontRouter.State, FrontRouter.Data] {
|
||||
|
||||
import FrontRouter._
|
||||
|
||||
// we pass these to helpers classes so that they have the logging context
|
||||
implicit def implicitLog: LoggingAdapter = log
|
||||
|
||||
remoteRouter ! GetRoutingStateStreaming
|
||||
|
||||
startWith(SYNCING, Data(Map.empty, SortedMap.empty, Map.empty, Map.empty, rebroadcast = Rebroadcast(channels = Map.empty, updates = Map.empty, nodes = Map.empty)))
|
||||
|
||||
when(SYNCING) {
|
||||
case Event(networkEvent: NetworkEvent, d) =>
|
||||
stay using FrontRouter.updateTable(d, networkEvent, doRebroadcast = false)
|
||||
|
||||
case Event(RoutingStateStreamingUpToDate, d) =>
|
||||
log.info("sync done nodes={} channels={}", d.nodes.size, d.channels.size)
|
||||
initialized.map(_.success(Done))
|
||||
setTimer(TickBroadcast.toString, TickBroadcast, routerConf.routerBroadcastInterval, repeat = true)
|
||||
goto(NORMAL) using d
|
||||
}
|
||||
|
||||
when(NORMAL) {
|
||||
case Event(GetRoutingState, d) =>
|
||||
log.info(s"getting valid announcements for $sender")
|
||||
sender ! RoutingState(d.channels.values, d.nodes.values)
|
||||
stay
|
||||
|
||||
case Event(s: SendChannelQuery, _) =>
|
||||
remoteRouter forward s
|
||||
stay
|
||||
|
||||
case Event(PeerRoutingMessage(peerConnection, remoteNodeId, q: QueryChannelRange), d) =>
|
||||
Sync.handleQueryChannelRange(d.channels, routerConf, RemoteGossip(peerConnection, remoteNodeId), q)
|
||||
stay
|
||||
|
||||
case Event(PeerRoutingMessage(peerConnection, remoteNodeId, q: QueryShortChannelIds), d) =>
|
||||
Sync.handleQueryShortChannelIds(d.nodes, d.channels, RemoteGossip(peerConnection, remoteNodeId), q)
|
||||
stay
|
||||
|
||||
case Event(PeerRoutingMessage(peerConnection, remoteNodeId, ann: AnnouncementMessage), d) =>
|
||||
val origin = RemoteGossip(peerConnection, remoteNodeId)
|
||||
val d1 = d.processing.get(ann) match {
|
||||
case Some(origins) if origins.contains(origin) =>
|
||||
log.warning("acking duplicate msg={}", ann)
|
||||
origin.peerConnection ! TransportHandler.ReadAck(ann)
|
||||
d
|
||||
case Some(origins) =>
|
||||
log.debug("message is already in processing={} origins.size={}", ann, origins.size)
|
||||
Metrics.gossipStashed(ann).increment()
|
||||
// we have already forwarded that message to the router
|
||||
// we could just acknowledge the message now, but then we would lose the information that we did receive this
|
||||
// announcement from that peer, and would send it back to that same peer if our master router accepts it
|
||||
// in the general case, we should fairly often receive the same gossip from several peers almost at the same time
|
||||
val origins1 = origins + origin
|
||||
d.copy(processing = d.processing + (ann -> origins1))
|
||||
case None =>
|
||||
d.accepted.get(ann) match {
|
||||
case Some(origins) if origins.contains(origin) =>
|
||||
log.warning("acking duplicate msg={}", ann)
|
||||
origin.peerConnection ! TransportHandler.ReadAck(ann)
|
||||
d
|
||||
case Some(origins) =>
|
||||
log.debug("message is already in accepted={} origins.size={}", ann, origins.size)
|
||||
val origins1 = origins + origin
|
||||
d.copy(accepted = d.accepted + (ann -> origins1))
|
||||
case None =>
|
||||
ann match {
|
||||
case n: NodeAnnouncement if d.nodes.contains(n.nodeId) =>
|
||||
origin.peerConnection ! TransportHandler.ReadAck(ann)
|
||||
Metrics.gossipDropped(ann).increment()
|
||||
d
|
||||
case c: ChannelAnnouncement if d.channels.contains(c.shortChannelId) =>
|
||||
origin.peerConnection ! TransportHandler.ReadAck(ann)
|
||||
Metrics.gossipDropped(ann).increment()
|
||||
d
|
||||
case u: ChannelUpdate if d.channels.contains(u.shortChannelId) && d.channels(u.shortChannelId).getChannelUpdateSameSideAs(u).contains(u) =>
|
||||
origin.peerConnection ! TransportHandler.ReadAck(ann)
|
||||
Metrics.gossipDropped(ann).increment()
|
||||
d
|
||||
case n: NodeAnnouncement if d.rebroadcast.nodes.contains(n) =>
|
||||
origin.peerConnection ! TransportHandler.ReadAck(ann)
|
||||
Metrics.gossipStashedRebroadcast(ann).increment()
|
||||
d.copy(rebroadcast = d.rebroadcast.copy(nodes = d.rebroadcast.nodes + (n -> (d.rebroadcast.nodes(n) + origin))))
|
||||
case c: ChannelAnnouncement if d.rebroadcast.channels.contains(c) =>
|
||||
origin.peerConnection ! TransportHandler.ReadAck(ann)
|
||||
Metrics.gossipStashedRebroadcast(ann).increment()
|
||||
d.copy(rebroadcast = d.rebroadcast.copy(channels = d.rebroadcast.channels + (c -> (d.rebroadcast.channels(c) + origin))))
|
||||
case u: ChannelUpdate if d.rebroadcast.updates.contains(u) =>
|
||||
origin.peerConnection ! TransportHandler.ReadAck(ann)
|
||||
Metrics.gossipStashedRebroadcast(ann).increment()
|
||||
d.copy(rebroadcast = d.rebroadcast.copy(updates = d.rebroadcast.updates + (u -> (d.rebroadcast.updates(u) + origin))))
|
||||
case _ =>
|
||||
Metrics.gossipForwarded(ann).increment()
|
||||
log.debug("sending announcement class={} to master router", ann.getClass.getSimpleName)
|
||||
remoteRouter ! PeerRoutingMessage(self, remoteNodeId, ann) // nb: we set ourselves as the origin
|
||||
d.copy(processing = d.processing + (ann -> Set(origin)))
|
||||
}
|
||||
}
|
||||
}
|
||||
stay using d1
|
||||
|
||||
case Event(accepted: GossipDecision.Accepted, d) =>
|
||||
log.debug("message has been accepted by router: {}", accepted)
|
||||
Metrics.gossipAccepted(accepted.ann).increment()
|
||||
d.processing.get(accepted.ann) match {
|
||||
case Some(origins) => origins.foreach { origin =>
|
||||
log.debug("acking msg={} for origin={}", accepted.ann, origin)
|
||||
origin.peerConnection ! TransportHandler.ReadAck(accepted.ann)
|
||||
origin.peerConnection ! accepted
|
||||
}
|
||||
case None => ()
|
||||
}
|
||||
// NB: we will also shortly receive a NetworkEvent from the router for this announcement, where we put it in the
|
||||
// rebroadcast map. We keep the origin peers in the accepted map, so we don't send back the same announcement to
|
||||
// the peers that sent us in the first place.
|
||||
// Why do we not just leave the announcement in the processing map? Because we would have a race between other
|
||||
// peers that send us that same announcement, and the NetworkEvent. If the other peers win the race, we will defer
|
||||
// acknowledging their message (because the announcement is still in the processing map) and we will
|
||||
// wait forever for the very gossip decision that we are processing now, resulting in a stuck connection
|
||||
val origins1 = d.processing.getOrElse(accepted.ann, Set.empty[RemoteGossip])
|
||||
stay using d.copy(processing = d.processing - accepted.ann, accepted = d.accepted + (accepted.ann -> origins1))
|
||||
|
||||
case Event(rejected: GossipDecision.Rejected, d) =>
|
||||
log.debug("message has been rejected by router: {}", rejected)
|
||||
Metrics.gossipRejected(rejected.ann, rejected).increment()
|
||||
d.processing.get(rejected.ann) match {
|
||||
case Some(origins) => origins.foreach { origin =>
|
||||
log.debug("acking msg={} for origin={}", rejected.ann, origin)
|
||||
origin.peerConnection ! TransportHandler.ReadAck(rejected.ann)
|
||||
origin.peerConnection ! rejected
|
||||
}
|
||||
case None => ()
|
||||
}
|
||||
stay using d.copy(processing = d.processing - rejected.ann)
|
||||
|
||||
case Event(networkEvent: NetworkEvent, d) =>
|
||||
log.debug("received event={}", networkEvent)
|
||||
Metrics.routerEvent(networkEvent).increment()
|
||||
stay using FrontRouter.updateTable(d, networkEvent, doRebroadcast = true)
|
||||
|
||||
case Event(TickBroadcast, d) =>
|
||||
if (d.rebroadcast.channels.isEmpty && d.rebroadcast.updates.isEmpty && d.rebroadcast.nodes.isEmpty) {
|
||||
stay
|
||||
} else {
|
||||
log.debug("broadcasting routing messages")
|
||||
log.debug("staggered broadcast details: channels={} updates={} nodes={}", d.rebroadcast.channels.size, d.rebroadcast.updates.size, d.rebroadcast.nodes.size)
|
||||
context.system.eventStream.publish(d.rebroadcast)
|
||||
stay using d.copy(rebroadcast = Rebroadcast(channels = Map.empty, updates = Map.empty, nodes = Map.empty))
|
||||
}
|
||||
|
||||
case Event(msg: PeerRoutingMessage, _) =>
|
||||
log.debug("forwarding peer routing message class={}", msg.message.getClass.getSimpleName)
|
||||
remoteRouter forward msg
|
||||
stay
|
||||
|
||||
case Event(_: TransportHandler.ReadAck, _) => stay // acks from remote router
|
||||
}
|
||||
|
||||
override def mdc(currentMessage: Any): MDC = {
|
||||
val category_opt = LogCategory(currentMessage)
|
||||
currentMessage match {
|
||||
case PeerRoutingMessage(_, remoteNodeId, _) => Logs.mdc(category_opt, remoteNodeId_opt = Some(remoteNodeId))
|
||||
case _ => Logs.mdc(category_opt)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
object FrontRouter {
|
||||
|
||||
def props(routerConf: RouterConf, remoteRouter: ActorRef, initialized: Option[Promise[Done]] = None): Props = Props(new FrontRouter(routerConf: RouterConf, remoteRouter: ActorRef, initialized))
|
||||
|
||||
// @formatter:off
|
||||
sealed trait State
|
||||
case object SYNCING extends State
|
||||
case object NORMAL extends State
|
||||
// @formatter:on
|
||||
|
||||
case class Data(nodes: Map[PublicKey, NodeAnnouncement],
|
||||
channels: SortedMap[ShortChannelId, PublicChannel],
|
||||
processing: Map[AnnouncementMessage, Set[RemoteGossip]],
|
||||
accepted: Map[AnnouncementMessage, Set[RemoteGossip]],
|
||||
rebroadcast: Rebroadcast)
|
||||
|
||||
object Metrics {
|
||||
private val Gossip = Kamon.counter("front.router.gossip")
|
||||
private val GossipResult = Kamon.counter("front.router.gossip.result")
|
||||
private val RouterEvent = Kamon.counter("front.router.event")
|
||||
|
||||
// @formatter:off
|
||||
def gossipDropped(ann: AnnouncementMessage): Counter = Gossip.withTag("status", "dropped").withTag("type", getSimpleClassName(ann))
|
||||
def gossipStashed(ann: AnnouncementMessage): Counter = Gossip.withTag("status", "stashed").withTag("type", getSimpleClassName(ann))
|
||||
def gossipStashedRebroadcast(ann: AnnouncementMessage): Counter = Gossip.withTag("status", "stashed-rebroadcast").withTag("type", getSimpleClassName(ann))
|
||||
def gossipForwarded(ann: AnnouncementMessage): Counter = Gossip.withTag("status", "forwarded").withTag("type", getSimpleClassName(ann))
|
||||
|
||||
def gossipAccepted(ann: AnnouncementMessage): Counter = GossipResult.withTag("result", "accepted").withTag("type", getSimpleClassName(ann))
|
||||
def gossipRejected(ann: AnnouncementMessage, reason: GossipDecision.Rejected): Counter = GossipResult.withTag("result", "rejected").withTag("reason", getSimpleClassName(reason)).withTag("type", getSimpleClassName(ann))
|
||||
|
||||
def routerEvent(event: NetworkEvent): Counter = RouterEvent.withTag("type", getSimpleClassName(event))
|
||||
// @formatter:on
|
||||
}
|
||||
|
||||
def updateTable(d: Data, event: NetworkEvent, doRebroadcast: Boolean)(implicit log: LoggingAdapter): Data = {
|
||||
event match {
|
||||
case NodesDiscovered(nodes) =>
|
||||
log.debug("adding {} nodes", nodes.size)
|
||||
val nodes1 = nodes.map(n => n.nodeId -> n).toMap
|
||||
val d1 = d.copy(nodes = d.nodes ++ nodes1)
|
||||
if (doRebroadcast) {
|
||||
nodes.foldLeft(d1) { case (d, ann) => FrontRouter.rebroadcast(d, ann) }
|
||||
} else {
|
||||
d1
|
||||
}
|
||||
|
||||
case NodeUpdated(n) =>
|
||||
log.debug("updating {} nodes", 1)
|
||||
val d1 = d.copy(nodes = d.nodes + (n.nodeId -> n))
|
||||
if (doRebroadcast) {
|
||||
FrontRouter.rebroadcast(d1, n)
|
||||
} else {
|
||||
d1
|
||||
}
|
||||
|
||||
case NodeLost(nodeId) =>
|
||||
log.debug("removing {} nodes", 1)
|
||||
d.copy(nodes = d.nodes - nodeId)
|
||||
|
||||
case ChannelsDiscovered(channels) =>
|
||||
log.debug("adding {} channels", channels.size)
|
||||
val channels1 = channels.foldLeft(SortedMap.empty[ShortChannelId, PublicChannel]) {
|
||||
case (channels, sc) => channels + (sc.ann.shortChannelId -> PublicChannel(sc.ann, ByteVector32.Zeroes, sc.capacity, sc.u1_opt, sc.u2_opt, None))
|
||||
}
|
||||
val d1 = d.copy(channels = d.channels ++ channels1)
|
||||
if (doRebroadcast) {
|
||||
channels.foldLeft(d1) { case (d, sc) => FrontRouter.rebroadcast(d, sc.ann) }
|
||||
} else {
|
||||
d1
|
||||
}
|
||||
|
||||
case ChannelLost(channelId) =>
|
||||
log.debug("removing {} channels", 1)
|
||||
d.copy(channels = d.channels - channelId)
|
||||
|
||||
case ChannelUpdatesReceived(updates) =>
|
||||
log.debug("adding/updating {} channel_updates", updates.size)
|
||||
val channels1 = updates.foldLeft(d.channels) {
|
||||
case (channels, u) => channels.get(u.shortChannelId) match {
|
||||
case Some(c) => channels + (c.ann.shortChannelId -> c.updateChannelUpdateSameSideAs(u))
|
||||
case None => channels
|
||||
}
|
||||
}
|
||||
val d1 = d.copy(channels = channels1)
|
||||
if (doRebroadcast) {
|
||||
updates.foldLeft(d1) { case (d, ann) => FrontRouter.rebroadcast(d, ann) }
|
||||
} else {
|
||||
d1
|
||||
}
|
||||
|
||||
case _: SyncProgress =>
|
||||
// we receive this as part of network events but it's useless
|
||||
d
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Schedule accepted announcements for rebroadcasting to our peers.
|
||||
*/
|
||||
def rebroadcast(d: Data, ann: AnnouncementMessage)(implicit log: LoggingAdapter): Data = {
|
||||
// We don't want to send back the announcement to the peer(s) that sent it to us in the first place. Announcements
|
||||
// that came from our peers are in the [[d.accepted]] map.
|
||||
val origins = d.accepted.getOrElse(ann, Set.empty[RemoteGossip]).map(o => o: GossipOrigin)
|
||||
val rebroadcast1 = ann match {
|
||||
case n: NodeAnnouncement => d.rebroadcast.copy(nodes = d.rebroadcast.nodes + (n -> origins))
|
||||
case c: ChannelAnnouncement => d.rebroadcast.copy(channels = d.rebroadcast.channels + (c -> origins))
|
||||
case u: ChannelUpdate =>
|
||||
if (d.channels.contains(u.shortChannelId)) {
|
||||
d.rebroadcast.copy(updates = d.rebroadcast.updates + (u -> origins))
|
||||
} else {
|
||||
d.rebroadcast // private channel, we don't rebroadcast the channel_update
|
||||
}
|
||||
}
|
||||
d.copy(accepted = d.accepted - ann, rebroadcast = rebroadcast1)
|
||||
}
|
||||
}
|
12
eclair-front/src/test/resources/application.conf
Normal file
12
eclair-front/src/test/resources/application.conf
Normal file
|
@ -0,0 +1,12 @@
|
|||
eclair.front {
|
||||
// just placeholders because this conf key is mandatory
|
||||
pub = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa01"
|
||||
}
|
||||
|
||||
kamon.environment.host = "auto"
|
||||
|
||||
akka {
|
||||
actor.provider = local
|
||||
cluster.seed-nodes = []
|
||||
extensions = []
|
||||
}
|
|
@ -0,0 +1,357 @@
|
|||
/*
|
||||
* Copyright 2019 ACINQ SAS
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package fr.acinq.eclair.router
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
import akka.testkit.{TestKit, TestProbe}
|
||||
import fr.acinq.bitcoin.Crypto.PrivateKey
|
||||
import fr.acinq.bitcoin.Script.{pay2wsh, write}
|
||||
import fr.acinq.bitcoin.{Block, Transaction, TxOut}
|
||||
import fr.acinq.eclair.TestConstants.Alice
|
||||
import fr.acinq.eclair.blockchain.{UtxoStatus, ValidateRequest, ValidateResult}
|
||||
import fr.acinq.eclair.crypto.TransportHandler
|
||||
import fr.acinq.eclair.io.Peer.PeerRoutingMessage
|
||||
import fr.acinq.eclair.router.Announcements.{makeChannelAnnouncement, makeChannelUpdate, makeNodeAnnouncement, signChannelAnnouncement}
|
||||
import fr.acinq.eclair.router.Router._
|
||||
import fr.acinq.eclair.transactions.Scripts
|
||||
import fr.acinq.eclair.wire.Color
|
||||
import fr.acinq.eclair.{CltvExpiryDelta, ShortChannelId, randomKey, _}
|
||||
import org.scalatest.funsuite.AnyFunSuiteLike
|
||||
import scodec.bits._
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
class FrontRouterSpec extends TestKit(ActorSystem("test")) with AnyFunSuiteLike {
|
||||
|
||||
import FrontRouterSpec._
|
||||
|
||||
test("correctly dispatch valid gossip") {
|
||||
val nodeParams = Alice.nodeParams
|
||||
|
||||
val watcher = TestProbe()
|
||||
val router = system.actorOf(Router.props(nodeParams, watcher.ref))
|
||||
|
||||
val system1 = ActorSystem("front-system-1")
|
||||
val system2 = ActorSystem("front-system-2")
|
||||
val system3 = ActorSystem("front-system-3")
|
||||
|
||||
// we use those to control messages exchanged between front and back routers
|
||||
val pipe1 = TestProbe()
|
||||
val pipe2 = TestProbe()
|
||||
val pipe3 = TestProbe()
|
||||
|
||||
val front1 = system1.actorOf(FrontRouter.props(nodeParams.routerConf, pipe1.ref))
|
||||
val front2 = system2.actorOf(FrontRouter.props(nodeParams.routerConf, pipe2.ref))
|
||||
val front3 = system3.actorOf(FrontRouter.props(nodeParams.routerConf, pipe3.ref))
|
||||
|
||||
pipe1.expectMsg(GetRoutingStateStreaming)
|
||||
pipe1.send(router, GetRoutingStateStreaming)
|
||||
pipe1.expectMsg(RoutingStateStreamingUpToDate)
|
||||
pipe1.forward(front1)
|
||||
|
||||
pipe2.expectMsg(GetRoutingStateStreaming)
|
||||
pipe2.send(router, GetRoutingStateStreaming)
|
||||
pipe2.expectMsg(RoutingStateStreamingUpToDate)
|
||||
pipe2.forward(front2)
|
||||
|
||||
pipe3.expectMsg(GetRoutingStateStreaming)
|
||||
pipe3.send(router, GetRoutingStateStreaming)
|
||||
pipe3.expectMsg(RoutingStateStreamingUpToDate)
|
||||
pipe3.forward(front3)
|
||||
|
||||
val peerConnection1a = TestProbe()
|
||||
val peerConnection1b = TestProbe()
|
||||
val peerConnection2a = TestProbe()
|
||||
val peerConnection3a = TestProbe()
|
||||
|
||||
system1.eventStream.subscribe(peerConnection1a.ref, classOf[Rebroadcast])
|
||||
system1.eventStream.subscribe(peerConnection1b.ref, classOf[Rebroadcast])
|
||||
system2.eventStream.subscribe(peerConnection2a.ref, classOf[Rebroadcast])
|
||||
system3.eventStream.subscribe(peerConnection3a.ref, classOf[Rebroadcast])
|
||||
|
||||
val origin1a = RemoteGossip(peerConnection1a.ref, randomKey.publicKey)
|
||||
val origin1b = RemoteGossip(peerConnection1b.ref, randomKey.publicKey)
|
||||
val origin2a = RemoteGossip(peerConnection2a.ref, randomKey.publicKey)
|
||||
|
||||
peerConnection1a.send(front1, PeerRoutingMessage(peerConnection1a.ref, origin1a.nodeId, chan_ab))
|
||||
pipe1.expectMsg(PeerRoutingMessage(front1, origin1a.nodeId, chan_ab))
|
||||
pipe1.send(router, PeerRoutingMessage(pipe1.ref, origin1a.nodeId, chan_ab))
|
||||
|
||||
watcher.expectMsg(ValidateRequest(chan_ab))
|
||||
|
||||
peerConnection1b.send(front1, PeerRoutingMessage(peerConnection1b.ref, origin1b.nodeId, chan_ab))
|
||||
pipe1.expectNoMessage()
|
||||
|
||||
peerConnection2a.send(front2, PeerRoutingMessage(peerConnection2a.ref, origin2a.nodeId, chan_ab))
|
||||
pipe2.expectMsg(PeerRoutingMessage(front2, origin2a.nodeId, chan_ab))
|
||||
pipe2.send(router, PeerRoutingMessage(pipe2.ref, origin2a.nodeId, chan_ab))
|
||||
pipe2.expectMsg(TransportHandler.ReadAck(chan_ab))
|
||||
|
||||
pipe1.expectNoMessage()
|
||||
pipe2.expectNoMessage()
|
||||
|
||||
watcher.send(router, ValidateResult(chan_ab, Right((Transaction(version = 0, txIn = Nil, txOut = TxOut(1000000 sat, write(pay2wsh(Scripts.multiSig2of2(funding_a, funding_b)))) :: Nil, lockTime = 0), UtxoStatus.Unspent))))
|
||||
pipe1.expectMsg(TransportHandler.ReadAck(chan_ab))
|
||||
|
||||
pipe1.expectMsg(GossipDecision.Accepted(chan_ab))
|
||||
pipe1.forward(front1)
|
||||
pipe1.expectMsg(ChannelsDiscovered(Seq(SingleChannelDiscovered(chan_ab, 1000000 sat, None, None))))
|
||||
pipe1.forward(front1)
|
||||
pipe2.expectMsg(GossipDecision.Accepted(chan_ab))
|
||||
pipe2.forward(front2)
|
||||
pipe2.expectMsg(ChannelsDiscovered(Seq(SingleChannelDiscovered(chan_ab, 1000000 sat, None, None))))
|
||||
pipe2.forward(front2)
|
||||
pipe3.expectMsg(ChannelsDiscovered(Seq(SingleChannelDiscovered(chan_ab, 1000000 sat, None, None))))
|
||||
pipe3.forward(front3)
|
||||
|
||||
pipe1.expectNoMessage()
|
||||
pipe2.expectNoMessage()
|
||||
pipe3.expectNoMessage()
|
||||
|
||||
peerConnection1a.expectMsg(TransportHandler.ReadAck(chan_ab))
|
||||
peerConnection1b.expectMsg(TransportHandler.ReadAck(chan_ab))
|
||||
peerConnection2a.expectMsg(TransportHandler.ReadAck(chan_ab))
|
||||
|
||||
peerConnection1a.expectMsg(GossipDecision.Accepted(chan_ab))
|
||||
peerConnection1b.expectMsg(GossipDecision.Accepted(chan_ab))
|
||||
peerConnection2a.expectMsg(GossipDecision.Accepted(chan_ab))
|
||||
|
||||
// we have to wait 2 times the broadcast interval because there is an additional per-peer delay
|
||||
val maxBroadcastDelay = 2 * nodeParams.routerConf.routerBroadcastInterval + 1.second
|
||||
peerConnection1a.expectMsg(maxBroadcastDelay, Rebroadcast(channels = Map(chan_ab -> Set(origin1a, origin1b)), updates = Map.empty, nodes = Map.empty))
|
||||
peerConnection1b.expectMsg(maxBroadcastDelay, Rebroadcast(channels = Map(chan_ab -> Set(origin1a, origin1b)), updates = Map.empty, nodes = Map.empty))
|
||||
peerConnection2a.expectMsg(maxBroadcastDelay, Rebroadcast(channels = Map(chan_ab -> Set(origin2a)), updates = Map.empty, nodes = Map.empty))
|
||||
peerConnection3a.expectMsg(maxBroadcastDelay, Rebroadcast(channels = Map(chan_ab -> Set.empty), updates = Map.empty, nodes = Map.empty))
|
||||
}
|
||||
|
||||
test("aggregate gossip") {
|
||||
val nodeParams = Alice.nodeParams
|
||||
|
||||
val watcher = TestProbe()
|
||||
val router = system.actorOf(Router.props(nodeParams, watcher.ref))
|
||||
|
||||
val system1 = ActorSystem("front-system-1")
|
||||
val system2 = ActorSystem("front-system-2")
|
||||
val system3 = ActorSystem("front-system-3")
|
||||
|
||||
val front1 = system1.actorOf(FrontRouter.props(nodeParams.routerConf, router))
|
||||
val front2 = system2.actorOf(FrontRouter.props(nodeParams.routerConf, router))
|
||||
val front3 = system3.actorOf(FrontRouter.props(nodeParams.routerConf, router))
|
||||
|
||||
val peerConnection1a = TestProbe("peerconn-1a")
|
||||
val peerConnection1b = TestProbe("peerconn-1b")
|
||||
val peerConnection2a = TestProbe("peerconn-2a")
|
||||
val peerConnection3a = TestProbe("peerconn-3a")
|
||||
|
||||
system1.eventStream.subscribe(peerConnection1a.ref, classOf[Rebroadcast])
|
||||
system1.eventStream.subscribe(peerConnection1b.ref, classOf[Rebroadcast])
|
||||
system2.eventStream.subscribe(peerConnection2a.ref, classOf[Rebroadcast])
|
||||
system3.eventStream.subscribe(peerConnection3a.ref, classOf[Rebroadcast])
|
||||
|
||||
val origin1a = RemoteGossip(peerConnection1a.ref, randomKey.publicKey)
|
||||
val origin1b = RemoteGossip(peerConnection1b.ref, randomKey.publicKey)
|
||||
val origin2a = RemoteGossip(peerConnection2a.ref, randomKey.publicKey)
|
||||
val origin3a = RemoteGossip(peerConnection3a.ref, randomKey.publicKey)
|
||||
|
||||
peerConnection1a.send(front1, PeerRoutingMessage(peerConnection1a.ref, origin1a.nodeId, chan_ab))
|
||||
watcher.expectMsg(ValidateRequest(chan_ab))
|
||||
peerConnection1b.send(front1, PeerRoutingMessage(peerConnection1b.ref, origin1b.nodeId, chan_ab))
|
||||
peerConnection2a.send(front2, PeerRoutingMessage(peerConnection2a.ref, origin2a.nodeId, chan_ab))
|
||||
|
||||
peerConnection1a.send(front1, PeerRoutingMessage(peerConnection1a.ref, origin1a.nodeId, ann_c))
|
||||
peerConnection1a.expectMsg(TransportHandler.ReadAck(ann_c))
|
||||
peerConnection1a.expectMsg(GossipDecision.NoKnownChannel(ann_c))
|
||||
peerConnection3a.send(front3, PeerRoutingMessage(peerConnection3a.ref, origin3a.nodeId, ann_a))
|
||||
peerConnection3a.send(front3, PeerRoutingMessage(peerConnection3a.ref, origin3a.nodeId, channelUpdate_ba))
|
||||
peerConnection3a.send(front3, PeerRoutingMessage(peerConnection3a.ref, origin3a.nodeId, channelUpdate_bc))
|
||||
peerConnection3a.expectMsg(TransportHandler.ReadAck(channelUpdate_bc))
|
||||
peerConnection3a.expectMsg(GossipDecision.NoRelatedChannel(channelUpdate_bc))
|
||||
|
||||
|
||||
watcher.send(router, ValidateResult(chan_ab, Right((Transaction(version = 0, txIn = Nil, txOut = TxOut(1000000 sat, write(pay2wsh(Scripts.multiSig2of2(funding_a, funding_b)))) :: Nil, lockTime = 0), UtxoStatus.Unspent))))
|
||||
|
||||
peerConnection1a.expectMsg(TransportHandler.ReadAck(chan_ab))
|
||||
peerConnection1b.expectMsg(TransportHandler.ReadAck(chan_ab))
|
||||
peerConnection2a.expectMsg(TransportHandler.ReadAck(chan_ab))
|
||||
|
||||
peerConnection1a.expectMsg(GossipDecision.Accepted(chan_ab))
|
||||
peerConnection1b.expectMsg(GossipDecision.Accepted(chan_ab))
|
||||
peerConnection2a.expectMsg(GossipDecision.Accepted(chan_ab))
|
||||
|
||||
peerConnection3a.expectMsg(TransportHandler.ReadAck(channelUpdate_ba))
|
||||
peerConnection3a.expectMsg(GossipDecision.Accepted(channelUpdate_ba))
|
||||
|
||||
peerConnection3a.expectMsg(TransportHandler.ReadAck(ann_a))
|
||||
peerConnection3a.expectMsg(GossipDecision.Accepted(ann_a))
|
||||
|
||||
peerConnection1b.send(front1, PeerRoutingMessage(peerConnection1b.ref, origin1b.nodeId, channelUpdate_ab))
|
||||
peerConnection1b.expectMsg(TransportHandler.ReadAck(channelUpdate_ab))
|
||||
peerConnection1b.expectMsg(GossipDecision.Accepted(channelUpdate_ab))
|
||||
|
||||
peerConnection3a.send(front3, PeerRoutingMessage(peerConnection3a.ref, origin3a.nodeId, ann_b))
|
||||
peerConnection3a.expectMsg(TransportHandler.ReadAck(ann_b))
|
||||
peerConnection3a.expectMsg(GossipDecision.Accepted(ann_b))
|
||||
|
||||
// we have to wait 2 times the broadcast interval because there is an additional per-peer delay
|
||||
val maxBroadcastDelay = 2 * nodeParams.routerConf.routerBroadcastInterval + 1.second
|
||||
peerConnection1a.expectMsg(maxBroadcastDelay, Rebroadcast(channels = Map(chan_ab -> Set(origin1a, origin1b)), updates = Map(channelUpdate_ab -> Set(origin1b), channelUpdate_ba -> Set.empty), nodes = Map(ann_a -> Set.empty, ann_b -> Set.empty)))
|
||||
peerConnection1b.expectMsg(maxBroadcastDelay, Rebroadcast(channels = Map(chan_ab -> Set(origin1a, origin1b)), updates = Map(channelUpdate_ab -> Set(origin1b), channelUpdate_ba -> Set.empty), nodes = Map(ann_a -> Set.empty, ann_b -> Set.empty)))
|
||||
peerConnection2a.expectMsg(maxBroadcastDelay, Rebroadcast(channels = Map(chan_ab -> Set(origin2a)), updates = Map(channelUpdate_ab -> Set.empty, channelUpdate_ba -> Set.empty), nodes = Map(ann_a -> Set.empty, ann_b -> Set.empty)))
|
||||
peerConnection3a.expectMsg(maxBroadcastDelay, Rebroadcast(channels = Map(chan_ab -> Set.empty), updates = Map(channelUpdate_ab -> Set.empty, channelUpdate_ba -> Set(origin3a)), nodes = Map(ann_a -> Set(origin3a), ann_b -> Set(origin3a))))
|
||||
}
|
||||
|
||||
test("do not forward duplicate gossip") {
|
||||
val nodeParams = Alice.nodeParams
|
||||
val router = TestProbe()
|
||||
val system1 = ActorSystem("front-system-1")
|
||||
val front1 = system1.actorOf(FrontRouter.props(nodeParams.routerConf, router.ref))
|
||||
router.expectMsg(GetRoutingStateStreaming)
|
||||
router.send(front1, RoutingStateStreamingUpToDate)
|
||||
|
||||
val peerConnection1 = TestProbe()
|
||||
system1.eventStream.subscribe(peerConnection1.ref, classOf[Rebroadcast])
|
||||
|
||||
val origin1 = RemoteGossip(peerConnection1.ref, randomKey.publicKey)
|
||||
|
||||
peerConnection1.send(front1, PeerRoutingMessage(peerConnection1.ref, origin1.nodeId, chan_ab))
|
||||
router.expectMsg(PeerRoutingMessage(front1, origin1.nodeId, chan_ab))
|
||||
router.send(front1, TransportHandler.ReadAck(chan_ab))
|
||||
peerConnection1.expectNoMessage()
|
||||
router.send(front1, GossipDecision.Accepted(chan_ab))
|
||||
peerConnection1.expectMsg(TransportHandler.ReadAck(chan_ab))
|
||||
peerConnection1.expectMsg(GossipDecision.Accepted(chan_ab))
|
||||
router.send(front1, ChannelsDiscovered(SingleChannelDiscovered(chan_ab, 0.sat, None, None) :: Nil))
|
||||
|
||||
peerConnection1.send(front1, PeerRoutingMessage(peerConnection1.ref, origin1.nodeId, chan_ab))
|
||||
router.expectNoMessage // announcement is pending rebroadcast
|
||||
peerConnection1.expectMsg(TransportHandler.ReadAck(chan_ab))
|
||||
|
||||
router.send(front1, TickBroadcast)
|
||||
peerConnection1.expectMsg(Rebroadcast(channels = Map(chan_ab -> Set(origin1)), updates = Map.empty, nodes = Map.empty))
|
||||
|
||||
peerConnection1.send(front1, PeerRoutingMessage(peerConnection1.ref, origin1.nodeId, chan_ab))
|
||||
router.expectNoMessage // announcement is already known
|
||||
peerConnection1.expectMsg(TransportHandler.ReadAck(chan_ab))
|
||||
}
|
||||
|
||||
test("acknowledge duplicate gossip") {
|
||||
val nodeParams = Alice.nodeParams
|
||||
val router = TestProbe()
|
||||
val system1 = ActorSystem("front-system-1")
|
||||
val front1 = system1.actorOf(FrontRouter.props(nodeParams.routerConf, router.ref))
|
||||
router.expectMsg(GetRoutingStateStreaming)
|
||||
router.send(front1, RoutingStateStreamingUpToDate)
|
||||
|
||||
val peerConnection1 = TestProbe()
|
||||
system1.eventStream.subscribe(peerConnection1.ref, classOf[Rebroadcast])
|
||||
|
||||
val origin1 = RemoteGossip(peerConnection1.ref, randomKey.publicKey)
|
||||
|
||||
// first message arrives and is forwarded to router
|
||||
peerConnection1.send(front1, PeerRoutingMessage(peerConnection1.ref, origin1.nodeId, chan_ab))
|
||||
router.expectMsg(PeerRoutingMessage(front1, origin1.nodeId, chan_ab))
|
||||
peerConnection1.expectNoMessage()
|
||||
// duplicate message is immediately acknowledged
|
||||
peerConnection1.send(front1, PeerRoutingMessage(peerConnection1.ref, origin1.nodeId, chan_ab))
|
||||
peerConnection1.expectMsg(TransportHandler.ReadAck(chan_ab))
|
||||
// router acknowledges the first message
|
||||
router.send(front1, TransportHandler.ReadAck(chan_ab))
|
||||
// but we still wait for the decision before acking the original message
|
||||
peerConnection1.expectNoMessage()
|
||||
// decision arrives, message is acknowledged
|
||||
router.send(front1, GossipDecision.Accepted(chan_ab))
|
||||
peerConnection1.expectMsg(TransportHandler.ReadAck(chan_ab))
|
||||
peerConnection1.expectMsg(GossipDecision.Accepted(chan_ab))
|
||||
}
|
||||
|
||||
test("do not rebroadcast channel_update for private channels") {
|
||||
val nodeParams = Alice.nodeParams
|
||||
val router = TestProbe()
|
||||
val system1 = ActorSystem("front-system-1")
|
||||
val front1 = system1.actorOf(FrontRouter.props(nodeParams.routerConf, router.ref))
|
||||
router.expectMsg(GetRoutingStateStreaming)
|
||||
router.send(front1, RoutingStateStreamingUpToDate)
|
||||
|
||||
val peerConnection1 = TestProbe()
|
||||
system1.eventStream.subscribe(peerConnection1.ref, classOf[Rebroadcast])
|
||||
|
||||
val origin1 = RemoteGossip(peerConnection1.ref, randomKey.publicKey)
|
||||
|
||||
// channel_update arrives and is forwarded to router (there is no associated channel, because it is private)
|
||||
peerConnection1.send(front1, PeerRoutingMessage(peerConnection1.ref, origin1.nodeId, channelUpdate_ab))
|
||||
router.expectMsg(PeerRoutingMessage(front1, origin1.nodeId, channelUpdate_ab))
|
||||
peerConnection1.expectNoMessage()
|
||||
// router acknowledges the message
|
||||
router.send(front1, TransportHandler.ReadAck(channelUpdate_ab))
|
||||
// but we still wait for the decision before acking the original message
|
||||
peerConnection1.expectNoMessage()
|
||||
// decision arrives, message is acknowledged
|
||||
router.send(front1, GossipDecision.Accepted(channelUpdate_ab))
|
||||
peerConnection1.expectMsg(TransportHandler.ReadAck(channelUpdate_ab))
|
||||
peerConnection1.expectMsg(GossipDecision.Accepted(channelUpdate_ab))
|
||||
// then the event arrives
|
||||
front1 ! ChannelUpdatesReceived(channelUpdate_ab :: Nil)
|
||||
// rebroadcast
|
||||
front1 ! TickBroadcast
|
||||
peerConnection1.expectNoMessage()
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
object FrontRouterSpec {
|
||||
val (priv_a, priv_b, priv_c, priv_d, priv_e, priv_f) = (randomKey, randomKey, randomKey, randomKey, randomKey, randomKey)
|
||||
val (a, b, c, d, e, f) = (priv_a.publicKey, priv_b.publicKey, priv_c.publicKey, priv_d.publicKey, priv_e.publicKey, priv_f.publicKey)
|
||||
|
||||
val (priv_funding_a, priv_funding_b, priv_funding_c, priv_funding_d, priv_funding_e, priv_funding_f) = (randomKey, randomKey, randomKey, randomKey, randomKey, randomKey)
|
||||
val (funding_a, funding_b, funding_c, funding_d, funding_e, funding_f) = (priv_funding_a.publicKey, priv_funding_b.publicKey, priv_funding_c.publicKey, priv_funding_d.publicKey, priv_funding_e.publicKey, priv_funding_f.publicKey)
|
||||
|
||||
val ann_a = makeNodeAnnouncement(priv_a, "node-A", Color(15, 10, -70), Nil, Features(hex"0200"))
|
||||
val ann_b = makeNodeAnnouncement(priv_b, "node-B", Color(50, 99, -80), Nil, Features(hex""))
|
||||
val ann_c = makeNodeAnnouncement(priv_c, "node-C", Color(123, 100, -40), Nil, Features(hex"0200"))
|
||||
val ann_d = makeNodeAnnouncement(priv_d, "node-D", Color(-120, -20, 60), Nil, Features(hex"00"))
|
||||
val ann_e = makeNodeAnnouncement(priv_e, "node-E", Color(-50, 0, 10), Nil, Features(hex"00"))
|
||||
val ann_f = makeNodeAnnouncement(priv_f, "node-F", Color(30, 10, -50), Nil, Features(hex"00"))
|
||||
|
||||
val channelId_ab = ShortChannelId(420000, 1, 0)
|
||||
val channelId_bc = ShortChannelId(420000, 2, 0)
|
||||
val channelId_cd = ShortChannelId(420000, 3, 0)
|
||||
val channelId_ef = ShortChannelId(420000, 4, 0)
|
||||
|
||||
def channelAnnouncement(shortChannelId: ShortChannelId, node1_priv: PrivateKey, node2_priv: PrivateKey, funding1_priv: PrivateKey, funding2_priv: PrivateKey) = {
|
||||
val witness = Announcements.generateChannelAnnouncementWitness(Block.RegtestGenesisBlock.hash, shortChannelId, node1_priv.publicKey, node2_priv.publicKey, funding1_priv.publicKey, funding2_priv.publicKey, Features.empty)
|
||||
val node1_sig = Announcements.signChannelAnnouncement(witness, node1_priv)
|
||||
val funding1_sig = Announcements.signChannelAnnouncement(witness, funding1_priv)
|
||||
val node2_sig = Announcements.signChannelAnnouncement(witness, node2_priv)
|
||||
val funding2_sig = Announcements.signChannelAnnouncement(witness, funding2_priv)
|
||||
makeChannelAnnouncement(Block.RegtestGenesisBlock.hash, shortChannelId, node1_priv.publicKey, node2_priv.publicKey, funding1_priv.publicKey, funding2_priv.publicKey, node1_sig, node2_sig, funding1_sig, funding2_sig)
|
||||
}
|
||||
|
||||
val chan_ab = channelAnnouncement(channelId_ab, priv_a, priv_b, priv_funding_a, priv_funding_b)
|
||||
val chan_bc = channelAnnouncement(channelId_bc, priv_b, priv_c, priv_funding_b, priv_funding_c)
|
||||
val chan_cd = channelAnnouncement(channelId_cd, priv_c, priv_d, priv_funding_c, priv_funding_d)
|
||||
val chan_ef = channelAnnouncement(channelId_ef, priv_e, priv_f, priv_funding_e, priv_funding_f)
|
||||
|
||||
val channelUpdate_ab = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_a, b, channelId_ab, CltvExpiryDelta(7), htlcMinimumMsat = 0 msat, feeBaseMsat = 10 msat, feeProportionalMillionths = 10, htlcMaximumMsat = 500000000 msat)
|
||||
val channelUpdate_ba = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_b, a, channelId_ab, CltvExpiryDelta(7), htlcMinimumMsat = 0 msat, feeBaseMsat = 10 msat, feeProportionalMillionths = 10, htlcMaximumMsat = 500000000 msat)
|
||||
val channelUpdate_bc = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_b, c, channelId_bc, CltvExpiryDelta(5), htlcMinimumMsat = 0 msat, feeBaseMsat = 10 msat, feeProportionalMillionths = 1, htlcMaximumMsat = 500000000 msat)
|
||||
val channelUpdate_cb = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_c, b, channelId_bc, CltvExpiryDelta(5), htlcMinimumMsat = 0 msat, feeBaseMsat = 10 msat, feeProportionalMillionths = 1, htlcMaximumMsat = 500000000 msat)
|
||||
val channelUpdate_cd = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_c, d, channelId_cd, CltvExpiryDelta(3), htlcMinimumMsat = 0 msat, feeBaseMsat = 10 msat, feeProportionalMillionths = 4, htlcMaximumMsat = 500000000 msat)
|
||||
val channelUpdate_dc = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_d, c, channelId_cd, CltvExpiryDelta(3), htlcMinimumMsat = 0 msat, feeBaseMsat = 10 msat, feeProportionalMillionths = 4, htlcMaximumMsat = 500000000 msat)
|
||||
val channelUpdate_ef = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_e, f, channelId_ef, CltvExpiryDelta(9), htlcMinimumMsat = 0 msat, feeBaseMsat = 10 msat, feeProportionalMillionths = 8, htlcMaximumMsat = 500000000 msat)
|
||||
val channelUpdate_fe = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_f, e, channelId_ef, CltvExpiryDelta(9), htlcMinimumMsat = 0 msat, feeBaseMsat = 10 msat, feeProportionalMillionths = 8, htlcMaximumMsat = 500000000 msat)
|
||||
}
|
|
@ -2,6 +2,36 @@ eclair {
|
|||
enable-kamon = false
|
||||
}
|
||||
|
||||
akka {
|
||||
actor.provider = cluster
|
||||
remote {
|
||||
artery {
|
||||
canonical.hostname = "127.0.0.1"
|
||||
canonical.port = 25520
|
||||
|
||||
untrusted-mode = on
|
||||
trusted-selection-paths = [
|
||||
"/user/*/switchboard",
|
||||
"/user/*/router",
|
||||
"/system/cluster/core/daemon",
|
||||
"/system/cluster/heartbeatReceiver",
|
||||
"/system/distributedPubSubMediator",
|
||||
"/system/clusterReceptionist/replicator"
|
||||
]
|
||||
}
|
||||
deployment {
|
||||
enable-whitelist = on
|
||||
whitelist = [] // no remote deployment
|
||||
}
|
||||
}
|
||||
cluster.roles = [backend]
|
||||
coordinated-shutdown.terminate-actor-system = on
|
||||
coordinated-shutdown.exit-jvm = on
|
||||
//It is recommended to load the extension when the actor system is started by defining it in akka.extensions
|
||||
//configuration property. Otherwise it will be activated when first used and then it takes a while for it to be populated.
|
||||
extensions = ["akka.cluster.pubsub.DistributedPubSub"]
|
||||
}
|
||||
|
||||
kamon.instrumentation.akka {
|
||||
filters {
|
||||
actors {
|
||||
|
|
|
@ -29,8 +29,8 @@ import scala.concurrent.ExecutionContext
|
|||
import scala.util.{Failure, Success}
|
||||
|
||||
/**
|
||||
* Created by PM on 25/01/2016.
|
||||
*/
|
||||
* Created by PM on 25/01/2016.
|
||||
*/
|
||||
object Boot extends App with Logging {
|
||||
try {
|
||||
val datadir = new File(System.getProperty("eclair.datadir", System.getProperty("user.home") + "/.eclair"))
|
||||
|
@ -59,15 +59,15 @@ object Boot extends App with Logging {
|
|||
}
|
||||
|
||||
/**
|
||||
* Starts the http APIs service if enabled in the configuration
|
||||
*
|
||||
* @param kit
|
||||
* @param system
|
||||
* @param ec
|
||||
*/
|
||||
* Starts the http APIs service if enabled in the configuration
|
||||
*
|
||||
* @param kit
|
||||
* @param system
|
||||
* @param ec
|
||||
*/
|
||||
def startApiServiceIfEnabled(kit: Kit)(implicit system: ActorSystem, ec: ExecutionContext) = {
|
||||
val config = system.settings.config.getConfig("eclair")
|
||||
if(config.getBoolean("api.enabled")){
|
||||
if (config.getBoolean("api.enabled")) {
|
||||
logger.info(s"json API enabled on port=${config.getInt("api.port")}")
|
||||
implicit val materializer = ActorMaterializer()
|
||||
val apiPassword = config.getString("api.password") match {
|
||||
|
|
1
pom.xml
1
pom.xml
|
@ -25,6 +25,7 @@
|
|||
|
||||
<modules>
|
||||
<module>eclair-core</module>
|
||||
<module>eclair-front</module>
|
||||
<module>eclair-node</module>
|
||||
<module>eclair-node-gui</module>
|
||||
</modules>
|
||||
|
|
Loading…
Add table
Reference in a new issue