2022 04 26 Startup time of appServer (#4294)

* Make Server's route be async

* WIP

* Add StartedBitcoinSAppConfig to indicate when tor starts up

* Add torStarted flag to getinfo response
This commit is contained in:
Chris Stewart 2022-04-28 13:50:28 -05:00 committed by GitHub
parent 67f8ac8294
commit f4d864fab8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 214 additions and 104 deletions

View File

@ -1,6 +1,7 @@
package org.bitcoins.commons.jsonmodels
import org.bitcoins.core.config.{BitcoinNetwork, BitcoinNetworks}
import org.bitcoins.core.serializers.PicklerKeys
import org.bitcoins.crypto.DoubleSha256DigestBE
import ujson._
@ -8,13 +9,15 @@ import ujson._
case class BitcoinSServerInfo(
network: BitcoinNetwork,
blockHeight: Int,
blockHash: DoubleSha256DigestBE) {
blockHash: DoubleSha256DigestBE,
torStarted: Boolean) {
lazy val toJson: Value = {
Obj(
"network" -> Str(network.name),
"blockHeight" -> Num(blockHeight),
"blockHash" -> Str(blockHash.hex)
PicklerKeys.networkKey -> Str(network.name),
PicklerKeys.blockHeightKey -> Num(blockHeight),
PicklerKeys.blockHashKey -> Str(blockHash.hex),
PicklerKeys.torStartedKey -> Bool(torStarted)
)
}
}
@ -24,10 +27,14 @@ object BitcoinSServerInfo {
def fromJson(json: Value): BitcoinSServerInfo = {
val obj = json.obj
val network = BitcoinNetworks.fromString(obj("network").str)
val height = obj("blockHeight").num.toInt
val blockHash = DoubleSha256DigestBE(obj("blockHash").str)
val network = BitcoinNetworks.fromString(obj(PicklerKeys.networkKey).str)
val height = obj(PicklerKeys.blockHeightKey).num.toInt
val blockHash = DoubleSha256DigestBE(obj(PicklerKeys.blockHashKey).str)
val torStarted = obj(PicklerKeys.torStartedKey).bool
BitcoinSServerInfo(network, height, blockHash)
BitcoinSServerInfo(network = network,
blockHeight = height,
blockHash = blockHash,
torStarted = torStarted)
}
}

View File

@ -27,11 +27,11 @@ class OracleServerMain(override val serverArgParser: ServerArgParser)(implicit
for {
_ <- conf.start()
oracle = new DLCOracle()
routes = Seq(OracleRoutes(oracle), commonRoutes)
routes = Seq(OracleRoutes(oracle), commonRoutes).map(Future.successful)
server = serverArgParser.rpcPortOpt match {
case Some(rpcport) =>
Server(conf = conf,
handlers = routes,
handlersF = routes,
rpcbindOpt = bindConfOpt,
rpcport = rpcport,
rpcPassword = conf.rpcPassword,
@ -39,7 +39,7 @@ class OracleServerMain(override val serverArgParser: ServerArgParser)(implicit
Source.empty)
case None =>
Server(conf = conf,
handlers = routes,
handlersF = routes,
rpcbindOpt = bindConfOpt,
rpcport = conf.rpcPort,
rpcPassword = conf.rpcPassword,

View File

@ -8,7 +8,14 @@ import akka.http.scaladsl.model.ws.Message
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server._
import akka.http.scaladsl.server.directives.Credentials.Missing
import akka.http.scaladsl.server.directives.{Credentials, DebuggingDirectives}
import akka.http.scaladsl.server.directives.{
Credentials,
DebuggingDirectives,
MarshallingDirectives,
MethodDirectives,
PathDirectives
}
import akka.http.scaladsl.unmarshalling.FromRequestUnmarshaller
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.{Done, NotUsed}
import de.heikoseeberger.akkahttpupickle.UpickleSupport._
@ -20,7 +27,7 @@ import scala.concurrent.Future
case class Server(
conf: AppConfig,
handlers: Seq[ServerRoute],
handlersF: Seq[Future[ServerRoute]],
rpcbindOpt: Option[String],
rpcport: Int,
rpcPassword: String,
@ -86,29 +93,64 @@ case class Server(
}
}
private val serverCmdDirective: FromRequestUnmarshaller[ServerCommand] =
MarshallingDirectives.as[ServerCommand]
private val initF =
Future.successful(PartialFunction.empty[ServerCommand, Route])
private def handlerF: Future[PartialFunction[ServerCommand, Route]] = {
handlersF.foldLeft(initF) { case (accumF, currF) =>
for {
accum <- accumF
newAccum <-
if (currF.isCompleted) {
currF.map(curr => accum.orElse(curr.handleCommand))
} else {
Future.successful(accum)
}
} yield {
newAccum
}
}
}
val route: Route = {
val commonRoute = withErrorHandling {
pathSingleSlash {
post {
entity(as[ServerCommand]) { cmd =>
val init = PartialFunction.empty[ServerCommand, Route]
val handler = handlers.foldLeft(init) { case (accum, curr) =>
accum.orElse(curr.handleCommand)
val commonRoute: Route = {
withErrorHandling {
PathDirectives.pathSingleSlash {
MethodDirectives.post { ctx =>
val route: Future[Route] = {
for {
handler <- handlerF
} yield {
MarshallingDirectives.entity(serverCmdDirective) { cmd =>
val i = handler.orElse(catchAllHandler).apply(cmd)
i
}
}
}
handler.orElse(catchAllHandler).apply(cmd)
route.flatMap(_.apply(ctx))
}
}
}
}
val authenticatedRoute = if (rpcPassword.isEmpty) {
commonRoute
} else {
authenticateBasic("auth", authenticator) { _ =>
commonRoute
val authDirectiveOpt: Option[Directive1[Done]] = {
if (rpcPassword.isEmpty) {
None
} else {
Some(authenticateBasic("auth", authenticator))
}
}
val authenticatedRoute: Route = authDirectiveOpt match {
case Some(authDirective) =>
authDirective { case _ =>
commonRoute
}
case None => commonRoute
}
DebuggingDirectives.logRequestResult(
("http-rpc-server", Logging.DebugLevel)) {
@ -117,10 +159,12 @@ case class Server(
}
def start(): Future[ServerBindings] = {
val httpFut =
Http()
val httpFut = for {
http <- Http()
.newServerAt(rpchost, rpcport)
.bindFlow(route)
} yield http
httpFut.foreach { http =>
logger.info(s"Started Bitcoin-S HTTP server at ${http.localAddress}")
}

View File

@ -0,0 +1,16 @@
package org.bitcoins.server.util
import scala.concurrent.Future
/** A trait used to indicated when different parts of [[BitcoinSAppConfig]] are started */
sealed trait AppConfigMarker
/** This class represents when BitcoinSAppConfig modules are started
* @param torStartedF this future is completed when all tor dependent modules are fully started
* the reason this is needed is because tor startup time is so variable
* @see https://github.com/bitcoin-s/bitcoin-s/issues/4210
*/
case class StartedBitcoinSAppConfig(torStartedF: Future[Unit])
extends AppConfigMarker
case object StoppedBitcoinSAppConfig extends AppConfigMarker

View File

@ -67,7 +67,7 @@ class RoutesSpec extends AnyWordSpec with ScalatestRouteTest with MockFactory {
val mockNode = mock[Node]
val chainRoutes = ChainRoutes(mockChainApi, RegTest)
val chainRoutes = ChainRoutes(mockChainApi, RegTest, Future.unit)
val nodeRoutes = NodeRoutes(mockNode)

View File

@ -13,6 +13,11 @@ import org.bitcoins.dlc.wallet.DLCAppConfig
import org.bitcoins.keymanager.config.KeyManagerAppConfig
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.rpc.config.BitcoindRpcAppConfig
import org.bitcoins.server.util.{
AppConfigMarker,
StartedBitcoinSAppConfig,
StoppedBitcoinSAppConfig
}
import org.bitcoins.tor.config.TorAppConfig
import org.bitcoins.wallet.config.WalletAppConfig
@ -33,7 +38,7 @@ import scala.concurrent.duration.{DurationInt, FiniteDuration}
case class BitcoinSAppConfig(
baseDatadir: Path,
configOverrides: Vector[Config])(implicit system: ActorSystem)
extends StartStopAsync[Unit]
extends StartStopAsync[AppConfigMarker]
with Logging {
import system.dispatcher
@ -65,7 +70,7 @@ case class BitcoinSAppConfig(
lazy val network: NetworkParameters = chainConf.network
/** Initializes the wallet, node and chain projects */
override def start(): Future[Unit] = {
override def start(): Future[StartedBitcoinSAppConfig] = {
val start = TimeUtil.currentEpochMs
//configurations that don't depend on tor startup
//start these in parallel as an optimization
@ -83,22 +88,23 @@ case class BitcoinSAppConfig(
for {
_ <- startedNonTorConfigs
_ <- startedTorDependentConfigsF
} yield {
logger.info(
s"Done starting BitcoinSAppConfig, it took=${TimeUtil.currentEpochMs - start}ms")
()
StartedBitcoinSAppConfig(startedTorDependentConfigsF.map(_ => ()))
}
}
override def stop(): Future[Unit] = {
override def stop(): Future[StoppedBitcoinSAppConfig.type] = {
for {
_ <- nodeConf.stop()
_ <- walletConf.stop()
_ <- chainConf.stop()
_ <- bitcoindRpcConf.stop()
_ <- torConf.stop()
} yield ()
} yield {
StoppedBitcoinSAppConfig
}
}
/** The underlying config the result of our fields derive from */

View File

@ -77,13 +77,13 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
}
for {
_ <- startedConfigF
startedConfig <- startedConfigF
start <- {
nodeConf.nodeType match {
case _: InternalImplementationNodeType =>
startBitcoinSBackend()
startBitcoinSBackend(startedConfig.torStartedF)
case NodeType.BitcoindBackend =>
startBitcoindBackend()
startBitcoindBackend(startedConfig.torStartedF)
}
}
} yield {
@ -107,7 +107,11 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
}
}
def startBitcoinSBackend(): Future[Unit] = {
/** Start the bitcoin-s wallet server with a neutrino backend
* @param startedTorConfigF a future that is completed when tor is fully started
* @return
*/
def startBitcoinSBackend(startedTorConfigF: Future[Unit]): Future[Unit] = {
logger.info(s"startBitcoinSBackend()")
val start = System.currentTimeMillis()
@ -174,27 +178,29 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
val startedDLCNodeF = dlcNodeF
.flatMap(_.start())
.flatMap(_ => dlcNodeF)
val chainApi = ChainHandler.fromDatabase()
//start our http server now that we are synced
for {
wallet <- configuredWalletF
node <- startedNodeF
_ <- startedWalletF
cachedChainApi <- node.chainApiFromDb()
chainApi = ChainHandler.fromChainHandlerCached(cachedChainApi)
dlcNode <- startedDLCNodeF
_ <- startHttpServer(nodeApi = node,
chainApi = chainApi,
wallet = wallet,
dlcNode = dlcNode,
serverCmdLineArgs = serverArgParser,
wsSource = wsSource)
_ <- startHttpServer(
nodeApiF = startedNodeF,
chainApi = chainApi,
walletF = configuredWalletF,
dlcNodeF = startedDLCNodeF,
torConfStarted = startedTorConfigF,
serverCmdLineArgs = serverArgParser,
wsSource = wsSource
)
_ = {
logger.info(
s"Starting ${nodeConf.nodeType.shortName} node sync, it took=${System
.currentTimeMillis() - start}ms")
}
_ <- startedWalletF
//make sure callbacks are registered before we start sync
_ <- callbacksF
node <- startedNodeF
_ <- startedTorConfigF
_ <- node.sync()
} yield {
logger.info(
@ -235,14 +241,22 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
} yield info
}
def startBitcoindBackend(): Future[Unit] = {
/** Start the bitcoin-s wallet server with a bitcoind backend
* @param startedTorConfigF a future that is completed when tor is fully started
* @return
*/
def startBitcoindBackend(startedTorConfigF: Future[Unit]): Future[Unit] = {
logger.info(s"startBitcoindBackend()")
val bitcoindF = for {
client <- bitcoindRpcConf.clientF
_ <- client.start()
} yield client
val tmpWalletF = bitcoindF.flatMap { bitcoind =>
val tuple = buildWsSource
val wsQueue: SourceQueueWithComplete[Message] = tuple._1
val wsSource: Source[Message, NotUsed] = tuple._2
val walletF = bitcoindF.flatMap { bitcoind =>
val feeProvider = FeeProviderFactory.getFeeProviderOrElse(
bitcoind,
feeProviderNameStrOpt = walletConf.feeProviderNameOpt,
@ -250,14 +264,38 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
proxyParamsOpt = walletConf.torConf.socks5ProxyParams,
network = walletConf.network
)
dlcConf.createDLCWallet(nodeApi = bitcoind,
chainQueryApi = bitcoind,
feeRateApi = feeProvider)
logger.info("Creating wallet")
val tmpWalletF = dlcConf.createDLCWallet(nodeApi = bitcoind,
chainQueryApi = bitcoind,
feeRateApi = feeProvider)
val chainCallbacks = WebsocketUtil.buildChainCallbacks(wsQueue, bitcoind)
for {
tmpWallet <- tmpWalletF
wallet = BitcoindRpcBackendUtil.createDLCWalletWithBitcoindCallbacks(
bitcoind,
tmpWallet,
Some(chainCallbacks))
nodeCallbacks <- CallbackUtil.createBitcoindNodeCallbacksForWallet(
wallet)
_ = nodeConf.addCallbacks(nodeCallbacks)
_ = logger.info("Starting wallet")
_ <- wallet.start().recoverWith {
//https://github.com/bitcoin-s/bitcoin-s/issues/2917
//https://github.com/bitcoin-s/bitcoin-s/pull/2918
case err: IllegalArgumentException
if err.getMessage.contains("If we have spent a spendinginfodb") =>
handleMissingSpendingInfoDb(err, wallet)
}
} yield wallet
}
val tuple = buildWsSource
val wsQueue: SourceQueueWithComplete[Message] = tuple._1
val wsSource: Source[Message, NotUsed] = tuple._2
val dlcNodeF = {
for {
wallet <- walletF
dlcNode = dlcNodeConf.createDLCNode(wallet)
_ <- dlcNode.start()
} yield dlcNode
}
for {
_ <- bitcoindRpcConf.start()
@ -268,42 +306,26 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
_ = require(
bitcoindNetwork == walletConf.network,
s"bitcoind ($bitcoindNetwork) on different network than wallet (${walletConf.network})")
_ = logger.info("Creating wallet")
chainCallbacks = WebsocketUtil.buildChainCallbacks(wsQueue, bitcoind)
tmpWallet <- tmpWalletF
wallet = BitcoindRpcBackendUtil.createDLCWalletWithBitcoindCallbacks(
bitcoind,
tmpWallet,
Some(chainCallbacks))
nodeCallbacks <- CallbackUtil.createBitcoindNodeCallbacksForWallet(wallet)
_ = nodeConf.addCallbacks(nodeCallbacks)
_ = logger.info("Starting wallet")
_ <- wallet.start().recoverWith {
//https://github.com/bitcoin-s/bitcoin-s/issues/2917
//https://github.com/bitcoin-s/bitcoin-s/pull/2918
case err: IllegalArgumentException
if err.getMessage.contains("If we have spent a spendinginfodb") =>
handleMissingSpendingInfoDb(err, wallet)
}
dlcNode = dlcNodeConf.createDLCNode(wallet)
_ <- dlcNode.start()
_ <- startHttpServer(nodeApi = bitcoind,
chainApi = bitcoind,
wallet = wallet,
dlcNode = dlcNode,
serverCmdLineArgs = serverArgParser,
wsSource = wsSource)
_ <- startHttpServer(
nodeApiF = Future.successful(bitcoind),
chainApi = bitcoind,
walletF = walletF,
dlcNodeF = dlcNodeF,
torConfStarted = startedTorConfigF,
serverCmdLineArgs = serverArgParser,
wsSource = wsSource
)
walletCallbacks = WebsocketUtil.buildWalletCallbacks(wsQueue)
_ = walletConf.addCallbacks(walletCallbacks)
wallet <- walletF
//intentionally doesn't map on this otherwise we
//wait until we are done syncing the entire wallet
//which could take 1 hour
_ = syncWalletWithBitcoindAndStartPolling(bitcoind, wallet)
dlcWalletCallbacks = WebsocketUtil.buildDLCWalletCallbacks(wsQueue)
_ = dlcConf.addCallbacks(dlcWalletCallbacks)
_ <- startedTorConfigF
} yield {
logger.info(s"Done starting Main!")
()
@ -351,10 +373,11 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
private var serverBindingsOpt: Option[ServerBindings] = None
private def startHttpServer(
nodeApi: NodeApi,
nodeApiF: Future[NodeApi],
chainApi: ChainApi,
wallet: DLCWallet,
dlcNode: DLCNode,
walletF: Future[DLCWallet],
dlcNodeF: Future[DLCNode],
torConfStarted: Future[Unit],
serverCmdLineArgs: ServerArgParser,
wsSource: Source[Message, NotUsed])(implicit
system: ActorSystem,
@ -362,20 +385,21 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
implicit val nodeConf: NodeAppConfig = conf.nodeConf
implicit val walletConf: WalletAppConfig = conf.walletConf
val walletRoutes = WalletRoutes(wallet)
val nodeRoutes = NodeRoutes(nodeApi)
val chainRoutes = ChainRoutes(chainApi, nodeConf.network)
val walletRoutesF = walletF.map(WalletRoutes(_))
val nodeRoutesF = nodeApiF.map(NodeRoutes(_))
val chainRoutes =
ChainRoutes(chainApi, nodeConf.network, torConfStarted)
val coreRoutes = CoreRoutes()
val dlcRoutes = DLCRoutes(dlcNode)
val dlcRoutesF = dlcNodeF.map(DLCRoutes(_))
val commonRoutes = CommonRoutes(conf.baseDatadir)
val handlers =
Seq(walletRoutes,
nodeRoutes,
chainRoutes,
coreRoutes,
dlcRoutes,
commonRoutes)
Seq(walletRoutesF,
nodeRoutesF,
Future.successful(chainRoutes),
Future.successful(coreRoutes),
dlcRoutesF,
Future.successful(commonRoutes))
val rpcBindConfOpt = serverCmdLineArgs.rpcBindOpt match {
case Some(rpcbind) => Some(rpcbind)
@ -399,7 +423,7 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
serverCmdLineArgs.rpcPortOpt match {
case Some(rpcport) =>
Server(conf = nodeConf,
handlers = handlers,
handlersF = handlers,
rpcbindOpt = rpcBindConfOpt,
rpcport = rpcport,
rpcPassword = conf.rpcPassword,
@ -408,7 +432,7 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
case None =>
Server(
conf = nodeConf,
handlers = handlers,
handlersF = handlers,
rpcbindOpt = rpcBindConfOpt,
rpcport = conf.rpcPort,
rpcPassword = conf.rpcPassword,

View File

@ -13,8 +13,10 @@ import org.bitcoins.server.util.ChainUtil
import scala.concurrent.Future
case class ChainRoutes(chain: ChainApi, network: BitcoinNetwork)(implicit
system: ActorSystem)
case class ChainRoutes(
chain: ChainApi,
network: BitcoinNetwork,
startedTorConfigF: Future[Unit])(implicit system: ActorSystem)
extends ServerRoute {
import system.dispatcher
@ -67,7 +69,11 @@ case class ChainRoutes(chain: ChainApi, network: BitcoinNetwork)(implicit
case ServerCommand("getinfo", _) =>
complete {
chain.getBestBlockHeader().map { header =>
val info = BitcoinSServerInfo(network, header.height, header.hashBE)
val info = BitcoinSServerInfo(network = network,
blockHeight = header.height,
blockHash = header.hashBE,
torStarted =
startedTorConfigF.isCompleted)
Server.httpSuccess(info.toJson)
}

View File

@ -32,6 +32,8 @@ object PicklerKeys {
final val chainworkKey: String = "chainwork"
final val previousblockhashKey: String = "previousblockhash"
final val nextblockhashKey: String = "nextblockhash"
final val blockHashKey: String = "blockHash"
final val blockHeightKey: String = "blockHeight"
final val myCollateral: String = "myCollateral"
final val theirCollateral: String = "theirCollateral"
final val myPayout: String = "myPayout"
@ -39,10 +41,14 @@ object PicklerKeys {
final val pnl: String = "pnl"
final val rateOfReturn: String = "rateOfReturn"
final val networkKey: String = "network"
final val outcomeKey: String = "outcome"
final val localPayoutKey: String = "localPayout"
final val outcomesKey: String = "outcomes"
final val torStartedKey: String = "torStarted"
//tlv points
final val pointsKey = "points"
final val payoutKey: String = "payout"

View File

@ -39,6 +39,7 @@ trait CachedBitcoinSAppConfig { _: BitcoinSAkkaAsyncTest =>
override def afterAll(): Unit = {
Await.result(cachedConfig.stop(), duration)
()
}
}