Make ChainApi to scan the block filters in order to find matches (#786)

* Make ChainApi to scan the block filters in order to find matches

* performance improvements, unit test

* some parallelization

* addressed comments

* parallelism level

* fix parallelism computation

* Scala 2.11 compatibility

* increased test coverage

* cleanup

* more checks and cleanup

* change ChainApi.getMatchingBlocks signature

* some more changes
This commit is contained in:
rorp 2019-10-11 12:19:42 -07:00 committed by Chris Stewart
parent 8e40c0e9d0
commit 82e6c36493
12 changed files with 308 additions and 24 deletions

View file

@ -3,7 +3,11 @@ package org.bitcoins.chain.blockchain
import akka.actor.ActorSystem
import org.bitcoins.chain.api.ChainApi
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.chain.models.{BlockHeaderDb, BlockHeaderDbHelper}
import org.bitcoins.chain.models.{
BlockHeaderDb,
BlockHeaderDbHelper,
CompactFilterDb
}
import org.bitcoins.core.crypto.{
DoubleSha256Digest,
DoubleSha256DigestBE,
@ -11,7 +15,9 @@ import org.bitcoins.core.crypto.{
}
import org.bitcoins.core.gcs.{BlockFilter, FilterHeader, FilterType}
import org.bitcoins.core.p2p.CompactFilterMessage
import org.bitcoins.core.protocol.BitcoinAddress
import org.bitcoins.core.protocol.blockchain.BlockHeader
import org.bitcoins.core.util.CryptoUtil
import org.bitcoins.testkit.BitcoinSTestAppConfig
import org.bitcoins.testkit.chain.fixture.ChainFixtureTag
import org.bitcoins.testkit.chain.{
@ -359,8 +365,6 @@ class ChainHandlerTest extends ChainUnitTest {
val blockHashBE =
DoubleSha256DigestBE.fromBytes(ECPrivateKey.freshPrivateKey.bytes)
val golombFilter = BlockFilter.fromHex("017fa880", blockHashBE.flip)
val firstFilter = CompactFilterMessage(blockHash = blockHashBE.flip,
filter = golombFilter)
val firstFilterHeader = FilterHeader(filterHash = golombFilter.hash,
prevHeaderHash =
DoubleSha256Digest.empty)
@ -408,6 +412,62 @@ class ChainHandlerTest extends ChainUnitTest {
}
}
it must "match block filters" in { chainHandler: ChainHandler =>
import scodec.bits._
// This is a filter for a random block on testnet
val filterBytes: ByteVector =
hex"fd2701f0ed169ad16107a8a74609b9e4de3c6133c564f79923ca228805d3" ++
hex"8e3efc796c4b35034cb573b10b759cdda5efd19e1cdb4d343afcb06455fa" ++
hex"820b06eca828ad61d3377fa464f3bd06ff4432310a363f667e13d09ba993" ++
hex"264c703a0aa668b33eaa555bd3e93ac85dfde380ab723aafd407dfa13ffe" ++
hex"2e7ddf6f452bd0d977617c4ab2dc3b38c26810023984ad57890e3cf34cfc" ++
hex"2d4a6973b9430ede26bfd9f5bb24e043d48483d84b9025d0a940b15f13fc" ++
hex"0a1e77abd7626869f417c7710e9a6315477691d7c4e2c50f0e776755a62a" ++
hex"b6f0e8eb7a3be8d1a8c3d9dd4602efc5146f0d431d1669378d7afa03c7b9" ++
hex"84d9b0b78007abb6e7c036156e5186d1d79a2f37daecfcbe8821cf42851c" ++
hex"b10ef0c359307d54e53078eb631f02c067a474dceb484da20bc0e7c5451a" ++
hex"b957f46b306caa82938b19bb34fd76c5cc07e048932524704dec8f72c91c" ++
hex"d5ee1f4648de839047a0bea0d4d4d66c19cfccc2b5f285a84af18114f608" ++
hex"f144391648aedfb5ffcccbb51272512d6ba9a2e19a47cebe5b50a8a7073a" ++
hex"1c24059440444047a41bdbab16f61bc4b0ee8987de82fd25cc62abc86e2b" ++
hex"577fc55175be138680df7253a8bcae9d9954391d3bed806ce5a6869b4553" ++
hex"0f214486b1b7f0347efcfde58ca0882f059f7b1541c74506930897c78e23" ++
hex"a6c94b49856369606ed652b8c7402a49f289cb5d1098bb999112225327e0" ++
hex"a32efd2bcd192a2ffbd1997c6a3b7d1a9445bc31fb57485ebe0c431e482b" ++
hex"04e509e557cff107cee08a45c22aa3cbdcb9d305bd95c919e90239e0ec29" ++
hex"2a5418a6151f431e8ab82278b3d816ecd483f43d3d657dae9996cc523fdd" ++
hex"242c4e01935db91a2936e9398ff7278b8a3430eed99ad25fc2a41afc0b4a" ++
hex"e417f6c1785414607cfa13f04173740333a5b58655c74a51deddb38cf8c3" ++
hex"d50b7d2ccf380cad34a5c341e7155494cc4560dff3b19bf88b4d73e9ce76" ++
hex"cbeff573fe93674e4a752d06d5321ff00a4582d62683fb4986d36eaec825" ++
hex"c14d41b2d5aefaf539e989f7fa097eac657c70b975c56e26b73fb9401ce3" ++
hex"81502f0883d52c6a3bcc956e0ea1787f0717d0205fecfe55b01edb1ac0"
val compactFilterDb = CompactFilterDb(
hashBE = CryptoUtil.doubleSHA256(filterBytes).flip,
filterType = FilterType.Basic,
bytes = filterBytes,
height = 1,
// this is the hash of the random testnet block
blockHashBE = DoubleSha256DigestBE
.fromHex(
"00000000496dcc754fabd97f3e2df0a7337eab417d75537fecf97a7ebb0e7c75")
)
for {
created <- chainHandler.filterDAO.create(compactFilterDb)
matched <- chainHandler.getMatchingBlocks(
scripts = Vector(
// this is a random address which is included into the block
BitcoinAddress("n1RH2x3b3ah4TGQtgrmNAHfmad9wr8U2QY").get.scriptPubKey),
startOpt = None,
endOpt = None
)
} yield {
assert(Vector(created.blockHashBE) == matched)
}
}
final def processHeaders(
processorF: Future[ChainApi],
headers: Vector[BlockHeader],

View file

@ -1,17 +1,19 @@
package org.bitcoins.chain.api
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.chain.models.{
BlockHeaderDb,
CompactFilterDb,
CompactFilterHeaderDb
}
import org.bitcoins.core.crypto.{DoubleSha256Digest, DoubleSha256DigestBE}
import org.bitcoins.core.protocol.blockchain.BlockHeader
import scala.concurrent.{ExecutionContext, Future}
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.core.gcs.FilterHeader
import org.bitcoins.core.p2p.CompactFilterMessage
import org.bitcoins.core.protocol.BlockStamp
import org.bitcoins.core.protocol.blockchain.BlockHeader
import org.bitcoins.core.protocol.script.ScriptPubKey
import scala.concurrent.{ExecutionContext, Future}
/**
* Entry api to the chain project for adding new things to our blockchain
@ -164,4 +166,27 @@ trait ChainApi {
def getFiltersAtHeight(height: Int)(
implicit ec: ExecutionContext): Future[Vector[CompactFilterDb]]
/**
* Iterates over the block filters in order to find filters that match to the given addresses
*
* @param scripts list of [[ScriptPubKey]]'s to watch
* @param startOpt start point (if empty it starts with the genesis block)
* @param endOpt end point (if empty it ends with the best tip)
* @param batchSize number of filters that can be matched in one batch
* (default [[ChainConfig.filterBatchSize]]
* @param parallelismLevel max number of threads required to perform matching
* (default [[Runtime.availableProcessors]])
* @return a list of matching block hashes
*/
def getMatchingBlocks(
scripts: Vector[ScriptPubKey],
startOpt: Option[BlockStamp] = None,
endOpt: Option[BlockStamp] = None,
batchSize: Int = chainConfig.filterBatchSize,
parallelismLevel: Int = Runtime.getRuntime.availableProcessors())(
implicit ec: ExecutionContext): Future[Vector[DoubleSha256DigestBE]]
/** Returns the block height of the given block stamp */
def getHeightByBlockStamp(blockStamp: BlockStamp)(
implicit ec: ExecutionContext): Future[Int]
}

View file

@ -26,3 +26,8 @@ case class UnknownBlockHeight(message: String) extends ChainException(message)
* [[org.bitcoins.chain.blockchain.ChainHandler]] tried to process multiple filters for the same block hash
*/
case class DuplicateFilters(message: String) extends ChainException(message)
/**
* The given block range is invalid
*/
case class InvalidBlockRange(message: String) extends ChainException(message)

View file

@ -7,10 +7,13 @@ import org.bitcoins.chain.models._
import org.bitcoins.core.crypto.{DoubleSha256Digest, DoubleSha256DigestBE}
import org.bitcoins.core.gcs.FilterHeader
import org.bitcoins.core.p2p.CompactFilterMessage
import org.bitcoins.core.protocol.BlockStamp
import org.bitcoins.core.protocol.blockchain.BlockHeader
import org.bitcoins.core.protocol.script.ScriptPubKey
import org.bitcoins.core.util.{CryptoUtil, FutureUtil}
import scala.concurrent.{ExecutionContext, Future}
import scala.annotation.tailrec
import scala.concurrent._
/**
* Chain Handler is meant to be the reference implementation
@ -357,6 +360,132 @@ case class ChainHandler(
override def getFiltersAtHeight(height: Int)(
implicit ec: ExecutionContext): Future[Vector[CompactFilterDb]] =
filterDAO.getAtHeight(height)
/** Implements [[ChainApi.getMatchingBlocks()]].
*
* I queries the filter database for [[batchSize]] filters a time
* and tries to run [[GolombFilter.matchesAny]] for each filter.
*
* It tries to match the filters in parallel using [[parallelismLevel]] threads.
* For best results use it with a separate execution context.
*/
def getMatchingBlocks(
scripts: Vector[ScriptPubKey],
startOpt: Option[BlockStamp] = None,
endOpt: Option[BlockStamp] = None,
batchSize: Int = chainConfig.filterBatchSize,
parallelismLevel: Int = Runtime.getRuntime.availableProcessors())(
implicit ec: ExecutionContext): Future[Vector[DoubleSha256DigestBE]] = {
require(batchSize > 0, "batch size must be greater than zero")
require(parallelismLevel > 0, "parallelism level must be greater than zero")
logger.info(
s"Starting looking for matching blocks for scripts ${scripts.mkString(",")}")
if (scripts.isEmpty) {
logger.info(s"No scripts to match")
Future.successful(Vector.empty)
} else {
val bytes = scripts.map(_.asmBytes)
/** Calculates group size to split a filter vector into [[parallelismLevel]] groups.
* It's needed to limit number of threads required to run the matching */
def calcGroupSize(vectorSize: Int): Int =
if (vectorSize / parallelismLevel * parallelismLevel < vectorSize)
vectorSize / parallelismLevel + 1
else vectorSize / parallelismLevel
/** Iterates over the grouped vector of filters to find matches with the given [[bytes]].
*/
def findMatches(filterGroups: Iterator[Vector[CompactFilterDb]]): Future[
Iterator[DoubleSha256DigestBE]] = {
// Sequence on the filter groups making sure the number of threads doesn't exceed [[parallelismLevel]].
Future
.sequence(filterGroups.map { filterGroup =>
// We need to wrap in a future here to make sure we can
// potentially run these matches in parallel
Future {
// Find any matches in the group and add the corresponding block hashes into the result
filterGroup.foldLeft(Vector.empty[DoubleSha256DigestBE]) {
(blocks, filter) =>
if (filter.golombFilter.matchesAny(bytes))
blocks :+ filter.blockHashBE
else blocks
}
}
})
.map(_.flatten)
}
/** Iterates over all filters in the range to find matches */
@tailrec
def loop(
start: Int,
end: Int,
acc: Future[Vector[DoubleSha256DigestBE]]): Future[
Vector[DoubleSha256DigestBE]] = {
if (end <= start) {
acc
} else {
val startHeight = end - (batchSize - 1)
val endHeight = end
val newAcc: Future[Vector[DoubleSha256DigestBE]] = for {
compactFilterDbs <- filterDAO.getBetweenHeights(startHeight,
endHeight)
groupSize = calcGroupSize(compactFilterDbs.size)
grouped = compactFilterDbs.grouped(groupSize)
filtered <- findMatches(grouped)
res <- acc
} yield {
res ++ filtered
}
val newEnd = Math.max(start, endHeight - batchSize)
loop(start, newEnd, newAcc)
}
}
val res = for {
startHeight <- startOpt.fold(Future.successful(0))(
getHeightByBlockStamp)
_ = if (startHeight < 0)
throw InvalidBlockRange(s"Start position cannot negative")
endHeight <- endOpt.fold(getFilterCount)(getHeightByBlockStamp)
_ = if (startHeight > endHeight)
throw InvalidBlockRange(
s"End position cannot precede start: $startHeight:$endHeight")
matched <- loop(startHeight, endHeight, Future.successful(Vector.empty))
} yield {
matched
}
res.foreach { blocks =>
logger.info(s"Done looking for matching blocks for addresses ${scripts
.mkString(",")}: blocks matched ${blocks.size} latest block ${blocks.headOption
.getOrElse("")}")
}
res.failed.foreach { e =>
logger.error(s"Cannot find matching blocks", e)
}
res
}
}
/** @inheritdoc */
override def getHeightByBlockStamp(blockStamp: BlockStamp)(
implicit ec: ExecutionContext): Future[Int] =
blockStamp match {
case blockHeight: BlockStamp.BlockHeight =>
Future.successful(blockHeight.height)
case blockHash: BlockStamp.BlockHash =>
getHeader(blockHash.hash.flip).map { header =>
header
.map(_.height)
.getOrElse(throw UnknownBlockHash(
s"Unknown block hash ${blockHash.hash.flip}"))
}
case blockTime: BlockStamp.BlockTime =>
Future.failed(new RuntimeException(s"Not implemented: $blockTime"))
}
}
object ChainHandler {

View file

@ -71,4 +71,20 @@ case class CompactFilterDAO()(
val query = table.map(_.height).max.getOrElse(0).result
query
}
/** Gets filters between (inclusive) from and to, could be out of order */
def getBetweenHeights(from: Int, to: Int): Future[Vector[CompactFilterDb]] = {
val query = getBetweenHeightsQuery(from, to)
database.runVec(query)
}
private def getBetweenHeightsQuery(
from: Int,
to: Int): SQLiteProfile.StreamingProfileAction[
Seq[CompactFilterDb],
CompactFilterDb,
Effect.Read] = {
table.filter(header => header.height >= from && header.height <= to).result
}
}

View file

@ -0,0 +1,19 @@
package org.bitcoins.core.gcs
import org.bitcoins.testkit.util.BitcoinSUnitTest
import scodec.bits.ByteVector
class FilterTypeTest extends BitcoinSUnitTest {
behavior of "FilterType"
it must "parse bytes" in {
assert(FilterType.fromBytes(ByteVector(0)) == FilterType.Basic)
assertThrows[IllegalArgumentException](FilterType.fromBytes(ByteVector(1)))
}
it must "know its code" in {
assert(FilterType.getCode(FilterType.Basic) == 0)
assert(FilterType.byCode(0) == FilterType.Basic)
assertThrows[IllegalArgumentException](FilterType.byCode(1))
}
}

View file

@ -7,8 +7,7 @@ import org.bitcoins.testkit.core.gen.CryptoGenerators._
import org.bitcoins.testkit.core.gen.NumberGenerator
import org.bitcoins.testkit.util.BitcoinSUnitTest
import org.scalacheck.Gen
import scodec.bits.ByteVector
import scodec.bits._
import scodec.bits.{ByteVector, _}
class GolombFilterTest extends BitcoinSUnitTest {
behavior of "GolombFilter"
@ -26,10 +25,14 @@ class GolombFilterTest extends BitcoinSUnitTest {
assert(!filter.matchesHash(rand))
assert(filter.matchesHash(data1))
assert(filter.matchesHash(data2))
assert(!filter.matchesAnyHash(Vector(rand)))
assert(filter.matchesAnyHash(Vector(rand, data1, data2)))
}
}
it must "match arbitrary encoded data for bip 158 GCS parameters" in {
assertThrows[IllegalArgumentException](SipHashKey(ByteVector.empty))
val genKey: Gen[SipHashKey] =
Gen
.listOfN(16, NumberGenerator.byte)
@ -50,6 +53,7 @@ class GolombFilterTest extends BitcoinSUnitTest {
val hashes = filter.decodedHashes
data.foreach(element => assert(filter.matches(element)))
assert(filter.matchesAny(data))
val hashesNotInData: Vector[UInt64] =
randHashes.filterNot(hashes.contains)
@ -104,5 +108,6 @@ class GolombFilterTest extends BitcoinSUnitTest {
assert(header.prevHeaderHash == prevHeader.hash)
assert(header == nextHeader)
assert(nextHeader == prevHeader.nextHeader(filter))
}
}

View file

@ -34,8 +34,7 @@ class RawGetCompactFilterHeadersMessageSerializerTest extends BitcoinSUnitTest {
assert(bytes == message.bytes)
val anotherMessage = GetCompactFilterHeadersMessage(
FilterType.Basic,
UInt32.fromHex("0180"),
0x0180,
DoubleSha256Digest.fromHex(
"8000000000000000000000000000000000000000000000000000000000000001"))

View file

@ -41,13 +41,14 @@ object FilterType extends Factory[FilterType] {
knownFilterTypes.get(filterType) match {
case Some(code) => code
case None =>
throw new RuntimeException(s"Unknown filter type: ${filterType}")
throw new IllegalArgumentException(
s"Unknown filter type: ${filterType}")
}
def byCode(code: Short): FilterType = knownFilterTypeCodes.get(code) match {
case Some(filterType) => filterType
case None =>
throw new RuntimeException(s"Unknown filter type code: ${code}")
throw new IllegalArgumentException(s"Unknown filter type code: ${code}")
}
}

View file

@ -2,15 +2,10 @@ package org.bitcoins.core.gcs
import org.bitcoins.core.crypto.DoubleSha256Digest
import org.bitcoins.core.number.{UInt64, UInt8}
import org.bitcoins.core.protocol.{CompactSizeUInt, NetworkElement}
import org.bitcoins.core.protocol.blockchain.Block
import org.bitcoins.core.protocol.script.{EmptyScriptPubKey, ScriptPubKey}
import org.bitcoins.core.protocol.transaction.{
Transaction,
TransactionInput,
TransactionOutPoint,
TransactionOutput
}
import org.bitcoins.core.protocol.transaction.{Transaction, TransactionOutput}
import org.bitcoins.core.protocol.{CompactSizeUInt, NetworkElement}
import org.bitcoins.core.script.control.OP_RETURN
import org.bitcoins.core.util.{BitcoinSUtil, CryptoUtil}
import scodec.bits.{BitVector, ByteVector}
@ -84,6 +79,20 @@ case class GolombFilter(
matchesHash(hash)
}
/** Checks whether there's a match for at least one of the given hashes
* TODO refactor it to implement https://github.com/bitcoin/bips/blob/master/bip-0158.mediawiki#golomb-coded-set-multi-match
*/
def matchesAnyHash(hashes: Vector[UInt64]): Boolean =
hashes.exists(matchesHash)
/** Hashes the given vector of data and calls [[matchesAnyHash()]] to find a match */
def matchesAny(data: Vector[ByteVector]): Boolean = {
val f = n.num * m
val hashes = data.map(GCS.hashToRange(_, f, key))
matchesAnyHash(hashes)
}
}
object BlockFilter {
@ -109,6 +118,8 @@ object BlockFilter {
* to BIP 158 Basic Block Filters
* @see [[https://github.com/bitcoin/bips/blob/master/bip-0158.mediawiki#contents]]
*/
/*
TODO uncomment and add unit tests for this method
def getInputScriptPubKeysFromBlock(
block: Block,
utxoProvider: TempUtxoProvider): Vector[ScriptPubKey] = {
@ -125,17 +136,21 @@ object BlockFilter {
.filterNot(_.scriptPubKey == EmptyScriptPubKey)
.map(_.scriptPubKey)
}
*/
/**
* Given a Block and access to the UTXO set, constructs a Block Filter for that block
* @see [[https://github.com/bitcoin/bips/blob/master/bip-0158.mediawiki#block-filters]]
*/
/*
TODO uncomment and add unit tests for this method
def apply(block: Block, utxoProvider: TempUtxoProvider): GolombFilter = {
val prevOutputScripts: Vector[ScriptPubKey] =
getInputScriptPubKeysFromBlock(block, utxoProvider)
BlockFilter(block, prevOutputScripts)
}
*/
/**
* Given a Block and access to the previous output scripts, constructs a Block Filter for that block

View file

@ -990,9 +990,6 @@ case class CompactFilterHeadersMessage(
val commandName: String = NetworkPayload.compactFilterHeadersCommandName
def bytes: ByteVector = RawCompactFilterHeadersMessageSerializer.write(this)
override def toString(): String =
s"CompactFilterHeadersMessage($filterType, stopHash=$stopHash, previousFilterHeader=$previousFilterHeader, filterHashes=$filterHashes)"
def filterHeaders: Vector[FilterHeader] = {
val z = FilterHeader(filterHashes.head, previousFilterHeader)
filterHashes.tail.foldLeft(Vector(z)) { (acc, nextFilterHash) =>

View file

@ -0,0 +1,13 @@
package org.bitcoins.core.protocol
import org.bitcoins.core.crypto.DoubleSha256Digest
import org.bitcoins.core.number.UInt32
/** This trait represents a point on blockchain, and is used to specify block ranges */
sealed trait BlockStamp
object BlockStamp {
case class BlockHash(hash: DoubleSha256Digest) extends BlockStamp
case class BlockHeight(height: Int) extends BlockStamp
case class BlockTime(time: UInt32) extends BlockStamp
}