1
0
mirror of https://github.com/ACINQ/eclair.git synced 2024-11-20 10:39:19 +01:00

Replace initialization futures Future[Boolean] by Future[Done] (#836)

Fixes #772.
This commit is contained in:
rorp 2019-01-29 01:25:11 -08:00 committed by Pierre-Marie Padiou
parent 3954e39bf8
commit a91cd631e8
4 changed files with 21 additions and 16 deletions

View File

@ -21,6 +21,7 @@ import java.net.InetSocketAddress
import java.sql.DriverManager import java.sql.DriverManager
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import akka.Done
import akka.actor.{ActorRef, ActorSystem, Props, SupervisorStrategy} import akka.actor.{ActorRef, ActorSystem, Props, SupervisorStrategy}
import akka.http.scaladsl.Http import akka.http.scaladsl.Http
import akka.pattern.after import akka.pattern.after
@ -155,11 +156,11 @@ class Setup(datadir: File,
def bootstrap: Future[Kit] = { def bootstrap: Future[Kit] = {
for { for {
_ <- Future.successful(true) _ <- Future.successful(true)
feeratesRetrieved = Promise[Boolean]() feeratesRetrieved = Promise[Done]()
zmqBlockConnected = Promise[Boolean]() zmqBlockConnected = Promise[Done]()
zmqTxConnected = Promise[Boolean]() zmqTxConnected = Promise[Done]()
tcpBound = Promise[Unit]() tcpBound = Promise[Done]()
routerInitialized = Promise[Unit]() routerInitialized = Promise[Done]()
defaultFeerates = FeeratesPerKB( defaultFeerates = FeeratesPerKB(
block_1 = config.getLong("default-feerates.delay-blocks.1"), block_1 = config.getLong("default-feerates.delay-blocks.1"),
@ -184,7 +185,7 @@ class Setup(datadir: File,
Globals.feeratesPerKw.set(FeeratesPerKw(feerates)) Globals.feeratesPerKw.set(FeeratesPerKw(feerates))
system.eventStream.publish(CurrentFeerates(Globals.feeratesPerKw.get)) system.eventStream.publish(CurrentFeerates(Globals.feeratesPerKw.get))
logger.info(s"current feeratesPerKB=${Globals.feeratesPerKB.get()} feeratesPerKw=${Globals.feeratesPerKw.get()}") logger.info(s"current feeratesPerKB=${Globals.feeratesPerKB.get()} feeratesPerKw=${Globals.feeratesPerKw.get()}")
feeratesRetrieved.trySuccess(true) feeratesRetrieved.trySuccess(Done)
}) })
_ <- feeratesRetrieved.future _ <- feeratesRetrieved.future
@ -194,8 +195,8 @@ class Setup(datadir: File,
system.actorOf(SimpleSupervisor.props(Props(new ZMQActor(config.getString("bitcoind.zmqtx"), Some(zmqTxConnected))), "zmqtx", SupervisorStrategy.Restart)) system.actorOf(SimpleSupervisor.props(Props(new ZMQActor(config.getString("bitcoind.zmqtx"), Some(zmqTxConnected))), "zmqtx", SupervisorStrategy.Restart))
system.actorOf(SimpleSupervisor.props(ZmqWatcher.props(new ExtendedBitcoinClient(new BatchingBitcoinJsonRPCClient(bitcoinClient))), "watcher", SupervisorStrategy.Resume)) system.actorOf(SimpleSupervisor.props(ZmqWatcher.props(new ExtendedBitcoinClient(new BatchingBitcoinJsonRPCClient(bitcoinClient))), "watcher", SupervisorStrategy.Resume))
case Electrum(electrumClient) => case Electrum(electrumClient) =>
zmqBlockConnected.success(true) zmqBlockConnected.success(Done)
zmqTxConnected.success(true) zmqTxConnected.success(Done)
system.actorOf(SimpleSupervisor.props(Props(new ElectrumWatcher(electrumClient)), "watcher", SupervisorStrategy.Resume)) system.actorOf(SimpleSupervisor.props(Props(new ElectrumWatcher(electrumClient)), "watcher", SupervisorStrategy.Resume))
} }

View File

@ -16,6 +16,7 @@
package fr.acinq.eclair.blockchain.bitcoind.zmq package fr.acinq.eclair.blockchain.bitcoind.zmq
import akka.Done
import akka.actor.{Actor, ActorLogging} import akka.actor.{Actor, ActorLogging}
import fr.acinq.bitcoin.{Block, Transaction} import fr.acinq.bitcoin.{Block, Transaction}
import fr.acinq.eclair.blockchain.{NewBlock, NewTransaction} import fr.acinq.eclair.blockchain.{NewBlock, NewTransaction}
@ -30,7 +31,7 @@ import scala.util.Try
/** /**
* Created by PM on 04/04/2017. * Created by PM on 04/04/2017.
*/ */
class ZMQActor(address: String, connected: Option[Promise[Boolean]] = None) extends Actor with ActorLogging { class ZMQActor(address: String, connected: Option[Promise[Done]] = None) extends Actor with ActorLogging {
import ZMQActor._ import ZMQActor._
@ -79,7 +80,7 @@ class ZMQActor(address: String, connected: Option[Promise[Boolean]] = None) exte
case event: Event => event.getEvent match { case event: Event => event.getEvent match {
case ZMQ.EVENT_CONNECTED => case ZMQ.EVENT_CONNECTED =>
log.info(s"connected to ${event.getAddress}") log.info(s"connected to ${event.getAddress}")
Try(connected.map(_.success(true))) Try(connected.map(_.success(Done)))
context.system.eventStream.publish(ZMQConnected) context.system.eventStream.publish(ZMQConnected)
case ZMQ.EVENT_DISCONNECTED => case ZMQ.EVENT_DISCONNECTED =>
log.warning(s"disconnected from ${event.getAddress}") log.warning(s"disconnected from ${event.getAddress}")

View File

@ -18,6 +18,7 @@ package fr.acinq.eclair.io
import java.net.InetSocketAddress import java.net.InetSocketAddress
import akka.Done
import akka.actor.{Actor, ActorLogging, ActorRef, OneForOneStrategy, Props, SupervisorStrategy} import akka.actor.{Actor, ActorLogging, ActorRef, OneForOneStrategy, Props, SupervisorStrategy}
import akka.io.Tcp.SO.KeepAlive import akka.io.Tcp.SO.KeepAlive
import akka.io.{IO, Tcp} import akka.io.{IO, Tcp}
@ -32,7 +33,7 @@ import scala.concurrent.Promise
/** /**
* Created by PM on 27/10/2015. * Created by PM on 27/10/2015.
*/ */
class Server(nodeParams: NodeParams, authenticator: ActorRef, address: InetSocketAddress, bound: Option[Promise[Unit]] = None) extends Actor with ActorLogging { class Server(nodeParams: NodeParams, authenticator: ActorRef, address: InetSocketAddress, bound: Option[Promise[Done]] = None) extends Actor with ActorLogging {
import Tcp._ import Tcp._
import context.system import context.system
@ -41,7 +42,7 @@ class Server(nodeParams: NodeParams, authenticator: ActorRef, address: InetSocke
def receive() = { def receive() = {
case Bound(localAddress) => case Bound(localAddress) =>
bound.map(_.success(Unit)) bound.map(_.success(Done))
log.info(s"bound on $localAddress") log.info(s"bound on $localAddress")
// Accept connections one by one // Accept connections one by one
sender() ! ResumeAccepting(batchSize = 1) sender() ! ResumeAccepting(batchSize = 1)
@ -65,7 +66,7 @@ class Server(nodeParams: NodeParams, authenticator: ActorRef, address: InetSocke
object Server { object Server {
def props(nodeParams: NodeParams, switchboard: ActorRef, address: InetSocketAddress, bound: Option[Promise[Unit]] = None): Props = Props(new Server(nodeParams, switchboard, address, bound)) def props(nodeParams: NodeParams, switchboard: ActorRef, address: InetSocketAddress, bound: Option[Promise[Done]] = None): Props = Props(new Server(nodeParams, switchboard, address, bound))
} }

View File

@ -16,6 +16,7 @@
package fr.acinq.eclair.router package fr.acinq.eclair.router
import akka.Done
import akka.actor.{ActorRef, Props, Status} import akka.actor.{ActorRef, Props, Status}
import akka.event.Logging.MDC import akka.event.Logging.MDC
import akka.pattern.pipe import akka.pattern.pipe
@ -33,6 +34,7 @@ import fr.acinq.eclair.router.Graph.GraphStructure.{DirectedGraph, GraphEdge}
import fr.acinq.eclair.router.Graph.WeightedPath import fr.acinq.eclair.router.Graph.WeightedPath
import fr.acinq.eclair.transactions.Scripts import fr.acinq.eclair.transactions.Scripts
import fr.acinq.eclair.wire._ import fr.acinq.eclair.wire._
import scala.collection.{SortedSet, mutable} import scala.collection.{SortedSet, mutable}
import scala.collection.immutable.{SortedMap, TreeMap} import scala.collection.immutable.{SortedMap, TreeMap}
import scala.compat.Platform import scala.compat.Platform
@ -84,7 +86,7 @@ case object TickPruneStaleChannels
* Created by PM on 24/05/2016. * Created by PM on 24/05/2016.
*/ */
class Router(nodeParams: NodeParams, watcher: ActorRef, initialized: Option[Promise[Unit]] = None) extends FSMDiagnosticActorLogging[State, Data] { class Router(nodeParams: NodeParams, watcher: ActorRef, initialized: Option[Promise[Done]] = None) extends FSMDiagnosticActorLogging[State, Data] {
import Router._ import Router._
@ -134,7 +136,7 @@ class Router(nodeParams: NodeParams, watcher: ActorRef, initialized: Option[Prom
self ! nodeAnn self ! nodeAnn
log.info(s"initialization completed, ready to process messages") log.info(s"initialization completed, ready to process messages")
Try(initialized.map(_.success(()))) Try(initialized.map(_.success(Done)))
startWith(NORMAL, Data(initNodes, initChannels, initChannelUpdates, Stash(Map.empty, Map.empty), rebroadcast = Rebroadcast(channels = Map.empty, updates = Map.empty, nodes = Map.empty), awaiting = Map.empty, privateChannels = Map.empty, privateUpdates = Map.empty, excludedChannels = Set.empty, graph, sync = Map.empty)) startWith(NORMAL, Data(initNodes, initChannels, initChannelUpdates, Stash(Map.empty, Map.empty), rebroadcast = Rebroadcast(channels = Map.empty, updates = Map.empty, nodes = Map.empty), awaiting = Map.empty, privateChannels = Map.empty, privateUpdates = Map.empty, excludedChannels = Set.empty, graph, sync = Map.empty))
} }
@ -694,7 +696,7 @@ class Router(nodeParams: NodeParams, watcher: ActorRef, initialized: Option[Prom
object Router { object Router {
def props(nodeParams: NodeParams, watcher: ActorRef, initialized: Option[Promise[Unit]] = None) = Props(new Router(nodeParams, watcher, initialized)) def props(nodeParams: NodeParams, watcher: ActorRef, initialized: Option[Promise[Done]] = None) = Props(new Router(nodeParams, watcher, initialized))
def toFakeUpdate(extraHop: ExtraHop): ChannelUpdate = def toFakeUpdate(extraHop: ExtraHop): ChannelUpdate =
// the `direction` bit in flags will not be accurate but it doesn't matter because it is not used // the `direction` bit in flags will not be accurate but it doesn't matter because it is not used