mirror of
https://github.com/bitcoin-s/bitcoin-s.git
synced 2025-03-13 19:37:30 +01:00
2021 04 18 Use akka streams in BitcoindRpcBackendUtil.syncWalletToBitcoind (#2916)
* Initial implementation of wallet sync with bitcoind with akka streams * Make BitcoindRpcBackendUtil.startBitcoindBlockPolling use akka streams * rework BitcoindRpcBackendUtil.syncWalletToBitcoind() to use akka streams
This commit is contained in:
parent
19319494cd
commit
4e1ace2706
2 changed files with 66 additions and 78 deletions
|
@ -1,18 +1,19 @@
|
||||||
package org.bitcoins.server
|
package org.bitcoins.server
|
||||||
|
|
||||||
|
import akka.Done
|
||||||
import akka.actor.{ActorSystem, Cancellable}
|
import akka.actor.{ActorSystem, Cancellable}
|
||||||
|
import akka.stream.scaladsl.{Keep, Sink, Source}
|
||||||
import grizzled.slf4j.Logging
|
import grizzled.slf4j.Logging
|
||||||
import org.bitcoins.core.api.node.NodeApi
|
import org.bitcoins.core.api.node.NodeApi
|
||||||
import org.bitcoins.core.protocol.blockchain.Block
|
import org.bitcoins.core.protocol.blockchain.Block
|
||||||
import org.bitcoins.core.protocol.transaction.Transaction
|
import org.bitcoins.core.protocol.transaction.Transaction
|
||||||
import org.bitcoins.core.util.FutureUtil
|
|
||||||
import org.bitcoins.crypto.DoubleSha256Digest
|
import org.bitcoins.crypto.DoubleSha256Digest
|
||||||
import org.bitcoins.rpc.client.common.BitcoindRpcClient
|
import org.bitcoins.rpc.client.common.BitcoindRpcClient
|
||||||
import org.bitcoins.rpc.config.ZmqConfig
|
import org.bitcoins.rpc.config.ZmqConfig
|
||||||
import org.bitcoins.wallet.Wallet
|
import org.bitcoins.wallet.Wallet
|
||||||
import org.bitcoins.zmq.ZMQSubscriber
|
import org.bitcoins.zmq.ZMQSubscriber
|
||||||
|
|
||||||
import java.util.concurrent.atomic.AtomicReference
|
import java.util.concurrent.atomic.AtomicInteger
|
||||||
import scala.concurrent.duration.{DurationInt, FiniteDuration}
|
import scala.concurrent.duration.{DurationInt, FiniteDuration}
|
||||||
import scala.concurrent.{ExecutionContext, Future, Promise}
|
import scala.concurrent.{ExecutionContext, Future, Promise}
|
||||||
import scala.util.{Failure, Success}
|
import scala.util.{Failure, Success}
|
||||||
|
@ -22,36 +23,8 @@ object BitcoindRpcBackendUtil extends Logging {
|
||||||
|
|
||||||
/** Has the wallet process all the blocks it has not seen up until bitcoind's chain tip */
|
/** Has the wallet process all the blocks it has not seen up until bitcoind's chain tip */
|
||||||
def syncWalletToBitcoind(bitcoind: BitcoindRpcClient, wallet: Wallet)(implicit
|
def syncWalletToBitcoind(bitcoind: BitcoindRpcClient, wallet: Wallet)(implicit
|
||||||
ec: ExecutionContext): Future[Unit] = {
|
system: ActorSystem): Future[Unit] = {
|
||||||
|
import system.dispatcher
|
||||||
def doSync(walletHeight: Int, bitcoindHeight: Int): Future[Unit] = {
|
|
||||||
if (walletHeight > bitcoindHeight) {
|
|
||||||
Future.failed(new RuntimeException(
|
|
||||||
s"Bitcoind and wallet are in incompatible states, " +
|
|
||||||
s"wallet height: $walletHeight, bitcoind height: $bitcoindHeight"))
|
|
||||||
} else {
|
|
||||||
val blockRange = walletHeight.to(bitcoindHeight).tail
|
|
||||||
|
|
||||||
logger.info(s"Syncing ${blockRange.size} blocks")
|
|
||||||
|
|
||||||
val func: Vector[Int] => Future[Unit] = { range =>
|
|
||||||
val hashFs =
|
|
||||||
range.map(bitcoind.getBlockHash(_).map(_.flip))
|
|
||||||
for {
|
|
||||||
hashes <- Future.sequence(hashFs)
|
|
||||||
_ <- wallet.nodeApi.downloadBlocks(hashes)
|
|
||||||
} yield ()
|
|
||||||
}
|
|
||||||
|
|
||||||
FutureUtil
|
|
||||||
.batchExecute(elements = blockRange.toVector,
|
|
||||||
f = func,
|
|
||||||
init = Vector.empty,
|
|
||||||
batchSize = 25)
|
|
||||||
.map(_ => ())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
bitcoindHeight <- bitcoind.getBlockCount
|
bitcoindHeight <- bitcoind.getBlockCount
|
||||||
walletStateOpt <- wallet.getSyncDescriptorOpt()
|
walletStateOpt <- wallet.getSyncDescriptorOpt()
|
||||||
|
@ -69,21 +42,52 @@ object BitcoindRpcBackendUtil extends Logging {
|
||||||
case Some(height) =>
|
case Some(height) =>
|
||||||
logger.info(
|
logger.info(
|
||||||
s"Last tx occurred at block $height, syncing from there")
|
s"Last tx occurred at block $height, syncing from there")
|
||||||
doSync(height, bitcoindHeight)
|
doSync(height, bitcoindHeight, bitcoind, wallet)
|
||||||
case None => Future.unit
|
case None => Future.unit
|
||||||
}
|
}
|
||||||
} yield ()
|
} yield ()
|
||||||
}
|
}
|
||||||
} yield ()
|
} yield ()
|
||||||
case Some(syncHeight) =>
|
case Some(syncHeight) =>
|
||||||
doSync(syncHeight.height, bitcoindHeight)
|
doSync(syncHeight.height, bitcoindHeight, bitcoind, wallet)
|
||||||
}
|
}
|
||||||
} yield ()
|
} yield ()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Helper method to sync the wallet until the bitcoind height */
|
||||||
|
private def doSync(
|
||||||
|
walletHeight: Int,
|
||||||
|
bitcoindHeight: Int,
|
||||||
|
bitcoind: BitcoindRpcClient,
|
||||||
|
wallet: Wallet)(implicit system: ActorSystem): Future[Wallet] = {
|
||||||
|
if (walletHeight > bitcoindHeight) {
|
||||||
|
Future.failed(
|
||||||
|
new RuntimeException(
|
||||||
|
s"Bitcoind and wallet are in incompatible states, " +
|
||||||
|
s"wallet height: $walletHeight, bitcoind height: $bitcoindHeight"))
|
||||||
|
} else {
|
||||||
|
import system.dispatcher
|
||||||
|
val blockRange = walletHeight.to(bitcoindHeight).tail
|
||||||
|
val numParallelism = Runtime.getRuntime.availableProcessors()
|
||||||
|
logger.info(s"Syncing ${blockRange.size} blocks")
|
||||||
|
val hashFs = Source(blockRange)
|
||||||
|
.mapAsync(numParallelism) {
|
||||||
|
bitcoind
|
||||||
|
.getBlockHash(_)
|
||||||
|
.map(_.flip)
|
||||||
|
}
|
||||||
|
.toMat(Sink.seq)(Keep.right)
|
||||||
|
.run()
|
||||||
|
for {
|
||||||
|
hashes <- hashFs
|
||||||
|
_ <- wallet.nodeApi.downloadBlocks(hashes.toVector)
|
||||||
|
} yield wallet
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
def createWalletWithBitcoindCallbacks(
|
def createWalletWithBitcoindCallbacks(
|
||||||
bitcoind: BitcoindRpcClient,
|
bitcoind: BitcoindRpcClient,
|
||||||
wallet: Wallet)(implicit ec: ExecutionContext): Wallet = {
|
wallet: Wallet)(implicit system: ActorSystem): Wallet = {
|
||||||
// We need to create a promise so we can inject the wallet with the callback
|
// We need to create a promise so we can inject the wallet with the callback
|
||||||
// after we have created it into SyncUtil.getNodeApiWalletCallback
|
// after we have created it into SyncUtil.getNodeApiWalletCallback
|
||||||
// so we don't lose the internal state of the wallet
|
// so we don't lose the internal state of the wallet
|
||||||
|
@ -145,46 +149,24 @@ object BitcoindRpcBackendUtil extends Logging {
|
||||||
|
|
||||||
private def getNodeApiWalletCallback(
|
private def getNodeApiWalletCallback(
|
||||||
bitcoindRpcClient: BitcoindRpcClient,
|
bitcoindRpcClient: BitcoindRpcClient,
|
||||||
walletF: Future[Wallet])(implicit ec: ExecutionContext): NodeApi = {
|
walletF: Future[Wallet])(implicit system: ActorSystem): NodeApi = {
|
||||||
|
import system.dispatcher
|
||||||
new NodeApi {
|
new NodeApi {
|
||||||
|
|
||||||
override def downloadBlocks(
|
override def downloadBlocks(
|
||||||
blockHashes: Vector[DoubleSha256Digest]): Future[Unit] = {
|
blockHashes: Vector[DoubleSha256Digest]): Future[Unit] = {
|
||||||
logger.info(s"Fetching ${blockHashes.length} hashes from bitcoind")
|
logger.info(s"Fetching ${blockHashes.length} hashes from bitcoind")
|
||||||
val f: Vector[DoubleSha256Digest] => Future[Wallet] = { hashes =>
|
val numParallelism = Runtime.getRuntime.availableProcessors()
|
||||||
val blocksF =
|
walletF.flatMap { wallet =>
|
||||||
FutureUtil.sequentially(hashes)(bitcoindRpcClient.getBlockRaw)
|
val runStream: Future[Done] = Source(blockHashes)
|
||||||
|
.mapAsync(parallelism = numParallelism) { hash =>
|
||||||
val updatedWalletF = for {
|
bitcoindRpcClient.getBlockRaw(hash)
|
||||||
blocks <- blocksF
|
|
||||||
wallet <- walletF
|
|
||||||
processedWallet <- {
|
|
||||||
FutureUtil.foldLeftAsync(wallet, blocks) { case (wallet, block) =>
|
|
||||||
wallet.processBlock(block)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} yield processedWallet
|
.foldAsync(wallet) { case (wallet, block) =>
|
||||||
|
wallet.processBlock(block)
|
||||||
updatedWalletF
|
}
|
||||||
}
|
.run()
|
||||||
|
runStream.map(_ => ())
|
||||||
val batchSize = 25
|
|
||||||
val batchedExecutedF = {
|
|
||||||
for {
|
|
||||||
wallet <- walletF
|
|
||||||
wallet <- FutureUtil.batchExecute[DoubleSha256Digest, Wallet](
|
|
||||||
elements = blockHashes,
|
|
||||||
f = f,
|
|
||||||
init = wallet,
|
|
||||||
batchSize = batchSize)
|
|
||||||
} yield wallet
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
batchedExecutedF.map { _ =>
|
|
||||||
logger.info(
|
|
||||||
s"Done fetching ${blockHashes.length} hashes from bitcoind")
|
|
||||||
()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -211,7 +193,8 @@ object BitcoindRpcBackendUtil extends Logging {
|
||||||
interval: FiniteDuration = 10.seconds)(implicit
|
interval: FiniteDuration = 10.seconds)(implicit
|
||||||
system: ActorSystem,
|
system: ActorSystem,
|
||||||
ec: ExecutionContext): Cancellable = {
|
ec: ExecutionContext): Cancellable = {
|
||||||
val atomicPrevCount: AtomicReference[Int] = new AtomicReference(startCount)
|
val numParallelism = Runtime.getRuntime.availableProcessors()
|
||||||
|
val atomicPrevCount: AtomicInteger = new AtomicInteger(startCount)
|
||||||
system.scheduler.scheduleWithFixedDelay(0.seconds, interval) { () =>
|
system.scheduler.scheduleWithFixedDelay(0.seconds, interval) { () =>
|
||||||
{
|
{
|
||||||
logger.debug("Polling bitcoind for block count")
|
logger.debug("Polling bitcoind for block count")
|
||||||
|
@ -222,22 +205,26 @@ object BitcoindRpcBackendUtil extends Logging {
|
||||||
|
|
||||||
// use .tail so we don't process the previous block that we already did
|
// use .tail so we don't process the previous block that we already did
|
||||||
val range = prevCount.to(count).tail
|
val range = prevCount.to(count).tail
|
||||||
|
val hashFs: Future[Seq[DoubleSha256Digest]] = Source(range)
|
||||||
val hashFs =
|
.mapAsync(parallelism = numParallelism) { height =>
|
||||||
range.map(bitcoind.getBlockHash(_).map(_.flip))
|
bitcoind.getBlockHash(height).map(_.flip)
|
||||||
|
}
|
||||||
val oldPrevCount = prevCount
|
.map { hash =>
|
||||||
atomicPrevCount.set(count)
|
val _ = atomicPrevCount.incrementAndGet()
|
||||||
|
hash
|
||||||
|
}
|
||||||
|
.toMat(Sink.seq)(Keep.right)
|
||||||
|
.run()
|
||||||
|
|
||||||
val requestsBlocksF = for {
|
val requestsBlocksF = for {
|
||||||
hashes <- Future.sequence(hashFs)
|
hashes <- hashFs
|
||||||
_ <- wallet.nodeApi.downloadBlocks(hashes.toVector)
|
_ <- wallet.nodeApi.downloadBlocks(hashes.toVector)
|
||||||
} yield logger.debug("Successfully polled bitcoind for new blocks")
|
} yield logger.debug("Successfully polled bitcoind for new blocks")
|
||||||
|
|
||||||
requestsBlocksF.onComplete {
|
requestsBlocksF.onComplete {
|
||||||
case Success(_) => ()
|
case Success(_) => ()
|
||||||
case Failure(err) =>
|
case Failure(err) =>
|
||||||
atomicPrevCount.set(oldPrevCount)
|
atomicPrevCount.set(prevCount)
|
||||||
logger.error("Requesting blocks from bitcoind polling failed",
|
logger.error("Requesting blocks from bitcoind polling failed",
|
||||||
err)
|
err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -396,6 +396,7 @@ object Deps {
|
||||||
Compile.logback,
|
Compile.logback,
|
||||||
Compile.akkaActor,
|
Compile.akkaActor,
|
||||||
Compile.akkaHttp,
|
Compile.akkaHttp,
|
||||||
|
Compile.akkaStream,
|
||||||
Compile.akkaSlf4j
|
Compile.akkaSlf4j
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue