diff --git a/app/server/src/main/scala/org/bitcoins/server/Main.scala b/app/server/src/main/scala/org/bitcoins/server/Main.scala index a9f95109e5..f478e38337 100644 --- a/app/server/src/main/scala/org/bitcoins/server/Main.scala +++ b/app/server/src/main/scala/org/bitcoins/server/Main.scala @@ -156,9 +156,10 @@ object Main extends App { lazy val onTx: OnTxReceived = { tx => wallet.processTransaction(tx, blockHash = None).map(_ => ()) } - lazy val onCompactFilter: OnCompactFilterReceived = { - (blockHash, blockFilter) => - wallet.processCompactFilter(blockHash, blockFilter).map(_ => ()) + lazy val onCompactFilters: OnCompactFiltersReceived = { blockFilters => + wallet + .processCompactFilters(blockFilters = blockFilters) + .map(_ => ()) } lazy val onBlock: OnBlockReceived = { block => wallet.processBlock(block).map(_ => ()) @@ -177,7 +178,7 @@ object Main extends App { } else if (nodeConf.isNeutrinoEnabled) { Future.successful( NodeCallbacks(onBlockReceived = Seq(onBlock), - onCompactFilterReceived = Seq(onCompactFilter), + onCompactFiltersReceived = Seq(onCompactFilters), onBlockHeadersReceived = Seq(onHeaders))) } else { Future.failed(new RuntimeException("Unexpected node type")) diff --git a/docs/wallet/chain-query-api.md b/docs/wallet/chain-query-api.md index fdd64a50cd..83fd89e364 100644 --- a/docs/wallet/chain-query-api.md +++ b/docs/wallet/chain-query-api.md @@ -91,21 +91,21 @@ val keyManager = keyManagerE match { // a block filter, the returned NodeCallbacks will contain the necessary items to initialize the callbacks def createCallbacks( processTransaction: Transaction => Future[Unit], - processCompactFilter: (DoubleSha256Digest, GolombFilter) => Future[Unit], + processCompactFilters: (Vector[(DoubleSha256Digest, GolombFilter)]) => Future[Unit], processBlock: Block => Future[Unit]): NodeCallbacks = { lazy val onTx: OnTxReceived = { tx => processTransaction(tx) } - lazy val onCompactFilter: OnCompactFilterReceived = { - (blockHash, blockFilter) => - processCompactFilter(blockHash, blockFilter) + lazy val onCompactFilters: OnCompactFiltersReceived = { + blockFilters => + processCompactFilters(blockFilters) } lazy val onBlock: OnBlockReceived = { block => processBlock(block) } NodeCallbacks(onTxReceived = Seq(onTx), onBlockReceived = Seq(onBlock), - onCompactFilterReceived = Seq(onCompactFilter)) + onCompactFiltersReceived = Seq(onCompactFilters)) } // Here is a super simple example of a callback, this could be replaced with anything, from @@ -117,12 +117,12 @@ val exampleProcessTx = (tx: Transaction) => val exampleProcessBlock = (block: Block) => Future.successful(println(s"Received block: ${block.blockHeader.hashBE}")) -val exampleProcessFilter = - (blockHash: DoubleSha256Digest, filter: GolombFilter) => - Future.successful(println(s"Received filter: ${blockHash.flip.hex} ${filter.hash.flip.hex}")) +val exampleProcessFilters = + (filters: Vector[(DoubleSha256Digest, GolombFilter)]) => + Future.successful(println(s"Received filter: ${filters.head._1.flip.hex} ${filters.head._2.hash.flip.hex}")) val exampleCallbacks = - createCallbacks(exampleProcessTx, exampleProcessFilter, exampleProcessBlock) + createCallbacks(exampleProcessTx, exampleProcessFilters, exampleProcessBlock) // Here is where we are defining our actual chain api, Ideally this could be it's own class // but for the examples sake we will keep it small. diff --git a/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeWithWalletTest.scala b/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeWithWalletTest.scala index d70b0365aa..da0736565d 100644 --- a/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeWithWalletTest.scala +++ b/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeWithWalletTest.scala @@ -3,7 +3,7 @@ package org.bitcoins.node import org.bitcoins.core.currency._ import org.bitcoins.core.wallet.fee.SatoshisPerByte import org.bitcoins.node.networking.peer.DataMessageHandler -import org.bitcoins.node.networking.peer.DataMessageHandler.OnCompactFilterReceived +import org.bitcoins.node.networking.peer.DataMessageHandler.OnCompactFiltersReceived import org.bitcoins.rpc.client.common.BitcoindVersion import org.bitcoins.rpc.util.AsyncUtil import org.bitcoins.server.BitcoinSAppConfig @@ -55,16 +55,16 @@ class NeutrinoNodeWithWalletTest extends NodeUnitTest { _ <- wallet.processBlock(block) } yield () } - val onCompactFilter: OnCompactFilterReceived = { (blockHash, blockFilter) => + val onCompactFilter: OnCompactFiltersReceived = { blockFilters => for { wallet <- walletF - _ <- wallet.processCompactFilter(blockHash, blockFilter) + _ <- wallet.processCompactFilters(blockFilters) } yield () } NodeCallbacks( onBlockReceived = Seq(onBlock), - onCompactFilterReceived = Seq(onCompactFilter) + onCompactFiltersReceived = Seq(onCompactFilter) ) } diff --git a/node/src/main/scala/org/bitcoins/node/NodeCallbacks.scala b/node/src/main/scala/org/bitcoins/node/NodeCallbacks.scala index eada6bf106..78dc421ce7 100644 --- a/node/src/main/scala/org/bitcoins/node/NodeCallbacks.scala +++ b/node/src/main/scala/org/bitcoins/node/NodeCallbacks.scala @@ -17,7 +17,7 @@ import scala.concurrent.{ExecutionContext, Future} * */ case class NodeCallbacks( - onCompactFilterReceived: Seq[OnCompactFilterReceived] = Seq.empty, + onCompactFiltersReceived: Seq[OnCompactFiltersReceived] = Seq.empty, onTxReceived: Seq[OnTxReceived] = Seq.empty, onBlockReceived: Seq[OnBlockReceived] = Seq.empty, onMerkleBlockReceived: Seq[OnMerkleBlockReceived] = Seq.empty, @@ -25,7 +25,7 @@ case class NodeCallbacks( ) { def +(other: NodeCallbacks): NodeCallbacks = copy( - onCompactFilterReceived = onCompactFilterReceived ++ other.onCompactFilterReceived, + onCompactFiltersReceived = onCompactFiltersReceived ++ other.onCompactFiltersReceived, onTxReceived = onTxReceived ++ other.onTxReceived, onBlockReceived = onBlockReceived ++ other.onBlockReceived, onMerkleBlockReceived = onMerkleBlockReceived ++ other.onMerkleBlockReceived, @@ -68,18 +68,17 @@ case class NodeCallbacks( })) } - def executeOnCompactFilterReceivedCallbacks( + def executeOnCompactFiltersReceivedCallbacks( logger: MarkedLogger, - blockHash: DoubleSha256Digest, - blockFilter: GolombFilter)( + blockFilters: Vector[(DoubleSha256Digest, GolombFilter)])( implicit ec: ExecutionContext): Future[Unit] = { - onCompactFilterReceived + onCompactFiltersReceived .foldLeft(FutureUtil.unit)((acc, callback) => acc.flatMap(_ => - callback(blockHash, blockFilter).recover { + callback(blockFilters).recover { case err: Throwable => logger.error( - "onCompactFilterReceived Callback failed with error: ", + "onCompactFiltersReceived Callback failed with error: ", err) })) } @@ -115,8 +114,8 @@ object NodeCallbacks { NodeCallbacks(onMerkleBlockReceived = Seq(f)) /** Constructs a set of callbacks that only acts on compact filter received */ - def onCompactFilterReceived(f: OnCompactFilterReceived): NodeCallbacks = - NodeCallbacks(onCompactFilterReceived = Seq(f)) + def onCompactFilterReceived(f: OnCompactFiltersReceived): NodeCallbacks = + NodeCallbacks(onCompactFiltersReceived = Seq(f)) /** Constructs a set of callbacks that only acts on block headers received */ def onBlockHeadersReceived(f: OnBlockHeadersReceived): NodeCallbacks = @@ -128,7 +127,7 @@ object NodeCallbacks { onTxReceived = Seq.empty, onBlockReceived = Seq.empty, onMerkleBlockReceived = Seq.empty, - onCompactFilterReceived = Seq.empty, + onCompactFiltersReceived = Seq.empty, onBlockHeadersReceived = Seq.empty ) } diff --git a/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala b/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala index 39cb2078bb..0f1bcac826 100644 --- a/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala +++ b/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala @@ -17,11 +17,14 @@ import scala.concurrent.{ExecutionContext, Future} /** This actor is meant to handle a [[org.bitcoins.core.p2p.DataPayload DataPayload]] * that a peer to sent to us on the p2p network, for instance, if we a receive a * [[org.bitcoins.core.p2p.HeadersMessage HeadersMessage]] we should store those headers in our database + * + * @param currentFilterBatch holds the current batch of filters to be processed, after its size reaches + * chainConfig.filterBatchSize they will be processed and then emptied */ case class DataMessageHandler( chainApi: ChainApi, callbacks: NodeCallbacks, - receivedFilterCount: Int = 0, + currentFilterBatch: Vector[CompactFilterMessage] = Vector.empty, syncing: Boolean = false)( implicit ec: ExecutionContext, appConfig: NodeAppConfig, @@ -74,37 +77,41 @@ case class DataMessageHandler( } case filter: CompactFilterMessage => logger.debug(s"Received ${filter.commandName}, $filter") + val batchSizeFull: Boolean = currentFilterBatch.size == chainConfig.filterBatchSize - 1 for { - (newCount, newSyncing) <- if (receivedFilterCount == chainConfig.filterBatchSize - 1) { + (newBatch, newSyncing) <- if (batchSizeFull) { logger.info( s"Received maximum amount of filters in one batch. This means we are not synced, requesting more") for { _ <- sendNextGetCompactFilterCommand(peerMsgSender, filter.blockHash.flip) - } yield (0, syncing) + } yield (Vector.empty, syncing) } else { for { - filterHeaderCount <- chainApi.getFilterHeaderCount - filterCount <- chainApi.getFilterCount + filterHeaderCount <- chainApi.getFilterHeaderCount() + filterCount <- chainApi.getFilterCount() } yield { val syncing = filterCount < filterHeaderCount - 1 if (!syncing) { logger.info(s"We are synced") } - (receivedFilterCount + 1, syncing) + (currentFilterBatch :+ filter, syncing) } } newChainApi <- chainApi.processFilter(filter) - blockFilter <- Future( - BlockFilter.fromBytes(filter.filterBytes, filter.blockHash)) - _ <- callbacks.executeOnCompactFilterReceivedCallbacks( - logger, - filter.blockHash, - blockFilter) + // If we are not syncing or our filter batch is full, process the filters + _ <- if (!syncing || batchSizeFull) { + val blockFilters = currentFilterBatch.map { filter => + (filter.blockHash, + BlockFilter.fromBytes(filter.filterBytes, filter.blockHash)) + } + callbacks.executeOnCompactFiltersReceivedCallbacks(logger, + blockFilters) + } else FutureUtil.unit } yield { this.copy(chainApi = newChainApi, - receivedFilterCount = newCount, + currentFilterBatch = newBatch, syncing = newSyncing) } case notHandling @ (MemPoolMessage | _: GetHeadersMessage | @@ -321,8 +328,8 @@ object DataMessageHandler { type OnTxReceived = Transaction => Future[Unit] /** Callback for handling a received compact block filter */ - type OnCompactFilterReceived = - (DoubleSha256Digest, GolombFilter) => Future[Unit] + type OnCompactFiltersReceived = + (Vector[(DoubleSha256Digest, GolombFilter)]) => Future[Unit] /** Callback for handling a received block header */ type OnBlockHeadersReceived = Vector[BlockHeader] => Future[Unit] diff --git a/wallet/src/main/scala/org/bitcoins/wallet/api/WalletApi.scala b/wallet/src/main/scala/org/bitcoins/wallet/api/WalletApi.scala index edc4914035..fa45d75ff8 100644 --- a/wallet/src/main/scala/org/bitcoins/wallet/api/WalletApi.scala +++ b/wallet/src/main/scala/org/bitcoins/wallet/api/WalletApi.scala @@ -90,7 +90,12 @@ trait LockedWalletApi extends WalletApi with WalletLogger { def processCompactFilter( blockHash: DoubleSha256Digest, - blockFilter: GolombFilter): Future[LockedWalletApi] = { + blockFilter: GolombFilter): Future[LockedWalletApi] = + processCompactFilters(Vector((blockHash, blockFilter))) + + def processCompactFilters( + blockFilters: Vector[(DoubleSha256Digest, GolombFilter)]): Future[ + LockedWalletApi] = { val utxosF = listUtxos() val addressesF = listAddresses() for { @@ -99,11 +104,12 @@ trait LockedWalletApi extends WalletApi with WalletLogger { scriptPubKeys = utxos.flatMap(_.redeemScriptOpt).toSet ++ addresses .map(_.scriptPubKey) .toSet - _ <- Future { - val matcher = SimpleFilterMatcher(blockFilter) - if (matcher.matchesAny(scriptPubKeys.toVector.map(_.asmBytes))) { - nodeApi.downloadBlocks(Vector(blockHash)) - } + _ <- FutureUtil.sequentially(blockFilters) { + case (blockHash, blockFilter) => + val matcher = SimpleFilterMatcher(blockFilter) + if (matcher.matchesAny(scriptPubKeys.toVector.map(_.asmBytes))) { + nodeApi.downloadBlocks(Vector(blockHash)) + } else FutureUtil.unit } } yield { this