[WIP] Adding multi-peer support (#3401)

* establish conn with multiple peers

squash 1

squash 2

squash 3

* add tests

* change log msg level

* minor changes

* fix

* formatting

* fix docs

* commit to rerun ci

* changes from comments

* fix bug

* rerun ci
This commit is contained in:
Shreyansh 2021-07-22 01:48:45 +05:30 committed by GitHub
parent 36c4da7c95
commit 4be2b2109b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 201 additions and 90 deletions

3
.idea/.gitignore generated vendored Normal file
View File

@ -0,0 +1,3 @@
# Default ignored files
/shelf/
/workspace.xml

View File

@ -84,18 +84,20 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
"No peers specified, unable to start node")
}
val peerSocket =
NetworkUtil.parseInetSocketAddress(nodeConf.peers.head,
nodeConf.network.port)
val peerSockets = {
nodeConf.peers.map(
NetworkUtil.parseInetSocketAddress(_, nodeConf.network.port)
)
}
val peer = Peer.fromSocket(peerSocket, nodeConf.socks5ProxyParams)
val peers = peerSockets.map(Peer.fromSocket(_, nodeConf.socks5ProxyParams))
//run chain work migration
val chainApiF = runChainWorkCalc(
serverArgParser.forceChainWorkRecalc || chainConf.forceRecalcChainWork)
//get a node that isn't started
val nodeF = nodeConf.createNode(peer)(chainConf, system)
val nodeF = nodeConf.createNode(peers)(chainConf, system)
val feeProvider = getFeeProviderOrElse(
MempoolSpaceProvider(HourFeeTarget, walletConf.network))

View File

@ -113,7 +113,7 @@ val nodeF = for {
peer <- peerF
} yield {
val dataMessageHandler = DataMessageHandler(chainApi)
NeutrinoNode(nodePeer = peer,
NeutrinoNode(nodePeer = Vector(peer),
dataMessageHandler = dataMessageHandler,
nodeConfig = nodeConfig,
chainConfig = chainConfig,

View File

@ -4,27 +4,24 @@ import akka.actor.Cancellable
import org.bitcoins.crypto.DoubleSha256DigestBE
import org.bitcoins.server.BitcoinSAppConfig
import org.bitcoins.testkit.BitcoinSTestAppConfig
import org.bitcoins.testkit.node.fixture.NeutrinoNodeConnectedWithBitcoind
import org.bitcoins.testkit.node.{
NodeTestUtil,
NodeTestWithCachedBitcoindNewest
}
import org.bitcoins.testkit.node.fixture.NeutrinoNodeConnectedWithBitcoinds
import org.bitcoins.testkit.node.{NodeTestUtil, NodeTestWithCachedBitcoindPair}
import org.scalatest.{FutureOutcome, Outcome}
import scala.concurrent.Future
class NeutrinoNodeTest extends NodeTestWithCachedBitcoindNewest {
class NeutrinoNodeTest extends NodeTestWithCachedBitcoindPair {
/** Wallet config with data directory set to user temp directory */
override protected def getFreshConfig: BitcoinSAppConfig =
BitcoinSTestAppConfig.getNeutrinoWithEmbeddedDbTestConfig(pgUrl)
override type FixtureParam = NeutrinoNodeConnectedWithBitcoind
override type FixtureParam = NeutrinoNodeConnectedWithBitcoinds
override def withFixture(test: OneArgAsyncTest): FutureOutcome = {
val outcomeF: Future[Outcome] = for {
bitcoind <- cachedBitcoindWithFundsF
outcome = withNeutrinoNodeConnectedToBitcoind(test, bitcoind)(
bitcoinds <- clientsF
outcome = withNeutrinoNodeConnectedToBitcoinds(test, bitcoinds.toVector)(
system,
getFreshConfig)
f <- outcome.toFuture
@ -34,15 +31,37 @@ class NeutrinoNodeTest extends NodeTestWithCachedBitcoindNewest {
behavior of "NeutrinoNode"
it must "be able to connect and then disconnect from all peers" in {
nodeConnectedWithBitcoind: NeutrinoNodeConnectedWithBitcoinds =>
//checking all peers are connected
val node = nodeConnectedWithBitcoind.node
val connFs = node.peers.indices.map(node.isConnected)
val connF = Future.sequence(connFs).map(_.forall(_ == true))
val assertion1F = connF.map(assert(_))
//checking all peers can be disconnected
def isAllDisconnectedF: Future[Boolean] = {
val disconnFs = node.peers.indices.map(node.isDisconnected)
val res = Future.sequence(disconnFs).map(_.forall(_ == true))
res
}
val disconnF = for {
_ <- assertion1F
_ <- node.stop()
f <- isAllDisconnectedF
} yield f
disconnF.map(assert(_))
}
it must "receive notification that a block occurred on the p2p network for neutrino" in {
nodeConnectedWithBitcoind: NeutrinoNodeConnectedWithBitcoind =>
nodeConnectedWithBitcoind: NeutrinoNodeConnectedWithBitcoinds =>
val node = nodeConnectedWithBitcoind.node
val bitcoind = nodeConnectedWithBitcoind.bitcoind
val bitcoind = nodeConnectedWithBitcoind.bitcoinds(0)
val assert1F = for {
_ <- node.isConnected.map(assert(_))
a2 <- node.isInitialized.map(assert(_))
_ <- node.isConnected(0).map(assert(_))
a2 <- node.isInitialized(0).map(assert(_))
} yield a2
val hashF: Future[DoubleSha256DigestBE] = bitcoind.getNewAddress
@ -62,9 +81,9 @@ class NeutrinoNodeTest extends NodeTestWithCachedBitcoindNewest {
}
it must "stay in sync with a bitcoind instance for neutrino" in {
nodeConnectedWithBitcoind: NeutrinoNodeConnectedWithBitcoind =>
nodeConnectedWithBitcoind: NeutrinoNodeConnectedWithBitcoinds =>
val node = nodeConnectedWithBitcoind.node
val bitcoind = nodeConnectedWithBitcoind.bitcoind
val bitcoind = nodeConnectedWithBitcoind.bitcoinds(0)
//we need to generate 1 block for bitcoind to consider
//itself out of IBD. bitcoind will not sendheaders

View File

@ -42,8 +42,8 @@ class SpvNodeTest extends NodeTestWithCachedBitcoindNewest {
val bitcoind = spvNodeConnectedWithBitcoind.bitcoind
val assert1F = for {
_ <- spvNode.isConnected.map(assert(_))
a2 <- spvNode.isInitialized.map(assert(_))
_ <- spvNode.isConnected(0).map(assert(_))
a2 <- spvNode.isInitialized(0).map(assert(_))
} yield a2
val hashF: Future[DoubleSha256DigestBE] = bitcoind.getNewAddress

View File

@ -32,7 +32,7 @@ class DataMessageHandlerTest extends NodeUnitTest {
param: SpvNodeConnectedWithBitcoindV21 =>
val SpvNodeConnectedWithBitcoindV21(spv, _) = param
val sender = spv.peerMsgSender
val sender = spv.peerMsgSenders(0)
for {
chainApi <- spv.chainApiFromDb()
dataMessageHandler = DataMessageHandler(chainApi)(spv.executionContext,
@ -66,7 +66,7 @@ class DataMessageHandlerTest extends NodeUnitTest {
}
}
val sender = spv.peerMsgSender
val sender = spv.peerMsgSenders(0)
for {
txId <- bitcoind.sendToAddress(junkAddress, 1.bitcoin)
tx <- bitcoind.getRawTransactionRaw(txId)
@ -101,7 +101,7 @@ class DataMessageHandlerTest extends NodeUnitTest {
()
}
}
val sender = spv.peerMsgSender
val sender = spv.peerMsgSenders(0)
for {
hash <- bitcoind.generateToAddress(blocks = 1, junkAddress).map(_.head)
@ -136,7 +136,7 @@ class DataMessageHandlerTest extends NodeUnitTest {
}
}
val sender = spv.peerMsgSender
val sender = spv.peerMsgSenders(0)
for {
hash <- bitcoind.generateToAddress(blocks = 1, junkAddress).map(_.head)
header <- bitcoind.getBlockHeaderRaw(hash)
@ -169,7 +169,7 @@ class DataMessageHandlerTest extends NodeUnitTest {
()
}
}
val sender = spv.peerMsgSender
val sender = spv.peerMsgSenders(0)
for {
hash <- bitcoind.generateToAddress(blocks = 1, junkAddress).map(_.head)
filter <- bitcoind.getBlockFilter(hash, FilterType.Basic)

View File

@ -18,7 +18,7 @@ import org.bitcoins.node.networking.peer.DataMessageHandler
import scala.concurrent.Future
case class NeutrinoNode(
nodePeer: Peer,
nodePeer: Vector[Peer],
dataMessageHandler: DataMessageHandler,
nodeConfig: NodeAppConfig,
chainConfig: ChainAppConfig,
@ -34,7 +34,7 @@ case class NeutrinoNode(
override def chainAppConfig: ChainAppConfig = chainConfig
override val peer: Peer = nodePeer
override val peers: Vector[Peer] = nodePeer
override def updateDataMessageHandler(
dataMessageHandler: DataMessageHandler): NeutrinoNode = {
@ -46,7 +46,7 @@ case class NeutrinoNode(
node <- super.start()
chainApi <- chainApiFromDb()
bestHash <- chainApi.getBestBlockHash()
_ <- peerMsgSender.sendGetCompactFilterCheckPointMessage(
_ <- peerMsgSenders(0).sendGetCompactFilterCheckPointMessage(
stopHash = bestHash.flip)
} yield {
node.asInstanceOf[NeutrinoNode]
@ -75,7 +75,7 @@ case class NeutrinoNode(
blockchains <- blockchainsF
// Get all of our cached headers in case of a reorg
cachedHeaders = blockchains.flatMap(_.headers).map(_.hashBE.flip)
_ <- peerMsgSender.sendGetHeadersMessage(cachedHeaders)
_ <- peerMsgSenders(0).sendGetHeadersMessage(cachedHeaders)
_ <- syncFilters(bestFilterHeaderOpt = bestFilterHeaderOpt,
bestFilterOpt = bestFilterOpt,
bestBlockHeader = header,
@ -128,7 +128,7 @@ case class NeutrinoNode(
chainApi: ChainApi,
bestFilterOpt: Option[CompactFilterDb]): Future[Unit] = {
val sendCompactFilterHeaderMsgF = {
peerMsgSender.sendNextGetCompactFilterHeadersCommand(
peerMsgSenders(0).sendNextGetCompactFilterHeadersCommand(
chainApi = chainApi,
filterHeaderBatchSize = chainConfig.filterHeaderBatchSize,
prevStopHash = bestFilterHeader.blockHashBE)
@ -143,7 +143,7 @@ case class NeutrinoNode(
//means we are not syncing filter headers, and our filters are NOT
//in sync with our compact filter headers
logger.info(s"Starting sync filters in NeutrinoNode.sync()")
peerMsgSender
peerMsgSenders(0)
.sendNextGetCompactFilterCommand(
chainApi = chainApi,
filterBatchSize = chainConfig.filterBatchSize,

View File

@ -43,7 +43,7 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
implicit def executionContext: ExecutionContext = system.dispatcher
val peer: Peer
val peers: Vector[Peer]
/** The current data message handler.
* It should be noted that the dataMessageHandler contains
@ -74,17 +74,20 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
* object. Internally in [[org.bitcoins.node.networking.P2PClient p2p client]] you will see that
* the [[ChainApi chain api]] is updated inside of the p2p client
*/
lazy val client: P2PClient = {
val peerMsgRecv: PeerMessageReceiver =
PeerMessageReceiver.newReceiver(node = this, peer = peer)
val p2p = P2PClient(context = system,
peer = peer,
peerMessageReceiver = peerMsgRecv)
lazy val clients: Vector[P2PClient] = {
val peerMsgRecvs: Vector[PeerMessageReceiver] =
peers.map(x => PeerMessageReceiver.newReceiver(node = this, peer = x))
val zipped = peers.zip(peerMsgRecvs)
val p2p = zipped.map { case (peer, peerMsgRecv) =>
P2PClient(context = system,
peer = peer,
peerMessageReceiver = peerMsgRecv)
}
p2p
}
lazy val peerMsgSender: PeerMessageSender = {
PeerMessageSender(client)
lazy val peerMsgSenders: Vector[PeerMessageSender] = {
clients.map(PeerMessageSender(_))
}
/** Sends the given P2P to our peer.
@ -92,22 +95,23 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
* with P2P messages, therefore marked as
* `private[node]`.
*/
def send(msg: NetworkPayload): Future[Unit] = {
peerMsgSender.sendMsg(msg)
def send(msg: NetworkPayload, idx: Int): Future[Unit] = {
peerMsgSenders(idx).sendMsg(msg)
}
/** Checks if we have a tcp connection with our peer */
def isConnected: Future[Boolean] = peerMsgSender.isConnected()
def isConnected(idx: Int): Future[Boolean] = peerMsgSenders(idx).isConnected()
/** Checks if we are fully initialized with our peer and have executed the handshake
* This means we can now send arbitrary messages to our peer
*
* @return
*/
def isInitialized: Future[Boolean] = peerMsgSender.isInitialized()
def isInitialized(idx: Int): Future[Boolean] =
peerMsgSenders(idx).isInitialized()
def isDisconnected: Future[Boolean] =
peerMsgSender.isDisconnected()
def isDisconnected(idx: Int): Future[Boolean] =
peerMsgSenders(idx).isDisconnected()
/** Starts our node */
def start(): Future[Node] = {
@ -122,18 +126,18 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
val chainApiF = startConfsF.flatMap(_ => chainApiFromDb())
val startNodeF = {
peerMsgSender.connect()
peerMsgSenders.foreach(_.connect())
val isInitializedF = for {
_ <- AsyncUtil.retryUntilSatisfiedF(() => isInitialized,
_ <- AsyncUtil.retryUntilSatisfiedF(() => isInitialized(0),
maxTries = 200,
interval = 250.millis)
} yield ()
isInitializedF.failed.foreach(err =>
logger.error(s"Failed to connect with peer=$peer with err=$err"))
logger.error(s"Failed to connect with peer=${peers(0)} with err=$err"))
isInitializedF.map { _ =>
logger.info(s"Our peer=$peer has been initialized")
logger.info(s"Our peer=${peers(0)} has been initialized")
logger.info(s"Our node has been full started. It took=${System
.currentTimeMillis() - start}ms")
this
@ -164,16 +168,25 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
/** Stops our node */
def stop(): Future[Node] = {
logger.info(s"Stopping node")
val disconnectFs = peerMsgSenders.map(_.disconnect())
val disconnectF = for {
disconnect <- peerMsgSender.disconnect()
disconnect <- Future.sequence(disconnectFs)
_ <- nodeAppConfig.stop()
} yield disconnect
def isAllDisconnectedF: Future[Boolean] = {
val connF = peerMsgSenders.indices.map(peerMsgSenders(_).isDisconnected())
val res = Future.sequence(connF).map(_.forall(_ == true))
res
}
val start = System.currentTimeMillis()
val isStoppedF = disconnectF.flatMap { _ =>
logger.info(s"Awaiting disconnect")
//25 seconds to disconnect
AsyncUtil.retryUntilSatisfiedF(() => isDisconnected, 500.millis)
AsyncUtil.retryUntilSatisfiedF(() => isAllDisconnectedF, 500.millis)
}
isStoppedF.failed.foreach { e =>
@ -204,7 +217,7 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
// Get all of our cached headers in case of a reorg
cachedHeaders = blockchains.flatMap(_.headers).map(_.hashBE.flip)
_ <- peerMsgSender.sendGetHeadersMessage(cachedHeaders)
_ <- peerMsgSenders(0).sendGetHeadersMessage(cachedHeaders)
} yield {
logger.info(
s"Starting sync node, height=${header.height} hash=${header.hashBE}")
@ -231,15 +244,15 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
for {
_ <- addToDbF
connected <- isConnected
connected <- isConnected(0)
res <- {
if (connected) {
logger.info(s"Sending out tx message for tx=$txIds")
peerMsgSender.sendInventoryMessage(transactions: _*)
peerMsgSenders(0).sendInventoryMessage(transactions: _*)
} else {
Future.failed(new RuntimeException(
s"Error broadcasting transaction $txIds, peer is disconnected $peer"))
s"Error broadcasting transaction $txIds, peer is disconnected ${peers(0)}"))
}
}
} yield res
@ -252,8 +265,8 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
if (blockHashes.isEmpty) {
Future.unit
} else {
peerMsgSender.sendGetDataMessage(TypeIdentifier.MsgWitnessBlock,
blockHashes: _*)
peerMsgSenders(0).sendGetDataMessage(TypeIdentifier.MsgWitnessBlock,
blockHashes: _*)
}
}

View File

@ -15,7 +15,7 @@ import org.bitcoins.node.networking.peer.DataMessageHandler
import scala.concurrent.Future
case class SpvNode(
nodePeer: Peer,
nodePeer: Vector[Peer],
dataMessageHandler: DataMessageHandler,
nodeConfig: NodeAppConfig,
chainConfig: ChainAppConfig,
@ -30,7 +30,7 @@ case class SpvNode(
override def chainAppConfig: ChainAppConfig = chainConfig
override val peer: Peer = nodePeer
override val peers: Vector[Peer] = nodePeer
private val _bloomFilter = new Mutable(BloomFilter.empty)
@ -58,8 +58,8 @@ case class SpvNode(
// then need to calculate all the new elements in
// the filter. this is easier:-)
for {
_ <- peerMsgSender.sendFilterClearMessage()
_ <- peerMsgSender.sendFilterLoadMessage(newBloom)
_ <- peerMsgSenders(0).sendFilterClearMessage()
_ <- peerMsgSenders(0).sendFilterLoadMessage(newBloom)
} yield this
}
@ -73,7 +73,7 @@ case class SpvNode(
val hash = address.hash
_bloomFilter.atomicUpdate(hash)(_.insert(_))
val sentFilterAddF = peerMsgSender.sendFilterAddMessage(hash)
val sentFilterAddF = peerMsgSenders(0).sendFilterAddMessage(hash)
sentFilterAddF.map(_ => this)
}
@ -81,10 +81,10 @@ case class SpvNode(
override def start(): Future[SpvNode] = {
for {
node <- super.start()
_ <- AsyncUtil.retryUntilSatisfiedF(() => isConnected)
_ <- peerMsgSender.sendFilterLoadMessage(bloomFilter)
_ <- AsyncUtil.retryUntilSatisfiedF(() => isConnected(0))
_ <- peerMsgSenders(0).sendFilterLoadMessage(bloomFilter)
} yield {
logger.info(s"Sending bloomfilter=${bloomFilter.hex} to $peer")
logger.info(s"Sending bloomfilter=${bloomFilter.hex} to ${peers(0)}")
node.asInstanceOf[SpvNode]
}
}

View File

@ -115,10 +115,10 @@ case class NodeAppConfig(
}
/** Creates either a neutrino node or a spv node based on the [[NodeAppConfig]] given */
def createNode(peer: Peer)(
def createNode(peers: Vector[Peer])(
chainConf: ChainAppConfig,
system: ActorSystem): Future[Node] = {
NodeAppConfig.createNode(peer)(this, chainConf, system)
NodeAppConfig.createNode(peers)(this, chainConf, system)
}
}
@ -134,7 +134,7 @@ object NodeAppConfig extends AppConfigFactory[NodeAppConfig] {
NodeAppConfig(datadir, confs: _*)
/** Creates either a neutrino node or a spv node based on the [[NodeAppConfig]] given */
def createNode(peer: Peer)(implicit
def createNode(peers: Vector[Peer])(implicit
nodeConf: NodeAppConfig,
chainConf: ChainAppConfig,
system: ActorSystem): Future[Node] = {
@ -150,9 +150,9 @@ object NodeAppConfig extends AppConfigFactory[NodeAppConfig] {
nodeConf.nodeType match {
case NodeType.SpvNode =>
dmhF.map(dmh => SpvNode(peer, dmh, nodeConf, chainConf, system))
dmhF.map(dmh => SpvNode(peers, dmh, nodeConf, chainConf, system))
case NodeType.NeutrinoNode =>
dmhF.map(dmh => NeutrinoNode(peer, dmh, nodeConf, chainConf, system))
dmhF.map(dmh => NeutrinoNode(peers, dmh, nodeConf, chainConf, system))
case NodeType.FullNode =>
Future.failed(new RuntimeException("Not implemented"))
case NodeType.BitcoindBackend =>

View File

@ -1,5 +1,7 @@
package org.bitcoins.node.util
import scala.util.Random
object BitcoinSNodeUtil {
/** Creates a unique actor name for a actor
@ -7,7 +9,7 @@ object BitcoinSNodeUtil {
* @return
*/
def createActorName(className: String): String = {
s"${className}-${System.currentTimeMillis()}"
s"$className-${System.currentTimeMillis()}-${Random.nextLong()}"
}
/** Creates a unique actor name for a given class

View File

@ -8,13 +8,14 @@ import org.bitcoins.rpc.client.v21.BitcoindV21RpcClient
import org.bitcoins.server.BitcoinSAppConfig
import org.bitcoins.testkit.node.NodeUnitTest.{createPeer, syncNeutrinoNode}
import org.bitcoins.testkit.node.fixture.{
NeutrinoNodeConnectedWithBitcoind,
NeutrinoNodeConnectedWithBitcoinds,
SpvNodeConnectedWithBitcoind,
SpvNodeConnectedWithBitcoindV21
}
import org.bitcoins.testkit.rpc.{
CachedBitcoind,
CachedBitcoindNewest,
CachedBitcoindPairV21,
CachedBitcoindV19
}
import org.bitcoins.testkit.wallet.BitcoinSWalletTest
@ -75,25 +76,26 @@ trait NodeTestWithCachedBitcoind extends BaseNodeTest { _: CachedBitcoind[_] =>
})(test)
}
def withNeutrinoNodeConnectedToBitcoind(
def withNeutrinoNodeConnectedToBitcoinds(
test: OneArgAsyncTest,
bitcoind: BitcoindRpcClient)(implicit
bitcoinds: Vector[BitcoindRpcClient])(implicit
system: ActorSystem,
appConfig: BitcoinSAppConfig): FutureOutcome = {
val nodeWithBitcoindBuilder: () => Future[
NeutrinoNodeConnectedWithBitcoind] = { () =>
NeutrinoNodeConnectedWithBitcoinds] = { () =>
require(appConfig.nodeType == NodeType.NeutrinoNode)
for {
node <- NodeUnitTest.createNeutrinoNode(bitcoind)(system,
appConfig.chainConf,
appConfig.nodeConf)
node <- NodeUnitTest.createNeutrinoNode(bitcoinds)(system,
appConfig.chainConf,
appConfig.nodeConf)
startedNode <- node.start()
syncedNode <- syncNeutrinoNode(startedNode, bitcoind)
} yield NeutrinoNodeConnectedWithBitcoind(syncedNode, bitcoind)
//is it enough to just sync with one bitcoind client for a test?
syncedNode <- syncNeutrinoNode(startedNode, bitcoinds(0))
} yield NeutrinoNodeConnectedWithBitcoinds(syncedNode, bitcoinds)
}
makeDependentFixture[NeutrinoNodeConnectedWithBitcoind](
makeDependentFixture[NeutrinoNodeConnectedWithBitcoinds](
build = nodeWithBitcoindBuilder,
{ case x: NeutrinoNodeConnectedWithBitcoind =>
{ case x: NeutrinoNodeConnectedWithBitcoinds =>
tearDownNode(x.node)
})(test)
}
@ -159,6 +161,16 @@ trait NodeTestWithCachedBitcoindNewest
}
}
trait NodeTestWithCachedBitcoindPair
extends NodeTestWithCachedBitcoind
with CachedBitcoindPairV21 {
override def afterAll(): Unit = {
super[CachedBitcoindPairV21].afterAll()
super[NodeTestWithCachedBitcoind].afterAll()
}
}
trait NodeTestWithCachedBitcoindV19
extends NodeTestWithCachedBitcoind
with CachedBitcoindV19 {

View File

@ -210,7 +210,7 @@ object NodeUnitTest extends P2PLogger {
val dmh = DataMessageHandler(chainApi)
NeutrinoNode(peer, dmh, nodeConf, chainConf, system)
NeutrinoNode(Vector(peer), dmh, nodeConf, chainConf, system)
}
def buildPeerMessageReceiver(chainApi: ChainApi, peer: Peer)(implicit
@ -453,7 +453,7 @@ object NodeUnitTest extends P2PLogger {
} yield {
val dmh = DataMessageHandler(chainHandler)
SpvNode(
nodePeer = peer,
nodePeer = Vector(peer),
dataMessageHandler = dmh,
nodeConfig = nodeAppConfig,
chainConfig = chainAppConfig,
@ -483,7 +483,39 @@ object NodeUnitTest extends P2PLogger {
chainApi <- chainApiF
} yield {
val dmh = DataMessageHandler(chainApi)
NeutrinoNode(nodePeer = peer,
NeutrinoNode(nodePeer = Vector(peer),
dataMessageHandler = dmh,
nodeConfig = nodeAppConfig,
chainConfig = chainAppConfig,
actorSystem = system)
}
nodeF
}
/** Creates a Neutrino node peered with the given bitcoind client, this method
* also calls [[org.bitcoins.node.Node.start() start]] to start the node
*/
def createNeutrinoNode(bitcoinds: Vector[BitcoindRpcClient])(implicit
system: ActorSystem,
chainAppConfig: ChainAppConfig,
nodeAppConfig: NodeAppConfig): Future[NeutrinoNode] = {
import system.dispatcher
val checkConfigF = Future {
assert(nodeAppConfig.nodeType == NodeType.NeutrinoNode)
}
val chainApiF = for {
_ <- checkConfigF
chainHandler <- ChainUnitTest.createChainHandler()
} yield chainHandler
val peersF = bitcoinds.map(createPeer(_))
val nodeF = for {
chainApi <- chainApiF
peers <- Future.sequence(peersF)
} yield {
val dmh = DataMessageHandler(chainApi)
NeutrinoNode(nodePeer = peers,
dataMessageHandler = dmh,
nodeConfig = nodeAppConfig,
chainConfig = chainAppConfig,

View File

@ -24,3 +24,13 @@ case class NeutrinoNodeConnectedWithBitcoind(
node: NeutrinoNode,
bitcoind: BitcoindRpcClient)
extends NodeConnectedWithBitcoind
trait NodeConnectedWithBitcoinds {
def node: Node
def bitcoinds: Vector[BitcoindRpcClient]
}
case class NeutrinoNodeConnectedWithBitcoinds(
node: NeutrinoNode,
bitcoinds: Vector[BitcoindRpcClient]
) extends NodeConnectedWithBitcoinds

View File

@ -189,6 +189,25 @@ trait CachedBitcoindPair[T <: BitcoindRpcClient]
}
}
trait CachedBitcoindPairV21
extends CachedBitcoindCollection[BitcoindV21RpcClient] {
_: BitcoinSAkkaAsyncTest =>
override val version: BitcoindVersion = BitcoindVersion.V21
lazy val clientsF: Future[NodePair[BitcoindV21RpcClient]] = {
BitcoindRpcTestUtil
.createNodePair[BitcoindV21RpcClient](version)
.map(NodePair.fromTuple)
.map { tuple =>
isClientsUsed.set(true)
val clients = cachedClients.get()
cachedClients.set(clients ++ tuple.toVector)
tuple
}
}
}
trait CachedBitcoindTriple[T <: BitcoindRpcClient]
extends CachedBitcoindCollection[T] {
_: BitcoinSAkkaAsyncTest =>
@ -204,5 +223,4 @@ trait CachedBitcoindTriple[T <: BitcoindRpcClient]
triple
}
}
}