From 7d79f65bf206369519a5d393d69b8f74bb06e767 Mon Sep 17 00:00:00 2001 From: pm47 Date: Wed, 15 Feb 2017 18:08:44 +0100 Subject: [PATCH] channels are now multiplexed over single connections between nodes --- .../src/main/resources/logback_colors.xml | 13 ++ .../src/main/scala/fr/acinq/eclair/Boot.scala | 13 +- .../scala/fr/acinq/eclair/api/Service.scala | 26 +-- .../fr/acinq/eclair/channel/Channel.scala | 124 +++++------- .../acinq/eclair/channel/ChannelTypes.scala | 12 +- .../fr/acinq/eclair/channel/Register.scala | 92 ++------- .../eclair/crypto/TransportHandler.scala | 110 +++++------ .../scala/fr/acinq/eclair/gui/Handlers.scala | 14 +- .../scala/fr/acinq/eclair/io/Client.scala | 47 +++-- .../io/LightningMessageSerializer.scala | 2 +- .../main/scala/fr/acinq/eclair/io/Peer.scala | 182 ++++++++++++++++++ .../scala/fr/acinq/eclair/io/Server.scala | 39 ++-- .../fr/acinq/eclair/io/Switchboard.scala | 68 +++++++ .../eclair/payment/PaymentLifecycle.scala | 10 +- .../scala/fr/acinq/eclair/router/Router.scala | 2 +- .../scala/fr/acinq/eclair/wire/Types.scala | 32 +-- .../acinq/eclair/channel/ThroughputSpec.scala | 11 +- .../states/StateTestsHelperMethods.scala | 11 +- .../a/WaitForAcceptChannelStateSpec.scala | 14 +- .../a/WaitForOpenChannelStateSpec.scala | 16 +- ...itForFundingCreatedInternalStateSpec.scala | 14 +- .../b/WaitForFundingCreatedStateSpec.scala | 16 +- .../b/WaitForFundingSignedStateSpec.scala | 14 +- .../c/WaitForAnnSignaturesStateSpec.scala | 17 +- ...aitForFundingLockedInternalStateSpec.scala | 14 +- .../c/WaitForFundingLockedStateSpec.scala | 21 +- .../channel/states/e/NormalStateSpec.scala | 4 +- .../channel/states/f/ShutdownStateSpec.scala | 4 +- .../states/g/NegotiatingStateSpec.scala | 4 +- .../channel/states/h/ClosingStateSpec.scala | 4 +- .../eclair/crypto/TransportHandlerSpec.scala | 36 +++- .../eclair/interop/InteroperabilitySpec.scala | 8 +- .../interop/rustytests/RustyTestsSpec.scala | 15 +- 33 files changed, 618 insertions(+), 391 deletions(-) create mode 100644 eclair-node/src/main/scala/fr/acinq/eclair/io/Peer.scala create mode 100644 eclair-node/src/main/scala/fr/acinq/eclair/io/Switchboard.scala diff --git a/eclair-node/src/main/resources/logback_colors.xml b/eclair-node/src/main/resources/logback_colors.xml index 126ac7bc4..413e0b56d 100644 --- a/eclair-node/src/main/resources/logback_colors.xml +++ b/eclair-node/src/main/resources/logback_colors.xml @@ -18,6 +18,15 @@ + + System.out + false + + %yellow(${HOSTNAME} %d) %highlight(%-5level) %logger{36} %X{akkaSource} - %red(%msg) %ex{12}%n + + + + System.out false @@ -45,6 +54,10 @@ + + + + diff --git a/eclair-node/src/main/scala/fr/acinq/eclair/Boot.scala b/eclair-node/src/main/scala/fr/acinq/eclair/Boot.scala index 75da63b24..a7c2b310a 100644 --- a/eclair-node/src/main/scala/fr/acinq/eclair/Boot.scala +++ b/eclair-node/src/main/scala/fr/acinq/eclair/Boot.scala @@ -8,14 +8,13 @@ import akka.http.scaladsl.Http import akka.stream.ActorMaterializer import akka.util.Timeout import com.typesafe.config.ConfigFactory -import fr.acinq.bitcoin.{Base58Check, BinaryData, MilliSatoshi, OP_CHECKSIG, OP_DUP, OP_EQUALVERIFY, OP_HASH160, OP_PUSHDATA, Satoshi} +import fr.acinq.bitcoin.{Base58Check, OP_CHECKSIG, OP_DUP, OP_EQUALVERIFY, OP_HASH160, OP_PUSHDATA} import fr.acinq.eclair.api.Service import fr.acinq.eclair.blockchain.peer.PeerClient import fr.acinq.eclair.blockchain.rpc.BitcoinJsonRPCClient import fr.acinq.eclair.blockchain.{ExtendedBitcoinClient, PeerWatcher} -import fr.acinq.eclair.channel._ import fr.acinq.eclair.gui.FxApp -import fr.acinq.eclair.io.{Client, Server} +import fr.acinq.eclair.io.{Server, Switchboard} import fr.acinq.eclair.payment._ import fr.acinq.eclair.router._ import grizzled.slf4j.Logging @@ -91,17 +90,15 @@ class Setup() extends Logging { val relayer = system.actorOf(Relayer.props(Globals.Node.privateKey, paymentHandler), name = "relayer") val router = system.actorOf(Router.props(watcher), name = "router") val paymentInitiator = system.actorOf(PaymentInitiator.props(Globals.Node.publicKey, router), "payment-initiator") - val register = system.actorOf(Register.props(watcher, router, relayer, finalScriptPubKey), name = "register") - val server = system.actorOf(Server.props(config.getString("eclair.server.host"), config.getInt("eclair.server.port"), register), "server") + val switchboard = system.actorOf(Switchboard.props(watcher, router, relayer, finalScriptPubKey), name = "switchboard") + val server = system.actorOf(Server.props(switchboard, new InetSocketAddress(config.getString("eclair.server.host"), config.getInt("eclair.server.port"))), "server") val _setup = this val api = new Service { - override val register: ActorRef = _setup.register + override val switchboard: ActorRef = _setup.switchboard override val router: ActorRef = _setup.router override val paymentHandler: ActorRef = _setup.paymentHandler override val paymentInitiator: ActorRef = _setup.paymentInitiator - - override def connect(host: String, port: Int, pubkey: BinaryData, fundingSatoshis: Satoshi, pushMsat: MilliSatoshi): Unit = system.actorOf(Client.props(host, port, pubkey, fundingSatoshis, pushMsat, register)) } Http().bindAndHandle(api.route, config.getString("eclair.api.host"), config.getInt("eclair.api.port")) onFailure { case t: Throwable => system.eventStream.publish(HTTPBindError) diff --git a/eclair-node/src/main/scala/fr/acinq/eclair/api/Service.scala b/eclair-node/src/main/scala/fr/acinq/eclair/api/Service.scala index fe8c63249..69e1ae16a 100644 --- a/eclair-node/src/main/scala/fr/acinq/eclair/api/Service.scala +++ b/eclair-node/src/main/scala/fr/acinq/eclair/api/Service.scala @@ -1,5 +1,7 @@ package fr.acinq.eclair.api +import java.net.InetSocketAddress + import akka.actor.ActorRef import akka.http.scaladsl.model.HttpMethods._ import akka.http.scaladsl.model.StatusCodes @@ -11,14 +13,15 @@ import akka.pattern.ask import akka.util.Timeout import de.heikoseeberger.akkahttpjson4s.Json4sSupport import de.heikoseeberger.akkahttpjson4s.Json4sSupport.ShouldWritePretty +import fr.acinq.bitcoin.Crypto.PublicKey import fr.acinq.bitcoin.{BinaryData, MilliSatoshi, Satoshi} import fr.acinq.eclair._ -import fr.acinq.eclair.channel.Register.{ListChannels, SendCommand} import fr.acinq.eclair.channel._ +import fr.acinq.eclair.io.Switchboard.{NewChannel, NewConnection} import fr.acinq.eclair.payment.CreatePayment import fr.acinq.eclair.router.ChannelDesc import grizzled.slf4j.Logging -import org.json4s.JsonAST.{JDouble, JInt, JString} +import org.json4s.JsonAST.{JInt, JString} import org.json4s.{JValue, jackson} import scala.concurrent.duration._ @@ -47,9 +50,7 @@ trait Service extends Logging { import Json4sSupport.{json4sMarshaller, json4sUnmarshaller} - def connect(host: String, port: Int, pubkey: BinaryData, fundingSatoshis: Satoshi, pushMsat: MilliSatoshi): Unit - - def register: ActorRef + def switchboard: ActorRef def router: ActorRef @@ -70,28 +71,31 @@ trait Service extends Logging { entity(as[JsonRPCBody]) { req => val f_res: Future[AnyRef] = req match { - case JsonRPCBody(_, _, "connect", JString(host) :: JInt(port) :: JString(pubkey) :: JInt(anchor_amount) :: Nil) => - connect(host, port.toInt, BinaryData(pubkey), Satoshi(anchor_amount.toLong), MilliSatoshi(0)) + case JsonRPCBody(_, _, "connect", JString(host) :: JInt(port) :: JString(pubkey) :: Nil) => + switchboard ! NewConnection(PublicKey(pubkey), new InetSocketAddress(host, port.toInt), None) + Future.successful("ok") + case JsonRPCBody(_, _, "open", JString(host) :: JInt(port) :: JString(pubkey) :: JInt(funding_amount) :: Nil) => + switchboard ! NewConnection(PublicKey(pubkey), new InetSocketAddress(host, port.toInt), Some(NewChannel(Satoshi(funding_amount.toLong), MilliSatoshi(0)))) Future.successful("ok") case JsonRPCBody(_, _, "info", _) => Future.successful(Status(Globals.Node.id)) - case JsonRPCBody(_, _, "list", _) => + /*case JsonRPCBody(_, _, "list", _) => (register ? ListChannels).mapTo[Iterable[ActorRef]] - .flatMap(l => Future.sequence(l.map(c => c ? CMD_GETINFO))) + .flatMap(l => Future.sequence(l.map(c => c ? CMD_GETINFO)))*/ case JsonRPCBody(_, _, "network", _) => (router ? 'channels).mapTo[Iterable[ChannelDesc]] case JsonRPCBody(_, _, "addhtlc", JInt(amount) :: JString(rhash) :: JString(nodeId) :: Nil) => (paymentInitiator ? CreatePayment(amount.toLong, BinaryData(rhash), BinaryData(nodeId))).mapTo[ChannelEvent] case JsonRPCBody(_, _, "genh", _) => (paymentHandler ? 'genh).mapTo[BinaryData] - case JsonRPCBody(_, _, "sign", JInt(channel) :: Nil) => + /*case JsonRPCBody(_, _, "sign", JInt(channel) :: Nil) => (register ? SendCommand(channel.toLong, CMD_SIGN)).mapTo[ActorRef].map(_ => "ok") case JsonRPCBody(_, _, "fulfillhtlc", JString(channel) :: JDouble(id) :: JString(r) :: Nil) => (register ? SendCommand(channel.toLong, CMD_FULFILL_HTLC(id.toLong, BinaryData(r), commit = true))).mapTo[ActorRef].map(_ => "ok") case JsonRPCBody(_, _, "close", JString(channel) :: JString(scriptPubKey) :: Nil) => (register ? SendCommand(channel.toLong, CMD_CLOSE(Some(scriptPubKey)))).mapTo[ActorRef].map(_ => "ok") case JsonRPCBody(_, _, "close", JString(channel) :: Nil) => - (register ? SendCommand(channel.toLong, CMD_CLOSE(None))).mapTo[ActorRef].map(_ => "ok") + (register ? SendCommand(channel.toLong, CMD_CLOSE(None))).mapTo[ActorRef].map(_ => "ok")*/ case JsonRPCBody(_, _, "help", _) => Future.successful(List( "info: display basic node information", diff --git a/eclair-node/src/main/scala/fr/acinq/eclair/channel/Channel.scala b/eclair-node/src/main/scala/fr/acinq/eclair/channel/Channel.scala index 3f482ae28..627ecce92 100644 --- a/eclair-node/src/main/scala/fr/acinq/eclair/channel/Channel.scala +++ b/eclair-node/src/main/scala/fr/acinq/eclair/channel/Channel.scala @@ -9,7 +9,7 @@ import fr.acinq.eclair.blockchain.peer.CurrentBlockCount import fr.acinq.eclair.channel.Helpers.{Closing, Funding} import fr.acinq.eclair.crypto.{Generators, ShaChain} import fr.acinq.eclair.payment.Binding -import fr.acinq.eclair.router.{Announcements, SendRoutingState} +import fr.acinq.eclair.router.Announcements import fr.acinq.eclair.transactions._ import fr.acinq.eclair.wire._ @@ -24,12 +24,12 @@ import scala.util.{Failure, Success, Try} */ object Channel { - def props(them: ActorRef, blockchain: ActorRef, router: ActorRef, relayer: ActorRef, localParams: LocalParams, theirNodeId: PublicKey, autoSignInterval: Option[FiniteDuration] = None) = Props(new Channel(them, blockchain, router, relayer, localParams, theirNodeId, autoSignInterval)) + def props(remote: ActorRef, blockchain: ActorRef, router: ActorRef, relayer: ActorRef, autoSignInterval: Option[FiniteDuration] = None) = Props(new Channel(remote, blockchain, router, relayer, autoSignInterval)) } -class Channel(val them: ActorRef, val blockchain: ActorRef, router: ActorRef, relayer: ActorRef, val localParams: LocalParams, remoteNodeId: PublicKey, autoSignInterval: Option[FiniteDuration] = None)(implicit ec: ExecutionContext = ExecutionContext.Implicits.global) extends LoggingFSM[State, Data] { +class Channel(val remote: ActorRef, val blockchain: ActorRef, router: ActorRef, relayer: ActorRef, autoSignInterval: Option[FiniteDuration] = None)(implicit ec: ExecutionContext = ExecutionContext.Implicits.global) extends LoggingFSM[State, Data] { - context.system.eventStream.publish(ChannelCreated(self, localParams, remoteNodeId)) + var remoteNodeId: PublicKey = null /* 8888888 888b 888 8888888 88888888888 @@ -69,25 +69,13 @@ class Channel(val them: ActorRef, val blockchain: ActorRef, router: ActorRef, re startWith(WAIT_FOR_INIT_INTERNAL, Nothing) when(WAIT_FOR_INIT_INTERNAL)(handleExceptions { - case Event(i@INPUT_INIT_FUNDER(fundingSatoshis, pushMsat), Nothing) if localParams.isFunder => - them ! Init(globalFeatures = localParams.globalFeatures, localFeatures = localParams.localFeatures) - goto(WAIT_FOR_INIT) using DATA_WAIT_FOR_INIT(localParams, Left(i), autoSignInterval) - - case Event(i@INPUT_INIT_FUNDEE(), Nothing) if !localParams.isFunder => - them ! Init(globalFeatures = localParams.globalFeatures, localFeatures = localParams.localFeatures) - goto(WAIT_FOR_INIT) using DATA_WAIT_FOR_INIT(localParams, Right(i), autoSignInterval) - }) - - when(WAIT_FOR_INIT)(handleExceptions { - case Event(Init(remoteGlobalFeatures, remoteLocalFeatures), DATA_WAIT_FOR_INIT(localParams, Left(initFunder), autoSignInterval)) => - val temporaryChannelId = Platform.currentTime + case Event(initFunder@INPUT_INIT_FUNDER(remoteNodeId, temporaryChannelId, fundingSatoshis, pushMsat, localParams, remoteInit), Nothing) => + this.remoteNodeId = remoteNodeId + context.system.eventStream.publish(ChannelCreated(self, localParams, remoteNodeId)) val firstPerCommitmentPoint = Generators.perCommitPoint(localParams.shaSeed, 0) - if (Features.requiresInitialRoutingSync(remoteLocalFeatures)) { - router ! SendRoutingState(them) - } - them ! OpenChannel(temporaryChannelId = temporaryChannelId, - fundingSatoshis = initFunder.fundingSatoshis, - pushMsat = initFunder.pushMsat, + remote ! OpenChannel(temporaryChannelId = temporaryChannelId, + fundingSatoshis = fundingSatoshis, + pushMsat = pushMsat, dustLimitSatoshis = localParams.dustLimitSatoshis, maxHtlcValueInFlightMsat = localParams.maxHtlcValueInFlightMsat, channelReserveSatoshis = localParams.channelReserveSatoshis, @@ -100,27 +88,26 @@ class Channel(val them: ActorRef, val blockchain: ActorRef, router: ActorRef, re paymentBasepoint = localParams.paymentKey.toPoint, delayedPaymentBasepoint = localParams.delayedPaymentKey.toPoint, firstPerCommitmentPoint = firstPerCommitmentPoint) - goto(WAIT_FOR_ACCEPT_CHANNEL) using DATA_WAIT_FOR_ACCEPT_CHANNEL(temporaryChannelId, localParams, fundingSatoshis = initFunder.fundingSatoshis, pushMsat = initFunder.pushMsat, remoteGlobalFeatures = remoteGlobalFeatures, remoteLocalFeatures = remoteLocalFeatures, autoSignInterval = autoSignInterval) + goto(WAIT_FOR_ACCEPT_CHANNEL) using DATA_WAIT_FOR_ACCEPT_CHANNEL(initFunder, autoSignInterval = autoSignInterval) - case Event(Init(remoteGlobalFeatures, remoteLocalFeatures), DATA_WAIT_FOR_INIT(localParams, Right(initFundee), autoSignInterval)) => - if (Features.requiresInitialRoutingSync(remoteLocalFeatures)) { - router ! SendRoutingState(them) - } - goto(WAIT_FOR_OPEN_CHANNEL) using DATA_WAIT_FOR_OPEN_CHANNEL(localParams, remoteGlobalFeatures = remoteGlobalFeatures, remoteLocalFeatures = remoteLocalFeatures, autoSignInterval = autoSignInterval) + case Event(inputFundee@INPUT_INIT_FUNDEE(remoteNodeId, _, localParams, _), Nothing) if !localParams.isFunder => + this.remoteNodeId = remoteNodeId + context.system.eventStream.publish(ChannelCreated(self, localParams, remoteNodeId)) + goto(WAIT_FOR_OPEN_CHANNEL) using DATA_WAIT_FOR_OPEN_CHANNEL(inputFundee, autoSignInterval = autoSignInterval) }) when(WAIT_FOR_OPEN_CHANNEL)(handleExceptions { - case Event(open: OpenChannel, DATA_WAIT_FOR_OPEN_CHANNEL(localParams, remoteGlobalFeatures, remoteLocalFeatures, autoSignInterval)) => + case Event(open: OpenChannel, DATA_WAIT_FOR_OPEN_CHANNEL(INPUT_INIT_FUNDEE(_, _, localParams, remoteInit), autoSignInterval)) => Try(Funding.validateParams(open.channelReserveSatoshis, open.fundingSatoshis)) match { case Failure(t) => log.warning(t.getMessage) - them ! Error(open.temporaryChannelId, t.getMessage.getBytes) + remote ! Error(open.temporaryChannelId, t.getMessage.getBytes) goto(CLOSED) case Success(_) => // TODO: maybe also check uniqueness of temporary channel id val minimumDepth = Globals.mindepth_blocks val firstPerCommitmentPoint = Generators.perCommitPoint(localParams.shaSeed, 0) - them ! AcceptChannel(temporaryChannelId = Platform.currentTime, + remote ! AcceptChannel(temporaryChannelId = open.temporaryChannelId, dustLimitSatoshis = localParams.dustLimitSatoshis, maxHtlcValueInFlightMsat = localParams.maxHtlcValueInFlightMsat, channelReserveSatoshis = localParams.channelReserveSatoshis, @@ -145,8 +132,8 @@ class Channel(val them: ActorRef, val blockchain: ActorRef, router: ActorRef, re revocationBasepoint = open.revocationBasepoint, paymentBasepoint = open.paymentBasepoint, delayedPaymentBasepoint = open.delayedPaymentBasepoint, - globalFeatures = remoteGlobalFeatures, - localFeatures = remoteLocalFeatures) + globalFeatures = remoteInit.globalFeatures, + localFeatures = remoteInit.localFeatures) log.debug(s"remote params: $remoteParams") val params = ChannelParams( localParams = localParams.copy(feeratePerKw = open.feeratePerKw), // funder gets to choose the first feerate @@ -165,11 +152,11 @@ class Channel(val them: ActorRef, val blockchain: ActorRef, router: ActorRef, re }) when(WAIT_FOR_ACCEPT_CHANNEL)(handleExceptions { - case Event(accept: AcceptChannel, DATA_WAIT_FOR_ACCEPT_CHANNEL(temporaryChannelId, localParams, fundingSatoshis, pushMsat, remoteGlobalFeatures, remoteLocalFeatures, autoSignInterval)) => + case Event(accept: AcceptChannel, DATA_WAIT_FOR_ACCEPT_CHANNEL(INPUT_INIT_FUNDER(_, temporaryChannelId, fundingSatoshis, pushMsat, localParams, remoteInit), autoSignInterval)) => Try(Funding.validateParams(accept.channelReserveSatoshis, fundingSatoshis)) match { case Failure(t) => log.warning(t.getMessage) - them ! Error(temporaryChannelId, t.getMessage.getBytes) + remote ! Error(temporaryChannelId, t.getMessage.getBytes) goto(CLOSED) case _ => // TODO: check equality of temporaryChannelId? or should be done upstream @@ -185,9 +172,8 @@ class Channel(val them: ActorRef, val blockchain: ActorRef, router: ActorRef, re revocationBasepoint = accept.revocationBasepoint, paymentBasepoint = accept.paymentBasepoint, delayedPaymentBasepoint = accept.delayedPaymentBasepoint, - globalFeatures = remoteGlobalFeatures, - localFeatures = remoteLocalFeatures - ) + globalFeatures = remoteInit.globalFeatures, + localFeatures = remoteInit.localFeatures) log.debug(s"remote params: $remoteParams") val params = ChannelParams( localParams = localParams, @@ -215,8 +201,8 @@ class Channel(val them: ActorRef, val blockchain: ActorRef, router: ActorRef, re // let's create the first commitment tx that spends the yet uncommitted funding tx val (localSpec, localCommitTx, remoteSpec, remoteCommitTx) = Funding.makeFirstCommitTxs(params, pushMsat, fundingTx.hash, fundingTxOutputIndex, remoteFirstPerCommitmentPoint) - val localSigOfRemoteTx = Transactions.sign(remoteCommitTx, localParams.fundingPrivKey) // signature of their initial commitment tx that pays them pushMsat - them ! FundingCreated( + val localSigOfRemoteTx = Transactions.sign(remoteCommitTx, params.localParams.fundingPrivKey) // signature of their initial commitment tx that pays remote pushMsat + remote ! FundingCreated( temporaryChannelId = temporaryChannelId, txid = fundingTx.hash, outputIndex = fundingTxOutputIndex, @@ -237,18 +223,18 @@ class Channel(val them: ActorRef, val blockchain: ActorRef, router: ActorRef, re val (localSpec, localCommitTx, remoteSpec, remoteCommitTx) = Funding.makeFirstCommitTxs(params, pushMsat, fundingTxHash, fundingTxOutputIndex, remoteFirstPerCommitmentPoint) // check remote signature validity - val localSigOfLocalTx = Transactions.sign(localCommitTx, localParams.fundingPrivKey) + val localSigOfLocalTx = Transactions.sign(localCommitTx, params.localParams.fundingPrivKey) val signedLocalCommitTx = Transactions.addSigs(localCommitTx, params.localParams.fundingPrivKey.publicKey, params.remoteParams.fundingPubKey, localSigOfLocalTx, remoteSig) Transactions.checkSpendable(signedLocalCommitTx) match { case Failure(cause) => log.error(cause, "their FundingCreated message contains an invalid signature") - them ! Error(temporaryChannelId, cause.getMessage.getBytes) + remote ! Error(temporaryChannelId, cause.getMessage.getBytes) // we haven't anything at stake yet, we can just stop goto(CLOSED) case Success(_) => log.info(s"signing remote tx: $remoteCommitTx") - val localSigOfRemoteTx = Transactions.sign(remoteCommitTx, localParams.fundingPrivKey) - them ! FundingSigned( + val localSigOfRemoteTx = Transactions.sign(remoteCommitTx, params.localParams.fundingPrivKey) + remote ! FundingSigned( temporaryChannelId = temporaryChannelId, signature = localSigOfRemoteTx ) @@ -278,12 +264,12 @@ class Channel(val them: ActorRef, val blockchain: ActorRef, router: ActorRef, re when(WAIT_FOR_FUNDING_SIGNED)(handleExceptions { case Event(FundingSigned(_, remoteSig), DATA_WAIT_FOR_FUNDING_SIGNED(temporaryChannelId, params, fundingTx, localSpec, localCommitTx, remoteCommit)) => // we make sure that their sig checks out and that our first commit tx is spendable - val localSigOfLocalTx = Transactions.sign(localCommitTx, localParams.fundingPrivKey) + val localSigOfLocalTx = Transactions.sign(localCommitTx, params.localParams.fundingPrivKey) val signedLocalCommitTx = Transactions.addSigs(localCommitTx, params.localParams.fundingPrivKey.publicKey, params.remoteParams.fundingPubKey, localSigOfLocalTx, remoteSig) Transactions.checkSpendable(signedLocalCommitTx) match { case Failure(cause) => log.error(cause, "their FundingSigned message contains an invalid signature") - them ! Error(temporaryChannelId, cause.getMessage.getBytes) + remote ! Error(temporaryChannelId, cause.getMessage.getBytes) // we haven't published anything yet, we can just stop goto(CLOSED) case Success(_) => @@ -318,15 +304,15 @@ class Channel(val them: ActorRef, val blockchain: ActorRef, router: ActorRef, re case Event(WatchEventConfirmed(BITCOIN_FUNDING_DEPTHOK, blockHeight, txIndex), d@DATA_WAIT_FOR_FUNDING_LOCKED_INTERNAL(temporaryChannelId, params, commitments, deferred)) => val channelId = toShortId(blockHeight, txIndex, commitments.commitInput.outPoint.index.toInt) blockchain ! WatchLost(self, commitments.anchorId, params.minimumDepth, BITCOIN_FUNDING_LOST) - val nextPerCommitmentPoint = Generators.perCommitPoint(localParams.shaSeed, 1) - them ! FundingLocked(temporaryChannelId, channelId, nextPerCommitmentPoint) + val nextPerCommitmentPoint = Generators.perCommitPoint(params.localParams.shaSeed, 1) + remote ! FundingLocked(temporaryChannelId, channelId, nextPerCommitmentPoint) deferred.map(self ! _) // TODO: htlcIdx should not be 0 when resuming connection goto(WAIT_FOR_FUNDING_LOCKED) using DATA_NORMAL(params, commitments.copy(channelId = channelId), None) // TODO: not implemented, maybe should be done with a state timer and not a blockchain watch? case Event(BITCOIN_FUNDING_TIMEOUT, _) => - them ! Error(0, "Funding tx timed out".getBytes) + remote ! Error(0, "Funding tx timed out".getBytes) goto(CLOSED) case Event(WatchEventSpent(BITCOIN_FUNDING_SPENT, tx: Transaction), d: DATA_WAIT_FOR_FUNDING_LOCKED_INTERNAL) if tx.txid == d.commitments.remoteCommit.txid => handleRemoteSpentCurrent(tx, d) @@ -365,7 +351,7 @@ class Channel(val them: ActorRef, val blockchain: ActorRef, router: ActorRef, re context.system.eventStream.subscribe(self, classOf[CurrentBlockCount]) if (Features.isChannelPublic(d.params.localParams.localFeatures) && Features.isChannelPublic(d.params.remoteParams.localFeatures)) { val (localNodeSig, localBitcoinSig) = Announcements.signChannelAnnouncement(d.channelId, Globals.Node.privateKey, remoteNodeId, d.params.localParams.fundingPrivKey, d.params.remoteParams.fundingPubKey) - them ! AnnouncementSignatures(d.channelId, localNodeSig, localBitcoinSig) + remote ! AnnouncementSignatures(d.channelId, localNodeSig, localBitcoinSig) goto(WAIT_FOR_ANN_SIGNATURES) using d.copy(commitments = d.commitments.copy(remoteNextCommitInfo = Right(nextPerCommitmentPoint))) } else { log.info(s"channel ${d.channelId} won't be announced") @@ -509,7 +495,7 @@ class Channel(val them: ActorRef, val blockchain: ActorRef, router: ActorRef, re case Event(msg@CommitSig(_, theirSig, theirHtlcSigs), d: DATA_NORMAL) => Try(Commitments.receiveCommit(d.commitments, msg)) match { case Success((commitments1, revocation)) => - them ! revocation + remote ! revocation // now that we have their sig, we should propagate the htlcs newly received (commitments1.localCommit.spec.htlcs -- d.commitments.localCommit.spec.htlcs) .filter(_.direction == IN) @@ -552,18 +538,18 @@ class Channel(val them: ActorRef, val blockchain: ActorRef, router: ActorRef, re // first if we have pending changes, we need to commit them val commitments2 = if (Commitments.localHasChanges(commitments)) { val (commitments1, commit) = Commitments.sendCommit(d.commitments) - them ! commit + remote ! commit commitments1 } else commitments val shutdown = Shutdown(d.channelId, Script.write(params.localParams.defaultFinalScriptPubKey)) - them ! shutdown + remote ! shutdown (shutdown, commitments2) }) match { case Success((localShutdown, commitments3)) if (commitments3.remoteNextCommitInfo.isRight && commitments3.localCommit.spec.htlcs.size == 0 && commitments3.localCommit.spec.htlcs.size == 0) || (commitments3.remoteNextCommitInfo.isLeft && commitments3.localCommit.spec.htlcs.size == 0 && commitments3.remoteNextCommitInfo.left.get.spec.htlcs.size == 0) => val closingSigned = Closing.makeFirstClosingTx(params, commitments3, localShutdown.scriptPubKey, remoteShutdown.scriptPubKey) - them ! closingSigned + remote ! closingSigned goto(NEGOTIATING) using DATA_NEGOTIATING(params, commitments3, localShutdown, remoteShutdown, closingSigned) case Success((localShutdown, commitments3)) => goto(SHUTDOWN) using DATA_SHUTDOWN(params, commitments3, localShutdown, remoteShutdown) @@ -643,12 +629,12 @@ class Channel(val them: ActorRef, val blockchain: ActorRef, router: ActorRef, re // TODO: we might have to propagate htlcs upstream depending on the outcome of https://github.com/ElementsProject/lightning/issues/29 Try(Commitments.receiveCommit(d.commitments, msg)) match { case Success((commitments1, revocation)) if commitments1.hasNoPendingHtlcs => - them ! revocation + remote ! revocation val closingSigned = Closing.makeFirstClosingTx(params, commitments1, localShutdown.scriptPubKey, remoteShutdown.scriptPubKey) - them ! closingSigned + remote ! closingSigned goto(NEGOTIATING) using DATA_NEGOTIATING(params, commitments1, localShutdown, remoteShutdown, closingSigned) case Success((commitments1, revocation)) => - them ! revocation + remote ! revocation stay using d.copy(commitments = commitments1) case Failure(cause) => handleLocalError(cause, d) } @@ -659,7 +645,7 @@ class Channel(val them: ActorRef, val blockchain: ActorRef, router: ActorRef, re Try(Commitments.receiveRevocation(d.commitments, msg)) match { case Success(commitments1) if commitments1.hasNoPendingHtlcs => val closingSigned = Closing.makeFirstClosingTx(params, commitments, localShutdown.scriptPubKey, remoteShutdown.scriptPubKey) - them ! closingSigned + remote ! closingSigned goto(NEGOTIATING) using DATA_NEGOTIATING(params, commitments1, localShutdown, remoteShutdown, closingSigned) case Success(commitments1) => stay using d.copy(commitments = commitments1) @@ -694,7 +680,7 @@ class Channel(val them: ActorRef, val blockchain: ActorRef, router: ActorRef, re case Success(signedClosingTx) => val nextClosingFee = Closing.nextClosingFee(Satoshi(d.localClosingSigned.feeSatoshis), Satoshi(remoteClosingFee)) val (_, closingSigned) = Closing.makeClosingTx(d.params, d.commitments, d.localShutdown.scriptPubKey, d.remoteShutdown.scriptPubKey, nextClosingFee) - them ! closingSigned + remote ! closingSigned if (nextClosingFee == Satoshi(remoteClosingFee)) { publishMutualClosing(signedClosingTx) goto(CLOSING) using DATA_CLOSING(d.commitments, ourSignature = Some(closingSigned), mutualClosePublished = Some(signedClosingTx)) @@ -747,13 +733,13 @@ class Channel(val them: ActorRef, val blockchain: ActorRef, router: ActorRef, re case Event(e: Error, d: DATA_CLOSING) => stay // nothing to do, there is already a spending tx published } - when(CLOSED, stateTimeout = 30 seconds) { + when(CLOSED, stateTimeout = 10 seconds) { case Event(StateTimeout, _) => log.info("shutting down") stop(FSM.Normal) } - when(ERR_INFORMATION_LEAK, stateTimeout = 30 seconds) { + when(ERR_INFORMATION_LEAK, stateTimeout = 10 seconds) { case Event(StateTimeout, _) => log.info("shutting down") stop(FSM.Normal) @@ -761,10 +747,6 @@ class Channel(val them: ActorRef, val blockchain: ActorRef, router: ActorRef, re whenUnhandled { - case Event(msg: RoutingMessage, _) => - router forward msg - stay - case Event(WatchEventLost(BITCOIN_FUNDING_LOST), _) => goto(ERR_FUNDING_LOST) case Event(CMD_GETSTATE, _) => @@ -778,8 +760,8 @@ class Channel(val them: ActorRef, val blockchain: ActorRef, router: ActorRef, re case Event(CMD_GETINFO, _) => sender ! RES_GETINFO(remoteNodeId, stateData match { // TODO - case c: DATA_WAIT_FOR_OPEN_CHANNEL => 0L - case c: DATA_WAIT_FOR_ACCEPT_CHANNEL => c.temporaryChannelId + case c: DATA_WAIT_FOR_OPEN_CHANNEL => c.initFundee.temporaryChannelId + case c: DATA_WAIT_FOR_ACCEPT_CHANNEL => c.initFunder.temporaryChannelId case c: DATA_WAIT_FOR_FUNDING_CREATED => c.temporaryChannelId case c: DATA_WAIT_FOR_FUNDING_LOCKED_INTERNAL => c.temporaryChannelId case c: DATA_NORMAL => c.commitments.channelId @@ -810,7 +792,7 @@ class Channel(val them: ActorRef, val blockchain: ActorRef, router: ActorRef, re */ def handleCommandSuccess(sender: ActorRef, msg: LightningMessage, newData: Data) = { - them ! msg + remote ! msg if (sender != self) { sender ! "ok" } @@ -825,7 +807,7 @@ class Channel(val them: ActorRef, val blockchain: ActorRef, router: ActorRef, re def handleLocalError(cause: Throwable, d: HasCommitments) = { log.error(cause, "") - them ! Error(0, cause.getMessage.getBytes) + remote ! Error(0, cause.getMessage.getBytes) spendLocalCurrent(d) } @@ -925,7 +907,7 @@ class Channel(val them: ActorRef, val blockchain: ActorRef, router: ActorRef, re log.warning(s"txid=${ tx.txid } was a revoked commitment, publishing the penalty tx") - them ! Error(0, "Funding tx has been spent".getBytes) + remote ! Error(0, "Funding tx has been spent".getBytes) // TODO hardcoded mindepth + shouldn't we watch the claim tx instead? blockchain ! WatchConfirmed(self, tx.txid, 3, BITCOIN_PENALTY_DONE) @@ -956,7 +938,7 @@ class Channel(val them: ActorRef, val blockchain: ActorRef, router: ActorRef, re d.commitments.anchorId } was spent !!") // TODO! channel id - them ! Error(0, "Funding tx has been spent".getBytes) + remote ! Error(0, "Funding tx has been spent".getBytes) // TODO: not enough val commitTx = d.commitments.localCommit.publishableTxs.commitTx.tx blockchain ! PublishAsap(commitTx) diff --git a/eclair-node/src/main/scala/fr/acinq/eclair/channel/ChannelTypes.scala b/eclair-node/src/main/scala/fr/acinq/eclair/channel/ChannelTypes.scala index 07f7adff0..73665ceb2 100644 --- a/eclair-node/src/main/scala/fr/acinq/eclair/channel/ChannelTypes.scala +++ b/eclair-node/src/main/scala/fr/acinq/eclair/channel/ChannelTypes.scala @@ -5,7 +5,7 @@ import fr.acinq.bitcoin.{BinaryData, ScriptElt, Transaction} import fr.acinq.eclair.payment.{Local, Origin} import fr.acinq.eclair.transactions.CommitmentSpec import fr.acinq.eclair.transactions.Transactions.CommitTx -import fr.acinq.eclair.wire.{ClosingSigned, FundingLocked, Shutdown, UpdateAddHtlc} +import fr.acinq.eclair.wire.{ClosingSigned, FundingLocked, Init, Shutdown, UpdateAddHtlc} import scala.concurrent.duration.FiniteDuration @@ -28,7 +28,6 @@ import scala.concurrent.duration.FiniteDuration */ sealed trait State case object WAIT_FOR_INIT_INTERNAL extends State -case object WAIT_FOR_INIT extends State case object WAIT_FOR_OPEN_CHANNEL extends State case object WAIT_FOR_ACCEPT_CHANNEL extends State case object WAIT_FOR_FUNDING_CREATED_INTERNAL extends State @@ -57,8 +56,8 @@ case object ERR_INFORMATION_LEAK extends State 8888888888 Y8P 8888888888 888 Y888 888 "Y8888P" */ -case class INPUT_INIT_FUNDER(fundingSatoshis: Long, pushMsat: Long) -case class INPUT_INIT_FUNDEE() +case class INPUT_INIT_FUNDER(remoteNodeId: PublicKey, temporaryChannelId: Long, fundingSatoshis: Long, pushMsat: Long, localParams: LocalParams, remoteInit: Init) +case class INPUT_INIT_FUNDEE(remoteNodeId: PublicKey, temporaryChannelId: Long, localParams: LocalParams, remoteInit: Init) case object INPUT_NO_MORE_HTLCS // when requesting a mutual close, we wait for as much as this timeout, then unilateral close case object INPUT_CLOSE_COMPLETE_TIMEOUT @@ -121,9 +120,8 @@ case class LocalCommitPublished(commitTx: Transaction, claimMainDelayedOutputTx: case class RemoteCommitPublished(commitTx: Transaction, claimMainOutputTx: Option[Transaction], claimHtlcSuccessTxs: Seq[Transaction], claimHtlcTimeoutTxs: Seq[Transaction]) case class RevokedCommitPublished(commitTx: Transaction, claimMainOutputTx: Option[Transaction], mainPenaltyTx: Option[Transaction], claimHtlcTimeoutTxs: Seq[Transaction], htlcTimeoutTxs: Seq[Transaction], htlcPenaltyTxs: Seq[Transaction]) -final case class DATA_WAIT_FOR_INIT(localParams: LocalParams, internalInit: Either[INPUT_INIT_FUNDER, INPUT_INIT_FUNDEE], autoSignInterval: Option[FiniteDuration]) extends Data -final case class DATA_WAIT_FOR_OPEN_CHANNEL(localParams: LocalParams, remoteGlobalFeatures: BinaryData, remoteLocalFeatures: BinaryData, autoSignInterval: Option[FiniteDuration]) extends Data -final case class DATA_WAIT_FOR_ACCEPT_CHANNEL(temporaryChannelId: Long, localParams: LocalParams, fundingSatoshis: Long, pushMsat: Long, remoteGlobalFeatures: BinaryData, remoteLocalFeatures: BinaryData, autoSignInterval: Option[FiniteDuration]) extends Data +final case class DATA_WAIT_FOR_OPEN_CHANNEL(initFundee: INPUT_INIT_FUNDEE, autoSignInterval: Option[FiniteDuration]) extends Data +final case class DATA_WAIT_FOR_ACCEPT_CHANNEL(initFunder: INPUT_INIT_FUNDER, autoSignInterval: Option[FiniteDuration]) extends Data final case class DATA_WAIT_FOR_FUNDING_INTERNAL(temporaryChannelId: Long, params: ChannelParams, pushMsat: Long, remoteFirstPerCommitmentPoint: Point) extends Data final case class DATA_WAIT_FOR_FUNDING_CREATED(temporaryChannelId: Long, params: ChannelParams, pushMsat: Long, remoteFirstPerCommitmentPoint: Point) extends Data final case class DATA_WAIT_FOR_FUNDING_SIGNED(temporaryChannelId: Long, params: ChannelParams, fundingTx: Transaction, localSpec: CommitmentSpec, localCommitTx: CommitTx, remoteCommit: RemoteCommit) extends Data diff --git a/eclair-node/src/main/scala/fr/acinq/eclair/channel/Register.scala b/eclair-node/src/main/scala/fr/acinq/eclair/channel/Register.scala index 8cd2b8b4f..19124a0c2 100644 --- a/eclair-node/src/main/scala/fr/acinq/eclair/channel/Register.scala +++ b/eclair-node/src/main/scala/fr/acinq/eclair/channel/Register.scala @@ -1,16 +1,7 @@ package fr.acinq.eclair.channel -import akka.actor.{Props, _} -import akka.util.Timeout -import fr.acinq.bitcoin.Crypto.{PrivateKey, PublicKey} -import fr.acinq.bitcoin.{BinaryData, DeterministicWallet, MilliSatoshi, Satoshi, ScriptElt} -import fr.acinq.eclair.Globals -import fr.acinq.eclair.crypto.Noise.KeyPair -import fr.acinq.eclair.crypto.TransportHandler -import fr.acinq.eclair.io.LightningMessageSerializer -import fr.acinq.eclair.wire.LightningMessage - -import scala.concurrent.duration._ +import akka.actor.{ActorContext, ActorPath, ActorSystem, Props} +import fr.acinq.bitcoin.BinaryData /** * Created by PM on 26/01/2016. @@ -32,62 +23,16 @@ import scala.concurrent.duration._ * ├── client (0..m, transient) * └── api */ -class Register(watcher: ActorRef, router: ActorRef, relayer: ActorRef, defaultFinalScriptPubKey: Seq[ScriptElt]) extends Actor with ActorLogging { +/*class Register(watcher: ActorRef, router: ActorRef, relayer: ActorRef, defaultFinalScriptPubKey: Seq[ScriptElt]) extends Actor with ActorLogging { import Register._ def receive: Receive = main(0L) def main(counter: Long): Receive = { - case CreateChannel(connection, pubkey, funding_opt, pushmsat_opt) => - def generateKey(index: Long): PrivateKey = DeterministicWallet.derivePrivateKey(Globals.Node.extendedPrivateKey, index :: counter :: Nil).privateKey - - val localParams = LocalParams( - dustLimitSatoshis = 542, - maxHtlcValueInFlightMsat = Long.MaxValue, - channelReserveSatoshis = 0, - htlcMinimumMsat = 0, - feeratePerKw = Globals.feeratePerKw, - toSelfDelay = Globals.delay_blocks, - maxAcceptedHtlcs = 100, - fundingPrivKey = generateKey(0), - revocationSecret = generateKey(1), - paymentKey = generateKey(2), - delayedPaymentKey = generateKey(3), - defaultFinalScriptPubKey = defaultFinalScriptPubKey, - shaSeed = Globals.Node.seed, - isFunder = funding_opt.isDefined, - globalFeatures = Globals.global_features, - localFeatures = Globals.local_features - ) - - def makeChannel(conn: ActorRef, publicKey: PublicKey, ctx: ActorContext): ActorRef = { - // note that we use transport's context and not register's context - val channel = ctx.actorOf(Channel.props(conn, watcher, router, relayer, localParams, publicKey, Some(Globals.autosign_interval)), "channel") - funding_opt match { - case Some(funding) => pushmsat_opt match { - case Some(pushmsat) => channel ! INPUT_INIT_FUNDER(funding.amount, pushmsat.amount) - case None => channel ! INPUT_INIT_FUNDER(funding.amount, 0) - } - case None => channel ! INPUT_INIT_FUNDEE() - } - channel - } - - val transportHandler = context.actorOf(Props( - new TransportHandler[LightningMessage]( - KeyPair(Globals.Node.publicKey.toBin, Globals.Node.privateKey.toBin), - pubkey, - isWriter = funding_opt.isDefined, - them = connection, - listenerFactory = makeChannel, - serializer = LightningMessageSerializer)), - name = s"transport-handler-${counter}") - - connection ! akka.io.Tcp.Register(transportHandler) - context.become(main(counter + 1)) case ListChannels => sender ! context.children + case SendCommand(channelId, cmd) => val s = sender implicit val timeout = Timeout(30 seconds) @@ -97,21 +42,20 @@ class Register(watcher: ActorRef, router: ActorRef, relayer: ActorRef, defaultFi actor }) } -} +}*/ object Register { - def props(blockchain: ActorRef, router: ActorRef, relayer: ActorRef, defaultFinalScriptPubKey: Seq[ScriptElt]) = Props(classOf[Register], blockchain, router, relayer, defaultFinalScriptPubKey) + /*def props(blockchain: ActorRef, router: ActorRef, relayer: ActorRef, defaultFinalScriptPubKey: Seq[ScriptElt]) = Props(classOf[Register], blockchain, router, relayer, defaultFinalScriptPubKey) // @formatter:off - case class CreateChannel(connection: ActorRef, pubkey: Option[BinaryData], fundingSatoshis: Option[Satoshi], pushMsat: Option[MilliSatoshi]) - + case class CreateChannel(remoteNodeId: PublicKey, fundingSatoshis: Satoshi, pushMsat: MilliSatoshi) + case class ConnectionEstablished(connection: ActorRef, createChannel_opt: Option[CreateChannel]) + case class HandshakeCompleted(transport: ActorRef, remoteNodeId: PublicKey) case class ListChannels() - case class SendCommand(channelId: Long, cmd: Command) - // @formatter:on - +*/ /** * Once it reaches NORMAL state, channel creates a [[fr.acinq.eclair.channel.AliasActor]] * which name is counterparty_id-anchor_id @@ -119,19 +63,21 @@ object Register { def createAlias(nodeAddress: BinaryData, channelId: Long)(implicit context: ActorContext) = context.actorOf(Props(new AliasActor(context.self)), name = s"$nodeAddress-${java.lang.Long.toUnsignedString(channelId)}") - def actorPathToNodeAddress(system: ActorSystem, nodeAddress: BinaryData): ActorPath = + /*def actorPathToNodeAddress(system: ActorSystem, nodeAddress: BinaryData): ActorPath = system / "register" / "transport-handler-*" / "channel" / s"$nodeAddress-*" def actorPathToNodeAddress(nodeAddress: BinaryData)(implicit context: ActorContext): ActorPath = actorPathToNodeAddress(context.system, nodeAddress) - +*/ def actorPathToChannelId(system: ActorSystem, channelId: Long): ActorPath = - system / "register" / "transport-handler-*" / "channel" / s"*-${channelId}" + system / "switchboard" / "peer-*" / "*" / s"*-${channelId}" def actorPathToChannelId(channelId: Long)(implicit context: ActorContext): ActorPath = actorPathToChannelId(context.system, channelId) - def actorPathToChannels()(implicit context: ActorContext): ActorPath = - context.system / "register" / "transport-handler-*" / "channel" + /*def actorPathToChannels()(implicit context: ActorContext): ActorPath = + context.system / "register" / "transport-handler-*" / "channel"*/ + + def actorPathToPeers()(implicit context: ActorContext): ActorPath = + context.system / "switchboard" / "peer-*" + - def actorPathToTransportHandlers()(implicit context: ActorContext): ActorPath = - context.system / "register" / "transport-handler-*" } \ No newline at end of file diff --git a/eclair-node/src/main/scala/fr/acinq/eclair/crypto/TransportHandler.scala b/eclair-node/src/main/scala/fr/acinq/eclair/crypto/TransportHandler.scala index ffe24c849..64358a68f 100644 --- a/eclair-node/src/main/scala/fr/acinq/eclair/crypto/TransportHandler.scala +++ b/eclair-node/src/main/scala/fr/acinq/eclair/crypto/TransportHandler.scala @@ -2,13 +2,11 @@ package fr.acinq.eclair.crypto import java.nio.ByteOrder -import akka.actor.{Actor, ActorContext, ActorRef, LoggingFSM, Terminated} -import akka.actor.{Actor, ActorRef, LoggingFSM, OneForOneStrategy, SupervisorStrategy, Terminated} +import akka.actor.{Actor, ActorRef, FSM, LoggingFSM, Terminated} import akka.io.Tcp.{PeerClosed, _} import akka.util.ByteString import fr.acinq.bitcoin.Crypto.PublicKey import fr.acinq.bitcoin.{BinaryData, Protocol} -import fr.acinq.eclair.channel.{CMD_CLOSE, Command} import fr.acinq.eclair.crypto.Noise._ import scala.annotation.tailrec @@ -24,26 +22,24 @@ import scala.util.{Failure, Success, Try} * Once the initial handshake has been completed successfully, the handler will create a listener actor with the * provided factory, and will forward it all decrypted messages * - * @param keyPair private/public key pair for this node - * @param rs remote node static public key (which must be known before we initiate communication) - * @param them actor htat represents the other node's - * @param isWriter if true we initiate the dialog - * @param listenerFactory factory that will be used to create the listener that will receive decrypted messages once the - * handshake phase as been completed. Its parameters are a tuple (transport handler, remote public key) + * @param keyPair private/public key pair for this node + * @param rs remote node static public key (which must be known before we initiate communication) + * @param connection actor that represents the other node's */ -class TransportHandler[T: ClassTag](keyPair: KeyPair, rs: Option[BinaryData], them: ActorRef, isWriter: Boolean, listenerFactory: (ActorRef, PublicKey, ActorContext) => ActorRef, serializer: TransportHandler.Serializer[T]) extends Actor with LoggingFSM[TransportHandler.State, TransportHandler.Data] { +class TransportHandler[T: ClassTag](keyPair: KeyPair, rs: Option[BinaryData], connection: ActorRef, serializer: TransportHandler.Serializer[T]) extends Actor with LoggingFSM[TransportHandler.State, TransportHandler.Data] { import TransportHandler._ - if (isWriter) require(rs.isDefined) + // it means we initiate the dialog + val isWriter = rs.isDefined - context.watch(them) + context.watch(connection) val reader = if (isWriter) { val state = makeWriter(keyPair, rs.get) val (state1, message, None) = state.write(BinaryData.empty) log.debug(s"sending prefix + $message") - them ! Write(TransportHandler.prefix +: message) + connection ! Write(TransportHandler.prefix +: message) state1 } else { makeReader(keyPair) @@ -72,11 +68,10 @@ class TransportHandler[T: ClassTag](keyPair: KeyPair, rs: Option[BinaryData], th reader.read(payload) match { case (writer, _, Some((dec, enc, ck))) => - val listener = listenerFactory(self, PublicKey(writer.rs), context) - context.watch(listener) - val (nextStateData, plaintextMessages) = WaitingForCyphertextData(ExtendedCipherState(enc, ck), ExtendedCipherState(dec, ck), None, remainder, listener).decrypt - sendToListener(listener, plaintextMessages) - goto(WaitingForCyphertext) using nextStateData + val remoteNodeId = PublicKey(writer.rs) + context.parent ! HandshakeCompleted(self, remoteNodeId) + val nextStateData = WaitingForListenerData(ExtendedCipherState(enc, ck), ExtendedCipherState(dec, ck), remainder) + goto(WaitingForListener) using nextStateData case (writer, _, None) => { writer.write(BinaryData.empty) match { @@ -84,68 +79,61 @@ class TransportHandler[T: ClassTag](keyPair: KeyPair, rs: Option[BinaryData], th // we're still in the middle of the handshake process and the other end must first received our next // message before they can reply require(remainder.isEmpty, "unexpected additional data received during handshake") - them ! Write(TransportHandler.prefix +: message) + connection ! Write(TransportHandler.prefix +: message) stay using HandshakeData(reader1, remainder) } case (_, message, Some((enc, dec, ck))) => { - them ! Write(TransportHandler.prefix +: message) - val listener = listenerFactory(self, PublicKey(writer.rs), context) - context.watch(listener) - val (nextStateData, plaintextMessages) = WaitingForCyphertextData(ExtendedCipherState(enc, ck), ExtendedCipherState(dec, ck), None, remainder, listener).decrypt - sendToListener(listener, plaintextMessages) - goto(WaitingForCyphertext) using nextStateData + connection ! Write(TransportHandler.prefix +: message) + val remoteNodeId = PublicKey(writer.rs) + context.parent ! HandshakeCompleted(self, remoteNodeId) + val nextStateData = WaitingForListenerData(ExtendedCipherState(enc, ck), ExtendedCipherState(dec, ck), remainder) + goto(WaitingForListener) using nextStateData } } } } } + } + + when(WaitingForListener) { + + case Event(Received(data), currentStateData@WaitingForListenerData(enc, dec, buffer)) => + stay using currentStateData.copy(buffer = buffer ++ data) + + case Event(Listener(listener), WaitingForListenerData(enc, dec, buffer)) => + val (nextStateData, plaintextMessages) = WaitingForCyphertextData(enc, dec, None, buffer, listener).decrypt + context.watch(listener) + sendToListener(listener, plaintextMessages) + goto(WaitingForCyphertext) using nextStateData - case Event(ErrorClosed(cause), _) => - log.warning(s"connection closed: $cause") - context.stop(self) - stay() } when(WaitingForCyphertext) { - case Event(Terminated(actor), WaitingForCyphertextData(_, _, _, _, listener)) if actor == listener => - context.stop(self) - stay() - case Event(Received(data), currentStateData@WaitingForCyphertextData(enc, dec, length, buffer, listener)) => val (nextStateData, plaintextMessages) = WaitingForCyphertextData.decrypt(currentStateData.copy(buffer = buffer ++ data)) sendToListener(listener, plaintextMessages) stay using nextStateData - case Event(plaintext: BinaryData, WaitingForCyphertextData(enc, dec, length, buffer, listener)) => - val (enc1, ciphertext) = TransportHandler.encrypt(enc, plaintext) - them ! Write(ByteString.fromArray(ciphertext.toArray)) - stay using WaitingForCyphertextData(enc1, dec, length, buffer, listener) - case Event(t: T, WaitingForCyphertextData(enc, dec, length, buffer, listener)) => val blob = serializer.serialize(t) val (enc1, ciphertext) = TransportHandler.encrypt(enc, blob) - them ! Write(ByteString.fromArray(ciphertext.toArray)) + connection ! Write(ByteString.fromArray(ciphertext.toArray)) stay using WaitingForCyphertextData(enc1, dec, length, buffer, listener) - - case Event(cmd: Command, WaitingForCyphertextData(_, _, _, _, listener)) => - listener forward cmd - stay - - case Event(ErrorClosed(cause), WaitingForCyphertextData(_, _, _, _, listener)) => - // we transform connection closed events into application error so that it triggers a uniclose - log.error(s"tcp connection error: $cause") - listener ! fr.acinq.eclair.wire.Error(0, cause.getBytes("UTF-8")) - stay - - case Event(PeerClosed, WaitingForCyphertextData(_, _, _, _, listener)) => - listener ! fr.acinq.eclair.wire.Error(0, "peer closed".getBytes("UTF-8")) - stay } whenUnhandled { - case Event(Terminated(actor), _) if actor == them => - log.warning("peer closed") - stay() + case Event(ErrorClosed(cause), _) => + // we transform connection closed events into application error so that it triggers a uniclose + log.warning(s"tcp connection error: $cause") + stop(FSM.Normal) + + case Event(PeerClosed, _) => + log.warning(s"connection closed") + stop(FSM.Normal) + + case Event(Terminated(actor), _) if actor == connection => + // this can be the connection or the listener, either way it is a cause of death + stop(FSM.Normal) case Event(message, _) => log.warning(s"unhandled $message") @@ -209,11 +197,15 @@ object TransportHandler { localStatic, KeyPair(BinaryData.empty, BinaryData.empty), BinaryData.empty, BinaryData.empty, Noise.Secp256k1DHFunctions, Noise.Chacha20Poly1305CipherFunctions, Noise.SHA256HashFunctions) + // @formatter:off sealed trait State - case object Handshake extends State - + case object WaitingForListener extends State case object WaitingForCyphertext extends State + // @formatter:on + + case class Listener(listener: ActorRef) + case class HandshakeCompleted(transport: ActorRef, remoteNodeId: PublicKey) sealed trait Data @@ -261,6 +253,8 @@ object TransportHandler { } } + case class WaitingForListenerData(enc: CipherState, dec: CipherState, buffer: ByteString) extends Data + case class WaitingForCyphertextData(enc: CipherState, dec: CipherState, ciphertextLength: Option[Int], buffer: ByteString, listener: ActorRef) extends Data { def decrypt: (WaitingForCyphertextData, Seq[BinaryData]) = WaitingForCyphertextData.decrypt(this) } diff --git a/eclair-node/src/main/scala/fr/acinq/eclair/gui/Handlers.scala b/eclair-node/src/main/scala/fr/acinq/eclair/gui/Handlers.scala index b6a5a86b4..4762973e4 100644 --- a/eclair-node/src/main/scala/fr/acinq/eclair/gui/Handlers.scala +++ b/eclair-node/src/main/scala/fr/acinq/eclair/gui/Handlers.scala @@ -4,16 +4,19 @@ package fr.acinq.eclair.gui import java.awt.TrayIcon.MessageType import java.awt.{SystemTray, TrayIcon} import java.io.{File, FileWriter} +import java.net.InetSocketAddress import java.time.LocalDateTime import java.time.format.{DateTimeFormatter, FormatStyle} import javafx.application.Platform import javafx.scene.control.TextArea import akka.pattern.ask +import fr.acinq.bitcoin.Crypto.PublicKey import fr.acinq.bitcoin.{BinaryData, MilliSatoshi, Satoshi} import fr.acinq.eclair._ import fr.acinq.eclair.gui.utils.GUIValidators import fr.acinq.eclair.io.Client +import fr.acinq.eclair.io.Switchboard.{NewChannel, NewConnection} import fr.acinq.eclair.payment.CreatePayment import grizzled.slf4j.Logging @@ -28,9 +31,14 @@ class Handlers(setup: Setup, trayIcon: TrayIcon) extends Logging { def open(hostPort: String, fundingSatoshis: Satoshi, pushMsat: MilliSatoshi) = { hostPort match { - case GUIValidators.hostRegex(pubkey, host, port) => - logger.info(s"connecting to $host:$port") - system.actorOf(Client.props(host, port.toInt, pubkey, fundingSatoshis, pushMsat, register)) + case GUIValidators.hostRegex(remoteNodeId, host, port) => + logger.info(s"opening a channel with remoteNodeId=$remoteNodeId") + (setup.switchboard ? NewConnection(PublicKey(remoteNodeId), new InetSocketAddress(host, port.toInt), Some(NewChannel(fundingSatoshis, pushMsat)))).mapTo[String].onComplete { + case Success(s) => {} + case Failure(t) => + val message = s"$host:$port" + notification("Connection failed", message, TrayIcon.MessageType.WARNING) + } case _ => {} } } diff --git a/eclair-node/src/main/scala/fr/acinq/eclair/io/Client.scala b/eclair-node/src/main/scala/fr/acinq/eclair/io/Client.scala index 4947b5321..6487d1f0b 100644 --- a/eclair-node/src/main/scala/fr/acinq/eclair/io/Client.scala +++ b/eclair-node/src/main/scala/fr/acinq/eclair/io/Client.scala @@ -2,36 +2,57 @@ package fr.acinq.eclair.io import java.net.InetSocketAddress -import akka.actor._ +import akka.actor.{Props, _} import akka.io.{IO, Tcp} -import fr.acinq.bitcoin.{BinaryData, MilliSatoshi, Satoshi} -import fr.acinq.eclair.channel.Register.CreateChannel +import fr.acinq.bitcoin.Crypto.PublicKey +import fr.acinq.eclair.Globals +import fr.acinq.eclair.crypto.Noise.KeyPair +import fr.acinq.eclair.crypto.TransportHandler +import fr.acinq.eclair.crypto.TransportHandler.HandshakeCompleted +import fr.acinq.eclair.wire.LightningMessage /** * Created by PM on 27/10/2015. */ -class Client(remote: InetSocketAddress, pubkey: BinaryData, fundingSatoshis: Satoshi, pushMsat: MilliSatoshi, register: ActorRef) extends Actor with ActorLogging { +class Client(switchboard: ActorRef, address: InetSocketAddress, remoteNodeId: PublicKey, origin: ActorRef) extends Actor with ActorLogging { import Tcp._ import context.system - IO(Tcp) ! Connect(remote) + IO(Tcp) ! Connect(address) def receive = { - case CommandFailed(_: Connect) => context stop self + case CommandFailed(_: Connect) => + origin ! Status.Failure(new RuntimeException("connection failed")) + context stop self - case c@Connected(remote, local) => + case Connected(remote, _) => log.info(s"connected to $remote") - val connection = sender() - register ! CreateChannel(connection, Some(pubkey), Some(fundingSatoshis), Some(pushMsat)) - // TODO: kill this actor ? + val connection = sender + val transport = context.actorOf(Props( + new TransportHandler[LightningMessage]( + KeyPair(Globals.Node.publicKey.toBin, Globals.Node.privateKey.toBin), + Some(remoteNodeId), + connection = connection, + serializer = LightningMessageSerializer))) + connection ! akka.io.Tcp.Register(transport) + context watch transport + context become connected(transport) + } + + def connected(transport: ActorRef): Receive = { + case Terminated(actor) if actor == transport => + origin ! Status.Failure(new RuntimeException("authentication failed")) + context stop self + + case h: HandshakeCompleted => + origin ! "connected" + switchboard ! h } } object Client extends App { - def props(address: InetSocketAddress, pubkey: BinaryData, fundingSatoshis: Satoshi, pushMsat: MilliSatoshi, register: ActorRef): Props = Props(classOf[Client], address, pubkey, fundingSatoshis, pushMsat, register) - - def props(host: String, port: Int, pubkey: BinaryData, fundingSatoshis: Satoshi, pushMsat: MilliSatoshi, register: ActorRef): Props = Props(classOf[Client], new InetSocketAddress(host, port), pubkey, fundingSatoshis, pushMsat, register) + def props(switchboard: ActorRef, address: InetSocketAddress, remoteNodeId: PublicKey, origin: ActorRef): Props = Props(classOf[Client], switchboard, address, remoteNodeId, origin) } diff --git a/eclair-node/src/main/scala/fr/acinq/eclair/io/LightningMessageSerializer.scala b/eclair-node/src/main/scala/fr/acinq/eclair/io/LightningMessageSerializer.scala index c89248168..c54084ea0 100644 --- a/eclair-node/src/main/scala/fr/acinq/eclair/io/LightningMessageSerializer.scala +++ b/eclair-node/src/main/scala/fr/acinq/eclair/io/LightningMessageSerializer.scala @@ -22,4 +22,4 @@ object LightningMessageSerializer extends TransportHandler.Serializer[LightningM case Attempt.Successful(DecodeResult(msg, _)) => msg case Attempt.Failure(cause) => throw new RuntimeException(s"deserialization error: $cause") } -} +} \ No newline at end of file diff --git a/eclair-node/src/main/scala/fr/acinq/eclair/io/Peer.scala b/eclair-node/src/main/scala/fr/acinq/eclair/io/Peer.scala new file mode 100644 index 000000000..dd43762fe --- /dev/null +++ b/eclair-node/src/main/scala/fr/acinq/eclair/io/Peer.scala @@ -0,0 +1,182 @@ +package fr.acinq.eclair.io + +import java.io.File +import java.net.InetSocketAddress + +import akka.actor.{ActorRef, LoggingFSM, PoisonPill, Props, Terminated} +import fr.acinq.bitcoin.Crypto.{PrivateKey, PublicKey} +import fr.acinq.bitcoin.{DeterministicWallet, ScriptElt} +import fr.acinq.eclair.channel.{Channel, INPUT_INIT_FUNDEE, INPUT_INIT_FUNDER, LocalParams} +import fr.acinq.eclair.crypto.TransportHandler.{HandshakeCompleted, Listener} +import fr.acinq.eclair.io.Switchboard.{NewChannel, NewConnection} +import fr.acinq.eclair.router.SendRoutingState +import fr.acinq.eclair.wire._ +import fr.acinq.eclair.{Features, Globals} + +import scala.compat.Platform + +// @formatter:off + +case object Reconnect +//case class ChannelIdSwitch(previousId: Long, nextId: Long) + +sealed trait OfflineChannel +case class BrandNewChannel(c: NewChannel) extends OfflineChannel +case class ColdChannel(f: File) extends OfflineChannel +case class HotChannel(a: ActorRef, channelId: Long) extends OfflineChannel + +sealed trait Data +case class DisconnectedData(offlineChannels: Seq[OfflineChannel]) extends Data +case class InitializingData(transport: ActorRef, offlineChannels: Seq[OfflineChannel]) extends Data +case class ConnectedData(transport: ActorRef, remoteInit: Init, channels: Map[Long, ActorRef]) extends Data + +sealed trait State +case object DISCONNECTED extends State +case object INITIALIZING extends State +case object CONNECTED extends State + +// @formatter:on + +/** + * Created by PM on 26/08/2016. + */ +class Peer(remoteNodeId: PublicKey, address_opt: Option[InetSocketAddress], watcher: ActorRef, router: ActorRef, relayer: ActorRef, defaultFinalScriptPubKey: Seq[ScriptElt]) extends LoggingFSM[State, Data] { + + import Peer._ + + startWith(DISCONNECTED, DisconnectedData(Nil)) + + when(DISCONNECTED) { + case Event(c: NewChannel, d@DisconnectedData(offlineChannels)) => + stay using d.copy(offlineChannels = offlineChannels :+ BrandNewChannel(c)) + + case Event(Reconnect, _) if address_opt.isDefined => + context.parent ! NewConnection(remoteNodeId, address_opt.get, None) + stay + + case Event(HandshakeCompleted(transport, _), DisconnectedData(channels)) => + log.info(s"registering as a listener to $transport") + transport ! Listener(self) + context watch transport + transport ! Init(globalFeatures = Globals.global_features, localFeatures = Globals.local_features) + goto(INITIALIZING) using InitializingData(transport, channels) + } + + when(INITIALIZING) { + case Event(Reconnect, _) => stay + + case Event(c: NewChannel, d@InitializingData(_, offlineChannels)) => + stay using d.copy(offlineChannels = offlineChannels :+ BrandNewChannel(c)) + + case Event(remoteInit: Init, InitializingData(transport, channels)) => + import fr.acinq.eclair.Features._ + log.info(s"$remoteNodeId has features: channelPublic=${isChannelPublic(remoteInit.localFeatures)} initialRoutingSync=${requiresInitialRoutingSync(remoteInit.localFeatures)}") + if (Features.requiresInitialRoutingSync(remoteInit.localFeatures)) { + router ! SendRoutingState(transport) + } + // let's bring existing/requested channels online + channels.map { + case BrandNewChannel(c) => self ! c + } + goto(CONNECTED) using ConnectedData(transport, remoteInit, Map()) + + case Event(Terminated(actor), InitializingData(transport, channels)) if actor == transport => + log.warning(s"lost connection to $remoteNodeId") + goto(DISCONNECTED) using DisconnectedData(channels) + } + + when(CONNECTED) { + case Event(Reconnect, _) => stay + + case Event(c: NewChannel, d@ConnectedData(transport, remoteInit, channels)) => + log.info(s"requesting a new channel to $remoteNodeId with fundingSatoshis=${c.fundingSatoshis} and pushMsat=${c.pushMsat}") + val temporaryChannelId = Platform.currentTime + val (channel, localParams) = createChannel(transport, temporaryChannelId, funder = true) + channel ! INPUT_INIT_FUNDER(remoteNodeId, temporaryChannelId, c.fundingSatoshis.amount, c.pushMsat.amount, localParams, remoteInit) + stay using d.copy(channels = channels + (temporaryChannelId -> channel)) + + case Event(msg: OpenChannel, d@ConnectedData(transport, remoteInit, channels)) => + log.info(s"accepting a new channel to $remoteNodeId") + val temporaryChannelId = msg.temporaryChannelId + val (channel, localParams) = createChannel(transport, temporaryChannelId, funder = false) + channel ! INPUT_INIT_FUNDEE(remoteNodeId, temporaryChannelId, localParams, remoteInit) + channel ! msg + stay using d.copy(channels = channels + (temporaryChannelId -> channel)) + + case Event(msg@FundingLocked(previousId, nextId, _), d@ConnectedData(_, _, channels)) if channels.contains(previousId) => + log.info(s"channel id switch: previousId=$previousId nextId=$nextId") + val channel = channels(previousId) + channel forward msg + //TODO: what if nextIds are different + stay using d.copy(channels = channels - previousId + (nextId -> channel)) + + case Event(msg: HasTemporaryChannelId, ConnectedData(_, _, channels)) if channels.contains(msg.temporaryChannelId) => + val channel = channels(msg.temporaryChannelId) + channel forward msg + stay + + case Event(msg: HasChannelId, ConnectedData(_, _, channels)) if channels.contains(msg.channelId) => + val channel = channels(msg.channelId) + channel forward msg + stay + + case Event(msg: RoutingMessage, ConnectedData(transport, _, _)) if sender == router=> + transport forward msg + stay + + case Event(msg: RoutingMessage, _) => + router forward msg + stay + + case Event(Terminated(actor), ConnectedData(transport, _, channels)) if actor == transport => + log.warning(s"lost connection to $remoteNodeId") + channels.values.foreach(_ ! Error(0, "peer disconnected".getBytes("UTF-8"))) + // TODO: no persistence yet, all channels are lost + goto(DISCONNECTED) using DisconnectedData(Nil) + + case Event(Terminated(actor), d@ConnectedData(transport, _, channels)) if channels.values.toSet.contains(actor) => + val channelId = channels.find(_._2 == actor).get._1 + log.info(s"channel closed: channelId=$channelId") + if (channels.size == 1) { + log.info(s"that was the last channel open, closing the connection") + transport ! PoisonPill + } + stay using d.copy(channels = channels - channelId) + } + + def createChannel(transport: ActorRef, temporaryChannelId: Long, funder: Boolean): (ActorRef, LocalParams) = { + val localParams = makeChannelParams(temporaryChannelId, defaultFinalScriptPubKey, funder) + val channel = context.actorOf(Channel.props(transport, watcher, router, relayer, Some(Globals.autosign_interval)), s"channel-$temporaryChannelId") + context watch channel + (channel, localParams) + } + +} + +object Peer { + + def props(remoteNodeId: PublicKey, address_opt: Option[InetSocketAddress], watcher: ActorRef, router: ActorRef, relayer: ActorRef, defaultFinalScriptPubKey: Seq[ScriptElt]) = Props(classOf[Peer], remoteNodeId, address_opt, watcher, router, relayer, defaultFinalScriptPubKey) + + def generateKey(keyPath: Seq[Long]): PrivateKey = DeterministicWallet.derivePrivateKey(Globals.Node.extendedPrivateKey, keyPath).privateKey + + def makeChannelParams(keyIndex: Long, defaultFinalScriptPubKey: Seq[ScriptElt], isFunder: Boolean): LocalParams = + LocalParams( + dustLimitSatoshis = 542, + maxHtlcValueInFlightMsat = Long.MaxValue, + channelReserveSatoshis = 0, + htlcMinimumMsat = 0, + feeratePerKw = Globals.feeratePerKw, + toSelfDelay = Globals.delay_blocks, + maxAcceptedHtlcs = 100, + fundingPrivKey = generateKey(keyIndex :: 0L :: Nil), + revocationSecret = generateKey(keyIndex :: 1L :: Nil), + paymentKey = generateKey(keyIndex :: 2L :: Nil), + delayedPaymentKey = generateKey(keyIndex :: 3L :: Nil), + defaultFinalScriptPubKey = defaultFinalScriptPubKey, + shaSeed = Globals.Node.seed, + isFunder = isFunder, + globalFeatures = Globals.global_features, + localFeatures = Globals.local_features + ) + +} diff --git a/eclair-node/src/main/scala/fr/acinq/eclair/io/Server.scala b/eclair-node/src/main/scala/fr/acinq/eclair/io/Server.scala index e84ebf0e4..4f244e1d5 100644 --- a/eclair-node/src/main/scala/fr/acinq/eclair/io/Server.scala +++ b/eclair-node/src/main/scala/fr/acinq/eclair/io/Server.scala @@ -4,39 +4,52 @@ import java.net.InetSocketAddress import akka.actor.{Actor, ActorLogging, ActorRef, Props} import akka.io.{IO, Tcp} -import fr.acinq.eclair.TCPBindError -import fr.acinq.eclair.channel.Register.CreateChannel +import fr.acinq.eclair.crypto.Noise.KeyPair +import fr.acinq.eclair.crypto.TransportHandler +import fr.acinq.eclair.crypto.TransportHandler.HandshakeCompleted +import fr.acinq.eclair.wire.LightningMessage +import fr.acinq.eclair.{Globals, TCPBindError} /** * Created by PM on 27/10/2015. */ -class Server(address: InetSocketAddress, register: ActorRef) extends Actor with ActorLogging { +class Server(switchboard: ActorRef, address: InetSocketAddress) extends Actor with ActorLogging { import Tcp._ import context.system IO(Tcp) ! Bind(self, address) - def receive = { - case b@Bound(localAddress) => - log.info(s"bound on $b") + def receive() = main(Set()) + + def main(transports: Set[ActorRef]): Receive = { + case Bound(localAddress) => + log.info(s"bound on $localAddress") case CommandFailed(_: Bind) => system.eventStream.publish(TCPBindError) context stop self - case c@Connected(remote, local) => + case Connected(remote, _) => log.info(s"connected to $remote") - val connection = sender() - register ! CreateChannel(connection, None, None, None) + val connection = sender + val transport = context.actorOf(Props( + new TransportHandler[LightningMessage]( + KeyPair(Globals.Node.publicKey.toBin, Globals.Node.privateKey.toBin), + None, + connection = connection, + serializer = LightningMessageSerializer))) + connection ! akka.io.Tcp.Register(transport) + context become main(transports + transport) + + case h: HandshakeCompleted => + switchboard ! h } } -object Server extends App { +object Server { - def props(address: InetSocketAddress, register: ActorRef): Props = Props(classOf[Server], address, register) - - def props(host: String, port: Int, register: ActorRef): Props = Props(classOf[Server], new InetSocketAddress(host, port), register) + def props(switchboard: ActorRef, address: InetSocketAddress): Props = Props(classOf[Server], switchboard, address) } diff --git a/eclair-node/src/main/scala/fr/acinq/eclair/io/Switchboard.scala b/eclair-node/src/main/scala/fr/acinq/eclair/io/Switchboard.scala new file mode 100644 index 000000000..8b9123e9c --- /dev/null +++ b/eclair-node/src/main/scala/fr/acinq/eclair/io/Switchboard.scala @@ -0,0 +1,68 @@ +package fr.acinq.eclair.io + +import java.net.InetSocketAddress + +import akka.actor.{Actor, ActorLogging, ActorRef, Props, Status, Terminated} +import fr.acinq.bitcoin.Crypto.PublicKey +import fr.acinq.bitcoin.{MilliSatoshi, Satoshi, ScriptElt} +import fr.acinq.eclair.Globals +import fr.acinq.eclair.crypto.TransportHandler.HandshakeCompleted + +/** + * Ties network connections to peers. + * Created by PM on 14/02/2017. + */ +class Switchboard(watcher: ActorRef, router: ActorRef, relayer: ActorRef, defaultFinalScriptPubKey: Seq[ScriptElt]) extends Actor with ActorLogging { + + import Switchboard._ + + def receive: Receive = main(Map(), Map()) + + def main(peers: Map[PublicKey, ActorRef], connections: Map[PublicKey, ActorRef]): Receive = { + + case NewConnection(Globals.Node.publicKey, _, _) => + sender ! Status.Failure(new RuntimeException("cannot open connection with oneself")) + + case NewConnection(remoteNodeId, address, newChannel_opt) => + val connection = connections.get(remoteNodeId) match { + case Some(connection) => + log.info(s"connection to $remoteNodeId is already in progress") + connection + case None => + log.info(s"connecting to $remoteNodeId @ $address") + val connection = context.actorOf(Client.props(self, address, remoteNodeId, sender)) + context watch(connection) + connection + } + val peer = peers.get(remoteNodeId) match { + case Some(peer) => peer + case None => createPeer(remoteNodeId, Some(address)) + } + newChannel_opt.map (peer forward _) + context become main(peers + (remoteNodeId -> peer), connections + (remoteNodeId -> connection)) + + case Terminated(actor) if connections.values.toSet.contains(actor) => + log.info(s"$actor is dead, removing from connections") + val remoteNodeId = connections.find(_._2 == actor).get._1 + context become main(peers, connections - remoteNodeId) + + case h@HandshakeCompleted(_, remoteNodeId) => + val peer = peers.getOrElse(remoteNodeId, createPeer(remoteNodeId, None)) + peer forward h + context become main(peers + (remoteNodeId -> peer), connections) + + } + + def createPeer(remoteNodeId: PublicKey, address_opt: Option[InetSocketAddress]) = context.actorOf(Peer.props(remoteNodeId, address_opt, watcher, router, relayer, defaultFinalScriptPubKey), name = s"peer-$remoteNodeId") +} + +object Switchboard { + + def props(watcher: ActorRef, router: ActorRef, relayer: ActorRef, defaultFinalScriptPubKey: Seq[ScriptElt]) = Props(classOf[Switchboard], watcher, router, relayer, defaultFinalScriptPubKey) + + // @formatter:off + case class NewChannel(fundingSatoshis: Satoshi, pushMsat: MilliSatoshi) + case class NewConnection(remoteNodeId: PublicKey, address: InetSocketAddress, newChannel_opt: Option[NewChannel]) + // @formatter:on + +} \ No newline at end of file diff --git a/eclair-node/src/main/scala/fr/acinq/eclair/payment/PaymentLifecycle.scala b/eclair-node/src/main/scala/fr/acinq/eclair/payment/PaymentLifecycle.scala index 1896f0ab6..ef6565770 100644 --- a/eclair-node/src/main/scala/fr/acinq/eclair/payment/PaymentLifecycle.scala +++ b/eclair-node/src/main/scala/fr/acinq/eclair/payment/PaymentLifecycle.scala @@ -95,10 +95,10 @@ object PaymentLifecycle { val payloadsbin: Seq[BinaryData] = payloads .map(Codecs.perHopPayloadCodec.encode(_)) - .map { - case Attempt.Successful(bitVector) => BinaryData(bitVector.toByteArray) - case Attempt.Failure(cause) => throw new RuntimeException(s"serialization error: $cause") - } :+ BinaryData("00" * 20) + .map { + case Attempt.Successful(bitVector) => BinaryData(bitVector.toByteArray) + case Attempt.Failure(cause) => throw new RuntimeException(s"serialization error: $cause") + } :+ BinaryData("00" * 20) Sphinx.makePacket(sessionKey, pubkeys, payloadsbin, associatedData) } @@ -106,7 +106,7 @@ object PaymentLifecycle { /** * * @param finalAmountMsat the final htlc amount in millisatoshis - * @param hops the hops as computed by the router + * @param hops the hops as computed by the router * @return a (firstAmountMsat, firstExpiry, payloads) tuple where: * - firstAmountMsat is the amount for the first htlc in the route * - firstExpiry is the cltv expiry for the first htlc in the route diff --git a/eclair-node/src/main/scala/fr/acinq/eclair/router/Router.scala b/eclair-node/src/main/scala/fr/acinq/eclair/router/Router.scala index 384e6ec35..ee5eae86e 100644 --- a/eclair-node/src/main/scala/fr/acinq/eclair/router/Router.scala +++ b/eclair-node/src/main/scala/fr/acinq/eclair/router/Router.scala @@ -170,7 +170,7 @@ class Router(watcher: ActorRef) extends Actor with ActorLogging { case 'tick_broadcast => log.info(s"broadcasting ${rebroadcast.size} routing messages") - rebroadcast.foreach(context.actorSelection(Register.actorPathToTransportHandlers) ! _) + rebroadcast.foreach(context.actorSelection(Register.actorPathToPeers) ! _) context become main(nodes, channels, updates, Nil, awaiting, stash) case 'nodes => sender ! nodes.values diff --git a/eclair-node/src/main/scala/fr/acinq/eclair/wire/Types.scala b/eclair-node/src/main/scala/fr/acinq/eclair/wire/Types.scala index 339c6b460..791316717 100644 --- a/eclair-node/src/main/scala/fr/acinq/eclair/wire/Types.scala +++ b/eclair-node/src/main/scala/fr/acinq/eclair/wire/Types.scala @@ -14,8 +14,10 @@ sealed trait LightningMessage sealed trait SetupMessage extends LightningMessage sealed trait ChannelMessage extends LightningMessage sealed trait HtlcMessage extends LightningMessage -sealed trait UpdateMessage extends HtlcMessage // <- not in the spec sealed trait RoutingMessage extends LightningMessage +sealed trait HasTemporaryChannelId extends LightningMessage { def temporaryChannelId: Long } // <- not in the spec +sealed trait HasChannelId extends LightningMessage { def channelId: Long } // <- not in the spec +sealed trait UpdateMessage extends HtlcMessage // <- not in the spec // @formatter:on case class Init(globalFeatures: BinaryData, @@ -38,7 +40,7 @@ case class OpenChannel(temporaryChannelId: Long, revocationBasepoint: Point, paymentBasepoint: Point, delayedPaymentBasepoint: Point, - firstPerCommitmentPoint: Point) extends ChannelMessage + firstPerCommitmentPoint: Point) extends ChannelMessage with HasTemporaryChannelId case class AcceptChannel(temporaryChannelId: Long, dustLimitSatoshis: Long, @@ -52,53 +54,53 @@ case class AcceptChannel(temporaryChannelId: Long, revocationBasepoint: Point, paymentBasepoint: Point, delayedPaymentBasepoint: Point, - firstPerCommitmentPoint: Point) extends ChannelMessage + firstPerCommitmentPoint: Point) extends ChannelMessage with HasTemporaryChannelId case class FundingCreated(temporaryChannelId: Long, txid: BinaryData, outputIndex: Int, - signature: BinaryData) extends ChannelMessage + signature: BinaryData) extends ChannelMessage with HasTemporaryChannelId case class FundingSigned(temporaryChannelId: Long, - signature: BinaryData) extends ChannelMessage + signature: BinaryData) extends ChannelMessage with HasTemporaryChannelId case class FundingLocked(temporaryChannelId: Long, channelId: Long, - nextPerCommitmentPoint: Point) extends ChannelMessage + nextPerCommitmentPoint: Point) extends ChannelMessage with HasTemporaryChannelId case class Shutdown(channelId: Long, - scriptPubKey: BinaryData) extends ChannelMessage + scriptPubKey: BinaryData) extends ChannelMessage with HasChannelId case class ClosingSigned(channelId: Long, feeSatoshis: Long, - signature: BinaryData) extends ChannelMessage + signature: BinaryData) extends ChannelMessage with HasChannelId case class UpdateAddHtlc(channelId: Long, id: Long, amountMsat: Long, expiry: Long, paymentHash: BinaryData, - onionRoutingPacket: BinaryData) extends HtlcMessage with UpdateMessage + onionRoutingPacket: BinaryData) extends HtlcMessage with UpdateMessage with HasChannelId case class UpdateFulfillHtlc(channelId: Long, id: Long, - paymentPreimage: BinaryData) extends HtlcMessage with UpdateMessage + paymentPreimage: BinaryData) extends HtlcMessage with UpdateMessage with HasChannelId case class UpdateFailHtlc(channelId: Long, id: Long, - reason: BinaryData) extends HtlcMessage with UpdateMessage + reason: BinaryData) extends HtlcMessage with UpdateMessage with HasChannelId case class CommitSig(channelId: Long, signature: BinaryData, - htlcSignatures: List[BinaryData]) extends HtlcMessage + htlcSignatures: List[BinaryData]) extends HtlcMessage with HasChannelId case class RevokeAndAck(channelId: Long, perCommitmentSecret: Scalar, nextPerCommitmentPoint: Point, - htlcTimeoutSignatures: List[BinaryData]) extends HtlcMessage + htlcTimeoutSignatures: List[BinaryData]) extends HtlcMessage with HasChannelId case class UpdateFee(channelId: Long, - feeratePerKw: Long) extends ChannelMessage with UpdateMessage + feeratePerKw: Long) extends ChannelMessage with UpdateMessage with HasChannelId case class ChannelAnnouncement(nodeSignature1: BinaryData, nodeSignature2: BinaryData, @@ -130,7 +132,7 @@ case class ChannelUpdate(signature: BinaryData, case class AnnouncementSignatures(channelId: Long, nodeSignature: BinaryData, - bitcoinSignature: BinaryData) extends RoutingMessage + bitcoinSignature: BinaryData) extends RoutingMessage with HasChannelId case class PerHopPayload(amt_to_forward: Long, outgoing_cltv_value: Int) \ No newline at end of file diff --git a/eclair-node/src/test/scala/fr/acinq/eclair/channel/ThroughputSpec.scala b/eclair-node/src/test/scala/fr/acinq/eclair/channel/ThroughputSpec.scala index cd9b38b3d..774321b5a 100644 --- a/eclair-node/src/test/scala/fr/acinq/eclair/channel/ThroughputSpec.scala +++ b/eclair-node/src/test/scala/fr/acinq/eclair/channel/ThroughputSpec.scala @@ -9,7 +9,7 @@ import fr.acinq.eclair.TestConstants.{Alice, Bob} import fr.acinq.eclair._ import fr.acinq.eclair.blockchain._ import fr.acinq.eclair.payment.Relayer -import fr.acinq.eclair.wire.UpdateAddHtlc +import fr.acinq.eclair.wire.{Init, UpdateAddHtlc} import org.junit.runner.RunWith import org.scalatest.FunSuite import org.scalatest.junit.JUnitRunner @@ -52,10 +52,13 @@ class ThroughputSpec extends FunSuite { context.become(run(h2r - htlc.paymentHash)) } }), "payment-handler") - val router: ActorRef = ??? val relayer = system.actorOf(Relayer.props(Globals.Node.privateKey, paymentHandler)) - val alice = system.actorOf(Channel.props(pipe, blockchain, ???, relayer, Alice.channelParams, Bob.id), "a") - val bob = system.actorOf(Channel.props(pipe, blockchain, ???, relayer, Bob.channelParams, Alice.id), "b") + val alice = system.actorOf(Channel.props(pipe, blockchain, ???, relayer), "a") + val bob = system.actorOf(Channel.props(pipe, blockchain, ???, relayer), "b") + val aliceInit = Init(Alice.channelParams.globalFeatures, Alice.channelParams.localFeatures) + val bobInit = Init(Bob.channelParams.globalFeatures, Bob.channelParams.localFeatures) + alice ! INPUT_INIT_FUNDER(Bob.id, 0, TestConstants.fundingSatoshis, TestConstants.pushMsat, Alice.channelParams, bobInit) + bob ! INPUT_INIT_FUNDEE(Alice.id, 0, Bob.channelParams, aliceInit) val latch = new CountDownLatch(2) val listener = system.actorOf(Props(new Actor { diff --git a/eclair-node/src/test/scala/fr/acinq/eclair/channel/states/StateTestsHelperMethods.scala b/eclair-node/src/test/scala/fr/acinq/eclair/channel/states/StateTestsHelperMethods.scala index bb82fc238..7514b185d 100644 --- a/eclair-node/src/test/scala/fr/acinq/eclair/channel/states/StateTestsHelperMethods.scala +++ b/eclair-node/src/test/scala/fr/acinq/eclair/channel/states/StateTestsHelperMethods.scala @@ -4,6 +4,7 @@ import akka.actor.ActorRef import akka.testkit.{TestFSMRef, TestKitBase, TestProbe} import fr.acinq.bitcoin.{BinaryData, Crypto} import fr.acinq.eclair.TestConstants +import fr.acinq.eclair.TestConstants.{Alice, Bob} import fr.acinq.eclair.blockchain._ import fr.acinq.eclair.channel._ import fr.acinq.eclair.wire._ @@ -22,12 +23,10 @@ trait StateTestsHelperMethods extends TestKitBase { blockchainA: ActorRef, alice2blockchain: TestProbe, bob2blockchain: TestProbe): Unit = { - alice ! INPUT_INIT_FUNDER(TestConstants.fundingSatoshis, TestConstants.pushMsat) - bob ! INPUT_INIT_FUNDEE() - alice2bob.expectMsgType[Init] - alice2bob.forward(bob) - bob2alice.expectMsgType[Init] - bob2alice.forward(alice) + val aliceInit = Init(Alice.channelParams.globalFeatures, Alice.channelParams.localFeatures) + val bobInit = Init(Bob.channelParams.globalFeatures, Bob.channelParams.localFeatures) + alice ! INPUT_INIT_FUNDER(Bob.id, 0, TestConstants.fundingSatoshis, TestConstants.pushMsat, Alice.channelParams, bobInit) + bob ! INPUT_INIT_FUNDEE(Alice.id, 0, Bob.channelParams, aliceInit) alice2bob.expectMsgType[OpenChannel] alice2bob.forward(bob) bob2alice.expectMsgType[AcceptChannel] diff --git a/eclair-node/src/test/scala/fr/acinq/eclair/channel/states/a/WaitForAcceptChannelStateSpec.scala b/eclair-node/src/test/scala/fr/acinq/eclair/channel/states/a/WaitForAcceptChannelStateSpec.scala index e3313d34a..f212b6e96 100644 --- a/eclair-node/src/test/scala/fr/acinq/eclair/channel/states/a/WaitForAcceptChannelStateSpec.scala +++ b/eclair-node/src/test/scala/fr/acinq/eclair/channel/states/a/WaitForAcceptChannelStateSpec.scala @@ -28,15 +28,13 @@ class WaitForAcceptChannelStateSpec extends TestkitBaseClass { val bob2blockchain = TestProbe() val relayer = TestProbe() val router = TestProbe() - val alice: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(alice2bob.ref, alice2blockchain.ref, router.ref, relayer.ref, Alice.channelParams, Bob.id)) - val bob: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(bob2alice.ref, bob2blockchain.ref, router.ref, relayer.ref, Bob.channelParams, Alice.id)) - alice ! INPUT_INIT_FUNDER(TestConstants.fundingSatoshis, TestConstants.pushMsat) - bob ! INPUT_INIT_FUNDEE() + val alice: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(alice2bob.ref, alice2blockchain.ref, router.ref, relayer.ref)) + val bob: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(bob2alice.ref, bob2blockchain.ref, router.ref, relayer.ref)) + val aliceInit = Init(Alice.channelParams.globalFeatures, Alice.channelParams.localFeatures) + val bobInit = Init(Bob.channelParams.globalFeatures, Bob.channelParams.localFeatures) within(30 seconds) { - alice2bob.expectMsgType[Init] - alice2bob.forward(bob) - bob2alice.expectMsgType[Init] - bob2alice.forward(alice) + alice ! INPUT_INIT_FUNDER(Bob.id, 0, TestConstants.fundingSatoshis, TestConstants.pushMsat, Alice.channelParams, bobInit) + bob ! INPUT_INIT_FUNDEE(Alice.id, 0, Bob.channelParams, aliceInit) alice2bob.expectMsgType[OpenChannel] alice2bob.forward(bob) awaitCond(alice.stateName == WAIT_FOR_ACCEPT_CHANNEL) diff --git a/eclair-node/src/test/scala/fr/acinq/eclair/channel/states/a/WaitForOpenChannelStateSpec.scala b/eclair-node/src/test/scala/fr/acinq/eclair/channel/states/a/WaitForOpenChannelStateSpec.scala index 3c4e7eb2a..b3f8c60bd 100644 --- a/eclair-node/src/test/scala/fr/acinq/eclair/channel/states/a/WaitForOpenChannelStateSpec.scala +++ b/eclair-node/src/test/scala/fr/acinq/eclair/channel/states/a/WaitForOpenChannelStateSpec.scala @@ -1,10 +1,10 @@ package fr.acinq.eclair.channel.states.a import akka.testkit.{TestFSMRef, TestProbe} -import fr.acinq.eclair.{TestConstants, TestkitBaseClass} import fr.acinq.eclair.TestConstants.{Alice, Bob} import fr.acinq.eclair.channel._ import fr.acinq.eclair.wire.{Error, Init, OpenChannel} +import fr.acinq.eclair.{TestConstants, TestkitBaseClass} import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner @@ -25,15 +25,13 @@ class WaitForOpenChannelStateSpec extends TestkitBaseClass { val bob2blockchain = TestProbe() val relayer = TestProbe() val router = TestProbe() - val alice: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(alice2bob.ref, alice2blockchain.ref, router.ref, relayer.ref, Alice.channelParams, Bob.id)) - val bob: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(bob2alice.ref, bob2blockchain.ref, router.ref, relayer.ref, Bob.channelParams, Alice.id)) - alice ! INPUT_INIT_FUNDER(TestConstants.fundingSatoshis, TestConstants.pushMsat) - bob ! INPUT_INIT_FUNDEE() + val alice: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(alice2bob.ref, alice2blockchain.ref, router.ref, relayer.ref)) + val bob: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(bob2alice.ref, bob2blockchain.ref, router.ref, relayer.ref)) + val aliceInit = Init(Alice.channelParams.globalFeatures, Alice.channelParams.localFeatures) + val bobInit = Init(Bob.channelParams.globalFeatures, Bob.channelParams.localFeatures) within(30 seconds) { - alice2bob.expectMsgType[Init] - alice2bob.forward(bob) - bob2alice.expectMsgType[Init] - bob2alice.forward(alice) + alice ! INPUT_INIT_FUNDER(Bob.id, 0, TestConstants.fundingSatoshis, TestConstants.pushMsat, Alice.channelParams, bobInit) + bob ! INPUT_INIT_FUNDEE(Alice.id, 0, Bob.channelParams, aliceInit) awaitCond(bob.stateName == WAIT_FOR_OPEN_CHANNEL) } test((bob, alice2bob, bob2alice, bob2blockchain)) diff --git a/eclair-node/src/test/scala/fr/acinq/eclair/channel/states/b/WaitForFundingCreatedInternalStateSpec.scala b/eclair-node/src/test/scala/fr/acinq/eclair/channel/states/b/WaitForFundingCreatedInternalStateSpec.scala index 757414dcb..86cc4e21f 100644 --- a/eclair-node/src/test/scala/fr/acinq/eclair/channel/states/b/WaitForFundingCreatedInternalStateSpec.scala +++ b/eclair-node/src/test/scala/fr/acinq/eclair/channel/states/b/WaitForFundingCreatedInternalStateSpec.scala @@ -28,15 +28,13 @@ class WaitForFundingCreatedInternalStateSpec extends TestkitBaseClass { val bob2blockchain = TestProbe() val relayer = TestProbe() val router = TestProbe() - val alice: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(alice2bob.ref, alice2blockchain.ref, router.ref, relayer.ref, Alice.channelParams, Bob.id)) - val bob: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(bob2alice.ref, bob2blockchain.ref, router.ref, relayer.ref, Bob.channelParams, Alice.id)) - alice ! INPUT_INIT_FUNDER(TestConstants.fundingSatoshis, TestConstants.pushMsat) - bob ! INPUT_INIT_FUNDEE() + val alice: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(alice2bob.ref, alice2blockchain.ref, router.ref, relayer.ref)) + val bob: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(bob2alice.ref, bob2blockchain.ref, router.ref, relayer.ref)) + val aliceInit = Init(Alice.channelParams.globalFeatures, Alice.channelParams.localFeatures) + val bobInit = Init(Bob.channelParams.globalFeatures, Bob.channelParams.localFeatures) within(30 seconds) { - alice2bob.expectMsgType[Init] - alice2bob.forward(bob) - bob2alice.expectMsgType[Init] - bob2alice.forward(alice) + alice ! INPUT_INIT_FUNDER(Bob.id, 0, TestConstants.fundingSatoshis, TestConstants.pushMsat, Alice.channelParams, bobInit) + bob ! INPUT_INIT_FUNDEE(Alice.id, 0, Bob.channelParams, aliceInit) alice2bob.expectMsgType[OpenChannel] alice2bob.forward(bob) bob2alice.expectMsgType[AcceptChannel] diff --git a/eclair-node/src/test/scala/fr/acinq/eclair/channel/states/b/WaitForFundingCreatedStateSpec.scala b/eclair-node/src/test/scala/fr/acinq/eclair/channel/states/b/WaitForFundingCreatedStateSpec.scala index d0629064d..915b58253 100644 --- a/eclair-node/src/test/scala/fr/acinq/eclair/channel/states/b/WaitForFundingCreatedStateSpec.scala +++ b/eclair-node/src/test/scala/fr/acinq/eclair/channel/states/b/WaitForFundingCreatedStateSpec.scala @@ -6,7 +6,7 @@ import fr.acinq.eclair.TestConstants.{Alice, Bob} import fr.acinq.eclair.blockchain.{PeerWatcher, WatchConfirmed, WatchSpent} import fr.acinq.eclair.channel._ import fr.acinq.eclair.wire._ -import fr.acinq.eclair.{TestkitBaseClass, TestBitcoinClient, TestConstants} +import fr.acinq.eclair.{TestBitcoinClient, TestConstants, TestkitBaseClass} import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner @@ -27,15 +27,13 @@ class WaitForFundingCreatedStateSpec extends TestkitBaseClass { val bob2blockchain = TestProbe() val relayer = TestProbe() val router = TestProbe() - val alice: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(alice2bob.ref, blockchainA, router.ref, relayer.ref, Alice.channelParams, Bob.id)) - val bob: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(bob2alice.ref, bob2blockchain.ref, router.ref, relayer.ref, Bob.channelParams, Alice.id)) - alice ! INPUT_INIT_FUNDER(TestConstants.fundingSatoshis, TestConstants.pushMsat) - bob ! INPUT_INIT_FUNDEE() + val alice: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(alice2bob.ref, blockchainA, router.ref, relayer.ref)) + val bob: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(bob2alice.ref, bob2blockchain.ref, router.ref, relayer.ref)) + val aliceInit = Init(Alice.channelParams.globalFeatures, Alice.channelParams.localFeatures) + val bobInit = Init(Bob.channelParams.globalFeatures, Bob.channelParams.localFeatures) within(30 seconds) { - alice2bob.expectMsgType[Init] - alice2bob.forward(bob) - bob2alice.expectMsgType[Init] - bob2alice.forward(alice) + alice ! INPUT_INIT_FUNDER(Bob.id, 0, TestConstants.fundingSatoshis, TestConstants.pushMsat, Alice.channelParams, bobInit) + bob ! INPUT_INIT_FUNDEE(Alice.id, 0, Bob.channelParams, aliceInit) alice2bob.expectMsgType[OpenChannel] alice2bob.forward(bob) bob2alice.expectMsgType[AcceptChannel] diff --git a/eclair-node/src/test/scala/fr/acinq/eclair/channel/states/b/WaitForFundingSignedStateSpec.scala b/eclair-node/src/test/scala/fr/acinq/eclair/channel/states/b/WaitForFundingSignedStateSpec.scala index c5747e5de..7a8e70f42 100644 --- a/eclair-node/src/test/scala/fr/acinq/eclair/channel/states/b/WaitForFundingSignedStateSpec.scala +++ b/eclair-node/src/test/scala/fr/acinq/eclair/channel/states/b/WaitForFundingSignedStateSpec.scala @@ -29,15 +29,13 @@ class WaitForFundingSignedStateSpec extends TestkitBaseClass { val bob2blockchain = TestProbe() val relayer = TestProbe() val router = TestProbe() - val alice: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(alice2bob.ref, alice2blockchain.ref, router.ref, relayer.ref, Alice.channelParams, Bob.id)) - val bob: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(bob2alice.ref, bob2blockchain.ref, router.ref, relayer.ref, Bob.channelParams, Alice.id)) - alice ! INPUT_INIT_FUNDER(TestConstants.fundingSatoshis, TestConstants.pushMsat) - bob ! INPUT_INIT_FUNDEE() + val alice: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(alice2bob.ref, alice2blockchain.ref, router.ref, relayer.ref)) + val bob: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(bob2alice.ref, bob2blockchain.ref, router.ref, relayer.ref)) + val aliceInit = Init(Alice.channelParams.globalFeatures, Alice.channelParams.localFeatures) + val bobInit = Init(Bob.channelParams.globalFeatures, Bob.channelParams.localFeatures) within(30 seconds) { - alice2bob.expectMsgType[Init] - alice2bob.forward(bob) - bob2alice.expectMsgType[Init] - bob2alice.forward(alice) + alice ! INPUT_INIT_FUNDER(Bob.id, 0, TestConstants.fundingSatoshis, TestConstants.pushMsat, Alice.channelParams, bobInit) + bob ! INPUT_INIT_FUNDEE(Alice.id, 0, Bob.channelParams, aliceInit) alice2bob.expectMsgType[OpenChannel] alice2bob.forward(bob) bob2alice.expectMsgType[AcceptChannel] diff --git a/eclair-node/src/test/scala/fr/acinq/eclair/channel/states/c/WaitForAnnSignaturesStateSpec.scala b/eclair-node/src/test/scala/fr/acinq/eclair/channel/states/c/WaitForAnnSignaturesStateSpec.scala index a47354bce..6ec4c8970 100644 --- a/eclair-node/src/test/scala/fr/acinq/eclair/channel/states/c/WaitForAnnSignaturesStateSpec.scala +++ b/eclair-node/src/test/scala/fr/acinq/eclair/channel/states/c/WaitForAnnSignaturesStateSpec.scala @@ -8,7 +8,6 @@ import fr.acinq.eclair.channel._ import fr.acinq.eclair.wire._ import fr.acinq.eclair.{TestBitcoinClient, TestConstants, TestkitBaseClass} import org.junit.runner.RunWith -import org.scalatest.Tag import org.scalatest.junit.JUnitRunner import scala.concurrent.duration._ @@ -29,18 +28,14 @@ class WaitForAnnSignaturesStateSpec extends TestkitBaseClass { val bob2blockchain = TestProbe() val relayer = TestProbe() val router = TestProbe() - + val alice: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(alice2bob.ref, alice2blockchain.ref, router.ref, relayer.ref)) + val bob: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(bob2alice.ref, bob2blockchain.ref, router.ref, relayer.ref)) val (aliceParams, bobParams) = (Alice.channelParams.copy(localFeatures = "01"), Bob.channelParams.copy(localFeatures = "01")) - - val alice: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(alice2bob.ref, alice2blockchain.ref, router.ref, relayer.ref, aliceParams, Bob.id)) - val bob: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(bob2alice.ref, bob2blockchain.ref, router.ref, relayer.ref, bobParams, Alice.id)) - alice ! INPUT_INIT_FUNDER(TestConstants.fundingSatoshis, TestConstants.pushMsat) - bob ! INPUT_INIT_FUNDEE() + val aliceInit = Init(aliceParams.globalFeatures, aliceParams.localFeatures) + val bobInit = Init(bobParams.globalFeatures, bobParams.localFeatures) within(30 seconds) { - alice2bob.expectMsgType[Init] - alice2bob.forward(bob) - bob2alice.expectMsgType[Init] - bob2alice.forward(alice) + alice ! INPUT_INIT_FUNDER(Bob.id, 0, TestConstants.fundingSatoshis, TestConstants.pushMsat, aliceParams, bobInit) + bob ! INPUT_INIT_FUNDEE(Alice.id, 0, bobParams, aliceInit) alice2bob.expectMsgType[OpenChannel] alice2bob.forward(bob) bob2alice.expectMsgType[AcceptChannel] diff --git a/eclair-node/src/test/scala/fr/acinq/eclair/channel/states/c/WaitForFundingLockedInternalStateSpec.scala b/eclair-node/src/test/scala/fr/acinq/eclair/channel/states/c/WaitForFundingLockedInternalStateSpec.scala index 0e898c5a1..ba366c046 100644 --- a/eclair-node/src/test/scala/fr/acinq/eclair/channel/states/c/WaitForFundingLockedInternalStateSpec.scala +++ b/eclair-node/src/test/scala/fr/acinq/eclair/channel/states/c/WaitForFundingLockedInternalStateSpec.scala @@ -28,15 +28,13 @@ class WaitForFundingLockedInternalStateSpec extends TestkitBaseClass { val bob2blockchain = TestProbe() val relayer = TestProbe() val router = TestProbe() - val alice: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(alice2bob.ref, alice2blockchain.ref, router.ref, relayer.ref, Alice.channelParams, Bob.id)) - val bob: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(bob2alice.ref, bob2blockchain.ref, router.ref, relayer.ref, Bob.channelParams, Alice.id)) - alice ! INPUT_INIT_FUNDER(TestConstants.fundingSatoshis, TestConstants.pushMsat) - bob ! INPUT_INIT_FUNDEE() + val alice: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(alice2bob.ref, alice2blockchain.ref, router.ref, relayer.ref)) + val bob: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(bob2alice.ref, bob2blockchain.ref, router.ref, relayer.ref)) + val aliceInit = Init(Alice.channelParams.globalFeatures, Alice.channelParams.localFeatures) + val bobInit = Init(Bob.channelParams.globalFeatures, Bob.channelParams.localFeatures) within(30 seconds) { - alice2bob.expectMsgType[Init] - alice2bob.forward(bob) - bob2alice.expectMsgType[Init] - bob2alice.forward(alice) + alice ! INPUT_INIT_FUNDER(Bob.id, 0, TestConstants.fundingSatoshis, TestConstants.pushMsat, Alice.channelParams, bobInit) + bob ! INPUT_INIT_FUNDEE(Alice.id, 0, Bob.channelParams, aliceInit) alice2bob.expectMsgType[OpenChannel] alice2bob.forward(bob) bob2alice.expectMsgType[AcceptChannel] diff --git a/eclair-node/src/test/scala/fr/acinq/eclair/channel/states/c/WaitForFundingLockedStateSpec.scala b/eclair-node/src/test/scala/fr/acinq/eclair/channel/states/c/WaitForFundingLockedStateSpec.scala index 13eb88710..541abb3dd 100644 --- a/eclair-node/src/test/scala/fr/acinq/eclair/channel/states/c/WaitForFundingLockedStateSpec.scala +++ b/eclair-node/src/test/scala/fr/acinq/eclair/channel/states/c/WaitForFundingLockedStateSpec.scala @@ -2,15 +2,13 @@ package fr.acinq.eclair.channel.states.c import akka.actor.{ActorRef, Props} import akka.testkit.{TestFSMRef, TestProbe} -import fr.acinq.bitcoin.{BinaryData, Crypto} import fr.acinq.eclair.TestConstants.{Alice, Bob} import fr.acinq.eclair.blockchain._ import fr.acinq.eclair.channel._ -import fr.acinq.eclair.router.{Announcements, Router} import fr.acinq.eclair.wire._ import fr.acinq.eclair.{TestBitcoinClient, TestConstants, TestkitBaseClass} import org.junit.runner.RunWith -import org.scalatest.{ConfigMap, Tag, TestData} +import org.scalatest.Tag import org.scalatest.junit.JUnitRunner import scala.concurrent.duration._ @@ -31,22 +29,18 @@ class WaitForFundingLockedStateSpec extends TestkitBaseClass { val bob2blockchain = TestProbe() val relayer = TestProbe() val router = TestProbe() - + val alice: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(alice2bob.ref, alice2blockchain.ref, router.ref, relayer.ref)) + val bob: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(bob2alice.ref, bob2blockchain.ref, router.ref, relayer.ref)) val (aliceParams, bobParams) = if (test.tags.contains("public")) { (Alice.channelParams.copy(localFeatures = "01"), Bob.channelParams.copy(localFeatures = "01")) } else { (Alice.channelParams, Bob.channelParams) } - - val alice: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(alice2bob.ref, alice2blockchain.ref, router.ref, relayer.ref, aliceParams, Bob.id)) - val bob: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(bob2alice.ref, bob2blockchain.ref, router.ref, relayer.ref, bobParams, Alice.id)) - alice ! INPUT_INIT_FUNDER(TestConstants.fundingSatoshis, TestConstants.pushMsat) - bob ! INPUT_INIT_FUNDEE() + val aliceInit = Init(aliceParams.globalFeatures, aliceParams.localFeatures) + val bobInit = Init(bobParams.globalFeatures, bobParams.localFeatures) within(30 seconds) { - alice2bob.expectMsgType[Init] - alice2bob.forward(bob) - bob2alice.expectMsgType[Init] - bob2alice.forward(alice) + alice ! INPUT_INIT_FUNDER(Bob.id, 0, TestConstants.fundingSatoshis, TestConstants.pushMsat, aliceParams, bobInit) + bob ! INPUT_INIT_FUNDEE(Alice.id, 0, bobParams, aliceInit) alice2bob.expectMsgType[OpenChannel] alice2bob.forward(bob) bob2alice.expectMsgType[AcceptChannel] @@ -79,6 +73,7 @@ class WaitForFundingLockedStateSpec extends TestkitBaseClass { bob2alice.expectMsgType[FundingLocked] bob2alice.forward(alice) awaitCond(alice.stateName == NORMAL) + bob2alice.expectNoMsg(200 millis) } } diff --git a/eclair-node/src/test/scala/fr/acinq/eclair/channel/states/e/NormalStateSpec.scala b/eclair-node/src/test/scala/fr/acinq/eclair/channel/states/e/NormalStateSpec.scala index 74630b11a..311ffd2d2 100644 --- a/eclair-node/src/test/scala/fr/acinq/eclair/channel/states/e/NormalStateSpec.scala +++ b/eclair-node/src/test/scala/fr/acinq/eclair/channel/states/e/NormalStateSpec.scala @@ -34,8 +34,8 @@ class NormalStateSpec extends TestkitBaseClass with StateTestsHelperMethods { val bob2blockchain = TestProbe() val relayer = TestProbe() val router = TestProbe() - val alice: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(alice2bob.ref, alice2blockchain.ref, router.ref, relayer.ref, Alice.channelParams, Bob.id)) - val bob: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(bob2alice.ref, bob2blockchain.ref, router.ref, relayer.ref, Bob.channelParams, Alice.id)) + val alice: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(alice2bob.ref, alice2blockchain.ref, router.ref, relayer.ref)) + val bob: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(bob2alice.ref, bob2blockchain.ref, router.ref, relayer.ref)) within(30 seconds) { reachNormal(alice, bob, alice2bob, bob2alice, blockchainA, alice2blockchain, bob2blockchain) awaitCond(alice.stateName == NORMAL) diff --git a/eclair-node/src/test/scala/fr/acinq/eclair/channel/states/f/ShutdownStateSpec.scala b/eclair-node/src/test/scala/fr/acinq/eclair/channel/states/f/ShutdownStateSpec.scala index c19274e7f..0532e5e8b 100644 --- a/eclair-node/src/test/scala/fr/acinq/eclair/channel/states/f/ShutdownStateSpec.scala +++ b/eclair-node/src/test/scala/fr/acinq/eclair/channel/states/f/ShutdownStateSpec.scala @@ -32,8 +32,8 @@ class ShutdownStateSpec extends TestkitBaseClass with StateTestsHelperMethods { val bob2blockchain = TestProbe() val relayer = TestProbe() val router = TestProbe() - val alice: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(alice2bob.ref, alice2blockchain.ref, router.ref, relayer.ref, Alice.channelParams, Bob.id)) - val bob: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(bob2alice.ref, bob2blockchain.ref, router.ref, relayer.ref, Bob.channelParams, Alice.id)) + val alice: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(alice2bob.ref, alice2blockchain.ref, router.ref, relayer.ref)) + val bob: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(bob2alice.ref, bob2blockchain.ref, router.ref, relayer.ref)) within(30 seconds) { reachNormal(alice, bob, alice2bob, bob2alice, blockchainA, alice2blockchain, bob2blockchain) val sender = TestProbe() diff --git a/eclair-node/src/test/scala/fr/acinq/eclair/channel/states/g/NegotiatingStateSpec.scala b/eclair-node/src/test/scala/fr/acinq/eclair/channel/states/g/NegotiatingStateSpec.scala index 97609ec7d..62233defc 100644 --- a/eclair-node/src/test/scala/fr/acinq/eclair/channel/states/g/NegotiatingStateSpec.scala +++ b/eclair-node/src/test/scala/fr/acinq/eclair/channel/states/g/NegotiatingStateSpec.scala @@ -30,8 +30,8 @@ class NegotiatingStateSpec extends TestkitBaseClass with StateTestsHelperMethods val relayer = TestProbe() // note that alice.initialFeeRate != bob.initialFeeRate val router = TestProbe() - val alice: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(alice2bob.ref, alice2blockchain.ref, router.ref, relayer.ref, Alice.channelParams, Bob.id)) - val bob: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(bob2alice.ref, bob2blockchain.ref, router.ref, relayer.ref, Bob.channelParams, Alice.id)) + val alice: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(alice2bob.ref, alice2blockchain.ref, router.ref, relayer.ref)) + val bob: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(bob2alice.ref, bob2blockchain.ref, router.ref, relayer.ref)) within(30 seconds) { reachNormal(alice, bob, alice2bob, bob2alice, blockchainA, alice2blockchain, bob2blockchain) val sender = TestProbe() diff --git a/eclair-node/src/test/scala/fr/acinq/eclair/channel/states/h/ClosingStateSpec.scala b/eclair-node/src/test/scala/fr/acinq/eclair/channel/states/h/ClosingStateSpec.scala index 8e4e416a6..820f4ab3a 100644 --- a/eclair-node/src/test/scala/fr/acinq/eclair/channel/states/h/ClosingStateSpec.scala +++ b/eclair-node/src/test/scala/fr/acinq/eclair/channel/states/h/ClosingStateSpec.scala @@ -30,8 +30,8 @@ class ClosingStateSpec extends TestkitBaseClass with StateTestsHelperMethods { val bob2blockchain = TestProbe() val relayer = TestProbe() val router = TestProbe() - val alice: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(alice2bob.ref, alice2blockchain.ref, router.ref, relayer.ref, Alice.channelParams, Bob.id)) - val bob: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(bob2alice.ref, bob2blockchain.ref, router.ref, relayer.ref, Bob.channelParams, Alice.id)) + val alice: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(alice2bob.ref, alice2blockchain.ref, router.ref, relayer.ref)) + val bob: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(bob2alice.ref, bob2blockchain.ref, router.ref, relayer.ref)) within(30 seconds) { reachNormal(alice, bob, alice2bob, bob2alice, blockchainA, alice2blockchain, bob2blockchain) diff --git a/eclair-node/src/test/scala/fr/acinq/eclair/crypto/TransportHandlerSpec.scala b/eclair-node/src/test/scala/fr/acinq/eclair/crypto/TransportHandlerSpec.scala index a2efddb69..76c6a3cd1 100644 --- a/eclair-node/src/test/scala/fr/acinq/eclair/crypto/TransportHandlerSpec.scala +++ b/eclair-node/src/test/scala/fr/acinq/eclair/crypto/TransportHandlerSpec.scala @@ -7,7 +7,7 @@ import akka.io.Tcp.{Received, Write} import akka.testkit.{TestActorRef, TestFSMRef, TestKit, TestProbe} import fr.acinq.bitcoin.{Base58Check, BinaryData} import fr.acinq.eclair.crypto.Noise.{Chacha20Poly1305CipherFunctions, CipherState} -import fr.acinq.eclair.crypto.TransportHandler.ExtendedCipherState +import fr.acinq.eclair.crypto.TransportHandler.{ExtendedCipherState, Listener} import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner import org.scalatest.{BeforeAndAfterAll, FunSuiteLike} @@ -32,10 +32,16 @@ class TransportHandlerSpec extends TestKit(ActorSystem("test")) with FunSuiteLik val pipe = system.actorOf(Props[MyPipe]) val probe1 = TestProbe() val probe2 = TestProbe() - val initiator = TestFSMRef(new TransportHandler(Initiator.s, Some(Responder.s.pub), pipe, true, (conn, _, _) => probe1.ref, TransportHandler.Noop)) - val responder = TestFSMRef(new TransportHandler(Responder.s, None, pipe, false, (conn, _, _) => probe2.ref, TransportHandler.Noop)) + val initiator = TestFSMRef(new TransportHandler(Initiator.s, Some(Responder.s.pub), pipe, TransportHandler.Noop)) + val responder = TestFSMRef(new TransportHandler(Responder.s, None, pipe, TransportHandler.Noop)) pipe ! (initiator, responder) + awaitCond(initiator.stateName == TransportHandler.WaitingForListener) + awaitCond(responder.stateName == TransportHandler.WaitingForListener) + + initiator ! Listener(probe1.ref) + responder ! Listener(probe2.ref) + awaitCond(initiator.stateName == TransportHandler.WaitingForCyphertext) awaitCond(responder.stateName == TransportHandler.WaitingForCyphertext) @@ -68,10 +74,16 @@ class TransportHandlerSpec extends TestKit(ActorSystem("test")) with FunSuiteLik val pipe = system.actorOf(Props[MyPipe]) val probe1 = TestProbe() val probe2 = TestProbe() - val initiator = TestFSMRef(new TransportHandler(Initiator.s, Some(Responder.s.pub), pipe, true, (conn, _, _) => probe1.ref, mySerializer)) - val responder = TestFSMRef(new TransportHandler(Responder.s, None, pipe, false, (conn, _, _) => probe2.ref, mySerializer)) + val initiator = TestFSMRef(new TransportHandler(Initiator.s, Some(Responder.s.pub), pipe, mySerializer)) + val responder = TestFSMRef(new TransportHandler(Responder.s, None, pipe, mySerializer)) pipe ! (initiator, responder) + awaitCond(initiator.stateName == TransportHandler.WaitingForListener) + awaitCond(responder.stateName == TransportHandler.WaitingForListener) + + initiator ! Listener(probe1.ref) + responder ! Listener(probe2.ref) + awaitCond(initiator.stateName == TransportHandler.WaitingForCyphertext) awaitCond(responder.stateName == TransportHandler.WaitingForCyphertext) @@ -92,10 +104,16 @@ class TransportHandlerSpec extends TestKit(ActorSystem("test")) with FunSuiteLik val pipe = system.actorOf(Props[MyPipeSplitter]) val probe1 = TestProbe() val probe2 = TestProbe() - val initiator = TestFSMRef(new TransportHandler(Initiator.s, Some(Responder.s.pub), pipe, true, (conn, _, _) => probe1.ref, TransportHandler.Noop)) - val responder = TestFSMRef(new TransportHandler(Responder.s, None, pipe, false, (conn, _, _) => probe2.ref, TransportHandler.Noop)) + val initiator = TestFSMRef(new TransportHandler(Initiator.s, Some(Responder.s.pub), pipe, TransportHandler.Noop)) + val responder = TestFSMRef(new TransportHandler(Responder.s, None, pipe, TransportHandler.Noop)) pipe ! (initiator, responder) + awaitCond(initiator.stateName == TransportHandler.WaitingForListener) + awaitCond(responder.stateName == TransportHandler.WaitingForListener) + + initiator ! Listener(probe1.ref) + responder ! Listener(probe2.ref) + awaitCond(initiator.stateName == TransportHandler.WaitingForCyphertext) awaitCond(responder.stateName == TransportHandler.WaitingForCyphertext) @@ -117,8 +135,8 @@ class TransportHandlerSpec extends TestKit(ActorSystem("test")) with FunSuiteLik val probe1 = TestProbe() val probe2 = TestProbe() val supervisor = TestActorRef(Props(new MySupervisor())) - val initiator = TestFSMRef(new TransportHandler(Initiator.s, Some(Initiator.s.pub), pipe, true, (conn, _, _) => probe1.ref, TransportHandler.Noop), supervisor, "ini") - val responder = TestFSMRef(new TransportHandler(Responder.s, None, pipe, false, (conn, _, _) => probe2.ref, TransportHandler.Noop), supervisor, "res") + val initiator = TestFSMRef(new TransportHandler(Initiator.s, Some(Initiator.s.pub), pipe, TransportHandler.Noop), supervisor, "ini") + val responder = TestFSMRef(new TransportHandler(Responder.s, None, pipe, TransportHandler.Noop), supervisor, "res") probe1.watch(responder) pipe ! (initiator, responder) diff --git a/eclair-node/src/test/scala/fr/acinq/eclair/interop/InteroperabilitySpec.scala b/eclair-node/src/test/scala/fr/acinq/eclair/interop/InteroperabilitySpec.scala index ee552af4f..ff5cd1604 100644 --- a/eclair-node/src/test/scala/fr/acinq/eclair/interop/InteroperabilitySpec.scala +++ b/eclair-node/src/test/scala/fr/acinq/eclair/interop/InteroperabilitySpec.scala @@ -3,14 +3,12 @@ package fr.acinq.eclair.interop import java.nio.file.{Files, Paths} import akka.actor.{ActorRef, ActorSystem} -import akka.pattern.ask import akka.util.Timeout import com.typesafe.config.ConfigFactory import fr.acinq.bitcoin.BinaryData import fr.acinq.eclair._ import fr.acinq.eclair.blockchain.ExtendedBitcoinClient import fr.acinq.eclair.blockchain.rpc.BitcoinJsonRPCClient -import fr.acinq.eclair.channel.Register.ListChannels import fr.acinq.eclair.channel.{CLOSED, CLOSING, CMD_ADD_HTLC, _} import org.json4s.JsonAST.JString import org.json4s.jackson.JsonMethods._ @@ -79,7 +77,7 @@ class InteroperabilitySpec extends FunSuite with BeforeAndAfterAll { val setup = new Setup() implicit val system = setup.system - val register = setup.register + //val register = setup.register implicit val timeout = Timeout(30 seconds) @@ -106,11 +104,11 @@ class InteroperabilitySpec extends FunSuite with BeforeAndAfterAll { future } - def listChannels: Future[Iterable[RES_GETINFO]] = { + def listChannels: Future[Iterable[RES_GETINFO]] = ???/*{ implicit val timeout = Timeout(5 seconds) (register ? ListChannels).mapTo[Iterable[ActorRef]] .flatMap(l => Future.sequence(l.map(c => (c ? CMD_GETINFO).mapTo[RES_GETINFO]))) - } + }*/ def waitForState(state: State): Future[Unit] = { listChannels.map(_.map(_.state)).flatMap(current => diff --git a/eclair-node/src/test/scala/fr/acinq/eclair/interop/rustytests/RustyTestsSpec.scala b/eclair-node/src/test/scala/fr/acinq/eclair/interop/rustytests/RustyTestsSpec.scala index 3ee19ec28..998a225e3 100644 --- a/eclair-node/src/test/scala/fr/acinq/eclair/interop/rustytests/RustyTestsSpec.scala +++ b/eclair-node/src/test/scala/fr/acinq/eclair/interop/rustytests/RustyTestsSpec.scala @@ -8,8 +8,9 @@ import akka.testkit.{TestFSMRef, TestKit, TestProbe} import fr.acinq.eclair.TestConstants.{Alice, Bob} import fr.acinq.eclair.blockchain.PeerWatcher import fr.acinq.eclair.channel._ -import fr.acinq.eclair.payment.{NoopPaymentHandler, Relayer} -import fr.acinq.eclair.{Globals, TestBitcoinClient, TestConstants} +import fr.acinq.eclair.payment.NoopPaymentHandler +import fr.acinq.eclair.wire.Init +import fr.acinq.eclair.{Globals, TestBitcoinClient} import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner import org.scalatest.{BeforeAndAfterAll, Matchers, fixture} @@ -35,11 +36,13 @@ class RustyTestsSpec extends TestKit(ActorSystem("test")) with Matchers with fix // we just bypass the relayer for this test val relayer = paymentHandler val router = TestProbe() - val alice: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(pipe, blockchainA, router.ref, relayer, Alice.channelParams, Bob.id)) - val bob: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(pipe, blockchainB, router.ref, relayer, Bob.channelParams, Alice.id)) + val alice: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(pipe, blockchainA, router.ref, relayer)) + val bob: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(pipe, blockchainB, router.ref, relayer)) + val aliceInit = Init(Alice.channelParams.globalFeatures, Alice.channelParams.localFeatures) + val bobInit = Init(Bob.channelParams.globalFeatures, Bob.channelParams.localFeatures) // alice and bob will both have 1 000 000 sat - alice ! INPUT_INIT_FUNDER(2000000, 1000000000) - bob ! INPUT_INIT_FUNDEE() + alice ! INPUT_INIT_FUNDER(Bob.id, 0, 2000000, 1000000000, Alice.channelParams, bobInit) + bob ! INPUT_INIT_FUNDEE(Alice.id, 0, Bob.channelParams, aliceInit) pipe ! (alice, bob) within(30 seconds) { awaitCond(alice.stateName == NORMAL)