1
0
mirror of https://github.com/ACINQ/eclair.git synced 2025-01-19 05:33:59 +01:00

added a supervision strategy to all actors (#31)

This commit is contained in:
Pierre-Marie Padiou 2017-03-17 18:00:43 +01:00 committed by Fabrice Drouin
parent 6797f5e3e9
commit f7e8d531fa
10 changed files with 72 additions and 29 deletions

View File

@ -4,7 +4,7 @@ import java.io.File
import java.net.InetSocketAddress
import javafx.application.Platform
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import akka.actor.{Actor, ActorRef, ActorSystem, Props, SupervisorStrategy}
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import akka.util.Timeout
@ -104,18 +104,18 @@ class Setup(datadir: String) extends Logging {
}))
val fatalEventFuture = fatalEventPromise.future
val peer = system.actorOf(PeerClient.props(config.getConfig("bitcoind")), "bitcoin-peer")
val watcher = system.actorOf(PeerWatcher.props(nodeParams, bitcoin_client), name = "watcher")
val paymentHandler = config.getString("payment-handler") match {
case "local" => system.actorOf(Props[LocalPaymentHandler], name = "payment-handler")
case "noop" => system.actorOf(Props[NoopPaymentHandler], name = "payment-handler")
}
val register = system.actorOf(Props(new Register), name = "register")
val relayer = system.actorOf(Relayer.props(nodeParams.privateKey, paymentHandler), name = "relayer")
val router = system.actorOf(Router.props(nodeParams, watcher), name = "router")
val switchboard = system.actorOf(Switchboard.props(nodeParams, watcher, router, relayer, finalScriptPubKey), name = "switchboard")
val paymentInitiator = system.actorOf(PaymentInitiator.props(nodeParams.privateKey.publicKey, router, register), "payment-initiator")
val server = system.actorOf(Server.props(nodeParams, switchboard, new InetSocketAddress(config.getString("server.binding-ip"), config.getInt("server.port"))), "server")
val peer = system.actorOf(SimpleSupervisor.props(PeerClient.props(config.getConfig("bitcoind")), "bitcoin-peer", SupervisorStrategy.Restart))
val watcher = system.actorOf(SimpleSupervisor.props(PeerWatcher.props(nodeParams, bitcoin_client), "watcher", SupervisorStrategy.Resume))
val paymentHandler = system.actorOf(SimpleSupervisor.props(config.getString("payment-handler") match {
case "local" => Props[LocalPaymentHandler]
case "noop" => Props[NoopPaymentHandler]
}, "payment-handler", SupervisorStrategy.Resume))
val register = system.actorOf(SimpleSupervisor.props(Props(new Register), "register", SupervisorStrategy.Resume))
val relayer = system.actorOf(SimpleSupervisor.props(Relayer.props(nodeParams.privateKey, paymentHandler), "relayer", SupervisorStrategy.Resume))
val router = system.actorOf(SimpleSupervisor.props(Router.props(nodeParams, watcher), "router", SupervisorStrategy.Resume))
val switchboard = system.actorOf(SimpleSupervisor.props(Switchboard.props(nodeParams, watcher, router, relayer, finalScriptPubKey), "switchboard", SupervisorStrategy.Resume))
val paymentInitiator = system.actorOf(SimpleSupervisor.props(PaymentInitiator.props(nodeParams.privateKey.publicKey, router, register), "payment-initiator", SupervisorStrategy.Restart))
val server = system.actorOf(SimpleSupervisor.props(Server.props(nodeParams, switchboard, new InetSocketAddress(config.getString("server.binding-ip"), config.getInt("server.port"))), "server", SupervisorStrategy.Restart))
val _setup = this
val api = new Service {

View File

@ -0,0 +1,26 @@
package fr.acinq.eclair
import akka.actor.{Actor, ActorLogging, OneForOneStrategy, Props, SupervisorStrategy}
/**
* This supervisor will supervise a single child actor using the provided [[SupervisorStrategy.Directive]]
* All incoming messages will be forwarded to the child actor.
*
* Created by PM on 17/03/2017.
*/
class SimpleSupervisor(childProps: Props, childName: String, strategy: SupervisorStrategy.Directive) extends Actor with ActorLogging {
val child = context.actorOf(childProps, childName)
override def receive: Receive = {
case msg => child forward msg
}
override val supervisorStrategy = OneForOneStrategy(loggingEnabled = true) { case _ => strategy }
}
object SimpleSupervisor {
def props(childProps: Props, childName: String, strategy: SupervisorStrategy.Directive) = Props(new SimpleSupervisor(childProps, childName, strategy))
}

View File

@ -1,6 +1,6 @@
package fr.acinq.eclair.channel
import akka.actor.{ActorRef, FSM, LoggingFSM, Props}
import akka.actor.{ActorRef, FSM, LoggingFSM, OneForOneStrategy, Props, SupervisorStrategy}
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.bitcoin._
import fr.acinq.eclair._
@ -1147,6 +1147,9 @@ class Channel(nodeParams: NodeParams, remoteNodeId: PublicKey, blockchain: Actor
}
}
// we let the peer decide what to do
override val supervisorStrategy = OneForOneStrategy(loggingEnabled = true) { case _ => SupervisorStrategy.Escalate }
}

View File

@ -57,12 +57,8 @@ class Register extends Actor with ActorLogging {
}
object Register {
def actorPathToPeers()(implicit context: ActorContext): ActorPath =
context.system / "switchboard" / "peer-*"
// @formatter:off
case class Forward(channelId: BinaryData, message: Any)
case class ForwardShortId(shortChannelId: Long, message: Any)
// @formatter:on
}

View File

@ -9,9 +9,9 @@ import javafx.scene.image.Image
import javafx.scene.{Parent, Scene}
import javafx.stage.{Popup, Screen, Stage, WindowEvent}
import akka.actor.Props
import akka.actor.{Props, SupervisorStrategy}
import akka.stream.StreamTcpException
import fr.acinq.eclair.Setup
import fr.acinq.eclair.{Setup, SimpleSupervisor}
import fr.acinq.eclair.channel.ChannelEvent
import fr.acinq.eclair.gui.controllers.{MainController, NotificationsController}
import fr.acinq.eclair.router.NetworkEvent
@ -40,7 +40,7 @@ class FxApp extends Application with Logging {
handlers = Option(new Handlers(setup.get))
val controller = new MainController(handlers.get, setup.get, getHostServices)
val guiUpdater = setup.get.system.actorOf(Props(classOf[GUIUpdater], controller, setup.get), "gui-updater")
val guiUpdater = setup.get.system.actorOf(SimpleSupervisor.props(Props(classOf[GUIUpdater], controller, setup.get), "gui-updater", SupervisorStrategy.Resume))
setup.get.system.eventStream.subscribe(guiUpdater, classOf[ChannelEvent])
setup.get.system.eventStream.subscribe(guiUpdater, classOf[NetworkEvent])

View File

@ -49,6 +49,9 @@ class Client(nodeParams: NodeParams, switchboard: ActorRef, address: InetSocketA
origin ! "connected"
switchboard ! h
}
// we should not restart a failing transport
override val supervisorStrategy = OneForOneStrategy(loggingEnabled = true) { case _ => SupervisorStrategy.Stop }
}
object Client extends App {

View File

@ -2,12 +2,13 @@ package fr.acinq.eclair.io
import java.net.InetSocketAddress
import akka.actor.{ActorRef, LoggingFSM, PoisonPill, Props, Terminated}
import akka.actor.{ActorRef, LoggingFSM, OneForOneStrategy, PoisonPill, Props, SupervisorStrategy, Terminated}
import fr.acinq.bitcoin.Crypto.{PrivateKey, PublicKey}
import fr.acinq.bitcoin.{BinaryData, Crypto, DeterministicWallet}
import fr.acinq.eclair.channel._
import fr.acinq.eclair.crypto.TransportHandler.{HandshakeCompleted, Listener}
import fr.acinq.eclair.io.Switchboard.{NewChannel, NewConnection}
import fr.acinq.eclair.router.Router.Rebroadcast
import fr.acinq.eclair.router.SendRoutingState
import fr.acinq.eclair.wire._
import fr.acinq.eclair.{Features, Globals, NodeParams}
@ -151,8 +152,8 @@ class Peer(nodeParams: NodeParams, remoteNodeId: PublicKey, address_opt: Option[
channel ! msg
stay using d.copy(channels = channels + (temporaryChannelId -> channel))
case Event(msg: RoutingMessage, ConnectedData(transport, _, _)) if sender == router =>
transport forward msg
case Event(Rebroadcast(announcements), ConnectedData(transport, _, _)) =>
announcements.foreach(transport forward _)
stay
case Event(msg: RoutingMessage, _) =>
@ -189,6 +190,9 @@ class Peer(nodeParams: NodeParams, remoteNodeId: PublicKey, address_opt: Option[
channel
}
// a failing channel won't be restarted, it should handle its states
override val supervisorStrategy = OneForOneStrategy(loggingEnabled = true) { case _ => SupervisorStrategy.Stop }
}
object Peer {

View File

@ -2,7 +2,7 @@ package fr.acinq.eclair.io
import java.net.InetSocketAddress
import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import akka.actor.{Actor, ActorLogging, ActorRef, OneForOneStrategy, Props, SupervisorStrategy}
import akka.io.Tcp.SO.KeepAlive
import akka.io.{IO, Tcp}
import fr.acinq.eclair.crypto.Noise.KeyPair
@ -46,6 +46,9 @@ class Server(nodeParams: NodeParams, switchboard: ActorRef, address: InetSocketA
case h: HandshakeCompleted =>
switchboard ! h
}
// we should not restart a failing transport
override val supervisorStrategy = OneForOneStrategy(loggingEnabled = true) { case _ => SupervisorStrategy.Stop }
}
object Server {

View File

@ -2,12 +2,13 @@ package fr.acinq.eclair.io
import java.net.InetSocketAddress
import akka.actor.{Actor, ActorLogging, ActorRef, Props, Status, Terminated}
import akka.actor.{Actor, ActorLogging, ActorRef, OneForOneStrategy, Props, Status, SupervisorStrategy, Terminated}
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.bitcoin.{BinaryData, MilliSatoshi, Satoshi}
import fr.acinq.eclair.NodeParams
import fr.acinq.eclair.channel.HasCommitments
import fr.acinq.eclair.crypto.TransportHandler.HandshakeCompleted
import fr.acinq.eclair.router.Router.Rebroadcast
/**
* Ties network connections to peers.
@ -66,6 +67,8 @@ class Switchboard(nodeParams: NodeParams, watcher: ActorRef, router: ActorRef, r
peer forward h
context become main(peers + (remoteNodeId -> peer), connections)
case r: Rebroadcast => peers.values.foreach(_ forward r)
case 'peers =>
sender ! peers.keys
@ -80,6 +83,9 @@ class Switchboard(nodeParams: NodeParams, watcher: ActorRef, router: ActorRef, r
peer
}
}
// we resume failing peers because they may have open channels that we don't want to close abruptly
override val supervisorStrategy = OneForOneStrategy(loggingEnabled = true) { case _ => SupervisorStrategy.Resume }
}
object Switchboard {

View File

@ -173,7 +173,7 @@ class Router(nodeParams: NodeParams, watcher: ActorRef) extends Actor with Actor
case 'tick_broadcast =>
log.info(s"broadcasting ${rebroadcast.size} routing messages")
rebroadcast.foreach(context.actorSelection(Register.actorPathToPeers) ! _)
context.actorSelection(context.system / "*" / "switchboard") ! Rebroadcast(rebroadcast)
context become main(nodes, channels, updates, Nil, awaiting, stash)
case 'nodes => sender ! nodes.values
@ -200,6 +200,8 @@ object Router {
def channelUpdateKey(shortChannelId: Long, flags: BinaryData) = s"ann-update-$shortChannelId-$flags"
// @formatter:on
case class Rebroadcast(ann: Seq[RoutingMessage])
def props(nodeParams: NodeParams, watcher: ActorRef) = Props(new Router(nodeParams, watcher))
def getDesc(u: ChannelUpdate, channel: ChannelAnnouncement): ChannelDesc = {