Batch processing compact filters (#1363)

* Batch processing compact filters

* Use one vector, rename param

* Fix compile issue on older versions

* Process sequentially
This commit is contained in:
Ben Carman 2020-04-26 09:34:41 -05:00 committed by GitHub
parent 3592e5575b
commit 37c62369eb
6 changed files with 62 additions and 49 deletions

View File

@ -156,9 +156,10 @@ object Main extends App {
lazy val onTx: OnTxReceived = { tx =>
wallet.processTransaction(tx, blockHash = None).map(_ => ())
}
lazy val onCompactFilter: OnCompactFilterReceived = {
(blockHash, blockFilter) =>
wallet.processCompactFilter(blockHash, blockFilter).map(_ => ())
lazy val onCompactFilters: OnCompactFiltersReceived = { blockFilters =>
wallet
.processCompactFilters(blockFilters = blockFilters)
.map(_ => ())
}
lazy val onBlock: OnBlockReceived = { block =>
wallet.processBlock(block).map(_ => ())
@ -177,7 +178,7 @@ object Main extends App {
} else if (nodeConf.isNeutrinoEnabled) {
Future.successful(
NodeCallbacks(onBlockReceived = Seq(onBlock),
onCompactFilterReceived = Seq(onCompactFilter),
onCompactFiltersReceived = Seq(onCompactFilters),
onBlockHeadersReceived = Seq(onHeaders)))
} else {
Future.failed(new RuntimeException("Unexpected node type"))

View File

@ -91,21 +91,21 @@ val keyManager = keyManagerE match {
// a block filter, the returned NodeCallbacks will contain the necessary items to initialize the callbacks
def createCallbacks(
processTransaction: Transaction => Future[Unit],
processCompactFilter: (DoubleSha256Digest, GolombFilter) => Future[Unit],
processCompactFilters: (Vector[(DoubleSha256Digest, GolombFilter)]) => Future[Unit],
processBlock: Block => Future[Unit]): NodeCallbacks = {
lazy val onTx: OnTxReceived = { tx =>
processTransaction(tx)
}
lazy val onCompactFilter: OnCompactFilterReceived = {
(blockHash, blockFilter) =>
processCompactFilter(blockHash, blockFilter)
lazy val onCompactFilters: OnCompactFiltersReceived = {
blockFilters =>
processCompactFilters(blockFilters)
}
lazy val onBlock: OnBlockReceived = { block =>
processBlock(block)
}
NodeCallbacks(onTxReceived = Seq(onTx),
onBlockReceived = Seq(onBlock),
onCompactFilterReceived = Seq(onCompactFilter))
onCompactFiltersReceived = Seq(onCompactFilters))
}
// Here is a super simple example of a callback, this could be replaced with anything, from
@ -117,12 +117,12 @@ val exampleProcessTx = (tx: Transaction) =>
val exampleProcessBlock = (block: Block) =>
Future.successful(println(s"Received block: ${block.blockHeader.hashBE}"))
val exampleProcessFilter =
(blockHash: DoubleSha256Digest, filter: GolombFilter) =>
Future.successful(println(s"Received filter: ${blockHash.flip.hex} ${filter.hash.flip.hex}"))
val exampleProcessFilters =
(filters: Vector[(DoubleSha256Digest, GolombFilter)]) =>
Future.successful(println(s"Received filter: ${filters.head._1.flip.hex} ${filters.head._2.hash.flip.hex}"))
val exampleCallbacks =
createCallbacks(exampleProcessTx, exampleProcessFilter, exampleProcessBlock)
createCallbacks(exampleProcessTx, exampleProcessFilters, exampleProcessBlock)
// Here is where we are defining our actual chain api, Ideally this could be it's own class
// but for the examples sake we will keep it small.

View File

@ -3,7 +3,7 @@ package org.bitcoins.node
import org.bitcoins.core.currency._
import org.bitcoins.core.wallet.fee.SatoshisPerByte
import org.bitcoins.node.networking.peer.DataMessageHandler
import org.bitcoins.node.networking.peer.DataMessageHandler.OnCompactFilterReceived
import org.bitcoins.node.networking.peer.DataMessageHandler.OnCompactFiltersReceived
import org.bitcoins.rpc.client.common.BitcoindVersion
import org.bitcoins.rpc.util.AsyncUtil
import org.bitcoins.server.BitcoinSAppConfig
@ -55,16 +55,16 @@ class NeutrinoNodeWithWalletTest extends NodeUnitTest {
_ <- wallet.processBlock(block)
} yield ()
}
val onCompactFilter: OnCompactFilterReceived = { (blockHash, blockFilter) =>
val onCompactFilter: OnCompactFiltersReceived = { blockFilters =>
for {
wallet <- walletF
_ <- wallet.processCompactFilter(blockHash, blockFilter)
_ <- wallet.processCompactFilters(blockFilters)
} yield ()
}
NodeCallbacks(
onBlockReceived = Seq(onBlock),
onCompactFilterReceived = Seq(onCompactFilter)
onCompactFiltersReceived = Seq(onCompactFilter)
)
}

View File

@ -17,7 +17,7 @@ import scala.concurrent.{ExecutionContext, Future}
*
*/
case class NodeCallbacks(
onCompactFilterReceived: Seq[OnCompactFilterReceived] = Seq.empty,
onCompactFiltersReceived: Seq[OnCompactFiltersReceived] = Seq.empty,
onTxReceived: Seq[OnTxReceived] = Seq.empty,
onBlockReceived: Seq[OnBlockReceived] = Seq.empty,
onMerkleBlockReceived: Seq[OnMerkleBlockReceived] = Seq.empty,
@ -25,7 +25,7 @@ case class NodeCallbacks(
) {
def +(other: NodeCallbacks): NodeCallbacks = copy(
onCompactFilterReceived = onCompactFilterReceived ++ other.onCompactFilterReceived,
onCompactFiltersReceived = onCompactFiltersReceived ++ other.onCompactFiltersReceived,
onTxReceived = onTxReceived ++ other.onTxReceived,
onBlockReceived = onBlockReceived ++ other.onBlockReceived,
onMerkleBlockReceived = onMerkleBlockReceived ++ other.onMerkleBlockReceived,
@ -68,18 +68,17 @@ case class NodeCallbacks(
}))
}
def executeOnCompactFilterReceivedCallbacks(
def executeOnCompactFiltersReceivedCallbacks(
logger: MarkedLogger,
blockHash: DoubleSha256Digest,
blockFilter: GolombFilter)(
blockFilters: Vector[(DoubleSha256Digest, GolombFilter)])(
implicit ec: ExecutionContext): Future[Unit] = {
onCompactFilterReceived
onCompactFiltersReceived
.foldLeft(FutureUtil.unit)((acc, callback) =>
acc.flatMap(_ =>
callback(blockHash, blockFilter).recover {
callback(blockFilters).recover {
case err: Throwable =>
logger.error(
"onCompactFilterReceived Callback failed with error: ",
"onCompactFiltersReceived Callback failed with error: ",
err)
}))
}
@ -115,8 +114,8 @@ object NodeCallbacks {
NodeCallbacks(onMerkleBlockReceived = Seq(f))
/** Constructs a set of callbacks that only acts on compact filter received */
def onCompactFilterReceived(f: OnCompactFilterReceived): NodeCallbacks =
NodeCallbacks(onCompactFilterReceived = Seq(f))
def onCompactFilterReceived(f: OnCompactFiltersReceived): NodeCallbacks =
NodeCallbacks(onCompactFiltersReceived = Seq(f))
/** Constructs a set of callbacks that only acts on block headers received */
def onBlockHeadersReceived(f: OnBlockHeadersReceived): NodeCallbacks =
@ -128,7 +127,7 @@ object NodeCallbacks {
onTxReceived = Seq.empty,
onBlockReceived = Seq.empty,
onMerkleBlockReceived = Seq.empty,
onCompactFilterReceived = Seq.empty,
onCompactFiltersReceived = Seq.empty,
onBlockHeadersReceived = Seq.empty
)
}

View File

@ -17,11 +17,14 @@ import scala.concurrent.{ExecutionContext, Future}
/** This actor is meant to handle a [[org.bitcoins.core.p2p.DataPayload DataPayload]]
* that a peer to sent to us on the p2p network, for instance, if we a receive a
* [[org.bitcoins.core.p2p.HeadersMessage HeadersMessage]] we should store those headers in our database
*
* @param currentFilterBatch holds the current batch of filters to be processed, after its size reaches
* chainConfig.filterBatchSize they will be processed and then emptied
*/
case class DataMessageHandler(
chainApi: ChainApi,
callbacks: NodeCallbacks,
receivedFilterCount: Int = 0,
currentFilterBatch: Vector[CompactFilterMessage] = Vector.empty,
syncing: Boolean = false)(
implicit ec: ExecutionContext,
appConfig: NodeAppConfig,
@ -74,37 +77,41 @@ case class DataMessageHandler(
}
case filter: CompactFilterMessage =>
logger.debug(s"Received ${filter.commandName}, $filter")
val batchSizeFull: Boolean = currentFilterBatch.size == chainConfig.filterBatchSize - 1
for {
(newCount, newSyncing) <- if (receivedFilterCount == chainConfig.filterBatchSize - 1) {
(newBatch, newSyncing) <- if (batchSizeFull) {
logger.info(
s"Received maximum amount of filters in one batch. This means we are not synced, requesting more")
for {
_ <- sendNextGetCompactFilterCommand(peerMsgSender,
filter.blockHash.flip)
} yield (0, syncing)
} yield (Vector.empty, syncing)
} else {
for {
filterHeaderCount <- chainApi.getFilterHeaderCount
filterCount <- chainApi.getFilterCount
filterHeaderCount <- chainApi.getFilterHeaderCount()
filterCount <- chainApi.getFilterCount()
} yield {
val syncing = filterCount < filterHeaderCount - 1
if (!syncing) {
logger.info(s"We are synced")
}
(receivedFilterCount + 1, syncing)
(currentFilterBatch :+ filter, syncing)
}
}
newChainApi <- chainApi.processFilter(filter)
blockFilter <- Future(
BlockFilter.fromBytes(filter.filterBytes, filter.blockHash))
_ <- callbacks.executeOnCompactFilterReceivedCallbacks(
logger,
filter.blockHash,
blockFilter)
// If we are not syncing or our filter batch is full, process the filters
_ <- if (!syncing || batchSizeFull) {
val blockFilters = currentFilterBatch.map { filter =>
(filter.blockHash,
BlockFilter.fromBytes(filter.filterBytes, filter.blockHash))
}
callbacks.executeOnCompactFiltersReceivedCallbacks(logger,
blockFilters)
} else FutureUtil.unit
} yield {
this.copy(chainApi = newChainApi,
receivedFilterCount = newCount,
currentFilterBatch = newBatch,
syncing = newSyncing)
}
case notHandling @ (MemPoolMessage | _: GetHeadersMessage |
@ -321,8 +328,8 @@ object DataMessageHandler {
type OnTxReceived = Transaction => Future[Unit]
/** Callback for handling a received compact block filter */
type OnCompactFilterReceived =
(DoubleSha256Digest, GolombFilter) => Future[Unit]
type OnCompactFiltersReceived =
(Vector[(DoubleSha256Digest, GolombFilter)]) => Future[Unit]
/** Callback for handling a received block header */
type OnBlockHeadersReceived = Vector[BlockHeader] => Future[Unit]

View File

@ -90,7 +90,12 @@ trait LockedWalletApi extends WalletApi with WalletLogger {
def processCompactFilter(
blockHash: DoubleSha256Digest,
blockFilter: GolombFilter): Future[LockedWalletApi] = {
blockFilter: GolombFilter): Future[LockedWalletApi] =
processCompactFilters(Vector((blockHash, blockFilter)))
def processCompactFilters(
blockFilters: Vector[(DoubleSha256Digest, GolombFilter)]): Future[
LockedWalletApi] = {
val utxosF = listUtxos()
val addressesF = listAddresses()
for {
@ -99,11 +104,12 @@ trait LockedWalletApi extends WalletApi with WalletLogger {
scriptPubKeys = utxos.flatMap(_.redeemScriptOpt).toSet ++ addresses
.map(_.scriptPubKey)
.toSet
_ <- Future {
val matcher = SimpleFilterMatcher(blockFilter)
if (matcher.matchesAny(scriptPubKeys.toVector.map(_.asmBytes))) {
nodeApi.downloadBlocks(Vector(blockHash))
}
_ <- FutureUtil.sequentially(blockFilters) {
case (blockHash, blockFilter) =>
val matcher = SimpleFilterMatcher(blockFilter)
if (matcher.matchesAny(scriptPubKeys.toVector.map(_.asmBytes))) {
nodeApi.downloadBlocks(Vector(blockHash))
} else FutureUtil.unit
}
} yield {
this