From 5b4f8b6b29836901ad6d67bfcea400881f5902cb Mon Sep 17 00:00:00 2001 From: pm47 Date: Sun, 20 Mar 2016 15:44:29 +0100 Subject: [PATCH] put path logic in register actor and use resolveOne --- eclair-demo/src/main/resources/logback.xml | 2 +- .../src/main/scala/fr/acinq/eclair/Boot.scala | 8 ++- .../scala/fr/acinq/eclair/RegisterActor.scala | 69 ------------------- .../scala/fr/acinq/eclair/api/Service.scala | 21 +++--- .../fr/acinq/eclair/channel/Channel.scala | 51 +++++++------- .../fr/acinq/eclair/channel/Register.scala | 67 ++++++++++++++++++ .../fr/acinq/eclair/io/AuthHandler.scala | 4 -- .../scala/fr/acinq/eclair/io/Client.scala | 4 +- .../scala/fr/acinq/eclair/io/Server.scala | 2 +- 9 files changed, 113 insertions(+), 115 deletions(-) delete mode 100644 eclair-demo/src/main/scala/fr/acinq/eclair/RegisterActor.scala create mode 100644 eclair-demo/src/main/scala/fr/acinq/eclair/channel/Register.scala diff --git a/eclair-demo/src/main/resources/logback.xml b/eclair-demo/src/main/resources/logback.xml index 97a7f0a73..c66df95ab 100644 --- a/eclair-demo/src/main/resources/logback.xml +++ b/eclair-demo/src/main/resources/logback.xml @@ -9,7 +9,7 @@ - + diff --git a/eclair-demo/src/main/scala/fr/acinq/eclair/Boot.scala b/eclair-demo/src/main/scala/fr/acinq/eclair/Boot.scala index 6be2343dc..87d6c1651 100644 --- a/eclair-demo/src/main/scala/fr/acinq/eclair/Boot.scala +++ b/eclair-demo/src/main/scala/fr/acinq/eclair/Boot.scala @@ -2,16 +2,18 @@ package fr.acinq.eclair import java.net.InetSocketAddress -import akka.actor.{ActorRef, Props, ActorSystem} +import akka.actor.{ActorRef, ActorSystem, Props} import akka.io.IO import akka.util.Timeout import com.typesafe.config.ConfigFactory import fr.acinq.eclair.api.ServiceActor import fr.acinq.eclair.blockchain.PollingWatcher +import fr.acinq.eclair.channel.Register import fr.acinq.eclair.io.{Client, Server} import grizzled.slf4j.Logging import spray.can.Http -import scala.concurrent.{ExecutionContext, Await} + +import scala.concurrent.{Await, ExecutionContext} import scala.concurrent.duration._ import Globals._ @@ -33,7 +35,7 @@ object Boot extends App with Logging { assert(chain == "testnet" || chain == "regtest", "you should be on testnet or regtest") val blockchain = system.actorOf(Props(new PollingWatcher(bitcoin_client)), name = "blockchain") - val register = system.actorOf(Props[RegisterActor], name = "register") + val register = system.actorOf(Props[Register], name = "register") val server = system.actorOf(Server.props(config.getString("eclair.server.address"), config.getInt("eclair.server.port")), "server") val api = system.actorOf(Props(new ServiceActor { diff --git a/eclair-demo/src/main/scala/fr/acinq/eclair/RegisterActor.scala b/eclair-demo/src/main/scala/fr/acinq/eclair/RegisterActor.scala deleted file mode 100644 index 866aaa794..000000000 --- a/eclair-demo/src/main/scala/fr/acinq/eclair/RegisterActor.scala +++ /dev/null @@ -1,69 +0,0 @@ -package fr.acinq.eclair - -import akka.actor._ -import akka.util.Timeout -import fr.acinq.bitcoin.{BinaryData, DeterministicWallet} -import fr.acinq.eclair.channel._ -import fr.acinq.eclair.io.AuthHandler -import akka.pattern.ask - -import scala.concurrent.{ExecutionContext, Future} -import scala.concurrent.duration._ - -object RegisterActor { - - // @formatter:off - case class CreateChannel(connection: ActorRef, anchorAmount: Option[Long]) - case class GetChannels() - case class Entry(nodeId: String, channelId: String, channel: ActorRef, state: ChannelState) - case class UpdateState(state: ChannelState) - // @formatter:on -} - -/** - * Created by PM on 26/01/2016. - */ - -/** - * Actor hierarchy: - * system - * ├── blockchain - * ├── register - * │ ├── handler-0 - * │ │ └── channel - * │ │ └── remote_node_id-anchor_id (alias to parent) - * │ ... - * │ └── handler-n - * │ └── channel - * │ └── remote_node_id-anchor_id (alias to parent) - * ├── server - * ├── client (0..m, transient) - * └── api - */ -class RegisterActor extends Actor with ActorLogging { - - import RegisterActor._ - - implicit val timeout = Timeout(30 seconds) - - import ExecutionContext.Implicits.global - - def receive: Receive = main(0L) - - def main(counter: Long): Receive = { - case CreateChannel(connection, amount) => - val commit_priv = DeterministicWallet.derivePrivateKey(Globals.Node.extendedPrivateKey, 0L :: counter :: Nil) - val final_priv = DeterministicWallet.derivePrivateKey(Globals.Node.extendedPrivateKey, 1L :: counter :: Nil) - val params = OurChannelParams(Globals.default_locktime, commit_priv.secretkey :+ 1.toByte, final_priv.secretkey :+ 1.toByte, Globals.default_mindepth, Globals.commit_fee, "sha-seed".getBytes(), amount) - context.actorOf(Props(classOf[AuthHandler], connection, Boot.blockchain, params), name = s"handler-${counter}") - context.become(main(counter + 1)) - case GetChannels => - val s = sender() - Future.sequence(context.children.map(c => c ? CMD_GETINFO)).map(s ! _) - } - - override def unhandled(message: Any): Unit = { - log.warning(s"unhandled message $message") - super.unhandled(message) - } -} diff --git a/eclair-demo/src/main/scala/fr/acinq/eclair/api/Service.scala b/eclair-demo/src/main/scala/fr/acinq/eclair/api/Service.scala index 73bae4a13..773fae88e 100644 --- a/eclair-demo/src/main/scala/fr/acinq/eclair/api/Service.scala +++ b/eclair-demo/src/main/scala/fr/acinq/eclair/api/Service.scala @@ -5,14 +5,13 @@ import java.net.InetSocketAddress import akka.actor.{Actor, ActorRef} import akka.util.Timeout import fr.acinq.bitcoin.BinaryData -import fr.acinq.eclair.RegisterActor.GetChannels import fr.acinq.eclair._ import fr.acinq.eclair.channel._ -import fr.acinq.eclair.{Boot, channel} +import fr.acinq.eclair.Boot import grizzled.slf4j.Logging import lightning.locktime import lightning.locktime.Locktime.Seconds -import org.json4s.JsonAST.{JBool, JDouble, JObject, JString} +import org.json4s.JsonAST.JString import org.json4s._ import org.json4s.jackson.JsonMethods._ import org.json4s.jackson.Serialization @@ -23,6 +22,7 @@ import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration._ import scala.util.{Failure, Success} import akka.pattern.ask +import fr.acinq.eclair.channel.Register.ListChannels /** * Created by PM on 25/01/2016. @@ -57,8 +57,8 @@ trait Service extends HttpService with Logging { def connect(addr: InetSocketAddress, amount: Long): Unit // amount in satoshis def register: ActorRef - def sendCommand(channel: String, cmd: Command): Future[String] = { - Boot.system.actorSelection(s"/user/register/handler-$channel/channel").resolveOne().map(actor => { + def sendCommand(channel_id: String, cmd: Command): Future[String] = { + Boot.system.actorSelection(Register.actorPathToChannelId(channel_id)).resolveOne().map(actor => { actor ! cmd "ok" }) @@ -75,14 +75,13 @@ trait Service extends HttpService with Logging { connect(new InetSocketAddress(host, port.toInt), anchor_amount.toLong) Future.successful("") case JsonRPCBody(_, _, "list", _) => - (register ? GetChannels).mapTo[Iterable[RES_GETINFO]] - case JsonRPCBody(_, _, "addhtlc", JString(channel) :: JInt(amount) :: JString(rhash) :: JInt(expiry) :: Nil) => - sendCommand(channel, CMD_SEND_HTLC_UPDATE(amount.toInt, BinaryData(rhash), locktime(Seconds(expiry.toInt)))) - case JsonRPCBody(_, _, "addhtlc_r", JInt(amount) :: JString(rhash) :: JInt(expiry) :: tail) => - val nodeIds = tail.toSeq.map { + (register ? ListChannels).mapTo[Iterable[ActorRef]] + .flatMap(l => Future.sequence(l.map(c => c ? CMD_GETINFO))) + case JsonRPCBody(_, _, "addhtlc", JInt(amount) :: JString(rhash) :: JInt(expiry) :: tail) => + val nodeIds = tail.map { case JString(nodeId) => nodeId } - Boot.system.actorSelection(s"*/register/handler-*/channel/${nodeIds.head}-*") + Boot.system.actorSelection(Register.actorPathToNodeId(nodeIds.head)) .resolveOne(2 seconds) .map { channel => channel ! CMD_SEND_HTLC_UPDATE(amount.toInt, BinaryData(rhash), locktime(Seconds(expiry.toInt)), nodeIds.drop(1)) diff --git a/eclair-demo/src/main/scala/fr/acinq/eclair/channel/Channel.scala b/eclair-demo/src/main/scala/fr/acinq/eclair/channel/Channel.scala index f24e6d2f1..52790657b 100644 --- a/eclair-demo/src/main/scala/fr/acinq/eclair/channel/Channel.scala +++ b/eclair-demo/src/main/scala/fr/acinq/eclair/channel/Channel.scala @@ -13,6 +13,7 @@ import lightning.update_decline_htlc.Reason.{CannotRoute, InsufficientFunds} import org.bouncycastle.util.encoders.Hex import scala.util.{Failure, Success, Try} +import scala.concurrent.duration._ /** * Created by PM on 20/08/2015. @@ -138,7 +139,7 @@ final case class CMD_SEND_HTLC_TIMEDOUT(h: sha256_hash) extends Command case object CMD_GETSTATE extends Command case object CMD_GETSTATEDATA extends Command case object CMD_GETINFO extends Command -final case class RES_GETINFO(name: String, nodeid: BinaryData, channelid: BinaryData, state: State, data: Data) +final case class RES_GETINFO(nodeid: BinaryData, channelid: BinaryData, state: State, data: Data) /* 8888888b. d8888 88888888888 d8888 @@ -436,7 +437,7 @@ class Channel(val them: ActorRef, val blockchain: ActorRef, val params: OurChann when(OPEN_WAIT_FOR_COMPLETE_THEIRANCHOR) { case Event(open_complete(blockid_opt), d: DATA_NORMAL) => - create_alias(theirNodeId, d.commitment.anchorId) + Register.create_alias(theirNodeId, d.commitment.anchorId) goto(NORMAL_LOWPRIO) using d case Event(pkt: close_channel, d: CurrentCommitment) => @@ -463,7 +464,7 @@ class Channel(val them: ActorRef, val blockchain: ActorRef, val params: OurChann when(OPEN_WAIT_FOR_COMPLETE_OURANCHOR) { case Event(open_complete(blockid_opt), d: DATA_NORMAL) => - create_alias(theirNodeId, d.commitment.anchorId) + Register.create_alias(theirNodeId, d.commitment.anchorId) goto(NORMAL_HIGHPRIO) using d case Event(pkt: close_channel, d: CurrentCommitment) => @@ -575,10 +576,16 @@ class Channel(val them: ActorRef, val blockchain: ActorRef, val params: OurChann commitment.state.them.htlcs_received.find(_.rHash == bin2sha256(Crypto.sha256(r))) .map(htlc => htlc.previousChannelId match { case Some(previousChannelId) => - val downstreamActorPath = s"*/register/handler-*/channel/*-$previousChannelId" - val downstream = Boot.system.actorSelection(downstreamActorPath) - log.info(s"forwarding r value to downstream=$downstream using path=$downstreamActorPath") - downstream ! CMD_SEND_HTLC_FULFILL(r) + log.info(s"resolving channelId=$previousChannelId") + Boot.system.actorSelection(Register.actorPathToChannelId(previousChannelId)) + .resolveOne(3 seconds) + .onComplete { + case Success(downstream) => + log.info(s"forwarding r value to downstream=$downstream") + downstream ! CMD_SEND_HTLC_FULFILL(r) + case Failure(t: Throwable) => + log.warning(s"couldn't resolve downstream node, htlc will timeout", t) + } case None => log.info(s"looks like I was the origin payer for htlc $htlc") }) @@ -714,11 +721,17 @@ class Channel(val them: ActorRef, val blockchain: ActorRef, val params: OurChann newCommitment.state.us.htlcs_received.lastOption .map(htlc => htlc.nextNodeIds.headOption match { case Some(nextNodeId) => - val upstreamActorPath = s"*/register/handler-*/channel/$nextNodeId-*" - val upstream = Boot.system.actorSelection(upstreamActorPath) - log.info(s"forwarding htlc to upstream=$upstream using path=$upstreamActorPath") - //FIXME : what if the next channel is in LOW state ? it will ignore CMD_SEND_HTLC_UPDATE msg - upstream ! CMD_SEND_HTLC_UPDATE(htlc.amountMsat, htlc.rHash, htlc.expiry, htlc.nextNodeIds.drop(1), originChannelId = Some(previousCommitment.anchorId)) + log.info(s"resolving next nodeId=$nextNodeId") + Boot.system.actorSelection(Register.actorPathToNodeId(nextNodeId)) + .resolveOne(3 seconds) + .onComplete { + case Success(upstream) => + log.info(s"forwarding htlc to upstream=$upstream") + //FIXME : what if the next channel is in LOW state ? will it ignore CMD_SEND_HTLC_UPDATE msg ? + upstream ! CMD_SEND_HTLC_UPDATE(htlc.amountMsat, htlc.rHash, htlc.expiry, htlc.nextNodeIds.drop(1), originChannelId = Some(previousCommitment.anchorId)) + case Failure(t: Throwable) => + log.warning(s"couldn't resolve upstream node, htlc will timeout", t) + } case None => log.info(s"looks like I was the final payee for htlc $htlc") }) @@ -948,13 +961,10 @@ class Channel(val them: ActorRef, val blockchain: ActorRef, val params: OurChann stay case Event(CMD_GETINFO, _) => - //TODO - val r = """/user/register/handler-(\d+)/channel""".r - val r(id) = self.path.toStringWithoutAddress - sender ! RES_GETINFO(id, theirNodeId, stateData match { + sender ! RES_GETINFO(theirNodeId, stateData match { case c: CurrentCommitment => c.commitment.anchorId case _ => "unknown" - } ,stateName, stateData) + }, stateName, stateData) stay // TODO : them ! error(Some("Unexpected message")) ? @@ -972,13 +982,6 @@ class Channel(val them: ActorRef, val blockchain: ActorRef, val params: OurChann 888 888 8888888888 88888888 888 8888888888 888 T88b "Y8888P" */ - /** - * Once it reaches NORMAL state, channel creates a [[fr.acinq.eclair.channel.AliasActor]] which name is counterparty_id-anchor_id - */ - def create_alias(node_id: BinaryData, anchor_id: BinaryData) { - context.actorOf(Props(new AliasActor(self)), name = s"$node_id-$anchor_id") - } - /** * Something went wrong, we publish the current commitment transaction */ diff --git a/eclair-demo/src/main/scala/fr/acinq/eclair/channel/Register.scala b/eclair-demo/src/main/scala/fr/acinq/eclair/channel/Register.scala new file mode 100644 index 000000000..793c6727d --- /dev/null +++ b/eclair-demo/src/main/scala/fr/acinq/eclair/channel/Register.scala @@ -0,0 +1,67 @@ +package fr.acinq.eclair.channel + +import akka.actor._ +import fr.acinq.bitcoin.{BinaryData, DeterministicWallet} +import fr.acinq.eclair.io.AuthHandler +import fr.acinq.eclair.{Boot, Globals} + +/** + * Created by PM on 26/01/2016. + */ + +/** + * Actor hierarchy: + * system + * ├── blockchain + * ├── register + * │ ├── handler-0 + * │ │ └── channel + * │ │ └── remote_node_id-anchor_id (alias to parent) + * │ ... + * │ └── handler-n + * │ └── channel + * │ └── remote_node_id-anchor_id (alias to parent) + * ├── server + * ├── client (0..m, transient) + * └── api + */ +class Register extends Actor with ActorLogging { + + import Register._ + + def receive: Receive = main(0L) + + def main(counter: Long): Receive = { + case CreateChannel(connection, amount) => + val commit_priv = DeterministicWallet.derivePrivateKey(Globals.Node.extendedPrivateKey, 0L :: counter :: Nil) + val final_priv = DeterministicWallet.derivePrivateKey(Globals.Node.extendedPrivateKey, 1L :: counter :: Nil) + val params = OurChannelParams(Globals.default_locktime, commit_priv.secretkey :+ 1.toByte, final_priv.secretkey :+ 1.toByte, Globals.default_mindepth, Globals.commit_fee, "sha-seed".getBytes(), amount) + context.actorOf(Props(classOf[AuthHandler], connection, Boot.blockchain, params), name = s"handler-${counter}") + context.become(main(counter + 1)) + case ListChannels => sender ! context.children + } +} + +object Register { + + // @formatter:off + case class CreateChannel(connection: ActorRef, anchorAmount: Option[Long]) + case class ListChannels() + // @formatter:on + + /** + * Once it reaches NORMAL state, channel creates a [[fr.acinq.eclair.channel.AliasActor]] + * which name is counterparty_id-anchor_id + */ + def create_alias(node_id: BinaryData, anchor_id: BinaryData)(implicit context: ActorContext) = + context.actorOf(Props(new AliasActor(context.self)), name = s"$node_id-$anchor_id") + + def actorPathToNodeId(nodeId: BinaryData): ActorPath = + Boot.system / "register" / "handler-*" / "channel" / s"${nodeId}-*" + + def actorPathToChannelId(channelId: BinaryData): ActorPath = + Boot.system / "register" / "handler-*" / "channel" / s"*-${channelId}" + + def actorPathToChannels(): ActorPath = + Boot.system / "register" / "handler-*" / "channel" +} \ No newline at end of file diff --git a/eclair-demo/src/main/scala/fr/acinq/eclair/io/AuthHandler.scala b/eclair-demo/src/main/scala/fr/acinq/eclair/io/AuthHandler.scala index 4a884043d..e3b6cf3c7 100644 --- a/eclair-demo/src/main/scala/fr/acinq/eclair/io/AuthHandler.scala +++ b/eclair-demo/src/main/scala/fr/acinq/eclair/io/AuthHandler.scala @@ -128,10 +128,6 @@ class AuthHandler(them: ActorRef, blockchain: ActorRef, our_params: OurChannelPa them ! Write(ByteString.fromArray(data)) stay using n.copy(sessionData = s.copy(totlen_out = new_totlen_out)) - case Event(msg: RegisterActor.UpdateState, _) => - context.parent forward msg - stay - case Event(cmd: Command, n@Normal(channel, _)) => channel forward cmd stay diff --git a/eclair-demo/src/main/scala/fr/acinq/eclair/io/Client.scala b/eclair-demo/src/main/scala/fr/acinq/eclair/io/Client.scala index fdfa1d7a4..86e2c6b15 100644 --- a/eclair-demo/src/main/scala/fr/acinq/eclair/io/Client.scala +++ b/eclair-demo/src/main/scala/fr/acinq/eclair/io/Client.scala @@ -4,8 +4,8 @@ import java.net.InetSocketAddress import akka.actor._ import akka.io.{IO, Tcp} -import fr.acinq.eclair.RegisterActor.CreateChannel -import fr.acinq.eclair.{Globals, Boot} +import fr.acinq.eclair.Boot +import fr.acinq.eclair.channel.Register.CreateChannel /** * Created by PM on 27/10/2015. diff --git a/eclair-demo/src/main/scala/fr/acinq/eclair/io/Server.scala b/eclair-demo/src/main/scala/fr/acinq/eclair/io/Server.scala index 900b021d0..0878ec29a 100644 --- a/eclair-demo/src/main/scala/fr/acinq/eclair/io/Server.scala +++ b/eclair-demo/src/main/scala/fr/acinq/eclair/io/Server.scala @@ -6,7 +6,7 @@ import akka.actor.{Actor, ActorLogging, ActorSystem, Props} import akka.io.{IO, Tcp} import com.typesafe.config.ConfigFactory import fr.acinq.eclair.Boot -import fr.acinq.eclair.RegisterActor.CreateChannel +import fr.acinq.eclair.channel.Register.CreateChannel /** * Created by PM on 27/10/2015.