mirror of
synced 2025-02-23 06:45:21 +01:00
Refactor BitcoinSRunner to use StartStop[Async] (#2986)
* Refactor BitcoinSRunner to use StartStop[Async]. This makes it easier to have a uniform interface to write for destruction code in BitcoinSRunnerTest * Fix other ServerRunTest to cleanup after itself
This commit is contained in:
6 changed files with 195 additions and 185 deletions
@ -10,10 +10,10 @@ class OracleServerMain(override val args: Array[String])
override val actorSystemName = "bitcoin-s-oracle"
override def startup: Future[Unit] = {
implicit val conf: DLCOracleAppConfig =
DLCOracleAppConfig(datadir, baseConfig)
implicit val conf: DLCOracleAppConfig =
DLCOracleAppConfig(datadir, baseConfig)
override def start(): Future[Unit] = {
val bindConfOpt = rpcBindOpt match {
case Some(rpcbind) => Some(rpcbind)
@ -41,12 +41,18 @@ class OracleServerMain(override val args: Array[String])
_ <- server.start()
} yield {
logger.info(s"Done starting oracle!")
sys.addShutdownHook {
logger.error(s"Exiting process")
conf.stop().foreach(_ => logger.info(s"Stopped DLC Oracle"))
system.terminate().foreach(_ => logger.info(s"Actor system terminated"))
override def stop(): Future[Unit] = {
logger.error(s"Exiting process")
for {
_ <- conf.stop()
_ = logger.info(s"Stopped DLC Oracle")
_ <- system.terminate()
} yield {
logger.info(s"Actor system terminated")
@ -21,7 +21,7 @@ class ScanBitcoind(override val args: Array[String]) extends BitcoinSRunner {
implicit val rpcAppConfig: BitcoindRpcAppConfig =
BitcoindRpcAppConfig(datadir, baseConfig)
override def startup: Future[Unit] = {
override def start(): Future[Unit] = {
val bitcoind = rpcAppConfig.client
@ -31,11 +31,15 @@ class ScanBitcoind(override val args: Array[String]) extends BitcoinSRunner {
for {
endHeight <- endHeightF
_ <- countSegwitTxs(bitcoind, startHeight, endHeight)
_ <- system.terminate()
} yield {
override def stop(): Future[Unit] = {
.map(_ => ())
/** Searches a given Source[Int] that represents block heights applying f to them and returning a Seq[T] with the results */
@ -14,7 +14,7 @@ class ZipDatadir(override val args: Array[String]) extends BitcoinSRunner {
implicit lazy val conf: BitcoinSAppConfig =
BitcoinSAppConfig(datadir, baseConfig)
override def startup: Future[Unit] = {
override def start(): Future[Unit] = {
//replace the line below with where you want to zip too
val path = Paths.get("/tmp", "bitcoin-s.zip")
@ -24,6 +24,8 @@ class ZipDatadir(override val args: Array[String]) extends BitcoinSRunner {
_ <- system.terminate()
} yield sys.exit(0)
override def stop(): Future[Unit] = Future.unit
object Zip extends App {
@ -4,7 +4,7 @@ import akka.actor.ActorSystem
import com.typesafe.config.{Config, ConfigFactory}
import grizzled.slf4j.Logging
import org.bitcoins.core.config._
import org.bitcoins.core.util.EnvUtil
import org.bitcoins.core.util.{EnvUtil, StartStopAsync}
import org.bitcoins.db.AppConfig
import org.bitcoins.db.AppConfig.safePathToString
@ -12,7 +12,7 @@ import java.nio.file.{Path, Paths}
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Properties
trait BitcoinSRunner extends Logging {
trait BitcoinSRunner extends StartStopAsync[Unit] with Logging {
protected def args: Array[String]
@ -88,8 +88,6 @@ trait BitcoinSRunner extends Logging {
lazy val datadir: Path =
def startup: Future[Unit]
// start everything!
final def run(customFinalDirOpt: Option[String] = None): Unit = {
@ -131,10 +129,9 @@ trait BitcoinSRunner extends Logging {
logger.info(s"using directory ${usedDir.toAbsolutePath.toString}")
val runner = startup
val runner: Future[Unit] = start()
runner.failed.foreach { err =>
logger.error(s"Failed to startup server!", err)
@ -1,12 +1,14 @@
package org.bitcoins.server
import java.nio.file._
import org.bitcoins.rpc.client.common.BitcoindVersion
import org.bitcoins.rpc.util.RpcUtil
import org.bitcoins.testkit.BitcoinSTestAppConfig
import org.bitcoins.testkit.fixtures.BitcoinSFixture
import org.bitcoins.testkit.util.{AkkaUtil, BitcoinSAsyncTest}
import org.scalatest.Assertion
import java.nio.file._
import scala.concurrent.Future
import scala.concurrent.duration.DurationInt
import scala.reflect.io.Directory
@ -29,10 +31,12 @@ class ServerRunTest extends BitcoinSAsyncTest {
val main = new BitcoinSServerMain(args)
val runMainF = main.start()
// Use Exception because different errors can occur
recoverToSucceededIf[Exception] {
val runMainF = new BitcoinSServerMain(args).startup
val assertionF: Future[Assertion] = recoverToSucceededIf[Exception] {
val deleteDirF = for {
_ <- runMainF
_ <- AkkaUtil.nonBlockingSleep(2.seconds)
_ = directory.deleteRecursively()
_ <- AkkaUtil.nonBlockingSleep(2.seconds)
@ -43,6 +47,11 @@ class ServerRunTest extends BitcoinSAsyncTest {
_ <- deleteDirF
} yield ()
for {
_ <- assertionF
_ <- main.stop()
} yield succeed
it must "start up and log to the correct location" in {
@ -66,13 +75,16 @@ class ServerRunTest extends BitcoinSAsyncTest {
main = new BitcoinSServerMain(args)
// Start the server in a separate thread
runnable = new Runnable {
override def run(): Unit = new BitcoinSServerMain(args).run()
override def run(): Unit = {
thread = new Thread(runnable)
_ = thread.start()
// Wait for the server to have successfully started up
_ <- AkkaUtil.nonBlockingSleep(1.second)
binding <- BitcoinSServer.startedF
@ -81,6 +93,7 @@ class ServerRunTest extends BitcoinSAsyncTest {
_ <- bitcoind.stop()
_ <- binding.terminate(5.seconds)
_ = thread.interrupt()
_ <- main.stop()
} yield {
// Cleanup
@ -33,177 +33,166 @@ class BitcoinSServerMain(override val args: Array[String])
implicit lazy val conf: BitcoinSAppConfig =
BitcoinSAppConfig(datadir, baseConfig)
def startup: Future[Unit] = {
implicit lazy val walletConf: WalletAppConfig = conf.walletConf
implicit lazy val nodeConf: NodeAppConfig = conf.nodeConf
implicit lazy val chainConf: ChainAppConfig = conf.chainConf
implicit lazy val bitcoindRpcConf: BitcoindRpcAppConfig = conf.bitcoindRpcConf
implicit val walletConf: WalletAppConfig = conf.walletConf
implicit val nodeConf: NodeAppConfig = conf.nodeConf
implicit val chainConf: ChainAppConfig = conf.chainConf
implicit val bitcoindRpcConf: BitcoindRpcAppConfig = conf.bitcoindRpcConf
override def start(): Future[Unit] = {
val startedConfigF = conf.start()
def startBitcoinSBackend(): Future[Unit] = {
if (nodeConf.peers.isEmpty) {
throw new IllegalArgumentException(
"No peers specified, unable to start node")
val peerSocket =
val peer = Peer.fromSocket(peerSocket)
//initialize the config, run migrations
val configInitializedF = conf.start()
//run chain work migration
val chainApiF = configInitializedF.flatMap { _ =>
runChainWorkCalc(forceChainWorkRecalc || chainConf.forceRecalcChainWork)
//get a node that isn't started
val nodeF = configInitializedF.flatMap { _ =>
nodeConf.createNode(peer)(chainConf, system)
//get our wallet
val configuredWalletF = for {
_ <- configInitializedF
node <- nodeF
chainApi <- chainApiF
_ = logger.info("Initialized chain api")
feeProvider = getFeeProviderOrElse(MempoolSpaceProvider(HourFeeTarget))
wallet <- walletConf.createHDWallet(node, chainApi, feeProvider)
callbacks <- createCallbacks(wallet)
_ = nodeConf.addCallbacks(callbacks)
} yield {
logger.info(s"Done configuring wallet")
//add callbacks to our uninitialized node
val configuredNodeF = for {
node <- nodeF
wallet <- configuredWalletF
initNode <- setBloomFilter(node, wallet)
} yield {
logger.info(s"Done configuring node")
//start our http server now that we are synced
for {
node <- configuredNodeF
wallet <- configuredWalletF
_ <- node.start()
_ <- wallet.start().recoverWith {
case err: IllegalArgumentException
if err.getMessage.contains("If we have spent a spendinginfodb") =>
handleMissingSpendingInfoDb(err, wallet)
cachedChainApi <- node.chainApiFromDb()
chainApi = ChainHandler.fromChainHandlerCached(cachedChainApi)
binding <- startHttpServer(nodeApi = node,
chainApi = chainApi,
wallet = wallet,
rpcbindOpt = rpcBindOpt,
rpcPortOpt = rpcPortOpt)
_ = {
logger.info(s"Starting ${nodeConf.nodeType.shortName} node sync")
_ = BitcoinSServer.startedFP.success(Future.successful(binding))
_ <- node.sync()
} yield {
logger.info(s"Done starting Main!")
sys.addShutdownHook {
logger.error(s"Exiting process")
.foreach(_ =>
logger.info(s"Stopped ${nodeConf.nodeType.shortName} node"))
.foreach(_ => logger.info(s"Actor system terminated"))
startedConfigF.failed.foreach { err =>
logger.error(s"Failed to initialize configuration for BicoinServerMain",
def startBitcoindBackend(): Future[Unit] = {
val bitcoind = bitcoindRpcConf.client
for {
_ <- conf.start()
_ = logger.info("Starting bitcoind")
_ <- bitcoindRpcConf.start()
_ = logger.info("Creating wallet")
feeProvider = getFeeProviderOrElse(bitcoind)
tmpWallet <- walletConf.createHDWallet(nodeApi = bitcoind,
chainQueryApi = bitcoind,
feeRateApi = feeProvider)
wallet = BitcoindRpcBackendUtil.createWalletWithBitcoindCallbacks(
_ = logger.info("Starting wallet")
_ <- wallet.start().recoverWith {
case err: IllegalArgumentException
if err.getMessage.contains("If we have spent a spendinginfodb") =>
handleMissingSpendingInfoDb(err, wallet)
for {
_ <- startedConfigF
start <- {
nodeConf.nodeType match {
case _: InternalImplementationNodeType =>
case NodeType.BitcoindBackend =>
_ = BitcoindRpcBackendUtil
.syncWalletToBitcoind(bitcoind, wallet)
.flatMap { _ =>
if (bitcoindRpcConf.zmqConfig == ZmqConfig.empty) {
BitcoindRpcBackendUtil.startBitcoindBlockPolling(wallet, bitcoind)
} else Future.unit
// Create callbacks for processing new blocks
_ =
if (bitcoindRpcConf.zmqConfig != ZmqConfig.empty) {
binding <- startHttpServer(nodeApi = bitcoind,
chainApi = bitcoind,
wallet = wallet,
rpcbindOpt = rpcBindOpt,
rpcPortOpt = rpcPortOpt)
_ = BitcoinSServer.startedFP.success(Future.successful(binding))
} yield {
logger.info(s"Done starting Main!")
sys.addShutdownHook {
logger.error(s"Exiting process")
.foreach(_ => logger.info(s"Actor system terminated"))
} yield start
override def stop(): Future[Unit] = {
logger.error(s"Exiting process")
for {
_ <- walletConf.stop()
_ <- nodeConf.stop()
_ <- chainConf.stop()
_ = logger.info(s"Stopped ${nodeConf.nodeType.shortName} node")
_ <- system.terminate()
} yield {
logger.info(s"Actor system terminated")
def startBitcoinSBackend(): Future[Unit] = {
if (nodeConf.peers.isEmpty) {
throw new IllegalArgumentException(
"No peers specified, unable to start node")
val startFut = nodeConf.nodeType match {
case _: InternalImplementationNodeType =>
case NodeType.BitcoindBackend =>
val peerSocket =
val peer = Peer.fromSocket(peerSocket)
//run chain work migration
val chainApiF = runChainWorkCalc(
forceChainWorkRecalc || chainConf.forceRecalcChainWork)
//get a node that isn't started
val nodeF = nodeConf.createNode(peer)(chainConf, system)
//get our wallet
val configuredWalletF = for {
node <- nodeF
chainApi <- chainApiF
_ = logger.info("Initialized chain api")
feeProvider = getFeeProviderOrElse(MempoolSpaceProvider(HourFeeTarget))
wallet <- walletConf.createHDWallet(node, chainApi, feeProvider)
callbacks <- createCallbacks(wallet)
_ = nodeConf.addCallbacks(callbacks)
} yield {
logger.info(s"Done configuring wallet")
startFut.failed.foreach { err =>
logger.error(s"Error on server startup!", err)
throw err
//add callbacks to our uninitialized node
val configuredNodeF = for {
node <- nodeF
wallet <- configuredWalletF
initNode <- setBloomFilter(node, wallet)
} yield {
logger.info(s"Done configuring node")
//start our http server now that we are synced
for {
node <- configuredNodeF
wallet <- configuredWalletF
_ <- node.start()
_ <- wallet.start().recoverWith {
case err: IllegalArgumentException
if err.getMessage.contains("If we have spent a spendinginfodb") =>
handleMissingSpendingInfoDb(err, wallet)
cachedChainApi <- node.chainApiFromDb()
chainApi = ChainHandler.fromChainHandlerCached(cachedChainApi)
binding <- startHttpServer(nodeApi = node,
chainApi = chainApi,
wallet = wallet,
rpcbindOpt = rpcBindOpt,
rpcPortOpt = rpcPortOpt)
_ = {
logger.info(s"Starting ${nodeConf.nodeType.shortName} node sync")
_ = BitcoinSServer.startedFP.success(Future.successful(binding))
_ <- node.sync()
} yield {
logger.info(s"Done starting Main!")
def startBitcoindBackend(): Future[Unit] = {
val bitcoind = bitcoindRpcConf.client
for {
_ <- bitcoindRpcConf.start()
_ = logger.info("Started bitcoind")
_ = logger.info("Creating wallet")
feeProvider = getFeeProviderOrElse(bitcoind)
tmpWallet <- walletConf.createHDWallet(nodeApi = bitcoind,
chainQueryApi = bitcoind,
feeRateApi = feeProvider)
wallet = BitcoindRpcBackendUtil.createWalletWithBitcoindCallbacks(
_ = logger.info("Starting wallet")
_ <- wallet.start().recoverWith {
case err: IllegalArgumentException
if err.getMessage.contains("If we have spent a spendinginfodb") =>
handleMissingSpendingInfoDb(err, wallet)
_ = BitcoindRpcBackendUtil
.syncWalletToBitcoind(bitcoind, wallet)
.flatMap { _ =>
if (bitcoindRpcConf.zmqConfig == ZmqConfig.empty) {
BitcoindRpcBackendUtil.startBitcoindBlockPolling(wallet, bitcoind)
} else Future.unit
// Create callbacks for processing new blocks
_ =
if (bitcoindRpcConf.zmqConfig != ZmqConfig.empty) {
binding <- startHttpServer(nodeApi = bitcoind,
chainApi = bitcoind,
wallet = wallet,
rpcbindOpt = rpcBindOpt,
rpcPortOpt = rpcPortOpt)
_ = BitcoinSServer.startedFP.success(Future.successful(binding))
} yield {
logger.info(s"Done starting Main!")
private def createCallbacks(wallet: Wallet)(implicit
@ -265,14 +254,13 @@ class BitcoinSServerMain(override val args: Array[String])
/** This is needed for migrations V2/V3 on the chain project to re-calculate the total work for the chain */
private def runChainWorkCalc(force: Boolean)(implicit
chainAppConfig: ChainAppConfig,
system: ActorSystem): Future[ChainApi] = {
val blockEC =
val chainApi = ChainHandler.fromDatabase(
blockHeaderDAO = BlockHeaderDAO()(blockEC, chainAppConfig),
CompactFilterHeaderDAO()(blockEC, chainAppConfig),
CompactFilterDAO()(blockEC, chainAppConfig))
blockHeaderDAO = BlockHeaderDAO()(blockEC, chainConf),
CompactFilterHeaderDAO()(blockEC, chainConf),
CompactFilterDAO()(blockEC, chainConf))
for {
isMissingChainWork <- chainApi.isMissingChainWork
chainApiWithWork <-
Add table
Reference in a new issue