2021 04 18 Use akka streams in BitcoindRpcBackendUtil.syncWalletToBitcoind (#2916)

* Initial implementation of wallet sync with bitcoind with akka streams

* Make BitcoindRpcBackendUtil.startBitcoindBlockPolling use akka streams

* rework BitcoindRpcBackendUtil.syncWalletToBitcoind() to use akka streams
This commit is contained in:
Chris Stewart 2021-04-19 15:54:34 -05:00 committed by GitHub
parent 19319494cd
commit 4e1ace2706
2 changed files with 66 additions and 78 deletions

View File

@ -1,18 +1,19 @@
package org.bitcoins.server
import akka.Done
import akka.actor.{ActorSystem, Cancellable}
import akka.stream.scaladsl.{Keep, Sink, Source}
import grizzled.slf4j.Logging
import org.bitcoins.core.api.node.NodeApi
import org.bitcoins.core.protocol.blockchain.Block
import org.bitcoins.core.protocol.transaction.Transaction
import org.bitcoins.core.util.FutureUtil
import org.bitcoins.crypto.DoubleSha256Digest
import org.bitcoins.rpc.client.common.BitcoindRpcClient
import org.bitcoins.rpc.config.ZmqConfig
import org.bitcoins.wallet.Wallet
import org.bitcoins.zmq.ZMQSubscriber
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Failure, Success}
@ -22,36 +23,8 @@ object BitcoindRpcBackendUtil extends Logging {
/** Has the wallet process all the blocks it has not seen up until bitcoind's chain tip */
def syncWalletToBitcoind(bitcoind: BitcoindRpcClient, wallet: Wallet)(implicit
ec: ExecutionContext): Future[Unit] = {
def doSync(walletHeight: Int, bitcoindHeight: Int): Future[Unit] = {
if (walletHeight > bitcoindHeight) {
Future.failed(new RuntimeException(
s"Bitcoind and wallet are in incompatible states, " +
s"wallet height: $walletHeight, bitcoind height: $bitcoindHeight"))
} else {
val blockRange = walletHeight.to(bitcoindHeight).tail
logger.info(s"Syncing ${blockRange.size} blocks")
val func: Vector[Int] => Future[Unit] = { range =>
val hashFs =
range.map(bitcoind.getBlockHash(_).map(_.flip))
for {
hashes <- Future.sequence(hashFs)
_ <- wallet.nodeApi.downloadBlocks(hashes)
} yield ()
}
FutureUtil
.batchExecute(elements = blockRange.toVector,
f = func,
init = Vector.empty,
batchSize = 25)
.map(_ => ())
}
}
system: ActorSystem): Future[Unit] = {
import system.dispatcher
for {
bitcoindHeight <- bitcoind.getBlockCount
walletStateOpt <- wallet.getSyncDescriptorOpt()
@ -69,21 +42,52 @@ object BitcoindRpcBackendUtil extends Logging {
case Some(height) =>
logger.info(
s"Last tx occurred at block $height, syncing from there")
doSync(height, bitcoindHeight)
doSync(height, bitcoindHeight, bitcoind, wallet)
case None => Future.unit
}
} yield ()
}
} yield ()
case Some(syncHeight) =>
doSync(syncHeight.height, bitcoindHeight)
doSync(syncHeight.height, bitcoindHeight, bitcoind, wallet)
}
} yield ()
}
/** Helper method to sync the wallet until the bitcoind height */
private def doSync(
walletHeight: Int,
bitcoindHeight: Int,
bitcoind: BitcoindRpcClient,
wallet: Wallet)(implicit system: ActorSystem): Future[Wallet] = {
if (walletHeight > bitcoindHeight) {
Future.failed(
new RuntimeException(
s"Bitcoind and wallet are in incompatible states, " +
s"wallet height: $walletHeight, bitcoind height: $bitcoindHeight"))
} else {
import system.dispatcher
val blockRange = walletHeight.to(bitcoindHeight).tail
val numParallelism = Runtime.getRuntime.availableProcessors()
logger.info(s"Syncing ${blockRange.size} blocks")
val hashFs = Source(blockRange)
.mapAsync(numParallelism) {
bitcoind
.getBlockHash(_)
.map(_.flip)
}
.toMat(Sink.seq)(Keep.right)
.run()
for {
hashes <- hashFs
_ <- wallet.nodeApi.downloadBlocks(hashes.toVector)
} yield wallet
}
}
def createWalletWithBitcoindCallbacks(
bitcoind: BitcoindRpcClient,
wallet: Wallet)(implicit ec: ExecutionContext): Wallet = {
wallet: Wallet)(implicit system: ActorSystem): Wallet = {
// We need to create a promise so we can inject the wallet with the callback
// after we have created it into SyncUtil.getNodeApiWalletCallback
// so we don't lose the internal state of the wallet
@ -145,46 +149,24 @@ object BitcoindRpcBackendUtil extends Logging {
private def getNodeApiWalletCallback(
bitcoindRpcClient: BitcoindRpcClient,
walletF: Future[Wallet])(implicit ec: ExecutionContext): NodeApi = {
walletF: Future[Wallet])(implicit system: ActorSystem): NodeApi = {
import system.dispatcher
new NodeApi {
override def downloadBlocks(
blockHashes: Vector[DoubleSha256Digest]): Future[Unit] = {
logger.info(s"Fetching ${blockHashes.length} hashes from bitcoind")
val f: Vector[DoubleSha256Digest] => Future[Wallet] = { hashes =>
val blocksF =
FutureUtil.sequentially(hashes)(bitcoindRpcClient.getBlockRaw)
val updatedWalletF = for {
blocks <- blocksF
wallet <- walletF
processedWallet <- {
FutureUtil.foldLeftAsync(wallet, blocks) { case (wallet, block) =>
wallet.processBlock(block)
}
val numParallelism = Runtime.getRuntime.availableProcessors()
walletF.flatMap { wallet =>
val runStream: Future[Done] = Source(blockHashes)
.mapAsync(parallelism = numParallelism) { hash =>
bitcoindRpcClient.getBlockRaw(hash)
}
} yield processedWallet
updatedWalletF
}
val batchSize = 25
val batchedExecutedF = {
for {
wallet <- walletF
wallet <- FutureUtil.batchExecute[DoubleSha256Digest, Wallet](
elements = blockHashes,
f = f,
init = wallet,
batchSize = batchSize)
} yield wallet
}
batchedExecutedF.map { _ =>
logger.info(
s"Done fetching ${blockHashes.length} hashes from bitcoind")
()
.foldAsync(wallet) { case (wallet, block) =>
wallet.processBlock(block)
}
.run()
runStream.map(_ => ())
}
}
@ -211,7 +193,8 @@ object BitcoindRpcBackendUtil extends Logging {
interval: FiniteDuration = 10.seconds)(implicit
system: ActorSystem,
ec: ExecutionContext): Cancellable = {
val atomicPrevCount: AtomicReference[Int] = new AtomicReference(startCount)
val numParallelism = Runtime.getRuntime.availableProcessors()
val atomicPrevCount: AtomicInteger = new AtomicInteger(startCount)
system.scheduler.scheduleWithFixedDelay(0.seconds, interval) { () =>
{
logger.debug("Polling bitcoind for block count")
@ -222,22 +205,26 @@ object BitcoindRpcBackendUtil extends Logging {
// use .tail so we don't process the previous block that we already did
val range = prevCount.to(count).tail
val hashFs =
range.map(bitcoind.getBlockHash(_).map(_.flip))
val oldPrevCount = prevCount
atomicPrevCount.set(count)
val hashFs: Future[Seq[DoubleSha256Digest]] = Source(range)
.mapAsync(parallelism = numParallelism) { height =>
bitcoind.getBlockHash(height).map(_.flip)
}
.map { hash =>
val _ = atomicPrevCount.incrementAndGet()
hash
}
.toMat(Sink.seq)(Keep.right)
.run()
val requestsBlocksF = for {
hashes <- Future.sequence(hashFs)
hashes <- hashFs
_ <- wallet.nodeApi.downloadBlocks(hashes.toVector)
} yield logger.debug("Successfully polled bitcoind for new blocks")
requestsBlocksF.onComplete {
case Success(_) => ()
case Failure(err) =>
atomicPrevCount.set(oldPrevCount)
atomicPrevCount.set(prevCount)
logger.error("Requesting blocks from bitcoind polling failed",
err)
}

View File

@ -396,6 +396,7 @@ object Deps {
Compile.logback,
Compile.akkaActor,
Compile.akkaHttp,
Compile.akkaStream,
Compile.akkaSlf4j
)
}