Remove callbacks param from DataMessageHandler & PeerMessageReceiver (#2476)

This commit is contained in:
benthecarman 2021-01-06 13:23:47 -06:00 committed by GitHub
parent 78e28baf3c
commit ecc4532bf7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 50 additions and 59 deletions

View file

@ -8,7 +8,6 @@ import org.bitcoins.core.p2p._
import org.bitcoins.core.protocol.CompactSizeUInt
import org.bitcoins.core.protocol.blockchain.BlockHeader
import org.bitcoins.crypto.{CryptoUtil, DoubleSha256Digest}
import org.bitcoins.node.NodeCallbacks
import org.bitcoins.node.models.Peer
import org.bitcoins.node.networking.peer.PeerMessageReceiver
import org.bitcoins.testkit.async.TestAsyncUtil
@ -172,7 +171,7 @@ class P2PClientTest extends BitcoindRpcTest with CachedBitcoinSAppConfig {
val probe = TestProbe()
val remote = peer.socket
val peerMessageReceiverF =
PeerMessageReceiver.preConnection(peer, NodeCallbacks.empty, None)
PeerMessageReceiver.preConnection(peer, None)
val clientActorF: Future[TestActorRef[P2PClientActor]] =
peerMessageReceiverF.map { peerMsgRecv =>

View file

@ -35,11 +35,9 @@ class DataMessageHandlerTest extends NodeUnitTest {
val callback: OnMerkleBlockReceived = {
(merkle: MerkleBlock, txs: Vector[Transaction]) =>
{
Future {
resultP.success((merkle, txs))
()
}
Future {
resultP.success((merkle, txs))
()
}
}
@ -55,8 +53,9 @@ class DataMessageHandlerTest extends NodeUnitTest {
payload2 = TransactionMessage(tx)
callbacks = NodeCallbacks.onMerkleBlockReceived(callback)
_ = nodeConfig.addCallbacks(callbacks)
dataMessageHandler = DataMessageHandler(genesisChainApi, callbacks)
dataMessageHandler = DataMessageHandler(genesisChainApi)
_ <- dataMessageHandler.handleDataPayload(payload1, sender)
_ <- dataMessageHandler.handleDataPayload(payload2, sender)
result <- resultP.future
@ -84,8 +83,9 @@ class DataMessageHandlerTest extends NodeUnitTest {
payload = BlockMessage(block)
callbacks = NodeCallbacks.onBlockReceived(callback)
_ = nodeConfig.addCallbacks(callbacks)
dataMessageHandler = DataMessageHandler(genesisChainApi, callbacks)
dataMessageHandler = DataMessageHandler(genesisChainApi)
_ <- dataMessageHandler.handleDataPayload(payload, sender)
result <- resultP.future
} yield assert(result == block)
@ -113,8 +113,9 @@ class DataMessageHandlerTest extends NodeUnitTest {
payload = HeadersMessage(CompactSizeUInt.one, Vector(header))
callbacks = NodeCallbacks.onBlockHeadersReceived(callback)
_ = nodeConfig.addCallbacks(callbacks)
dataMessageHandler = DataMessageHandler(genesisChainApi, callbacks)
dataMessageHandler = DataMessageHandler(genesisChainApi)
_ <- dataMessageHandler.handleDataPayload(payload, sender)
result <- resultP.future
} yield assert(result == Vector(header))
@ -128,11 +129,9 @@ class DataMessageHandlerTest extends NodeUnitTest {
Promise()
val callback: OnCompactFiltersReceived = {
(filters: Vector[(DoubleSha256Digest, GolombFilter)]) =>
{
Future {
resultP.success(filters)
()
}
Future {
resultP.success(filters)
()
}
}
for {
@ -145,8 +144,9 @@ class DataMessageHandlerTest extends NodeUnitTest {
CompactFilterMessage(FilterType.Basic, hash.flip, filter.filter.bytes)
callbacks = NodeCallbacks.onCompactFilterReceived(callback)
_ = nodeConfig.addCallbacks(callbacks)
dataMessageHandler = DataMessageHandler(genesisChainApi, callbacks)
dataMessageHandler = DataMessageHandler(genesisChainApi)
_ <- dataMessageHandler.handleDataPayload(payload, sender)
result <- resultP.future
} yield assert(result == Vector((hash.flip, filter.filter)))

View file

@ -76,7 +76,6 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
val peerMsgRecv: PeerMessageReceiver =
PeerMessageReceiver.newReceiver(chainApi = chainApi,
peer = peer,
callbacks = nodeCallbacks,
initialSyncDone = initialSyncDone)
val p2p = P2PClient(context = system,
peer = peer,

View file

@ -9,7 +9,7 @@ import org.bitcoins.core.util.FutureUtil
import org.bitcoins.crypto.DoubleSha256DigestBE
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models.BroadcastAbleTransactionDAO
import org.bitcoins.node.{NodeCallbacks, NodeType, P2PLogger}
import org.bitcoins.node.{NodeType, P2PLogger}
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.Try
@ -23,7 +23,6 @@ import scala.util.Try
*/
case class DataMessageHandler(
chainApi: ChainApi,
callbacks: NodeCallbacks,
initialSyncDone: Option[Promise[Done]] = None,
currentFilterBatch: Vector[CompactFilterMessage] = Vector.empty,
filterHeaderHeightOpt: Option[Int] = None,
@ -126,9 +125,10 @@ case class DataMessageHandler(
logger.debug(s"Processing ${filterBatch.size} filters")
for {
newChainApi <- chainApi.processFilters(filterBatch)
_ <- callbacks.executeOnCompactFiltersReceivedCallbacks(
logger,
blockFilters)
_ <-
appConfig.nodeCallbacks
.executeOnCompactFiltersReceivedCallbacks(logger,
blockFilters)
} yield (Vector.empty, newChainApi)
} else Future.successful((filterBatch, chainApi))
_ <-
@ -245,7 +245,9 @@ case class DataMessageHandler(
for {
newApi <- chainApiF
newSyncing <- getHeadersF
_ <- callbacks.executeOnBlockHeadersReceivedCallbacks(logger, headers)
_ <- appConfig.nodeCallbacks.executeOnBlockHeadersReceivedCallbacks(
logger,
headers)
} yield {
this.copy(chainApi = newApi, syncing = newSyncing)
}
@ -262,9 +264,11 @@ case class DataMessageHandler(
logger.debug("Processing block's header...")
for {
processedApi <- chainApi.processHeader(block.blockHeader)
_ <- callbacks.executeOnBlockHeadersReceivedCallbacks(
logger,
Vector(block.blockHeader))
_ <-
appConfig.nodeCallbacks
.executeOnBlockHeadersReceivedCallbacks(
logger,
Vector(block.blockHeader))
} yield processedApi
} else Future.successful(chainApi)
}
@ -273,24 +277,25 @@ case class DataMessageHandler(
for {
newApi <- newApiF
_ <-
callbacks
appConfig.nodeCallbacks
.executeOnBlockReceivedCallbacks(logger, block)
} yield {
this.copy(chainApi = newApi)
}
case TransactionMessage(tx) =>
MerkleBuffers.putTx(tx, callbacks).flatMap { belongsToMerkle =>
if (belongsToMerkle) {
logger.trace(
s"Transaction=${tx.txIdBE} belongs to merkleblock, not calling callbacks")
Future.successful(this)
} else {
logger.trace(
s"Transaction=${tx.txIdBE} does not belong to merkleblock, processing given callbacks")
callbacks
.executeOnTxReceivedCallbacks(logger, tx)
.map(_ => this)
}
MerkleBuffers.putTx(tx, appConfig.nodeCallbacks).flatMap {
belongsToMerkle =>
if (belongsToMerkle) {
logger.trace(
s"Transaction=${tx.txIdBE} belongs to merkleblock, not calling callbacks")
Future.successful(this)
} else {
logger.trace(
s"Transaction=${tx.txIdBE} does not belong to merkleblock, processing given callbacks")
appConfig.nodeCallbacks
.executeOnTxReceivedCallbacks(logger, tx)
.map(_ => this)
}
}
case MerkleBlockMessage(merkleBlock) =>
MerkleBuffers.putMerkle(merkleBlock)

View file

@ -20,7 +20,7 @@ import org.bitcoins.node.networking.peer.PeerMessageReceiverState.{
Normal,
Preconnection
}
import org.bitcoins.node.{NodeCallbacks, NodeType, P2PLogger}
import org.bitcoins.node.{NodeType, P2PLogger}
import scala.concurrent.{Future, Promise}
@ -34,8 +34,7 @@ import scala.concurrent.{Future, Promise}
class PeerMessageReceiver(
dataMessageHandler: DataMessageHandler,
val state: PeerMessageReceiverState,
peer: Peer,
callbacks: NodeCallbacks
peer: Peer
)(implicit
ref: ActorRefFactory,
nodeAppConfig: NodeAppConfig,
@ -139,7 +138,7 @@ class PeerMessageReceiver(
//else it means we are receiving this data payload from a peer,
//we need to handle it
dataMessageHandler.handleDataPayload(payload, sender).map { handler =>
new PeerMessageReceiver(handler, state, peer, callbacks)
new PeerMessageReceiver(handler, state, peer)
}
}
@ -241,8 +240,7 @@ class PeerMessageReceiver(
new PeerMessageReceiver(
dataMessageHandler = dataMessageHandler,
state = newState,
peer = peer,
callbacks = callbacks
peer = peer
)
}
}
@ -264,18 +262,16 @@ object PeerMessageReceiver {
state: PeerMessageReceiverState,
chainApi: ChainApi,
peer: Peer,
callbacks: NodeCallbacks,
initialSyncDone: Option[Promise[Done]])(implicit
ref: ActorRefFactory,
nodeAppConfig: NodeAppConfig,
chainAppConfig: ChainAppConfig
): PeerMessageReceiver = {
import ref.dispatcher
val dataHandler = DataMessageHandler(chainApi, callbacks, initialSyncDone)
val dataHandler = DataMessageHandler(chainApi, initialSyncDone)
new PeerMessageReceiver(dataMessageHandler = dataHandler,
state = state,
peer = peer,
callbacks = callbacks)
peer = peer)
}
/**
@ -283,10 +279,7 @@ object PeerMessageReceiver {
* to be connected to a peer. This can be given to [[org.bitcoins.node.networking.P2PClient.props() P2PClient]]
* to connect to a peer on the network
*/
def preConnection(
peer: Peer,
callbacks: NodeCallbacks,
initialSyncDone: Option[Promise[Done]])(implicit
def preConnection(peer: Peer, initialSyncDone: Option[Promise[Done]])(implicit
ref: ActorRefFactory,
nodeAppConfig: NodeAppConfig,
chainAppConfig: ChainAppConfig
@ -305,7 +298,6 @@ object PeerMessageReceiver {
PeerMessageReceiver(state = PeerMessageReceiverState.fresh(),
chainApi = chainHandler,
peer = peer,
callbacks = callbacks,
initialSyncDone = initialSyncDone)
}
}
@ -313,7 +305,6 @@ object PeerMessageReceiver {
def newReceiver(
chainApi: ChainApi,
peer: Peer,
callbacks: NodeCallbacks,
initialSyncDone: Option[Promise[Done]])(implicit
nodeAppConfig: NodeAppConfig,
chainAppConfig: ChainAppConfig,
@ -321,7 +312,6 @@ object PeerMessageReceiver {
PeerMessageReceiver(state = PeerMessageReceiverState.fresh(),
chainApi = chainApi,
peer = peer,
callbacks = callbacks,
initialSyncDone = initialSyncDone)
}
}

View file

@ -351,7 +351,6 @@ object NodeUnitTest extends P2PLogger {
PeerMessageReceiver(state = PeerMessageReceiverState.fresh(),
chainApi = chainApi,
peer = peer,
callbacks = NodeCallbacks.empty,
initialSyncDone = None)
Future.successful(receiver)
}
@ -363,7 +362,7 @@ object NodeUnitTest extends P2PLogger {
import system.dispatcher
val chainApiF = ChainUnitTest.createChainHandler()
val peerMsgReceiverF = chainApiF.flatMap { _ =>
PeerMessageReceiver.preConnection(peer, NodeCallbacks.empty, None)
PeerMessageReceiver.preConnection(peer, None)
}
//the problem here is the 'self', this needs to be an ordinary peer message handler
//that can handle the handshake
@ -490,7 +489,6 @@ object NodeUnitTest extends P2PLogger {
PeerMessageReceiver(state = PeerMessageReceiverState.fresh(),
chainApi = chainApi,
peer = peer,
callbacks = NodeCallbacks.empty,
initialSyncDone = None)
Future.successful(receiver)
}