1
0
Fork 0
mirror of https://github.com/ACINQ/eclair.git synced 2025-02-23 14:40:34 +01:00

put path logic in register actor and use resolveOne

This commit is contained in:
pm47 2016-03-20 15:44:29 +01:00
parent 5ef8de485f
commit 5b4f8b6b29
9 changed files with 113 additions and 115 deletions

View file

@ -9,7 +9,7 @@
</appender>
<logger name="fr.acinq.eclair.channel" level="DEBUG" />
<logger name="fr.acinq.eclair.RegisterActor" level="DEBUG" />
<logger name="fr.acinq.eclair.channel.Register" level="DEBUG" />
<root level="INFO">
<appender-ref ref="CONSOLE"/>

View file

@ -2,16 +2,18 @@ package fr.acinq.eclair
import java.net.InetSocketAddress
import akka.actor.{ActorRef, Props, ActorSystem}
import akka.actor.{ActorRef, ActorSystem, Props}
import akka.io.IO
import akka.util.Timeout
import com.typesafe.config.ConfigFactory
import fr.acinq.eclair.api.ServiceActor
import fr.acinq.eclair.blockchain.PollingWatcher
import fr.acinq.eclair.channel.Register
import fr.acinq.eclair.io.{Client, Server}
import grizzled.slf4j.Logging
import spray.can.Http
import scala.concurrent.{ExecutionContext, Await}
import scala.concurrent.{Await, ExecutionContext}
import scala.concurrent.duration._
import Globals._
@ -33,7 +35,7 @@ object Boot extends App with Logging {
assert(chain == "testnet" || chain == "regtest", "you should be on testnet or regtest")
val blockchain = system.actorOf(Props(new PollingWatcher(bitcoin_client)), name = "blockchain")
val register = system.actorOf(Props[RegisterActor], name = "register")
val register = system.actorOf(Props[Register], name = "register")
val server = system.actorOf(Server.props(config.getString("eclair.server.address"), config.getInt("eclair.server.port")), "server")
val api = system.actorOf(Props(new ServiceActor {

View file

@ -1,69 +0,0 @@
package fr.acinq.eclair
import akka.actor._
import akka.util.Timeout
import fr.acinq.bitcoin.{BinaryData, DeterministicWallet}
import fr.acinq.eclair.channel._
import fr.acinq.eclair.io.AuthHandler
import akka.pattern.ask
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
object RegisterActor {
// @formatter:off
case class CreateChannel(connection: ActorRef, anchorAmount: Option[Long])
case class GetChannels()
case class Entry(nodeId: String, channelId: String, channel: ActorRef, state: ChannelState)
case class UpdateState(state: ChannelState)
// @formatter:on
}
/**
* Created by PM on 26/01/2016.
*/
/**
* Actor hierarchy:
* system
* ├── blockchain
* ├── register
* ├── handler-0
* └── channel
* └── remote_node_id-anchor_id (alias to parent)
* ...
* └── handler-n
* └── channel
* └── remote_node_id-anchor_id (alias to parent)
* ├── server
* ├── client (0..m, transient)
* └── api
*/
class RegisterActor extends Actor with ActorLogging {
import RegisterActor._
implicit val timeout = Timeout(30 seconds)
import ExecutionContext.Implicits.global
def receive: Receive = main(0L)
def main(counter: Long): Receive = {
case CreateChannel(connection, amount) =>
val commit_priv = DeterministicWallet.derivePrivateKey(Globals.Node.extendedPrivateKey, 0L :: counter :: Nil)
val final_priv = DeterministicWallet.derivePrivateKey(Globals.Node.extendedPrivateKey, 1L :: counter :: Nil)
val params = OurChannelParams(Globals.default_locktime, commit_priv.secretkey :+ 1.toByte, final_priv.secretkey :+ 1.toByte, Globals.default_mindepth, Globals.commit_fee, "sha-seed".getBytes(), amount)
context.actorOf(Props(classOf[AuthHandler], connection, Boot.blockchain, params), name = s"handler-${counter}")
context.become(main(counter + 1))
case GetChannels =>
val s = sender()
Future.sequence(context.children.map(c => c ? CMD_GETINFO)).map(s ! _)
}
override def unhandled(message: Any): Unit = {
log.warning(s"unhandled message $message")
super.unhandled(message)
}
}

View file

@ -5,14 +5,13 @@ import java.net.InetSocketAddress
import akka.actor.{Actor, ActorRef}
import akka.util.Timeout
import fr.acinq.bitcoin.BinaryData
import fr.acinq.eclair.RegisterActor.GetChannels
import fr.acinq.eclair._
import fr.acinq.eclair.channel._
import fr.acinq.eclair.{Boot, channel}
import fr.acinq.eclair.Boot
import grizzled.slf4j.Logging
import lightning.locktime
import lightning.locktime.Locktime.Seconds
import org.json4s.JsonAST.{JBool, JDouble, JObject, JString}
import org.json4s.JsonAST.JString
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.json4s.jackson.Serialization
@ -23,6 +22,7 @@ import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import scala.util.{Failure, Success}
import akka.pattern.ask
import fr.acinq.eclair.channel.Register.ListChannels
/**
* Created by PM on 25/01/2016.
@ -57,8 +57,8 @@ trait Service extends HttpService with Logging {
def connect(addr: InetSocketAddress, amount: Long): Unit // amount in satoshis
def register: ActorRef
def sendCommand(channel: String, cmd: Command): Future[String] = {
Boot.system.actorSelection(s"/user/register/handler-$channel/channel").resolveOne().map(actor => {
def sendCommand(channel_id: String, cmd: Command): Future[String] = {
Boot.system.actorSelection(Register.actorPathToChannelId(channel_id)).resolveOne().map(actor => {
actor ! cmd
"ok"
})
@ -75,14 +75,13 @@ trait Service extends HttpService with Logging {
connect(new InetSocketAddress(host, port.toInt), anchor_amount.toLong)
Future.successful("")
case JsonRPCBody(_, _, "list", _) =>
(register ? GetChannels).mapTo[Iterable[RES_GETINFO]]
case JsonRPCBody(_, _, "addhtlc", JString(channel) :: JInt(amount) :: JString(rhash) :: JInt(expiry) :: Nil) =>
sendCommand(channel, CMD_SEND_HTLC_UPDATE(amount.toInt, BinaryData(rhash), locktime(Seconds(expiry.toInt))))
case JsonRPCBody(_, _, "addhtlc_r", JInt(amount) :: JString(rhash) :: JInt(expiry) :: tail) =>
val nodeIds = tail.toSeq.map {
(register ? ListChannels).mapTo[Iterable[ActorRef]]
.flatMap(l => Future.sequence(l.map(c => c ? CMD_GETINFO)))
case JsonRPCBody(_, _, "addhtlc", JInt(amount) :: JString(rhash) :: JInt(expiry) :: tail) =>
val nodeIds = tail.map {
case JString(nodeId) => nodeId
}
Boot.system.actorSelection(s"*/register/handler-*/channel/${nodeIds.head}-*")
Boot.system.actorSelection(Register.actorPathToNodeId(nodeIds.head))
.resolveOne(2 seconds)
.map { channel =>
channel ! CMD_SEND_HTLC_UPDATE(amount.toInt, BinaryData(rhash), locktime(Seconds(expiry.toInt)), nodeIds.drop(1))

View file

@ -13,6 +13,7 @@ import lightning.update_decline_htlc.Reason.{CannotRoute, InsufficientFunds}
import org.bouncycastle.util.encoders.Hex
import scala.util.{Failure, Success, Try}
import scala.concurrent.duration._
/**
* Created by PM on 20/08/2015.
@ -138,7 +139,7 @@ final case class CMD_SEND_HTLC_TIMEDOUT(h: sha256_hash) extends Command
case object CMD_GETSTATE extends Command
case object CMD_GETSTATEDATA extends Command
case object CMD_GETINFO extends Command
final case class RES_GETINFO(name: String, nodeid: BinaryData, channelid: BinaryData, state: State, data: Data)
final case class RES_GETINFO(nodeid: BinaryData, channelid: BinaryData, state: State, data: Data)
/*
8888888b. d8888 88888888888 d8888
@ -436,7 +437,7 @@ class Channel(val them: ActorRef, val blockchain: ActorRef, val params: OurChann
when(OPEN_WAIT_FOR_COMPLETE_THEIRANCHOR) {
case Event(open_complete(blockid_opt), d: DATA_NORMAL) =>
create_alias(theirNodeId, d.commitment.anchorId)
Register.create_alias(theirNodeId, d.commitment.anchorId)
goto(NORMAL_LOWPRIO) using d
case Event(pkt: close_channel, d: CurrentCommitment) =>
@ -463,7 +464,7 @@ class Channel(val them: ActorRef, val blockchain: ActorRef, val params: OurChann
when(OPEN_WAIT_FOR_COMPLETE_OURANCHOR) {
case Event(open_complete(blockid_opt), d: DATA_NORMAL) =>
create_alias(theirNodeId, d.commitment.anchorId)
Register.create_alias(theirNodeId, d.commitment.anchorId)
goto(NORMAL_HIGHPRIO) using d
case Event(pkt: close_channel, d: CurrentCommitment) =>
@ -575,10 +576,16 @@ class Channel(val them: ActorRef, val blockchain: ActorRef, val params: OurChann
commitment.state.them.htlcs_received.find(_.rHash == bin2sha256(Crypto.sha256(r)))
.map(htlc => htlc.previousChannelId match {
case Some(previousChannelId) =>
val downstreamActorPath = s"*/register/handler-*/channel/*-$previousChannelId"
val downstream = Boot.system.actorSelection(downstreamActorPath)
log.info(s"forwarding r value to downstream=$downstream using path=$downstreamActorPath")
downstream ! CMD_SEND_HTLC_FULFILL(r)
log.info(s"resolving channelId=$previousChannelId")
Boot.system.actorSelection(Register.actorPathToChannelId(previousChannelId))
.resolveOne(3 seconds)
.onComplete {
case Success(downstream) =>
log.info(s"forwarding r value to downstream=$downstream")
downstream ! CMD_SEND_HTLC_FULFILL(r)
case Failure(t: Throwable) =>
log.warning(s"couldn't resolve downstream node, htlc will timeout", t)
}
case None =>
log.info(s"looks like I was the origin payer for htlc $htlc")
})
@ -714,11 +721,17 @@ class Channel(val them: ActorRef, val blockchain: ActorRef, val params: OurChann
newCommitment.state.us.htlcs_received.lastOption
.map(htlc => htlc.nextNodeIds.headOption match {
case Some(nextNodeId) =>
val upstreamActorPath = s"*/register/handler-*/channel/$nextNodeId-*"
val upstream = Boot.system.actorSelection(upstreamActorPath)
log.info(s"forwarding htlc to upstream=$upstream using path=$upstreamActorPath")
//FIXME : what if the next channel is in LOW state ? it will ignore CMD_SEND_HTLC_UPDATE msg
upstream ! CMD_SEND_HTLC_UPDATE(htlc.amountMsat, htlc.rHash, htlc.expiry, htlc.nextNodeIds.drop(1), originChannelId = Some(previousCommitment.anchorId))
log.info(s"resolving next nodeId=$nextNodeId")
Boot.system.actorSelection(Register.actorPathToNodeId(nextNodeId))
.resolveOne(3 seconds)
.onComplete {
case Success(upstream) =>
log.info(s"forwarding htlc to upstream=$upstream")
//FIXME : what if the next channel is in LOW state ? will it ignore CMD_SEND_HTLC_UPDATE msg ?
upstream ! CMD_SEND_HTLC_UPDATE(htlc.amountMsat, htlc.rHash, htlc.expiry, htlc.nextNodeIds.drop(1), originChannelId = Some(previousCommitment.anchorId))
case Failure(t: Throwable) =>
log.warning(s"couldn't resolve upstream node, htlc will timeout", t)
}
case None =>
log.info(s"looks like I was the final payee for htlc $htlc")
})
@ -948,13 +961,10 @@ class Channel(val them: ActorRef, val blockchain: ActorRef, val params: OurChann
stay
case Event(CMD_GETINFO, _) =>
//TODO
val r = """/user/register/handler-(\d+)/channel""".r
val r(id) = self.path.toStringWithoutAddress
sender ! RES_GETINFO(id, theirNodeId, stateData match {
sender ! RES_GETINFO(theirNodeId, stateData match {
case c: CurrentCommitment => c.commitment.anchorId
case _ => "unknown"
} ,stateName, stateData)
}, stateName, stateData)
stay
// TODO : them ! error(Some("Unexpected message")) ?
@ -972,13 +982,6 @@ class Channel(val them: ActorRef, val blockchain: ActorRef, val params: OurChann
888 888 8888888888 88888888 888 8888888888 888 T88b "Y8888P"
*/
/**
* Once it reaches NORMAL state, channel creates a [[fr.acinq.eclair.channel.AliasActor]] which name is counterparty_id-anchor_id
*/
def create_alias(node_id: BinaryData, anchor_id: BinaryData) {
context.actorOf(Props(new AliasActor(self)), name = s"$node_id-$anchor_id")
}
/**
* Something went wrong, we publish the current commitment transaction
*/

View file

@ -0,0 +1,67 @@
package fr.acinq.eclair.channel
import akka.actor._
import fr.acinq.bitcoin.{BinaryData, DeterministicWallet}
import fr.acinq.eclair.io.AuthHandler
import fr.acinq.eclair.{Boot, Globals}
/**
* Created by PM on 26/01/2016.
*/
/**
* Actor hierarchy:
* system
* ├── blockchain
* ├── register
* ├── handler-0
* └── channel
* └── remote_node_id-anchor_id (alias to parent)
* ...
* └── handler-n
* └── channel
* └── remote_node_id-anchor_id (alias to parent)
* ├── server
* ├── client (0..m, transient)
* └── api
*/
class Register extends Actor with ActorLogging {
import Register._
def receive: Receive = main(0L)
def main(counter: Long): Receive = {
case CreateChannel(connection, amount) =>
val commit_priv = DeterministicWallet.derivePrivateKey(Globals.Node.extendedPrivateKey, 0L :: counter :: Nil)
val final_priv = DeterministicWallet.derivePrivateKey(Globals.Node.extendedPrivateKey, 1L :: counter :: Nil)
val params = OurChannelParams(Globals.default_locktime, commit_priv.secretkey :+ 1.toByte, final_priv.secretkey :+ 1.toByte, Globals.default_mindepth, Globals.commit_fee, "sha-seed".getBytes(), amount)
context.actorOf(Props(classOf[AuthHandler], connection, Boot.blockchain, params), name = s"handler-${counter}")
context.become(main(counter + 1))
case ListChannels => sender ! context.children
}
}
object Register {
// @formatter:off
case class CreateChannel(connection: ActorRef, anchorAmount: Option[Long])
case class ListChannels()
// @formatter:on
/**
* Once it reaches NORMAL state, channel creates a [[fr.acinq.eclair.channel.AliasActor]]
* which name is counterparty_id-anchor_id
*/
def create_alias(node_id: BinaryData, anchor_id: BinaryData)(implicit context: ActorContext) =
context.actorOf(Props(new AliasActor(context.self)), name = s"$node_id-$anchor_id")
def actorPathToNodeId(nodeId: BinaryData): ActorPath =
Boot.system / "register" / "handler-*" / "channel" / s"${nodeId}-*"
def actorPathToChannelId(channelId: BinaryData): ActorPath =
Boot.system / "register" / "handler-*" / "channel" / s"*-${channelId}"
def actorPathToChannels(): ActorPath =
Boot.system / "register" / "handler-*" / "channel"
}

View file

@ -128,10 +128,6 @@ class AuthHandler(them: ActorRef, blockchain: ActorRef, our_params: OurChannelPa
them ! Write(ByteString.fromArray(data))
stay using n.copy(sessionData = s.copy(totlen_out = new_totlen_out))
case Event(msg: RegisterActor.UpdateState, _) =>
context.parent forward msg
stay
case Event(cmd: Command, n@Normal(channel, _)) =>
channel forward cmd
stay

View file

@ -4,8 +4,8 @@ import java.net.InetSocketAddress
import akka.actor._
import akka.io.{IO, Tcp}
import fr.acinq.eclair.RegisterActor.CreateChannel
import fr.acinq.eclair.{Globals, Boot}
import fr.acinq.eclair.Boot
import fr.acinq.eclair.channel.Register.CreateChannel
/**
* Created by PM on 27/10/2015.

View file

@ -6,7 +6,7 @@ import akka.actor.{Actor, ActorLogging, ActorSystem, Props}
import akka.io.{IO, Tcp}
import com.typesafe.config.ConfigFactory
import fr.acinq.eclair.Boot
import fr.acinq.eclair.RegisterActor.CreateChannel
import fr.acinq.eclair.channel.Register.CreateChannel
/**
* Created by PM on 27/10/2015.