2022 09 12 Filter header and filter websocket events (#4777)

* implement chain callbacks for compact filter headers / compact filters

* Wire it up through the websocket

* Fix type for compactfilterprocessed

* Fix bug to write json rather than write a string
This commit is contained in:
Chris Stewart 2022-09-15 20:05:54 -05:00 committed by GitHub
parent 6b479e8765
commit bcddb015ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 430 additions and 54 deletions

View File

@ -2,6 +2,7 @@ package org.bitcoins.commons.jsonmodels.ws
import org.bitcoins.commons.jsonmodels.bitcoind.GetBlockHeaderResult
import org.bitcoins.commons.serializers.WsPicklers
import org.bitcoins.core.api.chain.db.{CompactFilterDb, CompactFilterHeaderDb}
import org.bitcoins.core.api.dlc.wallet.db.IncomingDLCOfferDb
import org.bitcoins.core.api.wallet.db.SpendingInfoDb
import org.bitcoins.core.protocol.BitcoinAddress
@ -67,9 +68,19 @@ object WalletWsType extends StringFactory[WalletWsType] {
object ChainWsType extends StringFactory[ChainWsType] {
case object BlockProcessed extends ChainWsType
case object CompactFilterHeaderProcessed extends ChainWsType
case object CompactFilterProcessed extends ChainWsType
case object SyncFlagChanged extends ChainWsType
private val all: Vector[ChainWsType] = Vector(BlockProcessed, SyncFlagChanged)
private val all: Vector[ChainWsType] =
Vector(
BlockProcessed,
CompactFilterHeaderProcessed,
CompactFilterProcessed,
SyncFlagChanged
)
override def fromStringOpt(string: String): Option[ChainWsType] = {
all.find(_.toString.toLowerCase() == string.toLowerCase)
@ -237,6 +248,26 @@ object ChainNotification {
}
}
case class CompactFilterHeaderProcessedNotification(
payload: CompactFilterHeaderDb)
extends ChainNotification[CompactFilterHeaderDb] {
override val `type`: ChainWsType = ChainWsType.CompactFilterHeaderProcessed
override val json: ujson.Value = {
upickle.default.writeJs(this)(
WsPicklers.compactFilterHeaderProcessedPickler)
}
}
case class CompactFilterProcessedNotification(payload: CompactFilterDb)
extends ChainNotification[CompactFilterDb] {
override val `type`: ChainWsType = ChainWsType.CompactFilterProcessed
override val json: ujson.Value = {
upickle.default.writeJs(this)(WsPicklers.compactFilterProcessedPickler)
}
}
case class SyncFlagChangedNotification(payload: Boolean)
extends ChainNotification[Boolean] {
override val `type`: ChainWsType = ChainWsType.SyncFlagChanged

View File

@ -4,6 +4,7 @@ import org.bitcoins.commons.jsonmodels.bitcoind.GetBlockHeaderResult
import org.bitcoins.commons.jsonmodels.bitcoind.RpcOpts.LockUnspentOutputParameter
import org.bitcoins.commons.jsonmodels.ws.WalletNotification.RescanComplete
import org.bitcoins.commons.serializers.JsonReaders.jsToSatoshis
import org.bitcoins.core.api.chain.db.{CompactFilterDb, CompactFilterHeaderDb}
import org.bitcoins.core.api.dlc.wallet.db.{DLCContactDb, IncomingDLCOfferDb}
import org.bitcoins.core.api.wallet.CoinSelectionAlgo
import org.bitcoins.core.api.wallet.db.SpendingInfoDb
@ -11,6 +12,7 @@ import org.bitcoins.core.config.DLC
import org.bitcoins.core.crypto._
import org.bitcoins.core.currency.{Bitcoins, Satoshis}
import org.bitcoins.core.dlc.accounting.DLCWalletAccounting
import org.bitcoins.core.gcs.FilterType
import org.bitcoins.core.hd.{AddressType, HDPath}
import org.bitcoins.core.number.{Int32, UInt16, UInt32, UInt64}
import org.bitcoins.core.protocol.blockchain.Block
@ -1543,6 +1545,76 @@ object Picklers {
.bimap(writeBlockHeaderResult(_), readBlockHeaderResult(_))
}
implicit val compactFilterHeaderPickler: ReadWriter[CompactFilterHeaderDb] = {
readwriter[ujson.Obj]
.bimap(writeCompactFilterHeaderDb(_), readCompactFilterHeaderDb(_))
}
implicit val compactFilterDbPickler: ReadWriter[CompactFilterDb] = {
readwriter[ujson.Obj]
.bimap(writeCompactFilterDb(_), readCompactFilterDb(_))
}
private def writeCompactFilterDb(
compactFilterDb: CompactFilterDb): ujson.Obj = {
ujson.Obj(
PicklerKeys.hashKey -> ujson.Str(compactFilterDb.hashBE.hex),
PicklerKeys.filterTypeKey -> ujson.Str(
compactFilterDb.filterType.toString),
PicklerKeys.compactFilterBytesKey -> ujson.Str(
compactFilterDb.bytes.toHex),
PicklerKeys.heightKey -> ujson.Num(compactFilterDb.height),
PicklerKeys.blockHashKey -> ujson.Str(compactFilterDb.blockHashBE.hex)
)
}
private def readCompactFilterDb(obj: ujson.Obj): CompactFilterDb = {
val hash = DoubleSha256DigestBE.fromHex(obj(PicklerKeys.hashKey).str)
val filterType = FilterType.fromString(obj(PicklerKeys.filterTypeKey).str)
val bytes =
ByteVector.fromValidHex(obj(PicklerKeys.compactFilterBytesKey).str)
val height = obj(PicklerKeys.heightKey).num.toInt
val blockHash =
DoubleSha256DigestBE.fromHex(obj(PicklerKeys.blockHashKey).str)
CompactFilterDb(
hashBE = hash,
filterType = filterType,
bytes = bytes,
height = height,
blockHashBE = blockHash
)
}
private def writeCompactFilterHeaderDb(
filterHeaderDb: CompactFilterHeaderDb): ujson.Obj = {
ujson.Obj(
PicklerKeys.hashKey -> ujson.Str(filterHeaderDb.hashBE.hex),
PicklerKeys.filterHashKey -> ujson.Str(filterHeaderDb.filterHashBE.hex),
PicklerKeys.previousFilterHeaderKey ->
ujson.Str(filterHeaderDb.previousFilterHeaderBE.hex),
PicklerKeys.blockHashKey -> ujson.Str(filterHeaderDb.blockHashBE.hex),
PicklerKeys.heightKey -> ujson.Num(filterHeaderDb.height)
)
}
private def readCompactFilterHeaderDb(
obj: ujson.Obj): CompactFilterHeaderDb = {
val hash = DoubleSha256DigestBE.fromHex(obj(PicklerKeys.hashKey).str)
val filterHash =
DoubleSha256DigestBE.fromHex(obj(PicklerKeys.filterHashKey).str)
val previousFilterHeader =
DoubleSha256DigestBE.fromHex(obj(PicklerKeys.previousFilterHeaderKey).str)
val blockHash =
DoubleSha256DigestBE.fromHex(obj(PicklerKeys.blockHashKey).str)
val height = obj(PicklerKeys.heightKey).num
CompactFilterHeaderDb(hashBE = hash,
filterHashBE = filterHash,
previousFilterHeaderBE = previousFilterHeader,
blockHashBE = blockHash,
height = height.toInt)
}
private def writeContactDb(contact: DLCContactDb): ujson.Obj = {
Obj(
PicklerKeys.aliasKey -> contact.alias,

View File

@ -2,6 +2,8 @@ package org.bitcoins.commons.serializers
import org.bitcoins.commons.jsonmodels.ws.ChainNotification.{
BlockProcessedNotification,
CompactFilterHeaderProcessedNotification,
CompactFilterProcessedNotification,
SyncFlagChangedNotification
}
import org.bitcoins.commons.jsonmodels.ws.DLCNodeNotification.{
@ -64,11 +66,17 @@ object WsPicklers {
val payloadJson: ujson.Value = notification match {
case BlockProcessedNotification(block) =>
upickle.default.writeJs(block)(Picklers.getBlockHeaderResultPickler)
case CompactFilterHeaderProcessedNotification(filterHeader) =>
upickle.default.writeJs(filterHeader)(
Picklers.compactFilterHeaderPickler)
case CompactFilterProcessedNotification(filter) =>
upickle.default.writeJs(filter)(Picklers.compactFilterDbPickler)
case SyncFlagChangedNotification(syncing) =>
upickle.default.writeJs(syncing)
}
val typeJson = upickle.default.writeJs(notification.`type`)
val notificationObj = ujson.Obj(
PicklerKeys.typeKey -> writeJs(notification.`type`),
PicklerKeys.typeKey -> typeJson,
PicklerKeys.payloadKey -> payloadJson
)
notificationObj
@ -83,6 +91,16 @@ object WsPicklers {
val block =
upickle.default.read(payloadObj)(Picklers.getBlockHeaderResultPickler)
BlockProcessedNotification(block)
case ChainWsType.CompactFilterHeaderProcessed =>
val filterheader =
upickle.default.read(payloadObj)(Picklers.compactFilterHeaderPickler)
CompactFilterHeaderProcessedNotification(filterheader)
case ChainWsType.CompactFilterProcessed =>
val filter =
upickle.default.read(payloadObj)(Picklers.compactFilterDbPickler)
CompactFilterProcessedNotification(filter)
case ChainWsType.SyncFlagChanged =>
val syncing = payloadObj.bool
SyncFlagChangedNotification(syncing)
@ -273,6 +291,23 @@ object WsPicklers {
)
}
implicit val compactFilterHeaderProcessedPickler: ReadWriter[
CompactFilterHeaderProcessedNotification] = {
readwriter[ujson.Obj].bimap(
writeChainNotification(_),
readChainNotification(_)
.asInstanceOf[CompactFilterHeaderProcessedNotification]
)
}
implicit val compactFilterProcessedPickler: ReadWriter[
CompactFilterProcessedNotification] = {
readwriter[ujson.Obj].bimap(
writeChainNotification(_),
readChainNotification(_).asInstanceOf[CompactFilterProcessedNotification]
)
}
implicit val syncFlagChangedPickler: ReadWriter[
SyncFlagChangedNotification] = {
readwriter[ujson.Obj].bimap(

View File

@ -23,14 +23,7 @@ import org.bitcoins.commons.jsonmodels.ws.{
WalletWsType,
WsNotification
}
import org.bitcoins.commons.rpc.{
GetBlockHeader,
GetNewAddress,
GetTransaction,
LockUnspent,
Rescan,
SendToAddress
}
import org.bitcoins.commons.rpc._
import org.bitcoins.commons.serializers.{Picklers, WsPicklers}
import org.bitcoins.core.currency.Bitcoins
import org.bitcoins.core.protocol.BitcoinAddress

View File

@ -6,9 +6,15 @@ import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.chain.{
ChainCallbacks,
OnBlockHeaderConnected,
OnCompactFilterConnected,
OnCompactFilterHeaderConnected,
OnSyncFlagChanged
}
import org.bitcoins.commons.jsonmodels.bitcoind.GetBlockHeaderResult
import org.bitcoins.commons.jsonmodels.ws.ChainNotification.{
BlockProcessedNotification,
CompactFilterHeaderProcessedNotification,
CompactFilterProcessedNotification
}
import org.bitcoins.commons.jsonmodels.ws.TorNotification.TorStartedNotification
import org.bitcoins.commons.jsonmodels.ws.{
ChainNotification,
@ -18,6 +24,7 @@ import org.bitcoins.commons.jsonmodels.ws.{
WsNotification
}
import org.bitcoins.core.api.chain.ChainApi
import org.bitcoins.core.api.chain.db.{CompactFilterDb, CompactFilterHeaderDb}
import org.bitcoins.core.api.dlc.wallet.db.IncomingDLCOfferDb
import org.bitcoins.core.protocol.blockchain.BlockHeader
import org.bitcoins.core.protocol.dlc.models.DLCStatus
@ -44,12 +51,9 @@ import scala.concurrent.{ExecutionContext, Future}
object WebsocketUtil extends Logging {
private def sendHeadersToWs(
results: Vector[GetBlockHeaderResult],
notifications: Vector[ChainNotification[_]],
queue: SourceQueueWithComplete[WsNotification[_]])(implicit
ec: ExecutionContext): Future[Unit] = {
val notifications =
results.map(result =>
ChainNotification.BlockProcessedNotification(result))
for {
_ <- FutureUtil.sequentially(notifications) { case msg =>
val x: Future[Unit] = queue
@ -80,12 +84,14 @@ object WebsocketUtil extends Logging {
//only emit the last header so that we don't overwhelm the UI
for {
results <- resultsF
_ <- sendHeadersToWs(Vector(results.last), queue)
notification = BlockProcessedNotification(results.last)
_ <- sendHeadersToWs(Vector(notification), queue)
} yield ()
} else {
val f = for {
results <- resultsF
_ <- sendHeadersToWs(results, queue)
notifications = results.map(BlockProcessedNotification(_))
_ <- sendHeadersToWs(notifications, queue)
} yield {
()
}
@ -94,6 +100,41 @@ object WebsocketUtil extends Logging {
}
}
val onCompactFilterHeaderProcessed: OnCompactFilterHeaderConnected = {
case filterHeaders: Vector[CompactFilterHeaderDb] =>
val isIBDF = chainApi.isIBD()
val emitBlockProccessedWhileIBDOnGoing =
chainAppConfig.ibdBlockProcessedEvents
isIBDF.flatMap { isIBD =>
if (isIBD && !emitBlockProccessedWhileIBDOnGoing) {
val notifications =
CompactFilterHeaderProcessedNotification(filterHeaders.last)
sendHeadersToWs(Vector(notifications), queue)
} else {
val notifications =
filterHeaders.map(CompactFilterHeaderProcessedNotification(_))
sendHeadersToWs(notifications, queue)
}
}
}
val onCompactFilterProcessed: OnCompactFilterConnected = {
case filters: Vector[CompactFilterDb] =>
val isIBDF = chainApi.isIBD()
val emitBlockProccessedWhileIBDOnGoing =
chainAppConfig.ibdBlockProcessedEvents
isIBDF.flatMap { isIBD =>
if (isIBD && !emitBlockProccessedWhileIBDOnGoing) {
val notifications = CompactFilterProcessedNotification(filters.last)
sendHeadersToWs(Vector(notifications), queue)
} else {
val notifications =
filters.map(CompactFilterProcessedNotification(_))
sendHeadersToWs(notifications, queue)
}
}
}
val onSyncFlagChanged: OnSyncFlagChanged = { syncing =>
val notification = ChainNotification.SyncFlagChangedNotification(syncing)
for {
@ -102,7 +143,10 @@ object WebsocketUtil extends Logging {
}
ChainCallbacks.onBlockHeaderConnected(onBlockProcessed) +
ChainCallbacks.onOnSyncFlagChanged(onSyncFlagChanged)
ChainCallbacks.onOnSyncFlagChanged(onSyncFlagChanged) +
ChainCallbacks.onCompactFilterHeaderConnected(
onCompactFilterHeaderProcessed) +
ChainCallbacks.onCompactFilterConnected(onCompactFilterProcessed)
}
/** Builds websocket callbacks for the wallet */

View File

@ -0,0 +1,139 @@
package org.bitcoins.chain
import org.bitcoins.chain.blockchain.ChainHandler
import org.bitcoins.core.api.chain.db.{CompactFilterDb, CompactFilterHeaderDb}
import org.bitcoins.core.gcs.FilterType
import org.bitcoins.core.p2p.CompactFilterMessage
import org.bitcoins.core.protocol.blockchain.BlockHeader
import org.bitcoins.crypto.{CryptoUtil, DoubleSha256DigestBE}
import org.bitcoins.testkit.chain.{
BlockHeaderHelper,
ChainDbUnitTest,
ChainUnitTest
}
import org.scalatest.FutureOutcome
import scodec.bits.ByteVector
import scala.concurrent.{Future, Promise}
class ChainCallbacksTest extends ChainDbUnitTest {
override type FixtureParam = ChainHandler
override def withFixture(test: OneArgAsyncTest): FutureOutcome =
withChainHandlerGenesisFilter(test)
it must "process a new valid block header with a callback" in {
chainHandler: ChainHandler =>
val resultP: Promise[Boolean] = Promise()
val callback: OnBlockHeaderConnected = {
case _: Vector[(Int, BlockHeader)] => {
Future {
resultP.success(true)
()
}
}
}
val callbacks = ChainCallbacks.onBlockHeaderConnected(callback)
chainHandler.chainConfig.addCallbacks(callbacks)
val newValidHeader =
BlockHeaderHelper.buildNextHeader(ChainUnitTest.genesisHeaderDb)
for {
_ <- chainHandler.processHeader(newValidHeader.blockHeader)
result <- resultP.future
} yield assert(result)
}
it must "process a new valid compact filter header with a callback" in {
chainHandler: ChainHandler =>
val resultP: Promise[Boolean] = Promise()
val callback: OnCompactFilterHeaderConnected = {
case _: Vector[CompactFilterHeaderDb] => {
Future {
resultP.success(true)
()
}
}
}
val callbacks = ChainCallbacks.onCompactFilterHeaderConnected(callback)
chainHandler.chainConfig.addCallbacks(callbacks)
val newValidHeader =
BlockHeaderHelper.buildNextHeader(ChainUnitTest.genesisHeaderDb)
val nextCompactFilterHeaderDb =
CompactFilterHeaderDb(
hashBE = DoubleSha256DigestBE.fromHex(
"000102030405060708090a0b0c0d0e0f000102030405060708090a0b0c0d0e0f"),
previousFilterHeaderBE = ChainUnitTest.genesisFilterHeaderDb.hashBE,
height = 1,
filterHashBE = DoubleSha256DigestBE.fromHex(
"555152535455565758595a5b5c5d5e5f555152535455565758595a5b5c5d5e5f"),
blockHashBE = newValidHeader.hashBE
)
for {
_ <- chainHandler.processHeader(newValidHeader.blockHeader)
_ <- chainHandler.processFilterHeader(
nextCompactFilterHeaderDb.filterHeader,
nextCompactFilterHeaderDb.blockHashBE)
result <- resultP.future
} yield assert(result)
}
it must "process a new valid compact filter with a callback" in {
chainHandler: ChainHandler =>
val resultP: Promise[Boolean] = Promise()
val callback: OnCompactFilterConnected = {
case _: Vector[CompactFilterDb] => {
Future {
resultP.success(true)
()
}
}
}
val callbacks = ChainCallbacks.onCompactFilterConnected(callback)
chainHandler.chainConfig.addCallbacks(callbacks)
val newValidHeader =
BlockHeaderHelper.buildNextHeader(ChainUnitTest.genesisHeaderDb)
val bytes = ByteVector(scala.util.Random.nextBytes(32))
val hashBE = CryptoUtil.doubleSHA256(bytes)
val nextCompactFilterHeaderDb =
CompactFilterHeaderDb(
hashBE = DoubleSha256DigestBE.fromHex(
"000102030405060708090a0b0c0d0e0f000102030405060708090a0b0c0d0e0f"),
previousFilterHeaderBE = ChainUnitTest.genesisFilterHeaderDb.hashBE,
height = 1,
filterHashBE = hashBE.flip,
blockHashBE = newValidHeader.hashBE
)
val nextCompactFilter = CompactFilterDb(
hashBE = hashBE.flip,
filterType = FilterType.Basic,
bytes = bytes,
height = 1,
blockHashBE = nextCompactFilterHeaderDb.blockHashBE
)
val filterMessage =
CompactFilterMessage(nextCompactFilter.blockHashBE.flip,
nextCompactFilter.golombFilter)
for {
_ <- chainHandler.processHeader(newValidHeader.blockHeader)
_ <- chainHandler.processFilterHeader(
nextCompactFilterHeaderDb.filterHeader,
nextCompactFilterHeaderDb.blockHashBE)
_ <- chainHandler.processFilter(filterMessage)
result <- resultP.future
} yield assert(result)
}
}

View File

@ -2,11 +2,7 @@ package org.bitcoins.chain.blockchain
import org.bitcoins.asyncutil.AsyncUtil
import org.bitcoins.chain.pow.Pow
import org.bitcoins.chain.{
ChainCallbacks,
OnBlockHeaderConnected,
OnSyncFlagChanged
}
import org.bitcoins.chain.{ChainCallbacks, OnSyncFlagChanged}
import org.bitcoins.core.api.chain.ChainApi
import org.bitcoins.core.api.chain.ChainQueryApi.FilterResponse
import org.bitcoins.core.api.chain.db.{
@ -36,7 +32,7 @@ import org.bitcoins.testkit.util.FileUtil
import org.scalatest.{Assertion, FutureOutcome}
import play.api.libs.json.Json
import scala.concurrent.{Future, Promise}
import scala.concurrent.Future
class ChainHandlerTest extends ChainDbUnitTest {
@ -673,31 +669,6 @@ class ChainHandlerTest extends ChainDbUnitTest {
} yield assert(isMissingLast100)
}
it must "process a new valid block header with a callback" in {
chainHandler: ChainHandler =>
val resultP: Promise[Boolean] = Promise()
val callback: OnBlockHeaderConnected = {
case _: Vector[(Int, BlockHeader)] => {
Future {
resultP.success(true)
()
}
}
}
val callbacks = ChainCallbacks.onBlockHeaderConnected(callback)
chainHandler.chainConfig.addCallbacks(callbacks)
val newValidHeader =
BlockHeaderHelper.buildNextHeader(ChainUnitTest.genesisHeaderDb)
for {
_ <- chainHandler.processHeader(newValidHeader.blockHeader)
result <- resultP.future
} yield assert(result)
}
it must "get best filter" in { chainHandler: ChainHandler =>
val bestFilterOptF = chainHandler.getBestFilter()
val bestFilterHeaderOptF = chainHandler.getBestFilterHeader()

View File

@ -1,6 +1,7 @@
package org.bitcoins.chain
import org.bitcoins.core.api.callback.{CallbackFactory, ModuleCallbacks}
import org.bitcoins.core.api.chain.db.{CompactFilterDb, CompactFilterHeaderDb}
import org.bitcoins.core.api.{Callback, CallbackHandler}
import org.bitcoins.core.protocol.blockchain.BlockHeader
import slick.util.Logging
@ -13,6 +14,14 @@ trait ChainCallbacks extends ModuleCallbacks[ChainCallbacks] with Logging {
Vector[(Int, BlockHeader)],
OnBlockHeaderConnected]
def onCompactFilterHeaderConnected: CallbackHandler[
Vector[CompactFilterHeaderDb],
OnCompactFilterHeaderConnected]
def onCompactFilterConnected: CallbackHandler[
Vector[CompactFilterDb],
OnCompactFilterConnected]
def onSyncFlagChanged: CallbackHandler[Boolean, OnSyncFlagChanged]
override def +(other: ChainCallbacks): ChainCallbacks
@ -29,6 +38,28 @@ trait ChainCallbacks extends ModuleCallbacks[ChainCallbacks] with Logging {
err))
}
def executeOnCompactFilterHeaderConnectedCallbacks(
filterHeaders: Vector[CompactFilterHeaderDb])(implicit
ec: ExecutionContext): Future[Unit] = {
onCompactFilterHeaderConnected.execute(
filterHeaders,
(err: Throwable) =>
logger.error(
s"${onCompactFilterHeaderConnected.name} Callback failed with err",
err))
}
def executeOnCompactFilterConnectedCallbacks(
filters: Vector[CompactFilterDb])(implicit
ec: ExecutionContext): Future[Unit] = {
onCompactFilterConnected.execute(
filters,
(err: Throwable) =>
logger.error(
s"${onCompactFilterConnected.name} Callback failed with err",
err))
}
def executeOnSyncFlagChanged(syncing: Boolean)(implicit
ec: ExecutionContext): Future[Unit] = {
onSyncFlagChanged.execute(
@ -43,6 +74,11 @@ trait ChainCallbacks extends ModuleCallbacks[ChainCallbacks] with Logging {
/** Callback for handling a received block header */
trait OnBlockHeaderConnected extends Callback[Vector[(Int, BlockHeader)]]
trait OnCompactFilterHeaderConnected
extends Callback[Vector[CompactFilterHeaderDb]]
trait OnCompactFilterConnected extends Callback[Vector[CompactFilterDb]]
trait OnSyncFlagChanged extends Callback[Boolean]
object ChainCallbacks extends CallbackFactory[ChainCallbacks] {
@ -51,19 +87,40 @@ object ChainCallbacks extends CallbackFactory[ChainCallbacks] {
onBlockHeaderConnected: CallbackHandler[
Vector[(Int, BlockHeader)],
OnBlockHeaderConnected],
onCompactFilterHeaderConnected: CallbackHandler[
Vector[CompactFilterHeaderDb],
OnCompactFilterHeaderConnected],
onCompactFilterConnected: CallbackHandler[
Vector[CompactFilterDb],
OnCompactFilterConnected],
onSyncFlagChanged: CallbackHandler[Boolean, OnSyncFlagChanged])
extends ChainCallbacks {
override def +(other: ChainCallbacks): ChainCallbacks =
copy(onBlockHeaderConnected =
onBlockHeaderConnected ++ other.onBlockHeaderConnected,
onSyncFlagChanged = onSyncFlagChanged ++ other.onSyncFlagChanged)
copy(
onBlockHeaderConnected =
onBlockHeaderConnected ++ other.onBlockHeaderConnected,
onCompactFilterHeaderConnected =
onCompactFilterHeaderConnected ++ other.onCompactFilterHeaderConnected,
onCompactFilterConnected =
onCompactFilterConnected ++ other.onCompactFilterConnected,
onSyncFlagChanged = onSyncFlagChanged ++ other.onSyncFlagChanged
)
}
/** Constructs a set of callbacks that only acts on block headers connected */
def onBlockHeaderConnected(f: OnBlockHeaderConnected): ChainCallbacks =
ChainCallbacks(onBlockHeaderConnected = Vector(f))
def onCompactFilterHeaderConnected(
f: OnCompactFilterHeaderConnected): ChainCallbacks = {
ChainCallbacks(onCompactFilterHeaderConnected = Vector(f))
}
def onCompactFilterConnected(f: OnCompactFilterConnected): ChainCallbacks = {
ChainCallbacks(onCompactFilterConnected = Vector(f))
}
def onOnSyncFlagChanged(f: OnSyncFlagChanged): ChainCallbacks =
ChainCallbacks(onSyncFlagChanged = Vector(f))
@ -72,6 +129,9 @@ object ChainCallbacks extends CallbackFactory[ChainCallbacks] {
def apply(
onBlockHeaderConnected: Vector[OnBlockHeaderConnected] = Vector.empty,
onCompactFilterHeaderConnected: Vector[OnCompactFilterHeaderConnected] =
Vector.empty,
onCompactFilterConnected: Vector[OnCompactFilterConnected] = Vector.empty,
onSyncFlagChanged: Vector[OnSyncFlagChanged] =
Vector.empty): ChainCallbacks =
ChainCallbacksImpl(
@ -79,6 +139,15 @@ object ChainCallbacks extends CallbackFactory[ChainCallbacks] {
CallbackHandler[Vector[(Int, BlockHeader)], OnBlockHeaderConnected](
"onBlockHeaderConnected",
onBlockHeaderConnected),
onCompactFilterHeaderConnected =
CallbackHandler[Vector[CompactFilterHeaderDb],
OnCompactFilterHeaderConnected](
"onCompactFilterHeaderConnected",
onCompactFilterHeaderConnected),
onCompactFilterConnected =
CallbackHandler[Vector[CompactFilterDb], OnCompactFilterConnected](
"onCompactFilterConnected",
onCompactFilterConnected),
onSyncFlagChanged =
CallbackHandler[Boolean, OnSyncFlagChanged]("onSyncFlagChanged",
onSyncFlagChanged)

View File

@ -352,6 +352,8 @@ class ChainHandler(
filterHeadersToCreate <- filterHeadersToCreateF
_ <- verifyFilterHeaders(filterHeadersToCreate)
_ <- filterHeaderDAO.createAll(filterHeadersToCreate)
_ <- chainConfig.callBacks.executeOnCompactFilterHeaderConnectedCallbacks(
filterHeadersToCreate)
} yield {
val minHeightOpt = filterHeadersToCreate.minByOption(_.height)
val maxHeightOpt = filterHeadersToCreate.maxByOption(_.height)
@ -398,6 +400,8 @@ class ChainHandler(
}
}
_ <- filterDAO.createAll(compactFilterDbs)
_ <- chainConfig.callBacks.executeOnCompactFilterConnectedCallbacks(
compactFilterDbs)
} yield {
val minHeightOpt = compactFilterDbs.minByOption(_.height)
val maxHeightOpt = compactFilterDbs.maxByOption(_.height)

View File

@ -1,7 +1,7 @@
package org.bitcoins.core.gcs
import org.bitcoins.core.number.{UInt64, UInt8}
import org.bitcoins.crypto.{Factory, NetworkElement}
import org.bitcoins.crypto.{Factory, NetworkElement, StringFactory}
import scodec.bits._
/** Filter types for BIP158 block content filters
@ -13,7 +13,7 @@ sealed abstract class FilterType extends NetworkElement {
val P: UInt8
}
object FilterType extends Factory[FilterType] {
object FilterType extends Factory[FilterType] with StringFactory[FilterType] {
val knownFilterTypes: Map[FilterType, Short] = Map(Basic -> 0.toShort)
@ -50,4 +50,18 @@ object FilterType extends Factory[FilterType] {
throw new IllegalArgumentException(s"Unknown filter type code: ${code}")
}
override def fromString(string: String): FilterType = {
fromStringOpt(string) match {
case Some(filterType) => filterType
case None =>
sys.error(s"Could not parse string=$string to known filter type")
}
}
override def fromStringOpt(string: String): Option[FilterType] = {
knownFilterTypes
.map(_._1)
.find(_.toString.toLowerCase == string.toLowerCase)
}
}

View File

@ -41,6 +41,10 @@ object PicklerKeys {
final val theirPayout: String = "theirPayout"
final val pnl: String = "pnl"
final val rateOfReturn: String = "rateOfReturn"
final val previousFilterHeaderKey: String = "previousFilterHeader"
final val filterHashKey: String = "filterHash"
final val filterTypeKey: String = "filterType"
final val compactFilterBytesKey: String = "compactFilterBytes"
final val networkKey: String = "network"