Peer Message Receiver Refactor (#2938)

This commit is contained in:
benthecarman 2021-04-27 15:54:44 -05:00 committed by GitHub
parent 73939a15fc
commit e3017fd17d
12 changed files with 191 additions and 220 deletions

View file

@ -61,7 +61,7 @@ class BitcoinSServerMain(override val args: Array[String])
//get a node that isn't started
val nodeF = configInitializedF.flatMap { _ =>
nodeConf.createNode(peer, None)(chainConf, system)
nodeConf.createNode(peer)(chainConf, system)
}
//get our wallet

View file

@ -44,6 +44,7 @@ For your node to be able to service these filters you will need set
import akka.actor.ActorSystem
import org.bitcoins.core.protocol.blockchain.Block
import org.bitcoins.node._
import org.bitcoins.node.networking.peer._
import org.bitcoins.rpc.client.common.BitcoindVersion
import org.bitcoins.testkit.node._
import org.bitcoins.testkit.node.fixture._
@ -108,14 +109,15 @@ val chainApiF = for {
//yay! All setup done, let's create a node and then start it!
val nodeF = for {
_ <- chainApiF
chainApi <- chainApiF
peer <- peerF
} yield {
val dataMessageHandler = DataMessageHandler(chainApi)
NeutrinoNode(nodePeer = peer,
dataMessageHandler = dataMessageHandler,
nodeConfig = nodeConfig,
chainConfig = chainConfig,
actorSystem = system,
initialSyncDone = None)
actorSystem = system)
}
//let's start it
@ -155,4 +157,4 @@ val cleanupF = for {
} yield ()
Await.result(cleanupF, 60.seconds)
```
```

View file

@ -11,7 +11,11 @@ import org.bitcoins.crypto.{CryptoUtil, DoubleSha256Digest}
import org.bitcoins.node.models.Peer
import org.bitcoins.node.networking.peer.PeerMessageReceiver
import org.bitcoins.testkit.async.TestAsyncUtil
import org.bitcoins.testkit.node.{CachedBitcoinSAppConfig, NodeTestUtil}
import org.bitcoins.testkit.node.{
CachedBitcoinSAppConfig,
NodeTestUtil,
NodeUnitTest
}
import org.bitcoins.testkit.rpc.BitcoindRpcTestUtil
import org.bitcoins.testkit.util.BitcoindRpcTest
import org.scalatest._
@ -168,7 +172,9 @@ class P2PClientTest extends BitcoindRpcTest with CachedBitcoinSAppConfig {
val probe = TestProbe()
val remote = peer.socket
val peerMessageReceiverF =
PeerMessageReceiver.preConnection(peer, None)
for {
node <- NodeUnitTest.buildNode(peer)
} yield PeerMessageReceiver.preConnection(peer, node)
val clientActorF: Future[TestActorRef[P2PClientActor]] =
peerMessageReceiverF.map { peerMsgRecv =>

View file

@ -32,8 +32,8 @@ class DataMessageHandlerTest extends NodeUnitTest {
param: SpvNodeConnectedWithBitcoindV19 =>
val SpvNodeConnectedWithBitcoindV19(spv, _) = param
val sender = spv.peerMsgSender
for {
sender <- spv.peerMsgSenderF
chainApi <- spv.chainApiFromDb()
dataMessageHandler = DataMessageHandler(chainApi)(spv.executionContext,
spv.nodeAppConfig,
@ -66,9 +66,8 @@ class DataMessageHandlerTest extends NodeUnitTest {
}
}
val sender = spv.peerMsgSender
for {
sender <- spv.peerMsgSenderF
txId <- bitcoind.sendToAddress(junkAddress, 1.bitcoin)
tx <- bitcoind.getRawTransactionRaw(txId)
_ <- bitcoind.generateToAddress(blocks = 1, junkAddress)
@ -102,9 +101,9 @@ class DataMessageHandlerTest extends NodeUnitTest {
()
}
}
for {
sender <- spv.peerMsgSenderF
val sender = spv.peerMsgSender
for {
hash <- bitcoind.generateToAddress(blocks = 1, junkAddress).map(_.head)
block <- bitcoind.getBlockRaw(hash)
@ -137,9 +136,8 @@ class DataMessageHandlerTest extends NodeUnitTest {
}
}
val sender = spv.peerMsgSender
for {
sender <- spv.peerMsgSenderF
hash <- bitcoind.generateToAddress(blocks = 1, junkAddress).map(_.head)
header <- bitcoind.getBlockHeaderRaw(hash)
@ -171,9 +169,8 @@ class DataMessageHandlerTest extends NodeUnitTest {
()
}
}
val sender = spv.peerMsgSender
for {
sender <- spv.peerMsgSenderF
hash <- bitcoind.generateToAddress(blocks = 1, junkAddress).map(_.head)
filter <- bitcoind.getBlockFilter(hash, FilterType.Basic)

View file

@ -5,6 +5,7 @@ import org.bitcoins.node.models.Peer
import org.bitcoins.server.BitcoinSAppConfig
import org.bitcoins.testkit.BitcoinSTestAppConfig
import org.bitcoins.testkit.async.TestAsyncUtil
import org.bitcoins.testkit.chain.ChainUnitTest
import org.bitcoins.testkit.node.{
CachedBitcoinSAppConfig,
NodeTestWithCachedBitcoindNewest,
@ -12,7 +13,7 @@ import org.bitcoins.testkit.node.{
}
import org.scalatest.{FutureOutcome, Outcome}
import scala.concurrent.Future
import scala.concurrent.{Await, Future}
import scala.concurrent.duration.DurationInt
/** Created by chris on 7/1/16.
@ -39,35 +40,35 @@ class PeerMessageHandlerTest
implicit protected lazy val chainConfig: ChainAppConfig =
cachedConfig.chainConf
override def beforeAll(): Unit = {
val setupF = ChainUnitTest.setupHeaderTableWithGenesisHeader()
Await.result(setupF, duration)
()
}
override def afterAll(): Unit = {
super[CachedBitcoinSAppConfig].afterAll()
super[NodeTestWithCachedBitcoindNewest].afterAll()
}
behavior of "PeerHandler"
it must "be able to fully initialize a PeerMessageReceiver" in { peer =>
val peerHandlerF = NodeUnitTest.buildPeerHandler(peer)
val peerMsgSenderF = peerHandlerF.map(_.peerMsgSender)
val p2pClientF = peerHandlerF.map(_.p2pClient)
for {
peerHandler <- NodeUnitTest.buildPeerHandler(peer)
peerMsgSender = peerHandler.peerMsgSender
p2pClient = peerHandler.p2pClient
val _ = peerHandlerF.map(_.peerMsgSender.connect())
val isConnectedF = TestAsyncUtil.retryUntilSatisfiedF(
() => p2pClientF.flatMap(_.isConnected()),
interval = 500.millis
)
_ = peerMsgSender.connect()
val isInitF = isConnectedF.flatMap { _ =>
TestAsyncUtil.retryUntilSatisfiedF(() =>
p2pClientF.flatMap(_.isInitialized()))
}
_ <- TestAsyncUtil.retryUntilSatisfiedF(() => p2pClient.isConnected(),
interval = 500.millis)
val disconnectF = isInitF.flatMap { _ =>
peerMsgSenderF.map(_.disconnect())
}
_ <- TestAsyncUtil.retryUntilSatisfiedF(() => p2pClient.isInitialized())
_ <- peerMsgSender.disconnect()
val isDisconnectedF = disconnectF.flatMap { _ =>
TestAsyncUtil.retryUntilSatisfiedF(() =>
p2pClientF.flatMap(_.isDisconnected()))
}
isDisconnectedF.map(_ => succeed)
_ <- TestAsyncUtil.retryUntilSatisfiedF(() => p2pClient.isDisconnected())
} yield succeed
}
/*

View file

@ -1,6 +1,5 @@
package org.bitcoins.node
import akka.Done
import akka.actor.ActorSystem
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.chain.models.BlockHeaderDAO
@ -8,14 +7,15 @@ import org.bitcoins.core.api.chain.ChainQueryApi.FilterResponse
import org.bitcoins.core.protocol.BlockStamp
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models.Peer
import org.bitcoins.node.networking.peer.DataMessageHandler
import scala.concurrent.{Future, Promise}
import scala.concurrent.Future
case class NeutrinoNode(
nodePeer: Peer,
dataMessageHandler: DataMessageHandler,
nodeConfig: NodeAppConfig,
chainConfig: ChainAppConfig,
initialSyncDone: Option[Promise[Done]],
actorSystem: ActorSystem)
extends Node {
require(
@ -30,12 +30,16 @@ case class NeutrinoNode(
override val peer: Peer = nodePeer
override def updateDataMessageHandler(
dataMessageHandler: DataMessageHandler): NeutrinoNode = {
copy(dataMessageHandler = dataMessageHandler)
}
override def start(): Future[NeutrinoNode] = {
val res = for {
node <- super.start()
chainApi <- chainApiFromDb()
bestHash <- chainApi.getBestBlockHash()
peerMsgSender <- peerMsgSenderF
_ <- peerMsgSender.sendGetCompactFilterCheckPointMessage(
stopHash = bestHash.flip)
} yield {
@ -62,7 +66,6 @@ case class NeutrinoNode(
header <- chainApi.getBestBlockHeader()
filterHeaderCount <- chainApi.getFilterHeaderCount()
filterCount <- chainApi.getFilterCount()
peerMsgSender <- peerMsgSenderF
blockchains <- blockchainsF
} yield {
// Get all of our cached headers in case of a reorg
@ -101,5 +104,4 @@ case class NeutrinoNode(
startHeight: Int,
endHeight: Int): Future[Vector[FilterResponse]] =
chainApiFromDb().flatMap(_.getFiltersBetweenHeights(startHeight, endHeight))
}

View file

@ -1,6 +1,5 @@
package org.bitcoins.node
import akka.Done
import akka.actor.ActorSystem
import org.bitcoins.asyncutil.AsyncUtil
import org.bitcoins.chain.blockchain.ChainHandlerCached
@ -23,12 +22,13 @@ import org.bitcoins.node.models.{
}
import org.bitcoins.node.networking.P2PClient
import org.bitcoins.node.networking.peer.{
DataMessageHandler,
PeerMessageReceiver,
PeerMessageSender
}
import scala.concurrent.duration.DurationInt
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
/** This a base trait for various kinds of nodes. It contains house keeping methods required for all nodes.
@ -45,12 +45,20 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
val peer: Peer
protected val initialSyncDone: Option[Promise[Done]]
/** The current data message handler.
* It should be noted that the dataMessageHandler contains
* chainstate. When we update with a new chainstate, we need to
* maek sure we update the [[DataMessageHandler]] via [[updateDataMessageHandler()]]
* to make sure we don't corrupt our chainstate cache
*/
def dataMessageHandler: DataMessageHandler
def nodeCallbacks: NodeCallbacks = nodeAppConfig.nodeCallbacks
lazy val txDAO: BroadcastAbleTransactionDAO = BroadcastAbleTransactionDAO()
def updateDataMessageHandler(dataMessageHandler: DataMessageHandler): Node
/** This is constructing a chain api from disk every time we call this method
* This involves database calls which can be slow and expensive to construct
* our [[org.bitcoins.chain.blockchain.Blockchain Blockchain]]
@ -66,26 +74,17 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
* object. Internally in [[org.bitcoins.node.networking.P2PClient p2p client]] you will see that
* the [[ChainApi chain api]] is updated inside of the p2p client
*/
lazy val clientF: Future[P2PClient] = {
val chainApiF = chainApiFromDb()
for {
chainApi <- chainApiF
} yield {
val peerMsgRecv: PeerMessageReceiver =
PeerMessageReceiver.newReceiver(chainApi = chainApi,
peer = peer,
initialSyncDone = initialSyncDone)
val p2p = P2PClient(context = system,
peer = peer,
peerMessageReceiver = peerMsgRecv)
p2p
}
lazy val client: P2PClient = {
val peerMsgRecv: PeerMessageReceiver =
PeerMessageReceiver.newReceiver(node = this, peer = peer)
val p2p = P2PClient(context = system,
peer = peer,
peerMessageReceiver = peerMsgRecv)
p2p
}
lazy val peerMsgSenderF: Future[PeerMessageSender] = {
clientF.map { client =>
PeerMessageSender(client)
}
lazy val peerMsgSender: PeerMessageSender = {
PeerMessageSender(client)
}
/** Sends the given P2P to our peer.
@ -94,21 +93,21 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
* `private[node]`.
*/
def send(msg: NetworkPayload): Future[Unit] = {
peerMsgSenderF.flatMap(_.sendMsg(msg))
peerMsgSender.sendMsg(msg)
}
/** Checks if we have a tcp connection with our peer */
def isConnected: Future[Boolean] = peerMsgSenderF.flatMap(_.isConnected())
def isConnected: Future[Boolean] = peerMsgSender.isConnected()
/** Checks if we are fully initialized with our peer and have executed the handshake
* This means we can now send arbitrary messages to our peer
*
* @return
*/
def isInitialized: Future[Boolean] = peerMsgSenderF.flatMap(_.isInitialized())
def isInitialized: Future[Boolean] = peerMsgSender.isInitialized()
def isDisconnected: Future[Boolean] =
peerMsgSenderF.flatMap(_.isDisconnected())
peerMsgSender.isDisconnected()
/** Starts our node */
def start(): Future[Node] = {
@ -123,8 +122,8 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
val chainApiF = startConfsF.flatMap(_ => chainApiFromDb())
val startNodeF = {
peerMsgSender.connect()
val isInitializedF = for {
_ <- peerMsgSenderF.map(_.connect())
_ <- AsyncUtil.retryUntilSatisfiedF(() => isInitialized,
interval = 250.millis)
} yield ()
@ -165,8 +164,7 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
def stop(): Future[Node] = {
logger.info(s"Stopping node")
val disconnectF = for {
p <- peerMsgSenderF
disconnect <- p.disconnect()
disconnect <- peerMsgSender.disconnect()
_ <- nodeAppConfig.stop()
} yield disconnect
@ -202,11 +200,11 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
chainApi <- chainApiFromDb()
header <- chainApi.getBestBlockHeader()
blockchains <- blockchainsF
} yield {
// Get all of our cached headers in case of a reorg
val cachedHeaders =
blockchains.flatMap(_.headers).map(_.hashBE.flip)
peerMsgSenderF.map(_.sendGetHeadersMessage(cachedHeaders))
cachedHeaders = blockchains.flatMap(_.headers).map(_.hashBE.flip)
_ <- peerMsgSender.sendGetHeadersMessage(cachedHeaders)
} yield {
logger.info(
s"Starting sync node, height=${header.height} hash=${header.hashBE}")
}
@ -231,7 +229,6 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
for {
_ <- addToDbF
peerMsgSender <- peerMsgSenderF
connected <- isConnected
@ -254,11 +251,8 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
if (blockHashes.isEmpty) {
Future.unit
} else {
for {
peerMsgSender <- peerMsgSenderF
_ <- peerMsgSender.sendGetDataMessage(TypeIdentifier.MsgWitnessBlock,
blockHashes: _*)
} yield ()
peerMsgSender.sendGetDataMessage(TypeIdentifier.MsgWitnessBlock,
blockHashes: _*)
}
}

View file

@ -1,6 +1,5 @@
package org.bitcoins.node
import akka.Done
import akka.actor.ActorSystem
import org.bitcoins.asyncutil.AsyncUtil
import org.bitcoins.chain.config.ChainAppConfig
@ -11,14 +10,15 @@ import org.bitcoins.core.protocol.{BitcoinAddress, BlockStamp}
import org.bitcoins.core.util.Mutable
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models.Peer
import org.bitcoins.node.networking.peer.DataMessageHandler
import scala.concurrent.{Future, Promise}
import scala.concurrent.Future
case class SpvNode(
nodePeer: Peer,
dataMessageHandler: DataMessageHandler,
nodeConfig: NodeAppConfig,
chainConfig: ChainAppConfig,
initialSyncDone: Option[Promise[Done]],
actorSystem: ActorSystem)
extends Node {
require(nodeConfig.nodeType == NodeType.SpvNode,
@ -41,6 +41,11 @@ case class SpvNode(
this
}
override def updateDataMessageHandler(
dataMessageHandler: DataMessageHandler): SpvNode = {
copy(dataMessageHandler = dataMessageHandler)
}
/** Updates our bloom filter to match the given TX
*
* @return SPV node with the updated bloom filter
@ -53,9 +58,8 @@ case class SpvNode(
// then need to calculate all the new elements in
// the filter. this is easier:-)
for {
p <- peerMsgSenderF
_ <- p.sendFilterClearMessage()
_ <- p.sendFilterLoadMessage(newBloom)
_ <- peerMsgSender.sendFilterClearMessage()
_ <- peerMsgSender.sendFilterLoadMessage(newBloom)
} yield this
}
@ -69,7 +73,7 @@ case class SpvNode(
val hash = address.hash
_bloomFilter.atomicUpdate(hash)(_.insert(_))
val sentFilterAddF = peerMsgSenderF.flatMap(_.sendFilterAddMessage(hash))
val sentFilterAddF = peerMsgSender.sendFilterAddMessage(hash)
sentFilterAddF.map(_ => this)
}
@ -77,7 +81,6 @@ case class SpvNode(
override def start(): Future[SpvNode] = {
for {
node <- super.start()
peerMsgSender <- peerMsgSenderF
_ <- AsyncUtil.retryUntilSatisfiedF(() => isConnected)
_ <- peerMsgSender.sendFilterLoadMessage(bloomFilter)
} yield {

View file

@ -1,17 +1,23 @@
package org.bitcoins.node.config
import akka.Done
import akka.actor.ActorSystem
import com.typesafe.config.Config
import org.bitcoins.chain.blockchain.ChainHandlerCached
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.chain.models.{
BlockHeaderDAO,
CompactFilterDAO,
CompactFilterHeaderDAO
}
import org.bitcoins.core.util.Mutable
import org.bitcoins.db.{AppConfigFactory, DbAppConfig, JdbcProfileComponent}
import org.bitcoins.node._
import org.bitcoins.node.db.NodeDbManagement
import org.bitcoins.node.models.Peer
import org.bitcoins.node.networking.peer.DataMessageHandler
import java.nio.file.Path
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.concurrent.{ExecutionContext, Future}
/** Configuration for the Bitcoin-S node
* @param directory The data directory of the node
@ -82,10 +88,10 @@ case class NodeAppConfig(
}
/** Creates either a neutrino node or a spv node based on the [[NodeAppConfig]] given */
def createNode(peer: Peer, initialSyncDone: Option[Promise[Done]])(
def createNode(peer: Peer)(
chainConf: ChainAppConfig,
system: ActorSystem): Future[Node] = {
NodeAppConfig.createNode(peer, initialSyncDone)(this, chainConf, system)
NodeAppConfig.createNode(peer)(this, chainConf, system)
}
}
@ -101,17 +107,25 @@ object NodeAppConfig extends AppConfigFactory[NodeAppConfig] {
NodeAppConfig(datadir, confs: _*)
/** Creates either a neutrino node or a spv node based on the [[NodeAppConfig]] given */
def createNode(peer: Peer, initialSyncDone: Option[Promise[Done]])(implicit
def createNode(peer: Peer)(implicit
nodeConf: NodeAppConfig,
chainConf: ChainAppConfig,
system: ActorSystem): Future[Node] = {
import system.dispatcher
val blockHeaderDAO = BlockHeaderDAO()
val filterHeaderDAO = CompactFilterHeaderDAO()
val filterDAO = CompactFilterDAO()
val dmhF = ChainHandlerCached
.fromDatabase(blockHeaderDAO, filterHeaderDAO, filterDAO)
.map(handler => DataMessageHandler(handler))
nodeConf.nodeType match {
case NodeType.SpvNode =>
Future.successful(
SpvNode(peer, nodeConf, chainConf, initialSyncDone, system))
dmhF.map(dmh => SpvNode(peer, dmh, nodeConf, chainConf, system))
case NodeType.NeutrinoNode =>
Future.successful(
NeutrinoNode(peer, nodeConf, chainConf, initialSyncDone, system))
dmhF.map(dmh => NeutrinoNode(peer, dmh, nodeConf, chainConf, system))
case NodeType.FullNode =>
Future.failed(new RuntimeException("Not implemented"))
case NodeType.BitcoindBackend =>

View file

@ -1,28 +1,14 @@
package org.bitcoins.node.networking.peer
import akka.Done
import akka.actor.ActorRefFactory
import org.bitcoins.chain.blockchain.ChainHandlerCached
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.chain.models.{
BlockHeaderDAO,
CompactFilterDAO,
CompactFilterHeaderDAO
}
import org.bitcoins.core.api.chain.ChainApi
import org.bitcoins.core.p2p.{NetworkMessage, _}
import org.bitcoins.core.p2p._
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models.Peer
import org.bitcoins.node.networking.P2PClient
import org.bitcoins.node.networking.peer.PeerMessageReceiverState.{
Disconnected,
Initializing,
Normal,
Preconnection
}
import org.bitcoins.node.{NodeType, P2PLogger}
import org.bitcoins.node.networking.peer.PeerMessageReceiverState._
import org.bitcoins.node.{Node, NodeType, P2PLogger}
import scala.concurrent.{Future, Promise}
import scala.concurrent.Future
/** Responsible for receiving messages from a peer on the
* p2p network. This is called by [[org.bitcoins.rpc.client.common.Client Client]] when doing the p2p
@ -31,13 +17,10 @@ import scala.concurrent.{Future, Promise}
* [[org.bitcoins.core.p2p.NetworkMessage NetworkMessage]]
*/
class PeerMessageReceiver(
dataMessageHandler: DataMessageHandler,
node: Node,
val state: PeerMessageReceiverState,
peer: Peer
)(implicit
ref: ActorRefFactory,
nodeAppConfig: NodeAppConfig,
chainAppConfig: ChainAppConfig)
)(implicit ref: ActorRefFactory, nodeAppConfig: NodeAppConfig)
extends P2PLogger {
import ref.dispatcher
@ -62,7 +45,7 @@ class PeerMessageReceiver(
val peerMsgSender = PeerMessageSender(client)
peerMsgSender.sendVersionMessage(dataMessageHandler.chainApi)
peerMsgSender.sendVersionMessage(node.dataMessageHandler.chainApi)
val newRecv = toState(newState)
@ -135,8 +118,9 @@ class PeerMessageReceiver(
sender: PeerMessageSender): Future[PeerMessageReceiver] = {
//else it means we are receiving this data payload from a peer,
//we need to handle it
dataMessageHandler.handleDataPayload(payload, sender).map { handler =>
new PeerMessageReceiver(handler, state, peer)
node.dataMessageHandler.handleDataPayload(payload, sender).map { handler =>
val newNode = node.updateDataMessageHandler(handler)
new PeerMessageReceiver(newNode, state, peer)
}
}
@ -152,7 +136,7 @@ class PeerMessageReceiver(
payload match {
case versionMsg: VersionMessage =>
logger.trace(s"Received versionMsg=${versionMsg}from peer=${peer}")
logger.trace(s"Received versionMsg=$versionMsg from peer=$peer")
state match {
case bad @ (_: Disconnected | _: Normal | Preconnection) =>
@ -163,28 +147,6 @@ class PeerMessageReceiver(
case good: Initializing =>
val newState = good.withVersionMsg(versionMsg)
// TODO: do not throw error once we have peer discovery
nodeAppConfig.nodeType match {
case NodeType.NeutrinoNode =>
if (!versionMsg.services.nodeCompactFilters) {
val errMsg =
s"Connected Peer ($peer) does not support compact filters"
logger.warn(errMsg)
sys.error(errMsg)
}
case NodeType.SpvNode =>
if (!versionMsg.services.nodeBloom) {
val errMsg =
s"Connected Peer ($peer) does not support bloom filters"
logger.warn(errMsg)
sys.error(errMsg)
}
case NodeType.FullNode =>
sys.error("Not yet implemented.")
case NodeType.BitcoindBackend =>
throw new RuntimeException("This is impossible")
}
sender.sendVerackMessage()
val newRecv = toState(newState)
@ -235,7 +197,7 @@ class PeerMessageReceiver(
/** Transitions our PeerMessageReceiver to a new state */
def toState(newState: PeerMessageReceiverState): PeerMessageReceiver = {
new PeerMessageReceiver(
dataMessageHandler = dataMessageHandler,
node = node,
state = newState,
peer = peer
)
@ -255,59 +217,31 @@ object PeerMessageReceiver {
case class NetworkMessageReceived(msg: NetworkMessage, client: P2PClient)
extends PeerMessageReceiverMsg
def apply(
state: PeerMessageReceiverState,
chainApi: ChainApi,
peer: Peer,
initialSyncDone: Option[Promise[Done]])(implicit
def apply(state: PeerMessageReceiverState, node: Node, peer: Peer)(implicit
ref: ActorRefFactory,
nodeAppConfig: NodeAppConfig,
chainAppConfig: ChainAppConfig
nodeAppConfig: NodeAppConfig
): PeerMessageReceiver = {
import ref.dispatcher
val dataHandler = DataMessageHandler(chainApi, initialSyncDone)
new PeerMessageReceiver(dataMessageHandler = dataHandler,
state = state,
peer = peer)
new PeerMessageReceiver(node = node, state = state, peer = peer)
}
/** Creates a peer message receiver that is ready
* to be connected to a peer. This can be given to [[org.bitcoins.node.networking.P2PClient.props() P2PClient]]
* to connect to a peer on the network
*/
def preConnection(peer: Peer, initialSyncDone: Option[Promise[Done]])(implicit
def preConnection(peer: Peer, node: Node)(implicit
ref: ActorRefFactory,
nodeAppConfig: NodeAppConfig,
chainAppConfig: ChainAppConfig
): Future[PeerMessageReceiver] = {
import ref.dispatcher
val blockHeaderDAO = BlockHeaderDAO()
val filterHeaderDAO = CompactFilterHeaderDAO()
val filterDAO = CompactFilterDAO()
val chainHandlerF =
ChainHandlerCached.fromDatabase(blockHeaderDAO,
filterHeaderDAO,
filterDAO)
for {
chainHandler <- chainHandlerF
} yield {
PeerMessageReceiver(state = PeerMessageReceiverState.fresh(),
chainApi = chainHandler,
peer = peer,
initialSyncDone = initialSyncDone)
}
nodeAppConfig: NodeAppConfig
): PeerMessageReceiver = {
PeerMessageReceiver(node = node,
state = PeerMessageReceiverState.fresh(),
peer = peer)
}
def newReceiver(
chainApi: ChainApi,
peer: Peer,
initialSyncDone: Option[Promise[Done]])(implicit
def newReceiver(node: Node, peer: Peer)(implicit
nodeAppConfig: NodeAppConfig,
chainAppConfig: ChainAppConfig,
ref: ActorRefFactory): PeerMessageReceiver = {
PeerMessageReceiver(state = PeerMessageReceiverState.fresh(),
chainApi = chainApi,
peer = peer,
initialSyncDone = initialSyncDone)
node = node,
peer = peer)
}
}

View file

@ -23,7 +23,7 @@ sealed trait CachedAppConfig { _: BitcoinSAkkaAsyncTest =>
trait CachedBitcoinSAppConfig { _: BitcoinSAkkaAsyncTest =>
implicit protected lazy val cachedConfig: BitcoinSAppConfig =
BitcoinSTestAppConfig.getSpvTestConfig()
BitcoinSTestAppConfig.getNeutrinoTestConfig()
implicit protected lazy val cachedNodeConf: NodeAppConfig = {
cachedConfig.nodeConf

View file

@ -1,17 +1,14 @@
package org.bitcoins.testkit.node
import akka.actor.ActorSystem
import org.bitcoins.chain.blockchain.ChainHandlerCached
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.chain.models._
import org.bitcoins.core.api.chain.ChainApi
import org.bitcoins.node._
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models.Peer
import org.bitcoins.node.networking.peer.{
PeerHandler,
PeerMessageReceiver,
PeerMessageReceiverState,
PeerMessageSender
}
import org.bitcoins.node.networking.peer._
import org.bitcoins.rpc.client.common.BitcoindVersion.{V18, V19}
import org.bitcoins.rpc.client.common.{BitcoindRpcClient, BitcoindVersion}
import org.bitcoins.rpc.client.v19.BitcoindV19RpcClient
@ -189,14 +186,39 @@ trait NodeUnitTest extends BaseNodeTest {
object NodeUnitTest extends P2PLogger {
def buildNode(peer: Peer)(implicit
chainConf: ChainAppConfig,
nodeConf: NodeAppConfig,
system: ActorSystem): Future[NeutrinoNode] = {
import system.dispatcher
val blockHeaderDAO = BlockHeaderDAO()
val filterHeaderDAO = CompactFilterHeaderDAO()
val filterDAO = CompactFilterDAO()
val chainApiF = ChainHandlerCached
.fromDatabase(blockHeaderDAO, filterHeaderDAO, filterDAO)
chainApiF.map(buildNode(peer, _))
}
def buildNode(peer: Peer, chainApi: ChainApi)(implicit
chainConf: ChainAppConfig,
nodeConf: NodeAppConfig,
system: ActorSystem): NeutrinoNode = {
import system.dispatcher
val dmh = DataMessageHandler(chainApi)
NeutrinoNode(peer, dmh, nodeConf, chainConf, system)
}
def buildPeerMessageReceiver(chainApi: ChainApi, peer: Peer)(implicit
appConfig: BitcoinSAppConfig,
system: ActorSystem): Future[PeerMessageReceiver] = {
val receiver =
PeerMessageReceiver(state = PeerMessageReceiverState.fresh(),
chainApi = chainApi,
peer = peer,
initialSyncDone = None)
val receiver = PeerMessageReceiver(state = PeerMessageReceiverState.fresh(),
node = buildNode(peer, chainApi),
peer = peer)
Future.successful(receiver)
}
@ -205,9 +227,9 @@ object NodeUnitTest extends P2PLogger {
chainAppConfig: ChainAppConfig,
system: ActorSystem): Future[PeerHandler] = {
import system.dispatcher
val chainApiF = ChainUnitTest.createChainHandler()
val peerMsgReceiverF = chainApiF.flatMap { _ =>
PeerMessageReceiver.preConnection(peer, None)
val nodeF = buildNode(peer)
val peerMsgReceiverF = nodeF.map { node =>
PeerMessageReceiver.preConnection(peer, node)
}
//the problem here is the 'self', this needs to be an ordinary peer message handler
//that can handle the handshake
@ -394,9 +416,8 @@ object NodeUnitTest extends P2PLogger {
system: ActorSystem): Future[PeerMessageReceiver] = {
val receiver =
PeerMessageReceiver(state = PeerMessageReceiverState.fresh(),
chainApi = chainApi,
peer = peer,
initialSyncDone = None)
node = buildNode(peer, chainApi),
peer = peer)
Future.successful(receiver)
}
@ -427,24 +448,20 @@ object NodeUnitTest extends P2PLogger {
val checkConfigF = Future {
assert(nodeAppConfig.nodeType == NodeType.SpvNode)
}
val chainApiF = for {
for {
_ <- checkConfigF
chainHandler <- ChainUnitTest.createChainHandler()
} yield chainHandler
val nodeF = for {
_ <- chainApiF
} yield {
val dmh = DataMessageHandler(chainHandler)
SpvNode(
nodePeer = peer,
dataMessageHandler = dmh,
nodeConfig = nodeAppConfig,
chainConfig = chainAppConfig,
actorSystem = system,
initialSyncDone = None
actorSystem = system
).setBloomFilter(P2PMessageTestUtil.emptyBloomFilter)
}
nodeF
}
/** Creates a Neutrino node peered with the given bitcoind client, this method
@ -465,13 +482,14 @@ object NodeUnitTest extends P2PLogger {
} yield chainHandler
val peer = createPeer(bitcoind)
val nodeF = for {
_ <- chainApiF
chainApi <- chainApiF
} yield {
val dmh = DataMessageHandler(chainApi)
NeutrinoNode(nodePeer = peer,
dataMessageHandler = dmh,
nodeConfig = nodeAppConfig,
chainConfig = chainAppConfig,
actorSystem = system,
initialSyncDone = None)
actorSystem = system)
}
nodeF