mirror of
https://github.com/bitcoin-s/bitcoin-s.git
synced 2025-03-26 21:42:48 +01:00
Add sync
JSON field to the getinfo
endpoint output (#4452)
* Add `sync` JSON field to the `getinfo` endpoint output * improve test coverage * update docs * rename sync flag * WebSocket notifications * fix unit tests * fix unit tests * increase test timout
This commit is contained in:
parent
9930c964f7
commit
e90e372e54
36 changed files with 612 additions and 94 deletions
app-commons/src/main/scala/org/bitcoins/commons
app
server-test/src/test/scala/org/bitcoins/server
server/src/main/scala/org/bitcoins/server
bitcoind-rpc/src/main/scala/org/bitcoins/rpc/client/common
chain-test/src/test/scala/org/bitcoins/chain
blockchain
models
chain/src/main
resources
postgresql/chain/migration
sqlite/chain/migration
scala/org/bitcoins/chain
core/src/main/scala/org/bitcoins/core
db-commons-test/src/test/scala/org/bitcoins/db
docs
node/src/main/scala/org/bitcoins/node
testkit/src/main/scala/org/bitcoins/testkit
wallet-test/src/test/scala/org/bitcoins/wallet
|
@ -10,14 +10,16 @@ case class BitcoinSServerInfo(
|
|||
network: BitcoinNetwork,
|
||||
blockHeight: Int,
|
||||
blockHash: DoubleSha256DigestBE,
|
||||
torStarted: Boolean) {
|
||||
torStarted: Boolean,
|
||||
syncing: Boolean) {
|
||||
|
||||
lazy val toJson: Value = {
|
||||
Obj(
|
||||
PicklerKeys.networkKey -> Str(network.name),
|
||||
PicklerKeys.blockHeightKey -> Num(blockHeight),
|
||||
PicklerKeys.blockHashKey -> Str(blockHash.hex),
|
||||
PicklerKeys.torStartedKey -> Bool(torStarted)
|
||||
PicklerKeys.torStartedKey -> Bool(torStarted),
|
||||
PicklerKeys.syncKey -> Bool(syncing)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
@ -31,10 +33,12 @@ object BitcoinSServerInfo {
|
|||
val height = obj(PicklerKeys.blockHeightKey).num.toInt
|
||||
val blockHash = DoubleSha256DigestBE(obj(PicklerKeys.blockHashKey).str)
|
||||
val torStarted = obj(PicklerKeys.torStartedKey).bool
|
||||
val sync = obj(PicklerKeys.syncKey).bool
|
||||
|
||||
BitcoinSServerInfo(network = network,
|
||||
blockHeight = height,
|
||||
blockHash = blockHash,
|
||||
torStarted = torStarted)
|
||||
torStarted = torStarted,
|
||||
syncing = sync)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -59,8 +59,9 @@ object WalletWsType extends StringFactory[WalletWsType] {
|
|||
|
||||
object ChainWsType extends StringFactory[ChainWsType] {
|
||||
case object BlockProcessed extends ChainWsType
|
||||
case object SyncFlagChanged extends ChainWsType
|
||||
|
||||
private val all: Vector[ChainWsType] = Vector(BlockProcessed)
|
||||
private val all: Vector[ChainWsType] = Vector(BlockProcessed, SyncFlagChanged)
|
||||
|
||||
override def fromStringOpt(string: String): Option[ChainWsType] = {
|
||||
all.find(_.toString.toLowerCase() == string.toLowerCase)
|
||||
|
@ -158,6 +159,11 @@ object ChainNotification {
|
|||
extends ChainNotification[GetBlockHeaderResult] {
|
||||
override val `type`: ChainWsType = ChainWsType.BlockProcessed
|
||||
}
|
||||
|
||||
case class SyncFlagChangedNotification(payload: Boolean)
|
||||
extends ChainNotification[Boolean] {
|
||||
override val `type`: ChainWsType = ChainWsType.SyncFlagChanged
|
||||
}
|
||||
}
|
||||
|
||||
object TorNotification {
|
||||
|
|
|
@ -1,6 +1,9 @@
|
|||
package org.bitcoins.commons.serializers
|
||||
|
||||
import org.bitcoins.commons.jsonmodels.ws.ChainNotification.BlockProcessedNotification
|
||||
import org.bitcoins.commons.jsonmodels.ws.ChainNotification.{
|
||||
BlockProcessedNotification,
|
||||
SyncFlagChangedNotification
|
||||
}
|
||||
import org.bitcoins.commons.jsonmodels.ws.WalletNotification.{
|
||||
DLCOfferAddNotification,
|
||||
DLCOfferRemoveNotification,
|
||||
|
@ -44,6 +47,8 @@ object WsPicklers {
|
|||
val payloadJson: ujson.Value = notification match {
|
||||
case BlockProcessedNotification(block) =>
|
||||
upickle.default.writeJs(block)(Picklers.getBlockHeaderResultPickler)
|
||||
case SyncFlagChangedNotification(syncing) =>
|
||||
upickle.default.writeJs(syncing)
|
||||
}
|
||||
val notificationObj = ujson.Obj(
|
||||
PicklerKeys.typeKey -> writeJs(notification.`type`),
|
||||
|
@ -61,6 +66,9 @@ object WsPicklers {
|
|||
val block =
|
||||
upickle.default.read(payloadObj)(Picklers.getBlockHeaderResultPickler)
|
||||
BlockProcessedNotification(block)
|
||||
case ChainWsType.SyncFlagChanged =>
|
||||
val syncing = payloadObj.bool
|
||||
SyncFlagChangedNotification(syncing)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -197,6 +205,14 @@ object WsPicklers {
|
|||
)
|
||||
}
|
||||
|
||||
implicit val syncFlagChangedPickler: ReadWriter[
|
||||
SyncFlagChangedNotification] = {
|
||||
readwriter[ujson.Obj].bimap(
|
||||
writeChainNotification(_),
|
||||
readChainNotification(_).asInstanceOf[SyncFlagChangedNotification]
|
||||
)
|
||||
}
|
||||
|
||||
implicit val dlcStateChangePickler: ReadWriter[DLCStateChangeNotification] = {
|
||||
readwriter[ujson.Obj].bimap(
|
||||
writeWalletNotification(_),
|
||||
|
|
|
@ -11,7 +11,10 @@ import akka.http.scaladsl.model.ws.{
|
|||
import akka.http.scaladsl.model.{HttpHeader, StatusCodes}
|
||||
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
|
||||
import org.bitcoins.cli.{CliCommand, Config, ConsoleCli}
|
||||
import org.bitcoins.commons.jsonmodels.ws.ChainNotification.BlockProcessedNotification
|
||||
import org.bitcoins.commons.jsonmodels.ws.ChainNotification.{
|
||||
BlockProcessedNotification,
|
||||
SyncFlagChangedNotification
|
||||
}
|
||||
import org.bitcoins.commons.jsonmodels.ws.WalletNotification.{
|
||||
DLCOfferAddNotification,
|
||||
DLCOfferRemoveNotification,
|
||||
|
@ -407,6 +410,29 @@ class WebsocketTests extends BitcoinSServerMainBitcoindFixture {
|
|||
}
|
||||
}
|
||||
|
||||
it must "receive updates when sync flag changes" in { serverWithBitcoind =>
|
||||
val ServerWithBitcoind(_, server) = serverWithBitcoind
|
||||
|
||||
val req = buildReq(server.conf)
|
||||
val tuple: (
|
||||
Future[WebSocketUpgradeResponse],
|
||||
(Future[Seq[WsNotification[_]]], Promise[Option[Message]])) = {
|
||||
Http()
|
||||
.singleWebSocketRequest(req, websocketFlow)
|
||||
}
|
||||
val notificationsF = tuple._2._1
|
||||
val promise = tuple._2._2
|
||||
for {
|
||||
_ <- AkkaUtil.nonBlockingSleep(15.seconds)
|
||||
_ = promise.success(None)
|
||||
notifications <- notificationsF
|
||||
} yield {
|
||||
val syncingNotifications =
|
||||
notifications.filter(_.isInstanceOf[SyncFlagChangedNotification])
|
||||
assert(syncingNotifications.nonEmpty)
|
||||
}
|
||||
}
|
||||
|
||||
it must "not queue things on the websocket while there is no one connected" in {
|
||||
serverWithBitcoind =>
|
||||
val ServerWithBitcoind(_, server) = serverWithBitcoind
|
||||
|
|
|
@ -14,6 +14,7 @@ import akka.stream.scaladsl.{
|
|||
import akka.{Done, NotUsed}
|
||||
import org.bitcoins.asyncutil.AsyncUtil
|
||||
import org.bitcoins.asyncutil.AsyncUtil.Exponential
|
||||
import org.bitcoins.chain.ChainCallbacks
|
||||
import org.bitcoins.chain.blockchain.ChainHandler
|
||||
import org.bitcoins.chain.config.ChainAppConfig
|
||||
import org.bitcoins.chain.models._
|
||||
|
@ -315,12 +316,12 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
|
|||
if err.getMessage.contains("If we have spent a spendinginfodb") =>
|
||||
handleMissingSpendingInfoDb(err, wallet)
|
||||
}
|
||||
} yield wallet
|
||||
} yield (wallet, chainCallbacks)
|
||||
}
|
||||
|
||||
val dlcNodeF = {
|
||||
for {
|
||||
wallet <- walletF
|
||||
(wallet, _) <- walletF
|
||||
dlcNode = dlcNodeConf.createDLCNode(wallet)
|
||||
_ <- dlcNode.start()
|
||||
} yield dlcNode
|
||||
|
@ -338,7 +339,7 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
|
|||
_ <- startHttpServer(
|
||||
nodeApiF = Future.successful(bitcoind),
|
||||
chainApi = bitcoind,
|
||||
walletF = walletF,
|
||||
walletF = walletF.map(_._1),
|
||||
dlcNodeF = dlcNodeF,
|
||||
torConfStarted = startedTorConfigF,
|
||||
serverCmdLineArgs = serverArgParser,
|
||||
|
@ -350,11 +351,13 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
|
|||
walletConf.walletNameOpt)
|
||||
_ = walletConf.addCallbacks(walletCallbacks)
|
||||
|
||||
wallet <- walletF
|
||||
(wallet, chainCallbacks) <- walletF
|
||||
//intentionally doesn't map on this otherwise we
|
||||
//wait until we are done syncing the entire wallet
|
||||
//which could take 1 hour
|
||||
_ = syncWalletWithBitcoindAndStartPolling(bitcoind, wallet)
|
||||
_ = syncWalletWithBitcoindAndStartPolling(bitcoind,
|
||||
wallet,
|
||||
Some(chainCallbacks))
|
||||
dlcWalletCallbacks = WebsocketUtil.buildDLCWalletCallbacks(wsQueue)
|
||||
_ = dlcConf.addCallbacks(dlcWalletCallbacks)
|
||||
_ <- startedTorConfigF
|
||||
|
@ -374,7 +377,9 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
|
|||
val chainApi = ChainHandler.fromDatabase(
|
||||
blockHeaderDAO = BlockHeaderDAO()(blockEC, chainConf),
|
||||
CompactFilterHeaderDAO()(blockEC, chainConf),
|
||||
CompactFilterDAO()(blockEC, chainConf))
|
||||
CompactFilterDAO()(blockEC, chainConf),
|
||||
ChainStateDescriptorDAO()(blockEC, chainConf)
|
||||
)
|
||||
for {
|
||||
isMissingChainWork <- chainApi.isMissingChainWork
|
||||
chainApiWithWork <-
|
||||
|
@ -503,7 +508,8 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
|
|||
*/
|
||||
private def syncWalletWithBitcoindAndStartPolling(
|
||||
bitcoind: BitcoindRpcClient,
|
||||
wallet: Wallet): Future[Unit] = {
|
||||
wallet: Wallet,
|
||||
chainCallbacksOpt: Option[ChainCallbacks]): Future[Unit] = {
|
||||
val f = for {
|
||||
_ <- AsyncUtil.retryUntilSatisfiedF(
|
||||
conditionF = { () =>
|
||||
|
@ -517,12 +523,14 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
|
|||
interval = 1.second,
|
||||
maxTries = 12
|
||||
)
|
||||
_ <- BitcoindRpcBackendUtil.syncWalletToBitcoind(bitcoind, wallet)
|
||||
_ <- BitcoindRpcBackendUtil.syncWalletToBitcoind(bitcoind,
|
||||
wallet,
|
||||
chainCallbacksOpt)
|
||||
_ <- wallet.updateUtxoPendingStates()
|
||||
_ <-
|
||||
if (bitcoindRpcConf.zmqConfig == ZmqConfig.empty) {
|
||||
BitcoindRpcBackendUtil
|
||||
.startBitcoindBlockPolling(wallet, bitcoind)
|
||||
.startBitcoindBlockPolling(wallet, bitcoind, chainCallbacksOpt)
|
||||
.map { _ =>
|
||||
BitcoindRpcBackendUtil
|
||||
.startBitcoindMempoolPolling(wallet, bitcoind) { tx =>
|
||||
|
|
|
@ -26,11 +26,15 @@ import scala.concurrent.{ExecutionContext, Future, Promise}
|
|||
object BitcoindRpcBackendUtil extends Logging {
|
||||
|
||||
/** Has the wallet process all the blocks it has not seen up until bitcoind's chain tip */
|
||||
def syncWalletToBitcoind(bitcoind: BitcoindRpcClient, wallet: Wallet)(implicit
|
||||
def syncWalletToBitcoind(
|
||||
bitcoind: BitcoindRpcClient,
|
||||
wallet: Wallet,
|
||||
chainCallbacksOpt: Option[ChainCallbacks])(implicit
|
||||
system: ActorSystem): Future[Unit] = {
|
||||
logger.info("Syncing wallet to bitcoind")
|
||||
import system.dispatcher
|
||||
for {
|
||||
val res = for {
|
||||
_ <- setSyncingFlag(true, bitcoind, chainCallbacksOpt)
|
||||
bitcoindHeight <- bitcoind.getBlockCount
|
||||
walletStateOpt <- wallet.getSyncDescriptorOpt()
|
||||
_ = logger.info(
|
||||
|
@ -65,6 +69,21 @@ object BitcoindRpcBackendUtil extends Logging {
|
|||
doSync(syncHeight.height, bitcoindHeight, bitcoind, wallet)
|
||||
}
|
||||
} yield ()
|
||||
res.onComplete { case _ =>
|
||||
setSyncingFlag(false, bitcoind, chainCallbacksOpt)
|
||||
}
|
||||
res
|
||||
}
|
||||
|
||||
private def setSyncingFlag(
|
||||
syncing: Boolean,
|
||||
bitcoind: BitcoindRpcClient,
|
||||
chainCallbacksOpt: Option[ChainCallbacks])(implicit
|
||||
ec: ExecutionContext) = for {
|
||||
_ <- bitcoind.setSyncing(false)
|
||||
} yield {
|
||||
chainCallbacksOpt.map(_.executeOnSyncFlagChanged(logger, syncing))
|
||||
()
|
||||
}
|
||||
|
||||
/** Helper method to sync the wallet until the bitcoind height */
|
||||
|
@ -297,6 +316,7 @@ object BitcoindRpcBackendUtil extends Logging {
|
|||
def startBitcoindBlockPolling(
|
||||
wallet: WalletApi,
|
||||
bitcoind: BitcoindRpcClient,
|
||||
chainCallbacksOpt: Option[ChainCallbacks],
|
||||
interval: FiniteDuration = 10.seconds)(implicit
|
||||
system: ActorSystem,
|
||||
ec: ExecutionContext): Future[Cancellable] = {
|
||||
|
@ -312,49 +332,61 @@ object BitcoindRpcBackendUtil extends Logging {
|
|||
def pollBitcoind(): Future[Unit] = {
|
||||
if (processingBitcoindBlocks.compareAndSet(false, true)) {
|
||||
logger.trace("Polling bitcoind for block count")
|
||||
val res: Future[Unit] = bitcoind.getBlockCount.flatMap { count =>
|
||||
val prevCount = atomicPrevCount.get()
|
||||
if (prevCount < count) {
|
||||
logger.info(
|
||||
s"Bitcoind has new block(s), requesting... ${count - prevCount} blocks")
|
||||
|
||||
// use .tail so we don't process the previous block that we already did
|
||||
val range = prevCount.to(count).tail
|
||||
val hashFs: Future[Seq[DoubleSha256Digest]] = Source(range)
|
||||
.mapAsync(parallelism = numParallelism) { height =>
|
||||
bitcoind.getBlockHash(height).map(_.flip)
|
||||
bitcoind.setSyncing(true)
|
||||
val res: Future[Unit] = for {
|
||||
_ <- setSyncingFlag(true, bitcoind, chainCallbacksOpt)
|
||||
count <- bitcoind.getBlockCount
|
||||
retval <- {
|
||||
val prevCount = atomicPrevCount.get()
|
||||
if (prevCount < count) {
|
||||
logger.info(
|
||||
s"Bitcoind has new block(s), requesting... ${count - prevCount} blocks")
|
||||
|
||||
// use .tail so we don't process the previous block that we already did
|
||||
val range = prevCount.to(count).tail
|
||||
val hashFs: Future[Seq[DoubleSha256Digest]] = Source(range)
|
||||
.mapAsync(parallelism = numParallelism) { height =>
|
||||
bitcoind.getBlockHash(height).map(_.flip)
|
||||
}
|
||||
.map { hash =>
|
||||
val _ = atomicPrevCount.incrementAndGet()
|
||||
hash
|
||||
}
|
||||
.toMat(Sink.seq)(Keep.right)
|
||||
.run()
|
||||
|
||||
val requestsBlocksF = for {
|
||||
hashes <- hashFs
|
||||
_ <- wallet.nodeApi.downloadBlocks(hashes.toVector)
|
||||
} yield logger.debug(
|
||||
"Successfully polled bitcoind for new blocks")
|
||||
|
||||
requestsBlocksF.failed.foreach { case err =>
|
||||
val failedCount = atomicPrevCount.get
|
||||
atomicPrevCount.set(prevCount)
|
||||
logger.error(
|
||||
s"Requesting blocks from bitcoind polling failed, range=[$prevCount, $failedCount]",
|
||||
err)
|
||||
}
|
||||
.map { hash =>
|
||||
val _ = atomicPrevCount.incrementAndGet()
|
||||
hash
|
||||
}
|
||||
.toMat(Sink.seq)(Keep.right)
|
||||
.run()
|
||||
|
||||
val requestsBlocksF = for {
|
||||
hashes <- hashFs
|
||||
_ <- wallet.nodeApi.downloadBlocks(hashes.toVector)
|
||||
} yield logger.debug(
|
||||
"Successfully polled bitcoind for new blocks")
|
||||
|
||||
requestsBlocksF.failed.foreach { case err =>
|
||||
val failedCount = atomicPrevCount.get
|
||||
atomicPrevCount.set(prevCount)
|
||||
logger.error(
|
||||
s"Requesting blocks from bitcoind polling failed, range=[$prevCount, $failedCount]",
|
||||
err)
|
||||
requestsBlocksF
|
||||
} else if (prevCount > count) {
|
||||
Future.failed(new RuntimeException(
|
||||
s"Bitcoind is at a block height ($count) before the wallet's ($prevCount)"))
|
||||
} else {
|
||||
logger.debug(s"In sync $prevCount count=$count")
|
||||
Future.unit
|
||||
}
|
||||
|
||||
requestsBlocksF
|
||||
} else if (prevCount > count) {
|
||||
Future.failed(new RuntimeException(
|
||||
s"Bitcoind is at a block height ($count) before the wallet's ($prevCount)"))
|
||||
} else {
|
||||
logger.debug(s"In sync $prevCount count=$count")
|
||||
Future.unit
|
||||
}
|
||||
} yield {
|
||||
retval
|
||||
}
|
||||
|
||||
res.onComplete { _ =>
|
||||
processingBitcoindBlocks.set(false)
|
||||
setSyncingFlag(false, bitcoind, chainCallbacksOpt)
|
||||
}
|
||||
res.onComplete(_ => processingBitcoindBlocks.set(false))
|
||||
res
|
||||
} else {
|
||||
logger.info(
|
||||
|
|
|
@ -68,13 +68,16 @@ case class ChainRoutes(
|
|||
|
||||
case ServerCommand("getinfo", _) =>
|
||||
complete {
|
||||
chain.getBestBlockHeader().map { header =>
|
||||
for {
|
||||
header <- chain.getBestBlockHeader()
|
||||
syncing <- chain.isSyncing()
|
||||
} yield {
|
||||
val info = BitcoinSServerInfo(network = network,
|
||||
blockHeight = header.height,
|
||||
blockHash = header.hashBE,
|
||||
torStarted =
|
||||
startedTorConfigF.isCompleted)
|
||||
|
||||
startedTorConfigF.isCompleted,
|
||||
syncing = syncing)
|
||||
Server.httpSuccess(info.toJson)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -613,7 +613,7 @@ object Rescan extends ServerJsonModels {
|
|||
case other =>
|
||||
Failure(
|
||||
new IllegalArgumentException(
|
||||
s"Bad number of arguments: ${other.length}. Expected: 4"))
|
||||
s"Bad number of arguments: ${other.length}. Expected: 5"))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -3,7 +3,11 @@ package org.bitcoins.server.util
|
|||
import akka.http.scaladsl.model.ws.{Message, TextMessage}
|
||||
import akka.stream.scaladsl.SourceQueueWithComplete
|
||||
import grizzled.slf4j.Logging
|
||||
import org.bitcoins.chain.{ChainCallbacks, OnBlockHeaderConnected}
|
||||
import org.bitcoins.chain.{
|
||||
ChainCallbacks,
|
||||
OnBlockHeaderConnected,
|
||||
OnSyncFlagChanged
|
||||
}
|
||||
import org.bitcoins.commons.jsonmodels.ws.TorNotification.TorStartedNotification
|
||||
import org.bitcoins.commons.jsonmodels.ws.{
|
||||
ChainNotification,
|
||||
|
@ -63,7 +67,18 @@ object WebsocketUtil extends Logging {
|
|||
f
|
||||
}
|
||||
|
||||
ChainCallbacks.onBlockHeaderConnected(onBlockProcessed)
|
||||
val onSyncFlagChanged: OnSyncFlagChanged = { syncing =>
|
||||
val notification = ChainNotification.SyncFlagChangedNotification(syncing)
|
||||
val notificationJson =
|
||||
upickle.default.writeJs(notification)(WsPicklers.syncFlagChangedPickler)
|
||||
val msg = TextMessage.Strict(notificationJson.toString())
|
||||
for {
|
||||
_ <- queue.offer(msg)
|
||||
} yield ()
|
||||
}
|
||||
|
||||
ChainCallbacks.onBlockHeaderConnected(onBlockProcessed) +
|
||||
ChainCallbacks.onOnSyncFlagChanged(onSyncFlagChanged)
|
||||
}
|
||||
|
||||
/** Builds websocket callbacks for the wallet */
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.bitcoins.rpc.client.v23.BitcoindV23RpcClient
|
|||
import org.bitcoins.rpc.config._
|
||||
|
||||
import java.io.File
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
import scala.concurrent.Future
|
||||
|
||||
/** This class is not guaranteed to be compatible with any particular
|
||||
|
@ -67,6 +68,8 @@ class BitcoindRpcClient(override val instance: BitcoindInstance)(implicit
|
|||
with PsbtRpc
|
||||
with UtilRpc {
|
||||
|
||||
private val syncing = new AtomicBoolean(false)
|
||||
|
||||
override lazy val version: Future[BitcoindVersion] = {
|
||||
instance match {
|
||||
case _: BitcoindInstanceRemote =>
|
||||
|
@ -275,6 +278,13 @@ class BitcoindRpcClient(override val instance: BitcoindInstance)(implicit
|
|||
s"Bitcoin Core $v does not support block filters headers through the rpc")
|
||||
}
|
||||
}
|
||||
|
||||
override def isSyncing(): Future[Boolean] = Future.successful(syncing.get())
|
||||
|
||||
override def setSyncing(value: Boolean): Future[ChainApi] = {
|
||||
syncing.set(value)
|
||||
Future.successful(this)
|
||||
}
|
||||
}
|
||||
|
||||
object BitcoindRpcClient {
|
||||
|
|
|
@ -1,7 +1,12 @@
|
|||
package org.bitcoins.chain.blockchain
|
||||
|
||||
import org.bitcoins.asyncutil.AsyncUtil
|
||||
import org.bitcoins.chain.pow.Pow
|
||||
import org.bitcoins.chain.{ChainCallbacks, OnBlockHeaderConnected}
|
||||
import org.bitcoins.chain.{
|
||||
ChainCallbacks,
|
||||
OnBlockHeaderConnected,
|
||||
OnSyncFlagChanged
|
||||
}
|
||||
import org.bitcoins.core.api.chain.ChainApi
|
||||
import org.bitcoins.core.api.chain.ChainQueryApi.FilterResponse
|
||||
import org.bitcoins.core.api.chain.db.{
|
||||
|
@ -706,6 +711,33 @@ class ChainHandlerTest extends ChainDbUnitTest {
|
|||
}
|
||||
}
|
||||
|
||||
it must "execute sync callback" in { chainHandler: ChainHandler =>
|
||||
@volatile var values = Vector.empty[Boolean]
|
||||
val callback: OnSyncFlagChanged = { (value: Boolean) =>
|
||||
Future {
|
||||
synchronized {
|
||||
values = values :+ value
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
val callbacks = ChainCallbacks.onOnSyncFlagChanged(callback)
|
||||
chainHandler.chainConfig.addCallbacks(callbacks)
|
||||
|
||||
for {
|
||||
_ <- chainHandler.setSyncing(false)
|
||||
_ <- chainHandler.setSyncing(false)
|
||||
_ <- chainHandler.setSyncing(true)
|
||||
_ <- chainHandler.setSyncing(true)
|
||||
_ <- chainHandler.setSyncing(false)
|
||||
_ <- chainHandler.setSyncing(false)
|
||||
_ <- AsyncUtil.awaitCondition { () => synchronized { values.size == 2 } }
|
||||
} yield {
|
||||
assert(values == Vector(true, false))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/** Checks that
|
||||
* 1. The header1 & header2 have the same chainwork
|
||||
* 2. Checks that header1 and header2 have the same time
|
||||
|
|
|
@ -164,6 +164,7 @@ class MainnetChainHandlerTest extends ChainDbUnitTest {
|
|||
val handler = ChainHandlerCached(chainHandler.blockHeaderDAO,
|
||||
chainHandler.filterHeaderDAO,
|
||||
chainHandler.filterDAO,
|
||||
chainHandler.stateDAO,
|
||||
Vector(blockchain),
|
||||
Map.empty)
|
||||
val processorF = Future.successful(handler)
|
||||
|
|
|
@ -0,0 +1,45 @@
|
|||
package org.bitcoins.chain.models
|
||||
|
||||
import org.bitcoins.testkit.chain.ChainDbUnitTest
|
||||
import org.scalatest.FutureOutcome
|
||||
|
||||
/** Created by chris on 9/8/16.
|
||||
*/
|
||||
class ChainStateDescriptorDAOTest extends ChainDbUnitTest {
|
||||
|
||||
override type FixtureParam = ChainStateDescriptorDAO
|
||||
|
||||
override def withFixture(test: OneArgAsyncTest): FutureOutcome =
|
||||
withChainStateDescriptorDAO(test)
|
||||
|
||||
behavior of "ChainStateDescriptorDAO"
|
||||
|
||||
it should "set and get sync flag" in { dao: ChainStateDescriptorDAO =>
|
||||
for {
|
||||
read <- dao.read(SyncDescriptor.tpe)
|
||||
_ = assert(read.isEmpty)
|
||||
sync <- dao.isSyncing
|
||||
_ = assert(!sync)
|
||||
|
||||
_ <- dao.updateSyncing(false)
|
||||
|
||||
read <- dao.read(SyncDescriptor.tpe)
|
||||
_ = assert(
|
||||
read == Some(
|
||||
ChainStateDescriptorDb(SyncDescriptor.tpe, SyncDescriptor(false))))
|
||||
sync <- dao.isSyncing
|
||||
_ = assert(!sync)
|
||||
|
||||
_ <- dao.updateSyncing(true)
|
||||
|
||||
read <- dao.read(SyncDescriptor.tpe)
|
||||
_ = assert(
|
||||
read == Some(
|
||||
ChainStateDescriptorDb(SyncDescriptor.tpe, SyncDescriptor(true))))
|
||||
sync <- dao.isSyncing
|
||||
_ = assert(sync)
|
||||
} yield succeed
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1 @@
|
|||
CREATE TABLE IF NOT EXISTS "state_descriptors" ("type" VARCHAR(254) PRIMARY KEY NOT NULL, "descriptor" TEXT NOT NULL);
|
|
@ -0,0 +1 @@
|
|||
CREATE TABLE IF NOT EXISTS "state_descriptors" ("type" VARCHAR(254) PRIMARY KEY NOT NULL, "descriptor" TEXT NOT NULL);
|
|
@ -12,6 +12,8 @@ trait ChainCallbacks {
|
|||
Vector[(Int, BlockHeader)],
|
||||
OnBlockHeaderConnected]
|
||||
|
||||
def onSyncFlagChanged: CallbackHandler[Boolean, OnSyncFlagChanged]
|
||||
|
||||
def +(other: ChainCallbacks): ChainCallbacks
|
||||
|
||||
def executeOnBlockHeaderConnectedCallbacks(
|
||||
|
@ -27,36 +29,58 @@ trait ChainCallbacks {
|
|||
err))
|
||||
}
|
||||
|
||||
def executeOnSyncFlagChanged(logger: Logger, syncing: Boolean)(implicit
|
||||
ec: ExecutionContext): Future[Unit] = {
|
||||
onSyncFlagChanged.execute(
|
||||
syncing,
|
||||
(err: Throwable) =>
|
||||
logger.error(s"${onSyncFlagChanged.name} Callback failed with error: ",
|
||||
err))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/** Callback for handling a received block header */
|
||||
trait OnBlockHeaderConnected extends Callback[Vector[(Int, BlockHeader)]]
|
||||
|
||||
trait OnSyncFlagChanged extends Callback[Boolean]
|
||||
|
||||
object ChainCallbacks {
|
||||
|
||||
private case class ChainCallbacksImpl(
|
||||
onBlockHeaderConnected: CallbackHandler[
|
||||
Vector[(Int, BlockHeader)],
|
||||
OnBlockHeaderConnected])
|
||||
OnBlockHeaderConnected],
|
||||
onSyncFlagChanged: CallbackHandler[Boolean, OnSyncFlagChanged])
|
||||
extends ChainCallbacks {
|
||||
|
||||
override def +(other: ChainCallbacks): ChainCallbacks =
|
||||
copy(onBlockHeaderConnected =
|
||||
onBlockHeaderConnected ++ other.onBlockHeaderConnected)
|
||||
onBlockHeaderConnected ++ other.onBlockHeaderConnected,
|
||||
onSyncFlagChanged = onSyncFlagChanged ++ other.onSyncFlagChanged)
|
||||
}
|
||||
|
||||
/** Constructs a set of callbacks that only acts on block headers connected */
|
||||
def onBlockHeaderConnected(f: OnBlockHeaderConnected): ChainCallbacks =
|
||||
ChainCallbacks(onBlockHeaderConnected = Vector(f))
|
||||
|
||||
def onOnSyncFlagChanged(f: OnSyncFlagChanged): ChainCallbacks =
|
||||
ChainCallbacks(onSyncFlagChanged = Vector(f))
|
||||
|
||||
lazy val empty: ChainCallbacks =
|
||||
ChainCallbacks(onBlockHeaderConnected = Vector.empty)
|
||||
|
||||
def apply(
|
||||
onBlockHeaderConnected: Vector[OnBlockHeaderConnected] =
|
||||
onBlockHeaderConnected: Vector[OnBlockHeaderConnected] = Vector.empty,
|
||||
onSyncFlagChanged: Vector[OnSyncFlagChanged] =
|
||||
Vector.empty): ChainCallbacks =
|
||||
ChainCallbacksImpl(onBlockHeaderConnected =
|
||||
CallbackHandler[Vector[(Int, BlockHeader)], OnBlockHeaderConnected](
|
||||
"onBlockHeaderConnected",
|
||||
onBlockHeaderConnected))
|
||||
ChainCallbacksImpl(
|
||||
onBlockHeaderConnected =
|
||||
CallbackHandler[Vector[(Int, BlockHeader)], OnBlockHeaderConnected](
|
||||
"onBlockHeaderConnected",
|
||||
onBlockHeaderConnected),
|
||||
onSyncFlagChanged =
|
||||
CallbackHandler[Boolean, OnSyncFlagChanged]("onSyncFlagChanged",
|
||||
onSyncFlagChanged)
|
||||
)
|
||||
}
|
||||
|
|
|
@ -35,6 +35,7 @@ class ChainHandler(
|
|||
val blockHeaderDAO: BlockHeaderDAO,
|
||||
val filterHeaderDAO: CompactFilterHeaderDAO,
|
||||
val filterDAO: CompactFilterDAO,
|
||||
val stateDAO: ChainStateDescriptorDAO,
|
||||
val blockFilterCheckpoints: Map[
|
||||
DoubleSha256DigestBE,
|
||||
DoubleSha256DigestBE])(implicit
|
||||
|
@ -145,6 +146,7 @@ class ChainHandler(
|
|||
val newChainHandler = ChainHandler(blockHeaderDAO,
|
||||
filterHeaderDAO,
|
||||
filterDAO,
|
||||
stateDAO,
|
||||
blockFilterCheckpoints =
|
||||
blockFilterCheckpoints)
|
||||
|
||||
|
@ -519,6 +521,7 @@ class ChainHandler(
|
|||
ChainHandler(blockHeaderDAO = blockHeaderDAO,
|
||||
filterHeaderDAO = filterHeaderDAO,
|
||||
filterDAO = filterDAO,
|
||||
stateDAO = stateDAO,
|
||||
blockFilterCheckpoints = updatedCheckpoints)
|
||||
}
|
||||
}
|
||||
|
@ -924,6 +927,7 @@ class ChainHandler(
|
|||
blockHeaderDAO = blockHeaderDAO,
|
||||
filterHeaderDAO = filterHeaderDAO,
|
||||
filterDAO = filterDAO,
|
||||
stateDAO = stateDAO,
|
||||
blockFilterCheckpoints = blockFilterCheckpoints
|
||||
)
|
||||
}
|
||||
|
@ -947,11 +951,13 @@ class ChainHandler(
|
|||
blockHeaderDAO: BlockHeaderDAO = blockHeaderDAO,
|
||||
filterHeaderDAO: CompactFilterHeaderDAO = filterHeaderDAO,
|
||||
filterDAO: CompactFilterDAO = filterDAO,
|
||||
stateDAO: ChainStateDescriptorDAO = stateDAO,
|
||||
blockFilterCheckpoints: Map[DoubleSha256DigestBE, DoubleSha256DigestBE] =
|
||||
blockFilterCheckpoints): ChainHandler = {
|
||||
new ChainHandler(blockHeaderDAO = blockHeaderDAO,
|
||||
filterHeaderDAO = filterHeaderDAO,
|
||||
filterDAO = filterDAO,
|
||||
stateDAO = stateDAO,
|
||||
blockFilterCheckpoints = blockFilterCheckpoints)
|
||||
}
|
||||
|
||||
|
@ -1008,6 +1014,21 @@ class ChainHandler(
|
|||
times.sorted.apply(times.size / 2)
|
||||
}
|
||||
}
|
||||
|
||||
override def isSyncing(): Future[Boolean] = {
|
||||
stateDAO.isSyncing
|
||||
}
|
||||
|
||||
override def setSyncing(value: Boolean): Future[ChainApi] = {
|
||||
for {
|
||||
changed <- stateDAO.updateSyncing(value)
|
||||
} yield {
|
||||
if (changed && chainConfig.callBacks.onSyncFlagChanged.nonEmpty) {
|
||||
chainConfig.callBacks.executeOnSyncFlagChanged(logger, value)
|
||||
}
|
||||
this
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
object ChainHandler {
|
||||
|
@ -1016,6 +1037,7 @@ object ChainHandler {
|
|||
blockHeaderDAO: BlockHeaderDAO,
|
||||
filterHeaderDAO: CompactFilterHeaderDAO,
|
||||
filterDAO: CompactFilterDAO,
|
||||
stateDAO: ChainStateDescriptorDAO,
|
||||
blockFilterCheckpoints: Map[DoubleSha256DigestBE, DoubleSha256DigestBE])(
|
||||
implicit
|
||||
ec: ExecutionContext,
|
||||
|
@ -1023,6 +1045,7 @@ object ChainHandler {
|
|||
new ChainHandler(blockHeaderDAO,
|
||||
filterHeaderDAO,
|
||||
filterDAO,
|
||||
stateDAO,
|
||||
blockFilterCheckpoints)
|
||||
}
|
||||
|
||||
|
@ -1031,6 +1054,7 @@ object ChainHandler {
|
|||
new ChainHandler(blockHeaderDAO = cached.blockHeaderDAO,
|
||||
filterHeaderDAO = cached.filterHeaderDAO,
|
||||
filterDAO = cached.filterDAO,
|
||||
stateDAO = cached.stateDAO,
|
||||
blockFilterCheckpoints = Map.empty)(cached.chainConfig, ec)
|
||||
}
|
||||
|
||||
|
@ -1040,24 +1064,29 @@ object ChainHandler {
|
|||
def fromDatabase(
|
||||
blockHeaderDAO: BlockHeaderDAO,
|
||||
filterHeaderDAO: CompactFilterHeaderDAO,
|
||||
filterDAO: CompactFilterDAO)(implicit
|
||||
filterDAO: CompactFilterDAO,
|
||||
stateDAO: ChainStateDescriptorDAO)(implicit
|
||||
ec: ExecutionContext,
|
||||
chainConfig: ChainAppConfig): ChainHandler = {
|
||||
new ChainHandler(blockHeaderDAO = blockHeaderDAO,
|
||||
filterHeaderDAO = filterHeaderDAO,
|
||||
filterDAO = filterDAO,
|
||||
stateDAO = stateDAO,
|
||||
blockFilterCheckpoints = Map.empty)
|
||||
}
|
||||
|
||||
def apply(
|
||||
blockHeaderDAO: BlockHeaderDAO,
|
||||
filterHeaderDAO: CompactFilterHeaderDAO,
|
||||
filterDAO: CompactFilterDAO)(implicit
|
||||
filterDAO: CompactFilterDAO,
|
||||
stateDAO: ChainStateDescriptorDAO
|
||||
)(implicit
|
||||
ec: ExecutionContext,
|
||||
chainConfig: ChainAppConfig): ChainHandler = {
|
||||
new ChainHandler(blockHeaderDAO = blockHeaderDAO,
|
||||
filterHeaderDAO = filterHeaderDAO,
|
||||
filterDAO = filterDAO,
|
||||
stateDAO = stateDAO,
|
||||
blockFilterCheckpoints = Map.empty)
|
||||
}
|
||||
|
||||
|
@ -1067,10 +1096,12 @@ object ChainHandler {
|
|||
lazy val blockHeaderDAO = BlockHeaderDAO()
|
||||
lazy val filterHeaderDAO = CompactFilterHeaderDAO()
|
||||
lazy val filterDAO = CompactFilterDAO()
|
||||
lazy val stateDAO = ChainStateDescriptorDAO()
|
||||
|
||||
ChainHandler.fromDatabase(blockHeaderDAO = blockHeaderDAO,
|
||||
filterHeaderDAO = filterHeaderDAO,
|
||||
filterDAO = filterDAO)
|
||||
filterDAO = filterDAO,
|
||||
stateDAO = stateDAO)
|
||||
}
|
||||
|
||||
/** Converts a [[ChainHandler]] to [[ChainHandlerCached]] by calling [[BlockHeaderDAO.getBlockchains()]] */
|
||||
|
@ -1083,6 +1114,7 @@ object ChainHandler {
|
|||
blockHeaderDAO = chainHandler.blockHeaderDAO,
|
||||
filterHeaderDAO = chainHandler.filterHeaderDAO,
|
||||
filterDAO = chainHandler.filterDAO,
|
||||
stateDAO = chainHandler.stateDAO,
|
||||
blockchains = blockchains,
|
||||
blockFilterCheckpoints = chainHandler.blockFilterCheckpoints
|
||||
)(chainHandler.chainConfig, ec)
|
||||
|
|
|
@ -3,6 +3,7 @@ package org.bitcoins.chain.blockchain
|
|||
import org.bitcoins.chain.config.ChainAppConfig
|
||||
import org.bitcoins.chain.models.{
|
||||
BlockHeaderDAO,
|
||||
ChainStateDescriptorDAO,
|
||||
CompactFilterDAO,
|
||||
CompactFilterHeaderDAO
|
||||
}
|
||||
|
@ -22,6 +23,7 @@ case class ChainHandlerCached(
|
|||
override val blockHeaderDAO: BlockHeaderDAO,
|
||||
override val filterHeaderDAO: CompactFilterHeaderDAO,
|
||||
override val filterDAO: CompactFilterDAO,
|
||||
override val stateDAO: ChainStateDescriptorDAO,
|
||||
blockchains: Vector[Blockchain],
|
||||
override val blockFilterCheckpoints: Map[
|
||||
DoubleSha256DigestBE,
|
||||
|
@ -31,6 +33,7 @@ case class ChainHandlerCached(
|
|||
extends ChainHandler(blockHeaderDAO,
|
||||
filterHeaderDAO,
|
||||
filterDAO,
|
||||
stateDAO,
|
||||
blockFilterCheckpoints) {
|
||||
|
||||
/** Gets the best block header from the given [[blockchains]] parameter */
|
||||
|
@ -63,7 +66,8 @@ object ChainHandlerCached {
|
|||
def fromDatabase(
|
||||
blockHeaderDAO: BlockHeaderDAO,
|
||||
filterHeaderDAO: CompactFilterHeaderDAO,
|
||||
filterDAO: CompactFilterDAO)(implicit
|
||||
filterDAO: CompactFilterDAO,
|
||||
stateDAO: ChainStateDescriptorDAO)(implicit
|
||||
ec: ExecutionContext,
|
||||
chainConfig: ChainAppConfig): Future[ChainHandlerCached] = {
|
||||
val bestChainsF = blockHeaderDAO.getBlockchains()
|
||||
|
@ -72,6 +76,7 @@ object ChainHandlerCached {
|
|||
new ChainHandlerCached(blockHeaderDAO = blockHeaderDAO,
|
||||
filterHeaderDAO = filterHeaderDAO,
|
||||
filterDAO = filterDAO,
|
||||
stateDAO = stateDAO,
|
||||
blockchains = chains,
|
||||
blockFilterCheckpoints = Map.empty))
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package org.bitcoins.chain.db
|
|||
import org.bitcoins.chain.config.ChainAppConfig
|
||||
import org.bitcoins.chain.models.{
|
||||
BlockHeaderDAO,
|
||||
ChainStateDescriptorDAO,
|
||||
CompactFilterDAO,
|
||||
CompactFilterHeaderDAO
|
||||
}
|
||||
|
@ -30,6 +31,10 @@ trait ChainDbManagement extends DbManagement {
|
|||
CompactFilterDAO()(ec, appConfig).table
|
||||
}
|
||||
|
||||
private lazy val stateTable: TableQuery[Table[_]] = {
|
||||
ChainStateDescriptorDAO()(ec, appConfig).table
|
||||
}
|
||||
|
||||
override lazy val allTables: List[TableQuery[Table[_]]] =
|
||||
List(chainTable, filterHeaderTable, filterTable)
|
||||
List(chainTable, filterHeaderTable, filterTable, stateTable)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,67 @@
|
|||
package org.bitcoins.chain.models
|
||||
|
||||
import org.bitcoins.crypto.StringFactory
|
||||
|
||||
sealed abstract class ChainStateDescriptorType
|
||||
|
||||
object ChainStateDescriptorType
|
||||
extends StringFactory[ChainStateDescriptorType] {
|
||||
|
||||
final case object Syncing extends ChainStateDescriptorType
|
||||
|
||||
val all: Vector[ChainStateDescriptorType] = Vector(Syncing)
|
||||
|
||||
override def fromStringOpt(str: String): Option[ChainStateDescriptorType] = {
|
||||
all.find(state => str.toLowerCase() == state.toString.toLowerCase)
|
||||
}
|
||||
|
||||
override def fromString(string: String): ChainStateDescriptorType = {
|
||||
fromStringOpt(string) match {
|
||||
case Some(state) => state
|
||||
case None =>
|
||||
sys.error(s"Could not find ChainStateDescriptorType for string=$string")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sealed abstract class ChainStateDescriptor {
|
||||
def descriptorType: ChainStateDescriptorType
|
||||
}
|
||||
|
||||
sealed trait ChainStateDescriptorFactory[T <: ChainStateDescriptor]
|
||||
extends StringFactory[T] {
|
||||
def tpe: ChainStateDescriptorType
|
||||
}
|
||||
|
||||
object ChainStateDescriptor extends StringFactory[ChainStateDescriptor] {
|
||||
|
||||
val all: Vector[StringFactory[ChainStateDescriptor]] =
|
||||
Vector(SyncDescriptor)
|
||||
|
||||
override def fromString(string: String): ChainStateDescriptor = {
|
||||
all.find(f => f.fromStringT(string).isSuccess) match {
|
||||
case Some(factory) => factory.fromString(string)
|
||||
case None =>
|
||||
sys.error(s"Could not find ChainStateDescriptor for string=$string")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
case class SyncDescriptor(syncing: Boolean) extends ChainStateDescriptor {
|
||||
|
||||
override val descriptorType: ChainStateDescriptorType =
|
||||
ChainStateDescriptorType.Syncing
|
||||
|
||||
override val toString: String = syncing.toString
|
||||
}
|
||||
|
||||
object SyncDescriptor extends ChainStateDescriptorFactory[SyncDescriptor] {
|
||||
|
||||
override val tpe: ChainStateDescriptorType =
|
||||
ChainStateDescriptorType.Syncing
|
||||
|
||||
override def fromString(string: String): SyncDescriptor = {
|
||||
val rescanning = java.lang.Boolean.parseBoolean(string)
|
||||
SyncDescriptor(rescanning)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,112 @@
|
|||
package org.bitcoins.chain.models
|
||||
|
||||
import org.bitcoins.chain.config.ChainAppConfig
|
||||
import org.bitcoins.chain.models.ChainStateDescriptorType.Syncing
|
||||
import org.bitcoins.db.{CRUD, SlickUtil}
|
||||
import slick.lifted.ProvenShape
|
||||
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
|
||||
case class ChainStateDescriptorDb(
|
||||
tpe: ChainStateDescriptorType,
|
||||
descriptor: ChainStateDescriptor) {
|
||||
require(descriptor.descriptorType == tpe)
|
||||
}
|
||||
|
||||
case class ChainStateDescriptorDAO()(implicit
|
||||
override val ec: ExecutionContext,
|
||||
override val appConfig: ChainAppConfig)
|
||||
extends CRUD[ChainStateDescriptorDb, ChainStateDescriptorType]
|
||||
with SlickUtil[ChainStateDescriptorDb, ChainStateDescriptorType] {
|
||||
import profile.api._
|
||||
|
||||
implicit val chainStateDescriptorTypeMapper: BaseColumnType[
|
||||
ChainStateDescriptorType] =
|
||||
MappedColumnType.base[ChainStateDescriptorType, String](
|
||||
_.toString,
|
||||
ChainStateDescriptorType.fromString)
|
||||
|
||||
implicit val chainStateDescriptorMapper: BaseColumnType[
|
||||
ChainStateDescriptor] =
|
||||
MappedColumnType.base[ChainStateDescriptor, String](
|
||||
_.toString,
|
||||
ChainStateDescriptor.fromString)
|
||||
|
||||
override val table: profile.api.TableQuery[ChainStateDescriptorTable] =
|
||||
TableQuery[ChainStateDescriptorTable]
|
||||
|
||||
override def createAll(ts: Vector[ChainStateDescriptorDb]): Future[
|
||||
Vector[ChainStateDescriptorDb]] =
|
||||
createAllNoAutoInc(ts, safeDatabase)
|
||||
|
||||
override def findByPrimaryKeys(ids: Vector[ChainStateDescriptorType]): Query[
|
||||
ChainStateDescriptorTable,
|
||||
ChainStateDescriptorDb,
|
||||
Seq] = {
|
||||
table.filter(_.tpe.inSet(ids))
|
||||
}
|
||||
|
||||
override def findByPrimaryKey(id: ChainStateDescriptorType): Query[
|
||||
Table[_],
|
||||
ChainStateDescriptorDb,
|
||||
Seq] = {
|
||||
table.filter(_.tpe === id)
|
||||
}
|
||||
|
||||
override def findAll(ts: Vector[ChainStateDescriptorDb]): Query[
|
||||
Table[_],
|
||||
ChainStateDescriptorDb,
|
||||
Seq] =
|
||||
findByPrimaryKeys(ts.map(_.tpe))
|
||||
|
||||
def getSync(): Future[Option[SyncDescriptor]] = {
|
||||
read(Syncing).map {
|
||||
case Some(db) =>
|
||||
val desc = SyncDescriptor.fromString(db.descriptor.toString)
|
||||
Some(desc)
|
||||
case None => None
|
||||
}
|
||||
}
|
||||
|
||||
def isSyncing: Future[Boolean] = getSync().map(_.exists(_.syncing))
|
||||
|
||||
def updateSyncing(syncing: Boolean): Future[Boolean] = {
|
||||
val tpe: ChainStateDescriptorType = Syncing
|
||||
val query = table.filter(_.tpe === tpe)
|
||||
val actions = for {
|
||||
dbs <- query.result
|
||||
res <- dbs.headOption match {
|
||||
case None =>
|
||||
val desc = SyncDescriptor(syncing)
|
||||
val db = ChainStateDescriptorDb(tpe, desc)
|
||||
(table += db).map(_ => syncing)
|
||||
case Some(db) =>
|
||||
val oldDesc = SyncDescriptor.fromString(db.descriptor.toString)
|
||||
if (oldDesc.syncing != syncing) {
|
||||
val newDesc = SyncDescriptor(syncing)
|
||||
val newDb = ChainStateDescriptorDb(tpe, newDesc)
|
||||
query.update(newDb).map(_ => true)
|
||||
} else {
|
||||
DBIO.successful(false)
|
||||
}
|
||||
}
|
||||
} yield res
|
||||
|
||||
safeDatabase.run(actions)
|
||||
}
|
||||
|
||||
class ChainStateDescriptorTable(t: Tag)
|
||||
extends Table[ChainStateDescriptorDb](t,
|
||||
schemaName,
|
||||
"state_descriptors") {
|
||||
|
||||
def tpe: Rep[ChainStateDescriptorType] = column("type", O.PrimaryKey)
|
||||
|
||||
def descriptor: Rep[ChainStateDescriptor] = column("descriptor")
|
||||
|
||||
override def * : ProvenShape[ChainStateDescriptorDb] =
|
||||
(tpe, descriptor).<>(ChainStateDescriptorDb.tupled,
|
||||
ChainStateDescriptorDb.unapply)
|
||||
|
||||
}
|
||||
}
|
|
@ -20,6 +20,7 @@ trait ChainApi extends ChainQueryApi {
|
|||
/** Adds a block header to our chain project.
|
||||
* This will return a failed future when the
|
||||
* given header is invalid.
|
||||
*
|
||||
* @param header
|
||||
* @return
|
||||
*/
|
||||
|
@ -32,6 +33,7 @@ trait ChainApi extends ChainQueryApi {
|
|||
* that they are given. If the headers are out of order, this method will fail.
|
||||
*
|
||||
* This method will also fail when there are zero headers given that are valid.
|
||||
*
|
||||
* @param headers
|
||||
* @return
|
||||
*/
|
||||
|
@ -49,8 +51,8 @@ trait ChainApi extends ChainQueryApi {
|
|||
/** Gets the number of blocks in the database */
|
||||
def getBlockCount(): Future[Int]
|
||||
|
||||
// /** Gets the hash of the block that is what we consider "best" */
|
||||
// override def getBestBlockHash: Future[DoubleSha256DigestBE]
|
||||
// /** Gets the hash of the block that is what we consider "best" */
|
||||
// override def getBestBlockHash: Future[DoubleSha256DigestBE]
|
||||
|
||||
/** Gets the best block header we have */
|
||||
def getBestBlockHeader(): Future[BlockHeaderDb]
|
||||
|
@ -147,4 +149,8 @@ trait ChainApi extends ChainQueryApi {
|
|||
def getHeadersBetween(
|
||||
from: BlockHeaderDb,
|
||||
to: BlockHeaderDb): Future[Vector[BlockHeaderDb]]
|
||||
|
||||
def isSyncing(): Future[Boolean]
|
||||
|
||||
def setSyncing(value: Boolean): Future[ChainApi]
|
||||
}
|
||||
|
|
|
@ -48,6 +48,7 @@ object PicklerKeys {
|
|||
final val outcomesKey: String = "outcomes"
|
||||
|
||||
final val torStartedKey: String = "torStarted"
|
||||
final val syncKey: String = "syncing"
|
||||
|
||||
//tlv points
|
||||
final val pointsKey = "points"
|
||||
|
|
|
@ -59,13 +59,13 @@ class DbManagementTest extends BitcoinSAsyncTest with EmbeddedPg {
|
|||
val result = chainDbManagement.migrate()
|
||||
chainAppConfig.driver match {
|
||||
case SQLite =>
|
||||
val expected = 6
|
||||
val expected = 7
|
||||
assert(result.migrationsExecuted == expected)
|
||||
val flywayInfo = chainDbManagement.info()
|
||||
assert(flywayInfo.applied().length == expected)
|
||||
assert(flywayInfo.pending().length == 0)
|
||||
case PostgreSQL =>
|
||||
val expected = 5
|
||||
val expected = 6
|
||||
assert(result.migrationsExecuted == expected)
|
||||
val flywayInfo = chainDbManagement.info()
|
||||
//+1 for << Flyway Schema Creation >>
|
||||
|
|
|
@ -66,10 +66,11 @@ val chainProjectInitF = chainConfig.start()
|
|||
val blockHeaderDAO = BlockHeaderDAO()
|
||||
val compactFilterHeaderDAO = CompactFilterHeaderDAO()
|
||||
val compactFilterDAO = CompactFilterDAO()
|
||||
val stateDAO = ChainStateDescriptorDAO()
|
||||
|
||||
|
||||
//initialize the chain handler from the database
|
||||
val chainHandler = ChainHandler.fromDatabase(blockHeaderDAO, compactFilterHeaderDAO, compactFilterDAO)
|
||||
val chainHandler = ChainHandler.fromDatabase(blockHeaderDAO, compactFilterHeaderDAO, compactFilterDAO, stateDAO)
|
||||
|
||||
// Now, do the actual syncing:
|
||||
val syncedChainApiF = for {
|
||||
|
|
|
@ -135,10 +135,12 @@ val syncF: Future[ChainApi] = configF.flatMap { _ =>
|
|||
val blockHeaderDAO = BlockHeaderDAO()
|
||||
val compactFilterHeaderDAO = CompactFilterHeaderDAO()
|
||||
val compactFilterDAO = CompactFilterDAO()
|
||||
val stateDAO = ChainStateDescriptorDAO()
|
||||
val chainHandler = ChainHandler(
|
||||
blockHeaderDAO,
|
||||
compactFilterHeaderDAO,
|
||||
compactFilterDAO,
|
||||
stateDAO,
|
||||
blockFilterCheckpoints = Map.empty)
|
||||
|
||||
ChainSync.sync(chainHandler, getBlockHeaderFunc, getBestBlockHashFunc)
|
||||
|
|
|
@ -76,6 +76,7 @@ case class NeutrinoNode(
|
|||
for {
|
||||
chainApi <- chainApiFromDb()
|
||||
_ <- chainApi.getBestBlockHash()
|
||||
_ <- chainApi.setSyncing(true)
|
||||
|
||||
syncPeer <- peerManager.randomPeerWithService(
|
||||
ServiceIdentifier.NODE_COMPACT_FILTERS)
|
||||
|
|
|
@ -5,6 +5,7 @@ import org.bitcoins.chain.blockchain.ChainHandlerCached
|
|||
import org.bitcoins.chain.config.ChainAppConfig
|
||||
import org.bitcoins.chain.models.{
|
||||
BlockHeaderDAO,
|
||||
ChainStateDescriptorDAO,
|
||||
CompactFilterDAO,
|
||||
CompactFilterHeaderDAO
|
||||
}
|
||||
|
@ -63,7 +64,8 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
|
|||
executionContext: ExecutionContext): Future[ChainHandlerCached] = {
|
||||
ChainHandlerCached.fromDatabase(BlockHeaderDAO(),
|
||||
CompactFilterHeaderDAO(),
|
||||
CompactFilterDAO())
|
||||
CompactFilterDAO(),
|
||||
ChainStateDescriptorDAO())
|
||||
}
|
||||
|
||||
/** Unlike our chain api, this is cached inside our node
|
||||
|
|
|
@ -6,6 +6,7 @@ import org.bitcoins.chain.blockchain.ChainHandlerCached
|
|||
import org.bitcoins.chain.config.ChainAppConfig
|
||||
import org.bitcoins.chain.models.{
|
||||
BlockHeaderDAO,
|
||||
ChainStateDescriptorDAO,
|
||||
CompactFilterDAO,
|
||||
CompactFilterHeaderDAO
|
||||
}
|
||||
|
@ -231,9 +232,10 @@ object NodeAppConfig extends AppConfigFactoryActorSystem[NodeAppConfig] {
|
|||
val blockHeaderDAO = BlockHeaderDAO()
|
||||
val filterHeaderDAO = CompactFilterHeaderDAO()
|
||||
val filterDAO = CompactFilterDAO()
|
||||
val stateDAO = ChainStateDescriptorDAO()
|
||||
|
||||
val dmhF = ChainHandlerCached
|
||||
.fromDatabase(blockHeaderDAO, filterHeaderDAO, filterDAO)
|
||||
.fromDatabase(blockHeaderDAO, filterHeaderDAO, filterDAO, stateDAO)
|
||||
.map(handler => DataMessageHandler(handler, walletCreationTimeOpt))
|
||||
|
||||
nodeConf.nodeType match {
|
||||
|
|
|
@ -100,6 +100,7 @@ case class DataMessageHandler(
|
|||
case Some(filterHeaderHeight) =>
|
||||
Future.successful(filterHeaderHeight + filterHeaders.size)
|
||||
}
|
||||
newChainApi <- newChainApi.setSyncing(newSyncing)
|
||||
} yield {
|
||||
this.copy(chainApi = newChainApi,
|
||||
syncing = newSyncing,
|
||||
|
@ -153,6 +154,7 @@ case class DataMessageHandler(
|
|||
Future.successful(newSyncing)
|
||||
}
|
||||
}
|
||||
newChainApi <- newChainApi.setSyncing(newSyncing2)
|
||||
} yield {
|
||||
this.copy(
|
||||
chainApi = newChainApi,
|
||||
|
@ -209,7 +211,12 @@ case class DataMessageHandler(
|
|||
s"Received headers message with ${count.toInt} headers from $peer")
|
||||
logger.trace(
|
||||
s"Received headers=${headers.map(_.hashBE.hex).mkString("[", ",", "]")}")
|
||||
val chainApiF = chainApi.processHeaders(headers)
|
||||
val chainApiF = for {
|
||||
newChainApi <- chainApi.setSyncing(count.toInt > 0)
|
||||
processed <- newChainApi.processHeaders(headers)
|
||||
} yield {
|
||||
processed
|
||||
}
|
||||
|
||||
val getHeadersF = chainApiF
|
||||
.flatMap { newApi =>
|
||||
|
@ -349,9 +356,9 @@ case class DataMessageHandler(
|
|||
filterHeaderCount <- chainApi.getFilterHeaderCount()
|
||||
filterCount <- chainApi.getFilterCount()
|
||||
syncing <- {
|
||||
assert(headerHeight >= Math.max(filterHeaderCount, filterCount),
|
||||
"Header chain cannot be behind filter or filter header chain")
|
||||
assert(
|
||||
require(headerHeight >= Math.max(filterHeaderCount, filterCount),
|
||||
"Header chain cannot be behind filter or filter header chain")
|
||||
require(
|
||||
filterHeaderCount >= filterCount,
|
||||
s"Filter header height $filterHeaderCount must be atleast filter height $filterCount")
|
||||
if (headerHeight > filterHeaderCount) {
|
||||
|
@ -359,7 +366,7 @@ case class DataMessageHandler(
|
|||
s"Starting to fetch filter headers in data message handler")
|
||||
sendFirstGetCompactFilterHeadersCommand(peerMessageSender)
|
||||
} else {
|
||||
assert(
|
||||
require(
|
||||
headerHeight == filterHeaderCount && headerHeight == filterCount)
|
||||
logger.info(s"We are synced")
|
||||
Try(initialSyncDone.map(_.success(Done)))
|
||||
|
|
|
@ -170,6 +170,11 @@ trait ChainUnitTest
|
|||
destroy = () => ChainUnitTest.destroyAllTables())(test)
|
||||
}
|
||||
|
||||
def withChainStateDescriptorDAO(test: OneArgAsyncTest): FutureOutcome = {
|
||||
makeFixture(build = () => ChainUnitTest.createChainStateDescriptorDAO(),
|
||||
destroy = () => ChainUnitTest.destroyAllTables())(test)
|
||||
}
|
||||
|
||||
def withChainHandler(test: OneArgAsyncTest): FutureOutcome = {
|
||||
makeFixture(() => ChainUnitTest.createChainHandler(),
|
||||
() => ChainUnitTest.destroyAllTables())(test)
|
||||
|
@ -197,10 +202,12 @@ trait ChainUnitTest
|
|||
blockHeaderDAO <- ChainUnitTest.createPopulatedBlockHeaderDAO()
|
||||
filterHeaderDAO <- ChainUnitTest.createPopulatedFilterHeaderDAO()
|
||||
filterDAO <- ChainUnitTest.createPopulatedFilterDAO()
|
||||
stateDAO = ChainStateDescriptorDAO()
|
||||
chainHandler = ChainHandler.fromDatabase(blockHeaderDAO = blockHeaderDAO,
|
||||
filterHeaderDAO =
|
||||
filterHeaderDAO,
|
||||
filterDAO = filterDAO)
|
||||
filterDAO = filterDAO,
|
||||
stateDAO = stateDAO)
|
||||
} yield chainHandler
|
||||
}
|
||||
|
||||
|
@ -417,7 +424,8 @@ trait ChainUnitTest
|
|||
blockHeaderDAO: BlockHeaderDAO): Future[ReorgFixtureBlockHeaderDAO] = {
|
||||
val handler = ChainHandler.fromDatabase(blockHeaderDAO,
|
||||
CompactFilterHeaderDAO(),
|
||||
CompactFilterDAO())
|
||||
CompactFilterDAO(),
|
||||
ChainStateDescriptorDAO())
|
||||
val chainFixtureF = buildChainHandlerCompetingHeaders(handler)
|
||||
for {
|
||||
chainFixture <- chainFixtureF
|
||||
|
@ -478,6 +486,13 @@ object ChainUnitTest extends ChainVerificationLogger {
|
|||
chainHandlerF.map(_.blockHeaderDAO)
|
||||
}
|
||||
|
||||
def createChainStateDescriptorDAO()(implicit
|
||||
ec: ExecutionContext,
|
||||
appConfig: ChainAppConfig): Future[ChainStateDescriptorDAO] = {
|
||||
appConfig.migrate()
|
||||
Future.successful(ChainStateDescriptorDAO())
|
||||
}
|
||||
|
||||
def createFilterHeaderDAO()(implicit
|
||||
appConfig: ChainAppConfig,
|
||||
ec: ExecutionContext): Future[CompactFilterHeaderDAO] = {
|
||||
|
@ -655,10 +670,12 @@ object ChainUnitTest extends ChainVerificationLogger {
|
|||
lazy val blockHeaderDAO = BlockHeaderDAO()
|
||||
lazy val filterHeaderDAO = CompactFilterHeaderDAO()
|
||||
lazy val filterDAO = CompactFilterDAO()
|
||||
lazy val stateDAO = ChainStateDescriptorDAO()
|
||||
|
||||
ChainHandlerCached.fromDatabase(blockHeaderDAO = blockHeaderDAO,
|
||||
filterHeaderDAO = filterHeaderDAO,
|
||||
filterDAO = filterDAO)
|
||||
filterDAO = filterDAO,
|
||||
stateDAO = stateDAO)
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -174,5 +174,10 @@ trait BaseNodeTest extends BitcoinSFixture with EmbeddedPg {
|
|||
/** calculates the median time passed */
|
||||
override def getMedianTimePast(): Future[Long] =
|
||||
Future.successful(0L)
|
||||
|
||||
override def isSyncing(): Future[Boolean] = Future.successful(false)
|
||||
|
||||
override def setSyncing(value: Boolean): Future[ChainApi] =
|
||||
Future.successful(this)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -169,9 +169,10 @@ object NodeUnitTest extends P2PLogger {
|
|||
val blockHeaderDAO = BlockHeaderDAO()
|
||||
val filterHeaderDAO = CompactFilterHeaderDAO()
|
||||
val filterDAO = CompactFilterDAO()
|
||||
val stateDAO = ChainStateDescriptorDAO()
|
||||
|
||||
val chainApiF = ChainHandlerCached
|
||||
.fromDatabase(blockHeaderDAO, filterHeaderDAO, filterDAO)
|
||||
.fromDatabase(blockHeaderDAO, filterHeaderDAO, filterDAO, stateDAO)
|
||||
|
||||
val nodeF = chainApiF.map(buildNode(peer, _, walletCreationTimeOpt))
|
||||
for {
|
||||
|
@ -510,11 +511,17 @@ object NodeUnitTest extends P2PLogger {
|
|||
system: ActorSystem): Future[NeutrinoNode] = {
|
||||
import system.dispatcher
|
||||
for {
|
||||
|
||||
syncing <- node.chainApiFromDb().flatMap(_.isSyncing())
|
||||
_ = assert(!syncing)
|
||||
_ <- node.sync()
|
||||
syncing <- node.chainApiFromDb().flatMap(_.isSyncing())
|
||||
_ = assert(syncing)
|
||||
_ <- NodeTestUtil.awaitSync(node, bitcoind)
|
||||
_ <- NodeTestUtil.awaitCompactFilterHeadersSync(node, bitcoind)
|
||||
_ <- NodeTestUtil.awaitCompactFiltersSync(node, bitcoind)
|
||||
syncing <- node.chainApiFromDb().flatMap(_.isSyncing())
|
||||
_ = assert(!syncing)
|
||||
|
||||
} yield node
|
||||
}
|
||||
|
||||
|
|
|
@ -142,7 +142,7 @@ class AddressTagIntegrationTest extends BitcoinSWalletTest {
|
|||
bitcoindAddr <- bitcoindAddrF
|
||||
_ <- AkkaUtil.nonBlockingSleep(1.second)
|
||||
_ <- bitcoind.generateToAddress(1, bitcoindAddr)
|
||||
_ <- BitcoindRpcBackendUtil.syncWalletToBitcoind(bitcoind, wallet)
|
||||
_ <- BitcoindRpcBackendUtil.syncWalletToBitcoind(bitcoind, wallet, None)
|
||||
} yield succeed
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
package org.bitcoins.wallet
|
||||
|
||||
import org.bitcoins.asyncutil.AsyncUtil
|
||||
import org.bitcoins.chain.{ChainCallbacks, OnSyncFlagChanged}
|
||||
import org.bitcoins.commons.jsonmodels.wallet.SyncHeightDescriptor
|
||||
import org.bitcoins.core.currency._
|
||||
import org.bitcoins.core.gcs.FilterType
|
||||
|
@ -16,6 +18,17 @@ class BitcoindBackendTest extends WalletAppConfigWithBitcoindNewestFixtures {
|
|||
it must "correctly catch up to bitcoind" in { walletAppConfigWithBitcoind =>
|
||||
val bitcoind = walletAppConfigWithBitcoind.bitcoind
|
||||
val amountToSend = Bitcoins.one
|
||||
@volatile var syncingValues = Vector.empty[Boolean]
|
||||
val callback: OnSyncFlagChanged = { (value: Boolean) =>
|
||||
Future {
|
||||
synchronized {
|
||||
syncingValues = syncingValues :+ value
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
val callbacks = ChainCallbacks.onOnSyncFlagChanged(callback)
|
||||
|
||||
for {
|
||||
header <- bitcoind.getBestBlockHeader()
|
||||
|
||||
|
@ -39,7 +52,12 @@ class BitcoindBackendTest extends WalletAppConfigWithBitcoindNewestFixtures {
|
|||
_ <-
|
||||
wallet.stateDescriptorDAO.updateSyncHeight(header.hashBE, header.height)
|
||||
|
||||
_ <- BitcoindRpcBackendUtil.syncWalletToBitcoind(bitcoind, wallet)
|
||||
syncing <- bitcoind.isSyncing()
|
||||
_ = assert(!syncing)
|
||||
_ <- BitcoindRpcBackendUtil.syncWalletToBitcoind(bitcoind,
|
||||
wallet,
|
||||
Some(callbacks))
|
||||
_ <- AsyncUtil.awaitConditionF { () => bitcoind.isSyncing().map(!_) }
|
||||
|
||||
balance <- wallet.getBalance()
|
||||
|
||||
|
@ -49,6 +67,9 @@ class BitcoindBackendTest extends WalletAppConfigWithBitcoindNewestFixtures {
|
|||
} yield {
|
||||
assert(balance == amountToSend)
|
||||
assert(syncHeightOpt.contains(SyncHeightDescriptor(bestHash, height)))
|
||||
synchronized {
|
||||
assert(syncingValues == Vector(true, false))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -81,7 +102,7 @@ class BitcoindBackendTest extends WalletAppConfigWithBitcoindNewestFixtures {
|
|||
_ <- wallet.stateDescriptorDAO.updateSyncHeight(header.hashBE,
|
||||
header.height)
|
||||
|
||||
_ <- BitcoindRpcBackendUtil.syncWalletToBitcoind(bitcoind, wallet)
|
||||
_ <- BitcoindRpcBackendUtil.syncWalletToBitcoind(bitcoind, wallet, None)
|
||||
|
||||
utxos <- wallet.listUtxos(TxoState.ConfirmedReceived)
|
||||
} yield {
|
||||
|
@ -168,7 +189,7 @@ class BitcoindBackendTest extends WalletAppConfigWithBitcoindNewestFixtures {
|
|||
bitcoindAddr)
|
||||
|
||||
// sync wallet
|
||||
_ <- BitcoindRpcBackendUtil.syncWalletToBitcoind(bitcoind, wallet)
|
||||
_ <- BitcoindRpcBackendUtil.syncWalletToBitcoind(bitcoind, wallet, None)
|
||||
|
||||
unconfirmedBalance <- wallet.getUnconfirmedBalance()
|
||||
confirmedBalance <- wallet.getConfirmedBalance()
|
||||
|
|
|
@ -47,6 +47,7 @@ class BitcoindBlockPollingTest
|
|||
// Setup block polling
|
||||
_ <- BitcoindRpcBackendUtil.startBitcoindBlockPolling(wallet,
|
||||
bitcoind,
|
||||
None,
|
||||
1.second)
|
||||
_ <- bitcoind.generateToAddress(6, bech32Address)
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue