mirror of
https://github.com/bitcoin-s/bitcoin-s.git
synced 2025-02-22 22:36:34 +01:00
Callback rework (#1542)
This commit is contained in:
parent
027bd39474
commit
1731bd3163
16 changed files with 278 additions and 168 deletions
|
@ -22,8 +22,7 @@ import org.bitcoins.keymanager.KeyManagerInitializeError
|
|||
import org.bitcoins.keymanager.bip39.{BIP39KeyManager, BIP39LockedKeyManager}
|
||||
import org.bitcoins.node.config.NodeAppConfig
|
||||
import org.bitcoins.node.models.Peer
|
||||
import org.bitcoins.node.networking.peer.DataMessageHandler
|
||||
import org.bitcoins.node.{NeutrinoNode, Node, NodeCallbacks, SpvNode}
|
||||
import org.bitcoins.node._
|
||||
import org.bitcoins.wallet.Wallet
|
||||
import org.bitcoins.wallet.api._
|
||||
import org.bitcoins.wallet.config.WalletAppConfig
|
||||
|
@ -237,7 +236,6 @@ object Main extends App with BitcoinSLogger {
|
|||
private def createCallbacks(wallet: WalletApi)(
|
||||
implicit nodeConf: NodeAppConfig,
|
||||
ec: ExecutionContext): Future[NodeCallbacks] = {
|
||||
import DataMessageHandler._
|
||||
lazy val onTx: OnTxReceived = { tx =>
|
||||
wallet.processTransaction(tx, blockHash = None).map(_ => ())
|
||||
}
|
||||
|
@ -258,13 +256,13 @@ object Main extends App with BitcoinSLogger {
|
|||
}
|
||||
if (nodeConf.isSPVEnabled) {
|
||||
Future.successful(
|
||||
NodeCallbacks(onTxReceived = Seq(onTx),
|
||||
onBlockHeadersReceived = Seq(onHeaders)))
|
||||
NodeCallbacks(onTxReceived = Vector(onTx),
|
||||
onBlockHeadersReceived = Vector(onHeaders)))
|
||||
} else if (nodeConf.isNeutrinoEnabled) {
|
||||
Future.successful(
|
||||
NodeCallbacks(onBlockReceived = Seq(onBlock),
|
||||
onCompactFiltersReceived = Seq(onCompactFilters),
|
||||
onBlockHeadersReceived = Seq(onHeaders)))
|
||||
NodeCallbacks(onBlockReceived = Vector(onBlock),
|
||||
onCompactFiltersReceived = Vector(onCompactFilters),
|
||||
onBlockHeadersReceived = Vector(onHeaders)))
|
||||
} else {
|
||||
Future.failed(new RuntimeException("Unexpected node type"))
|
||||
}
|
||||
|
|
|
@ -0,0 +1,36 @@
|
|||
package org.bitcoins.core.api
|
||||
|
||||
import org.bitcoins.core.util.FutureUtil
|
||||
import org.bitcoins.testkit.util.BitcoinSAsyncTest
|
||||
import org.scalatest.Assertion
|
||||
|
||||
import scala.concurrent.Promise
|
||||
import scala.concurrent.duration._
|
||||
import scala.util.Success
|
||||
|
||||
class CallbackTest extends BitcoinSAsyncTest {
|
||||
|
||||
val testTimeout: FiniteDuration = 10.seconds
|
||||
|
||||
it must "show callbacks being blocked" in {
|
||||
val promise = Promise[Assertion]()
|
||||
|
||||
val f1: Callback[Unit] = _ => {
|
||||
Thread.sleep(testTimeout.toMillis)
|
||||
promise.complete(fail("2nd callback did not start before timeout"))
|
||||
FutureUtil.unit
|
||||
}
|
||||
val f2: Callback[Unit] = _ => {
|
||||
promise.complete(Success(succeed))
|
||||
FutureUtil.unit
|
||||
}
|
||||
val handler =
|
||||
CallbackHandler[Unit, Callback[Unit]](name = "name", Vector(f1, f2))
|
||||
|
||||
// Start execution of callbacks
|
||||
handler.execute(())
|
||||
|
||||
// Return result of the callbacks, f2 should complete first
|
||||
promise.future
|
||||
}
|
||||
}
|
65
core/src/main/scala/org/bitcoins/core/api/Callback.scala
Normal file
65
core/src/main/scala/org/bitcoins/core/api/Callback.scala
Normal file
|
@ -0,0 +1,65 @@
|
|||
package org.bitcoins.core.api
|
||||
|
||||
import org.bitcoins.core.util.SeqWrapper
|
||||
import org.slf4j.Logger
|
||||
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
/** A function to be called in response to an event */
|
||||
trait Callback[T] {
|
||||
def apply(param: T): Future[Unit]
|
||||
}
|
||||
|
||||
/** A function with two parameters to be called in response to an event */
|
||||
trait Callback2[T1, T2] extends Callback[(T1, T2)] {
|
||||
def apply(param1: T1, param2: T2): Future[Unit]
|
||||
|
||||
override def apply(param: (T1, T2)): Future[Unit] = apply(param._1, param._2)
|
||||
}
|
||||
|
||||
/** A function with three parameters to be called in response to an event */
|
||||
trait Callback3[T1, T2, T3] extends Callback[(T1, T2, T3)] {
|
||||
def apply(param1: T1, param2: T2, param3: T3): Future[Unit]
|
||||
|
||||
override def apply(param: (T1, T2, T3)): Future[Unit] =
|
||||
apply(param._1, param._2, param._3)
|
||||
}
|
||||
|
||||
/** Manages a set of callbacks, should be used to manage execution and logging if needed */
|
||||
case class CallbackHandler[C, T <: Callback[C]](
|
||||
name: String,
|
||||
override val wrapped: IndexedSeq[T])
|
||||
extends SeqWrapper[T] {
|
||||
|
||||
def ++(other: CallbackHandler[C, T]): CallbackHandler[C, T] = {
|
||||
require(name == other.name,
|
||||
"Cannot combine callback handlers with different names")
|
||||
CallbackHandler(name, wrapped ++ other.wrapped)
|
||||
}
|
||||
|
||||
/** Executes the callbacks synchronously, if any fail, they are recovered by recoverFunc */
|
||||
def execute(param: C, recoverFunc: Throwable => Unit = _ => ())(
|
||||
implicit ec: ExecutionContext): Future[Unit] = {
|
||||
val executeFs = wrapped.map { callback =>
|
||||
// Need to wrap in another future so they are all started at once
|
||||
// and do not block each other
|
||||
Future {
|
||||
callback(param).recover {
|
||||
case NonFatal(err) =>
|
||||
recoverFunc(err)
|
||||
}
|
||||
}.flatten
|
||||
}
|
||||
|
||||
Future.sequence(executeFs).map(_ => ())
|
||||
}
|
||||
|
||||
/** Executes the callbacks synchronously, Failures are logged */
|
||||
def execute(logger: Logger, param: C)(
|
||||
implicit ec: ExecutionContext): Future[Unit] = {
|
||||
val recoverFunc = (err: Throwable) =>
|
||||
logger.error(s"$name Callback failed with error: ", err)
|
||||
execute(param, recoverFunc)
|
||||
}
|
||||
}
|
|
@ -15,3 +15,29 @@ trait MapWrapper[K, +T] extends Map[K, T] {
|
|||
override def updated[V1 >: T](key: K, value: V1): Map[K, V1] =
|
||||
wrapped.updated(key, value)
|
||||
}
|
||||
|
||||
class Mutable[A](initialValue: A) {
|
||||
private val lock = new java.util.concurrent.locks.ReentrantReadWriteLock()
|
||||
|
||||
private var value: A = initialValue
|
||||
|
||||
def atomicGet: A = {
|
||||
lock.readLock().lock()
|
||||
try value
|
||||
finally lock.readLock().unlock()
|
||||
}
|
||||
|
||||
def atomicSet(f: => A): Unit = {
|
||||
lock.writeLock().lock()
|
||||
try value = f
|
||||
finally lock.writeLock().unlock()
|
||||
}
|
||||
|
||||
def atomicUpdate[B](b: B)(update: (A, B) => A): A = {
|
||||
lock.writeLock().lock()
|
||||
try {
|
||||
value = update(value, b)
|
||||
value
|
||||
} finally lock.writeLock().unlock()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -116,7 +116,7 @@ val startedNodeF = nodeF.flatMap(_.start())
|
|||
|
||||
//let's make a simple callback that print's the
|
||||
//blockhash everytime we receive a block on the network
|
||||
val blockReceivedFunc = { block: Block =>
|
||||
val blockReceivedFunc: OnBlockReceived = { block: Block =>
|
||||
Future.successful(
|
||||
println(s"Received blockhash=${block.blockHeader.hashBE}"))
|
||||
}
|
||||
|
|
|
@ -15,7 +15,7 @@ import org.bitcoins.core.protocol.transaction.Transaction
|
|||
import org.bitcoins.core.wallet.fee._
|
||||
import org.bitcoins.feeprovider._
|
||||
import org.bitcoins.keymanager.bip39.BIP39KeyManager
|
||||
import org.bitcoins.node.NodeCallbacks
|
||||
import org.bitcoins.node._
|
||||
import org.bitcoins.node.networking.peer.DataMessageHandler._
|
||||
import org.bitcoins.rpc.client.v19.BitcoindV19RpcClient
|
||||
import org.bitcoins.rpc.config.BitcoindInstance
|
||||
|
@ -107,9 +107,9 @@ def createCallbacks(
|
|||
lazy val onBlock: OnBlockReceived = { block =>
|
||||
processBlock(block)
|
||||
}
|
||||
NodeCallbacks(onTxReceived = Seq(onTx),
|
||||
onBlockReceived = Seq(onBlock),
|
||||
onCompactFiltersReceived = Seq(onCompactFilters))
|
||||
NodeCallbacks(onTxReceived = Vector(onTx),
|
||||
onBlockReceived = Vector(onBlock),
|
||||
onCompactFiltersReceived = Vector(onCompactFilters))
|
||||
}
|
||||
|
||||
// Here is a super simple example of a callback, this could be replaced with anything, from
|
||||
|
|
|
@ -12,7 +12,7 @@ import org.bitcoins.core.protocol.transaction.Transaction
|
|||
import org.bitcoins.core.wallet.fee._
|
||||
import org.bitcoins.feeprovider._
|
||||
import org.bitcoins.keymanager.bip39.BIP39KeyManager
|
||||
import org.bitcoins.node.NodeCallbacks
|
||||
import org.bitcoins.node._
|
||||
import org.bitcoins.node.networking.peer.DataMessageHandler._
|
||||
import org.bitcoins.rpc.client.v19.BitcoindV19RpcClient
|
||||
import org.bitcoins.rpc.config.BitcoindInstance
|
||||
|
@ -77,7 +77,7 @@ def createCallback(processBlock: Block => Future[Unit]): NodeCallbacks = {
|
|||
lazy val onBlock: OnBlockReceived = { block =>
|
||||
processBlock(block)
|
||||
}
|
||||
NodeCallbacks(onBlockReceived = Seq(onBlock))
|
||||
NodeCallbacks(onBlockReceived = Vector(onBlock))
|
||||
}
|
||||
|
||||
// Here is a super simple example of a callback, this could be replaced with anything, from
|
||||
|
|
|
@ -3,8 +3,6 @@ package org.bitcoins.node
|
|||
import org.bitcoins.core.currency._
|
||||
import org.bitcoins.core.util.EnvUtil
|
||||
import org.bitcoins.core.wallet.fee.SatoshisPerByte
|
||||
import org.bitcoins.node.networking.peer.DataMessageHandler
|
||||
import org.bitcoins.node.networking.peer.DataMessageHandler.OnCompactFiltersReceived
|
||||
import org.bitcoins.rpc.client.common.BitcoindVersion
|
||||
import org.bitcoins.rpc.util.AsyncUtil
|
||||
import org.bitcoins.server.BitcoinSAppConfig
|
||||
|
@ -55,7 +53,7 @@ class NeutrinoNodeWithWalletTest extends NodeUnitTest {
|
|||
val TestFees = 2240.sats
|
||||
|
||||
def callbacks: NodeCallbacks = {
|
||||
val onBlock: DataMessageHandler.OnBlockReceived = { block =>
|
||||
val onBlock: OnBlockReceived = { block =>
|
||||
for {
|
||||
wallet <- walletF
|
||||
_ <- wallet.processBlock(block)
|
||||
|
@ -69,8 +67,8 @@ class NeutrinoNodeWithWalletTest extends NodeUnitTest {
|
|||
}
|
||||
|
||||
NodeCallbacks(
|
||||
onBlockReceived = Seq(onBlock),
|
||||
onCompactFiltersReceived = Seq(onCompactFilters)
|
||||
onBlockReceived = Vector(onBlock),
|
||||
onCompactFiltersReceived = Vector(onCompactFilters)
|
||||
)
|
||||
}
|
||||
|
||||
|
|
|
@ -3,7 +3,6 @@ package org.bitcoins.node
|
|||
import akka.actor.Cancellable
|
||||
import org.bitcoins.core.currency._
|
||||
import org.bitcoins.crypto.{DoubleSha256Digest, DoubleSha256DigestBE}
|
||||
import org.bitcoins.node.networking.peer.DataMessageHandler
|
||||
import org.bitcoins.server.BitcoinSAppConfig
|
||||
import org.bitcoins.testkit.BitcoinSTestAppConfig
|
||||
import org.bitcoins.testkit.node.NodeUnitTest.SpvNodeFundedWalletBitcoind
|
||||
|
@ -38,7 +37,7 @@ class SpvNodeWithWalletTest extends NodeUnitTest {
|
|||
val amountFromBitcoind = 1.bitcoin
|
||||
|
||||
def callbacks: NodeCallbacks = {
|
||||
val onTx: DataMessageHandler.OnTxReceived = { tx =>
|
||||
val onTx: OnTxReceived = { tx =>
|
||||
for {
|
||||
expectedTxId <- expectedTxIdF
|
||||
wallet <- walletF
|
||||
|
@ -56,7 +55,7 @@ class SpvNodeWithWalletTest extends NodeUnitTest {
|
|||
}
|
||||
}
|
||||
NodeCallbacks(
|
||||
onTxReceived = Seq(onTx)
|
||||
onTxReceived = Vector(onTx)
|
||||
)
|
||||
}
|
||||
|
||||
|
|
|
@ -48,7 +48,7 @@ class UpdateBloomFilterTest extends NodeUnitTest with BeforeAndAfter {
|
|||
// confirmed
|
||||
private val txFromWalletP: Promise[Transaction] = Promise()
|
||||
|
||||
def addressCallback: DataMessageHandler.OnTxReceived = { tx: Transaction =>
|
||||
def addressCallback: OnTxReceived = { tx: Transaction =>
|
||||
// we check if any of the addresses in the TX
|
||||
// pays to our wallet address
|
||||
for {
|
||||
|
@ -59,10 +59,11 @@ class UpdateBloomFilterTest extends NodeUnitTest with BeforeAndAfter {
|
|||
if (result) {
|
||||
assertionP.success(true)
|
||||
}
|
||||
()
|
||||
}
|
||||
}
|
||||
|
||||
def txCallback: DataMessageHandler.OnMerkleBlockReceived = {
|
||||
def txCallback: OnMerkleBlockReceived = {
|
||||
(_: MerkleBlock, txs: Vector[Transaction]) =>
|
||||
{
|
||||
txFromWalletP.future
|
||||
|
@ -70,6 +71,7 @@ class UpdateBloomFilterTest extends NodeUnitTest with BeforeAndAfter {
|
|||
if (txs.contains(tx)) {
|
||||
assertionP.success(true)
|
||||
}
|
||||
()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,7 +4,7 @@ import _root_.org.scalatest.compatible.Assertion
|
|||
import org.bitcoins.core.protocol.blockchain.{Block, MerkleBlock}
|
||||
import org.bitcoins.core.protocol.transaction.Transaction
|
||||
import org.bitcoins.core.util.FutureUtil
|
||||
import org.bitcoins.node.NodeCallbacks
|
||||
import org.bitcoins.node.{NodeCallbacks, OnMerkleBlockReceived}
|
||||
import org.bitcoins.node.config.NodeAppConfig
|
||||
import org.bitcoins.testkit.BitcoinSTestAppConfig
|
||||
import org.bitcoins.testkit.core.gen.{
|
||||
|
@ -37,16 +37,14 @@ class MerkleBuffersTest extends BitcoinSAsyncTest {
|
|||
case (txs, otherTxs, block) =>
|
||||
var receivedExpectedTXs: Option[Try[Assertion]] = None
|
||||
var callbackCount: Int = 0
|
||||
val callback: DataMessageHandler.OnMerkleBlockReceived = {
|
||||
(_, merkleTxs) =>
|
||||
receivedExpectedTXs = Some(
|
||||
Try(
|
||||
assert(txs == merkleTxs,
|
||||
val callback: OnMerkleBlockReceived = { (_, merkleTxs) =>
|
||||
receivedExpectedTXs = Some(
|
||||
Try(assert(txs == merkleTxs,
|
||||
"Received TXs in callback was not the ones we put in")))
|
||||
callbackCount = callbackCount + 1
|
||||
FutureUtil.unit
|
||||
callbackCount = callbackCount + 1
|
||||
FutureUtil.unit
|
||||
}
|
||||
val callbacks = NodeCallbacks(onMerkleBlockReceived = Seq(callback))
|
||||
val callbacks = NodeCallbacks(onMerkleBlockReceived = Vector(callback))
|
||||
|
||||
val merkle = MerkleBlock(block, txs.map(_.txId))
|
||||
val _ = MerkleBuffers.putMerkle(merkle)
|
||||
|
|
|
@ -3,16 +3,27 @@ package org.bitcoins.node
|
|||
import akka.actor.ActorSystem
|
||||
import org.bitcoins.chain.blockchain.ChainHandler
|
||||
import org.bitcoins.chain.config.ChainAppConfig
|
||||
import org.bitcoins.chain.models.{BlockHeaderDAO, CompactFilterDAO, CompactFilterHeaderDAO}
|
||||
import org.bitcoins.chain.models.{
|
||||
BlockHeaderDAO,
|
||||
CompactFilterDAO,
|
||||
CompactFilterHeaderDAO
|
||||
}
|
||||
import org.bitcoins.core.api.{ChainQueryApi, NodeApi}
|
||||
import org.bitcoins.core.p2p.{NetworkPayload, TypeIdentifier}
|
||||
import org.bitcoins.core.protocol.transaction.Transaction
|
||||
import org.bitcoins.core.util.Mutable
|
||||
import org.bitcoins.crypto.{DoubleSha256Digest, DoubleSha256DigestBE}
|
||||
import org.bitcoins.node.config.NodeAppConfig
|
||||
import org.bitcoins.node.models.{BroadcastAbleTransaction, BroadcastAbleTransactionDAO, Peer}
|
||||
import org.bitcoins.node.models.{
|
||||
BroadcastAbleTransaction,
|
||||
BroadcastAbleTransactionDAO,
|
||||
Peer
|
||||
}
|
||||
import org.bitcoins.node.networking.P2PClient
|
||||
import org.bitcoins.node.networking.peer.{PeerMessageReceiver, PeerMessageSender}
|
||||
import org.bitcoins.node.util.BitcoinSNodeUtil.Mutable
|
||||
import org.bitcoins.node.networking.peer.{
|
||||
PeerMessageReceiver,
|
||||
PeerMessageSender
|
||||
}
|
||||
import org.bitcoins.rpc.util.AsyncUtil
|
||||
import org.slf4j.Logger
|
||||
|
||||
|
|
|
@ -1,133 +1,166 @@
|
|||
package org.bitcoins.node
|
||||
|
||||
import org.bitcoins.core.api.{Callback, Callback2, CallbackHandler}
|
||||
import org.bitcoins.core.gcs.GolombFilter
|
||||
import org.bitcoins.core.protocol.blockchain.{Block, BlockHeader, MerkleBlock}
|
||||
import org.bitcoins.core.protocol.transaction.Transaction
|
||||
import org.bitcoins.core.util.FutureUtil
|
||||
import org.bitcoins.crypto.DoubleSha256Digest
|
||||
import org.bitcoins.node.networking.peer.DataMessageHandler._
|
||||
import org.slf4j.Logger
|
||||
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
|
||||
/**
|
||||
* Callbacks for responding to events in the SPV node.
|
||||
* The approriate callback is executed whenver the node receives
|
||||
* Callbacks for responding to events in the node.
|
||||
* The appropriate callback is executed whenever the node receives
|
||||
* a `getdata` message matching it.
|
||||
*
|
||||
*/
|
||||
case class NodeCallbacks(
|
||||
onCompactFiltersReceived: Seq[OnCompactFiltersReceived] = Seq.empty,
|
||||
onTxReceived: Seq[OnTxReceived] = Seq.empty,
|
||||
onBlockReceived: Seq[OnBlockReceived] = Seq.empty,
|
||||
onMerkleBlockReceived: Seq[OnMerkleBlockReceived] = Seq.empty,
|
||||
onBlockHeadersReceived: Seq[OnBlockHeadersReceived] = Seq.empty
|
||||
) {
|
||||
trait NodeCallbacks {
|
||||
|
||||
def +(other: NodeCallbacks): NodeCallbacks = copy(
|
||||
onCompactFiltersReceived = onCompactFiltersReceived ++ other.onCompactFiltersReceived,
|
||||
onTxReceived = onTxReceived ++ other.onTxReceived,
|
||||
onBlockReceived = onBlockReceived ++ other.onBlockReceived,
|
||||
onMerkleBlockReceived = onMerkleBlockReceived ++ other.onMerkleBlockReceived,
|
||||
onBlockHeadersReceived = onBlockHeadersReceived ++ other.onBlockHeadersReceived
|
||||
)
|
||||
def onCompactFiltersReceived: CallbackHandler[
|
||||
Vector[(DoubleSha256Digest, GolombFilter)],
|
||||
OnCompactFiltersReceived]
|
||||
|
||||
def onTxReceived: CallbackHandler[Transaction, OnTxReceived]
|
||||
|
||||
def onBlockReceived: CallbackHandler[Block, OnBlockReceived]
|
||||
|
||||
def onMerkleBlockReceived: CallbackHandler[
|
||||
(MerkleBlock, Vector[Transaction]),
|
||||
OnMerkleBlockReceived]
|
||||
|
||||
def onBlockHeadersReceived: CallbackHandler[
|
||||
Vector[BlockHeader],
|
||||
OnBlockHeadersReceived]
|
||||
|
||||
def +(other: NodeCallbacks): NodeCallbacks
|
||||
|
||||
def executeOnTxReceivedCallbacks(logger: Logger, tx: Transaction)(
|
||||
implicit ec: ExecutionContext): Future[Unit] = {
|
||||
onTxReceived
|
||||
.foldLeft(FutureUtil.unit)((acc, callback) =>
|
||||
acc.flatMap(_ =>
|
||||
callback(tx).recover {
|
||||
case err: Throwable =>
|
||||
logger.error("onTxReceived Callback failed with error: ", err)
|
||||
}))
|
||||
onTxReceived.execute(logger, tx)
|
||||
}
|
||||
|
||||
def executeOnBlockReceivedCallbacks(logger: Logger, block: Block)(
|
||||
implicit ec: ExecutionContext): Future[Unit] = {
|
||||
onBlockReceived
|
||||
.foldLeft(FutureUtil.unit)((acc, callback) =>
|
||||
acc.flatMap(_ =>
|
||||
callback(block).recover {
|
||||
case err: Throwable =>
|
||||
logger.error("onBlockReceived Callback failed with error: ", err)
|
||||
}))
|
||||
onBlockReceived.execute(logger, block)
|
||||
}
|
||||
|
||||
def executeOnMerkleBlockReceivedCallbacks(
|
||||
logger: Logger,
|
||||
merkleBlock: MerkleBlock,
|
||||
txs: Vector[Transaction])(implicit ec: ExecutionContext): Future[Unit] = {
|
||||
onMerkleBlockReceived
|
||||
.foldLeft(FutureUtil.unit)((acc, callback) =>
|
||||
acc.flatMap(_ =>
|
||||
callback(merkleBlock, txs).recover {
|
||||
case err: Throwable =>
|
||||
logger.error("OnMerkleBlockReceived Callback failed with error: ",
|
||||
err)
|
||||
}))
|
||||
onMerkleBlockReceived.execute(logger, (merkleBlock, txs))
|
||||
}
|
||||
|
||||
def executeOnCompactFiltersReceivedCallbacks(
|
||||
logger: Logger,
|
||||
blockFilters: Vector[(DoubleSha256Digest, GolombFilter)])(
|
||||
implicit ec: ExecutionContext): Future[Unit] = {
|
||||
onCompactFiltersReceived
|
||||
.foldLeft(FutureUtil.unit)((acc, callback) =>
|
||||
acc.flatMap(_ =>
|
||||
callback(blockFilters).recover {
|
||||
case err: Throwable =>
|
||||
logger.error(
|
||||
"onCompactFiltersReceived Callback failed with error: ",
|
||||
err)
|
||||
}))
|
||||
onCompactFiltersReceived.execute(logger, blockFilters)
|
||||
}
|
||||
|
||||
def executeOnBlockHeadersReceivedCallbacks(
|
||||
logger: Logger,
|
||||
headers: Vector[BlockHeader])(
|
||||
implicit ec: ExecutionContext): Future[Unit] = {
|
||||
onBlockHeadersReceived
|
||||
.foldLeft(FutureUtil.unit)((acc, callback) =>
|
||||
acc.flatMap(_ =>
|
||||
callback(headers).recover {
|
||||
case err: Throwable =>
|
||||
logger.error(
|
||||
"onBlockHeadersReceived Callback failed with error: ",
|
||||
err)
|
||||
}))
|
||||
onBlockHeadersReceived.execute(logger, headers)
|
||||
}
|
||||
}
|
||||
|
||||
/** Callback for handling a received block */
|
||||
trait OnBlockReceived extends Callback[Block]
|
||||
|
||||
/** Callback for handling a received Merkle block with its corresponding TXs */
|
||||
trait OnMerkleBlockReceived extends Callback2[MerkleBlock, Vector[Transaction]]
|
||||
|
||||
/** Callback for handling a received transaction */
|
||||
trait OnTxReceived extends Callback[Transaction]
|
||||
|
||||
/** Callback for handling a received compact block filter */
|
||||
trait OnCompactFiltersReceived
|
||||
extends Callback[Vector[(DoubleSha256Digest, GolombFilter)]]
|
||||
|
||||
/** Callback for handling a received block header */
|
||||
trait OnBlockHeadersReceived extends Callback[Vector[BlockHeader]]
|
||||
|
||||
object NodeCallbacks {
|
||||
|
||||
// Use Impl pattern here to enforce the correct names on the CallbackHandlers
|
||||
private case class NodeCallbacksImpl(
|
||||
onCompactFiltersReceived: CallbackHandler[
|
||||
Vector[(DoubleSha256Digest, GolombFilter)],
|
||||
OnCompactFiltersReceived],
|
||||
onTxReceived: CallbackHandler[Transaction, OnTxReceived],
|
||||
onBlockReceived: CallbackHandler[Block, OnBlockReceived],
|
||||
onMerkleBlockReceived: CallbackHandler[
|
||||
(MerkleBlock, Vector[Transaction]),
|
||||
OnMerkleBlockReceived],
|
||||
onBlockHeadersReceived: CallbackHandler[
|
||||
Vector[BlockHeader],
|
||||
OnBlockHeadersReceived]
|
||||
) extends NodeCallbacks {
|
||||
|
||||
override def +(other: NodeCallbacks): NodeCallbacks =
|
||||
copy(
|
||||
onCompactFiltersReceived = onCompactFiltersReceived ++ other.onCompactFiltersReceived,
|
||||
onTxReceived = onTxReceived ++ other.onTxReceived,
|
||||
onBlockReceived = onBlockReceived ++ other.onBlockReceived,
|
||||
onMerkleBlockReceived = onMerkleBlockReceived ++ other.onMerkleBlockReceived,
|
||||
onBlockHeadersReceived = onBlockHeadersReceived ++ other.onBlockHeadersReceived
|
||||
)
|
||||
}
|
||||
|
||||
/** Constructs a set of callbacks that only acts on TX received */
|
||||
def onTxReceived(f: OnTxReceived): NodeCallbacks =
|
||||
NodeCallbacks(onTxReceived = Seq(f))
|
||||
NodeCallbacks(onTxReceived = Vector(f))
|
||||
|
||||
/** Constructs a set of callbacks that only acts on block received */
|
||||
def onBlockReceived(f: OnBlockReceived): NodeCallbacks =
|
||||
NodeCallbacks(onBlockReceived = Seq(f))
|
||||
NodeCallbacks(onBlockReceived = Vector(f))
|
||||
|
||||
/** Constructs a set of callbacks that only acts on merkle block received */
|
||||
def onMerkleBlockReceived(f: OnMerkleBlockReceived): NodeCallbacks =
|
||||
NodeCallbacks(onMerkleBlockReceived = Seq(f))
|
||||
NodeCallbacks(onMerkleBlockReceived = Vector(f))
|
||||
|
||||
/** Constructs a set of callbacks that only acts on compact filter received */
|
||||
def onCompactFilterReceived(f: OnCompactFiltersReceived): NodeCallbacks =
|
||||
NodeCallbacks(onCompactFiltersReceived = Seq(f))
|
||||
NodeCallbacks(onCompactFiltersReceived = Vector(f))
|
||||
|
||||
/** Constructs a set of callbacks that only acts on block headers received */
|
||||
def onBlockHeadersReceived(f: OnBlockHeadersReceived): NodeCallbacks =
|
||||
NodeCallbacks(onBlockHeadersReceived = Seq(f))
|
||||
NodeCallbacks(onBlockHeadersReceived = Vector(f))
|
||||
|
||||
/** Empty callbacks that does nothing with the received data */
|
||||
val empty: NodeCallbacks =
|
||||
NodeCallbacks(
|
||||
onTxReceived = Seq.empty,
|
||||
onBlockReceived = Seq.empty,
|
||||
onMerkleBlockReceived = Seq.empty,
|
||||
onCompactFiltersReceived = Seq.empty,
|
||||
onBlockHeadersReceived = Seq.empty
|
||||
NodeCallbacks(Vector.empty,
|
||||
Vector.empty,
|
||||
Vector.empty,
|
||||
Vector.empty,
|
||||
Vector.empty)
|
||||
|
||||
def apply(
|
||||
onCompactFiltersReceived: Vector[OnCompactFiltersReceived] = Vector.empty,
|
||||
onTxReceived: Vector[OnTxReceived] = Vector.empty,
|
||||
onBlockReceived: Vector[OnBlockReceived] = Vector.empty,
|
||||
onMerkleBlockReceived: Vector[OnMerkleBlockReceived] = Vector.empty,
|
||||
onBlockHeadersReceived: Vector[OnBlockHeadersReceived] = Vector.empty): NodeCallbacks = {
|
||||
NodeCallbacksImpl(
|
||||
onCompactFiltersReceived =
|
||||
CallbackHandler[Vector[(DoubleSha256Digest, GolombFilter)],
|
||||
OnCompactFiltersReceived]("onCompactFilterReceived",
|
||||
onCompactFiltersReceived),
|
||||
onTxReceived = CallbackHandler[Transaction, OnTxReceived]("onTxReceived",
|
||||
onTxReceived),
|
||||
onBlockReceived =
|
||||
CallbackHandler[Block, OnBlockReceived]("onBlockReceived",
|
||||
onBlockReceived),
|
||||
onMerkleBlockReceived =
|
||||
CallbackHandler[(MerkleBlock, Vector[Transaction]),
|
||||
OnMerkleBlockReceived]("onCompactFilterReceived",
|
||||
onMerkleBlockReceived),
|
||||
onBlockHeadersReceived =
|
||||
CallbackHandler[Vector[BlockHeader], OnBlockHeadersReceived](
|
||||
"onCompactFilterReceived",
|
||||
onBlockHeadersReceived)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,9 +6,9 @@ import org.bitcoins.core.api.ChainQueryApi.FilterResponse
|
|||
import org.bitcoins.core.bloom.BloomFilter
|
||||
import org.bitcoins.core.protocol.transaction.Transaction
|
||||
import org.bitcoins.core.protocol.{BitcoinAddress, BlockStamp}
|
||||
import org.bitcoins.core.util.Mutable
|
||||
import org.bitcoins.node.config.NodeAppConfig
|
||||
import org.bitcoins.node.models.Peer
|
||||
import org.bitcoins.node.util.BitcoinSNodeUtil.Mutable
|
||||
|
||||
import scala.concurrent.Future
|
||||
|
||||
|
|
|
@ -2,12 +2,10 @@ package org.bitcoins.node.networking.peer
|
|||
|
||||
import org.bitcoins.chain.api.ChainApi
|
||||
import org.bitcoins.chain.config.ChainAppConfig
|
||||
import org.bitcoins.core.gcs.{BlockFilter, GolombFilter}
|
||||
import org.bitcoins.core.gcs.BlockFilter
|
||||
import org.bitcoins.core.p2p._
|
||||
import org.bitcoins.core.protocol.blockchain.{Block, BlockHeader, MerkleBlock}
|
||||
import org.bitcoins.core.protocol.transaction.Transaction
|
||||
import org.bitcoins.core.util.FutureUtil
|
||||
import org.bitcoins.crypto.{DoubleSha256Digest, DoubleSha256DigestBE}
|
||||
import org.bitcoins.crypto.DoubleSha256DigestBE
|
||||
import org.bitcoins.node.config.NodeAppConfig
|
||||
import org.bitcoins.node.models.BroadcastAbleTransactionDAO
|
||||
import org.bitcoins.node.{NodeCallbacks, P2PLogger}
|
||||
|
@ -320,27 +318,3 @@ case class DataMessageHandler(
|
|||
|
||||
}
|
||||
}
|
||||
|
||||
object DataMessageHandler {
|
||||
|
||||
/** Callback for handling a received block */
|
||||
type OnBlockReceived = Block => Future[Unit]
|
||||
|
||||
/** Callback for handling a received Merkle block with its corresponding TXs */
|
||||
type OnMerkleBlockReceived =
|
||||
(MerkleBlock, Vector[Transaction]) => Future[Unit]
|
||||
|
||||
/** Callback for handling a received transaction */
|
||||
type OnTxReceived = Transaction => Future[Unit]
|
||||
|
||||
/** Callback for handling a received compact block filter */
|
||||
type OnCompactFiltersReceived =
|
||||
(Vector[(DoubleSha256Digest, GolombFilter)]) => Future[Unit]
|
||||
|
||||
/** Callback for handling a received block header */
|
||||
type OnBlockHeadersReceived = Vector[BlockHeader] => Future[Unit]
|
||||
|
||||
/** Does nothing */
|
||||
def noop[T]: T => Future[Unit] = _ => FutureUtil.unit
|
||||
|
||||
}
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
package org.bitcoins.node.util
|
||||
|
||||
trait BitcoinSNodeUtil {
|
||||
object BitcoinSNodeUtil {
|
||||
|
||||
/**
|
||||
* Creates a unique actor name for a actor
|
||||
|
@ -19,33 +19,3 @@ trait BitcoinSNodeUtil {
|
|||
def createActorName(className: Class[_]): String =
|
||||
createActorName(className.getSimpleName)
|
||||
}
|
||||
|
||||
object BitcoinSNodeUtil extends BitcoinSNodeUtil {
|
||||
|
||||
class Mutable[A](initialValue: A) {
|
||||
private val lock = new java.util.concurrent.locks.ReentrantReadWriteLock()
|
||||
|
||||
private var value: A = initialValue
|
||||
|
||||
def atomicGet: A = {
|
||||
lock.readLock().lock()
|
||||
try value
|
||||
finally lock.readLock().unlock()
|
||||
}
|
||||
|
||||
def atomicSet(f: => A): Unit = {
|
||||
lock.writeLock().lock()
|
||||
try value = f
|
||||
finally lock.writeLock().unlock()
|
||||
}
|
||||
|
||||
def atomicUpdate[B](b: B)(update: (A, B) => A): A = {
|
||||
lock.writeLock().lock()
|
||||
try {
|
||||
value = update(value, b)
|
||||
value
|
||||
} finally lock.writeLock().unlock()
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue