2021 04 18 Use akka streams in BitcoindRpcBackendUtil.syncWalletToBitcoind (#2916)

* Initial implementation of wallet sync with bitcoind with akka streams

* Make BitcoindRpcBackendUtil.startBitcoindBlockPolling use akka streams

* rework BitcoindRpcBackendUtil.syncWalletToBitcoind() to use akka streams
This commit is contained in:
Chris Stewart 2021-04-19 15:54:34 -05:00 committed by GitHub
parent 19319494cd
commit 4e1ace2706
2 changed files with 66 additions and 78 deletions

View file

@ -1,18 +1,19 @@
package org.bitcoins.server package org.bitcoins.server
import akka.Done
import akka.actor.{ActorSystem, Cancellable} import akka.actor.{ActorSystem, Cancellable}
import akka.stream.scaladsl.{Keep, Sink, Source}
import grizzled.slf4j.Logging import grizzled.slf4j.Logging
import org.bitcoins.core.api.node.NodeApi import org.bitcoins.core.api.node.NodeApi
import org.bitcoins.core.protocol.blockchain.Block import org.bitcoins.core.protocol.blockchain.Block
import org.bitcoins.core.protocol.transaction.Transaction import org.bitcoins.core.protocol.transaction.Transaction
import org.bitcoins.core.util.FutureUtil
import org.bitcoins.crypto.DoubleSha256Digest import org.bitcoins.crypto.DoubleSha256Digest
import org.bitcoins.rpc.client.common.BitcoindRpcClient import org.bitcoins.rpc.client.common.BitcoindRpcClient
import org.bitcoins.rpc.config.ZmqConfig import org.bitcoins.rpc.config.ZmqConfig
import org.bitcoins.wallet.Wallet import org.bitcoins.wallet.Wallet
import org.bitcoins.zmq.ZMQSubscriber import org.bitcoins.zmq.ZMQSubscriber
import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.duration.{DurationInt, FiniteDuration} import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.concurrent.{ExecutionContext, Future, Promise} import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Failure, Success} import scala.util.{Failure, Success}
@ -22,36 +23,8 @@ object BitcoindRpcBackendUtil extends Logging {
/** Has the wallet process all the blocks it has not seen up until bitcoind's chain tip */ /** Has the wallet process all the blocks it has not seen up until bitcoind's chain tip */
def syncWalletToBitcoind(bitcoind: BitcoindRpcClient, wallet: Wallet)(implicit def syncWalletToBitcoind(bitcoind: BitcoindRpcClient, wallet: Wallet)(implicit
ec: ExecutionContext): Future[Unit] = { system: ActorSystem): Future[Unit] = {
import system.dispatcher
def doSync(walletHeight: Int, bitcoindHeight: Int): Future[Unit] = {
if (walletHeight > bitcoindHeight) {
Future.failed(new RuntimeException(
s"Bitcoind and wallet are in incompatible states, " +
s"wallet height: $walletHeight, bitcoind height: $bitcoindHeight"))
} else {
val blockRange = walletHeight.to(bitcoindHeight).tail
logger.info(s"Syncing ${blockRange.size} blocks")
val func: Vector[Int] => Future[Unit] = { range =>
val hashFs =
range.map(bitcoind.getBlockHash(_).map(_.flip))
for {
hashes <- Future.sequence(hashFs)
_ <- wallet.nodeApi.downloadBlocks(hashes)
} yield ()
}
FutureUtil
.batchExecute(elements = blockRange.toVector,
f = func,
init = Vector.empty,
batchSize = 25)
.map(_ => ())
}
}
for { for {
bitcoindHeight <- bitcoind.getBlockCount bitcoindHeight <- bitcoind.getBlockCount
walletStateOpt <- wallet.getSyncDescriptorOpt() walletStateOpt <- wallet.getSyncDescriptorOpt()
@ -69,21 +42,52 @@ object BitcoindRpcBackendUtil extends Logging {
case Some(height) => case Some(height) =>
logger.info( logger.info(
s"Last tx occurred at block $height, syncing from there") s"Last tx occurred at block $height, syncing from there")
doSync(height, bitcoindHeight) doSync(height, bitcoindHeight, bitcoind, wallet)
case None => Future.unit case None => Future.unit
} }
} yield () } yield ()
} }
} yield () } yield ()
case Some(syncHeight) => case Some(syncHeight) =>
doSync(syncHeight.height, bitcoindHeight) doSync(syncHeight.height, bitcoindHeight, bitcoind, wallet)
} }
} yield () } yield ()
} }
/** Helper method to sync the wallet until the bitcoind height */
private def doSync(
walletHeight: Int,
bitcoindHeight: Int,
bitcoind: BitcoindRpcClient,
wallet: Wallet)(implicit system: ActorSystem): Future[Wallet] = {
if (walletHeight > bitcoindHeight) {
Future.failed(
new RuntimeException(
s"Bitcoind and wallet are in incompatible states, " +
s"wallet height: $walletHeight, bitcoind height: $bitcoindHeight"))
} else {
import system.dispatcher
val blockRange = walletHeight.to(bitcoindHeight).tail
val numParallelism = Runtime.getRuntime.availableProcessors()
logger.info(s"Syncing ${blockRange.size} blocks")
val hashFs = Source(blockRange)
.mapAsync(numParallelism) {
bitcoind
.getBlockHash(_)
.map(_.flip)
}
.toMat(Sink.seq)(Keep.right)
.run()
for {
hashes <- hashFs
_ <- wallet.nodeApi.downloadBlocks(hashes.toVector)
} yield wallet
}
}
def createWalletWithBitcoindCallbacks( def createWalletWithBitcoindCallbacks(
bitcoind: BitcoindRpcClient, bitcoind: BitcoindRpcClient,
wallet: Wallet)(implicit ec: ExecutionContext): Wallet = { wallet: Wallet)(implicit system: ActorSystem): Wallet = {
// We need to create a promise so we can inject the wallet with the callback // We need to create a promise so we can inject the wallet with the callback
// after we have created it into SyncUtil.getNodeApiWalletCallback // after we have created it into SyncUtil.getNodeApiWalletCallback
// so we don't lose the internal state of the wallet // so we don't lose the internal state of the wallet
@ -145,46 +149,24 @@ object BitcoindRpcBackendUtil extends Logging {
private def getNodeApiWalletCallback( private def getNodeApiWalletCallback(
bitcoindRpcClient: BitcoindRpcClient, bitcoindRpcClient: BitcoindRpcClient,
walletF: Future[Wallet])(implicit ec: ExecutionContext): NodeApi = { walletF: Future[Wallet])(implicit system: ActorSystem): NodeApi = {
import system.dispatcher
new NodeApi { new NodeApi {
override def downloadBlocks( override def downloadBlocks(
blockHashes: Vector[DoubleSha256Digest]): Future[Unit] = { blockHashes: Vector[DoubleSha256Digest]): Future[Unit] = {
logger.info(s"Fetching ${blockHashes.length} hashes from bitcoind") logger.info(s"Fetching ${blockHashes.length} hashes from bitcoind")
val f: Vector[DoubleSha256Digest] => Future[Wallet] = { hashes => val numParallelism = Runtime.getRuntime.availableProcessors()
val blocksF = walletF.flatMap { wallet =>
FutureUtil.sequentially(hashes)(bitcoindRpcClient.getBlockRaw) val runStream: Future[Done] = Source(blockHashes)
.mapAsync(parallelism = numParallelism) { hash =>
val updatedWalletF = for { bitcoindRpcClient.getBlockRaw(hash)
blocks <- blocksF
wallet <- walletF
processedWallet <- {
FutureUtil.foldLeftAsync(wallet, blocks) { case (wallet, block) =>
wallet.processBlock(block)
}
} }
} yield processedWallet .foldAsync(wallet) { case (wallet, block) =>
wallet.processBlock(block)
updatedWalletF }
} .run()
runStream.map(_ => ())
val batchSize = 25
val batchedExecutedF = {
for {
wallet <- walletF
wallet <- FutureUtil.batchExecute[DoubleSha256Digest, Wallet](
elements = blockHashes,
f = f,
init = wallet,
batchSize = batchSize)
} yield wallet
}
batchedExecutedF.map { _ =>
logger.info(
s"Done fetching ${blockHashes.length} hashes from bitcoind")
()
} }
} }
@ -211,7 +193,8 @@ object BitcoindRpcBackendUtil extends Logging {
interval: FiniteDuration = 10.seconds)(implicit interval: FiniteDuration = 10.seconds)(implicit
system: ActorSystem, system: ActorSystem,
ec: ExecutionContext): Cancellable = { ec: ExecutionContext): Cancellable = {
val atomicPrevCount: AtomicReference[Int] = new AtomicReference(startCount) val numParallelism = Runtime.getRuntime.availableProcessors()
val atomicPrevCount: AtomicInteger = new AtomicInteger(startCount)
system.scheduler.scheduleWithFixedDelay(0.seconds, interval) { () => system.scheduler.scheduleWithFixedDelay(0.seconds, interval) { () =>
{ {
logger.debug("Polling bitcoind for block count") logger.debug("Polling bitcoind for block count")
@ -222,22 +205,26 @@ object BitcoindRpcBackendUtil extends Logging {
// use .tail so we don't process the previous block that we already did // use .tail so we don't process the previous block that we already did
val range = prevCount.to(count).tail val range = prevCount.to(count).tail
val hashFs: Future[Seq[DoubleSha256Digest]] = Source(range)
val hashFs = .mapAsync(parallelism = numParallelism) { height =>
range.map(bitcoind.getBlockHash(_).map(_.flip)) bitcoind.getBlockHash(height).map(_.flip)
}
val oldPrevCount = prevCount .map { hash =>
atomicPrevCount.set(count) val _ = atomicPrevCount.incrementAndGet()
hash
}
.toMat(Sink.seq)(Keep.right)
.run()
val requestsBlocksF = for { val requestsBlocksF = for {
hashes <- Future.sequence(hashFs) hashes <- hashFs
_ <- wallet.nodeApi.downloadBlocks(hashes.toVector) _ <- wallet.nodeApi.downloadBlocks(hashes.toVector)
} yield logger.debug("Successfully polled bitcoind for new blocks") } yield logger.debug("Successfully polled bitcoind for new blocks")
requestsBlocksF.onComplete { requestsBlocksF.onComplete {
case Success(_) => () case Success(_) => ()
case Failure(err) => case Failure(err) =>
atomicPrevCount.set(oldPrevCount) atomicPrevCount.set(prevCount)
logger.error("Requesting blocks from bitcoind polling failed", logger.error("Requesting blocks from bitcoind polling failed",
err) err)
} }

View file

@ -396,6 +396,7 @@ object Deps {
Compile.logback, Compile.logback,
Compile.akkaActor, Compile.akkaActor,
Compile.akkaHttp, Compile.akkaHttp,
Compile.akkaStream,
Compile.akkaSlf4j Compile.akkaSlf4j
) )
} }