diff --git a/app/server/src/main/scala/org/bitcoins/server/Main.scala b/app/server/src/main/scala/org/bitcoins/server/Main.scala index ae8f78bf46..5551b5d5fd 100644 --- a/app/server/src/main/scala/org/bitcoins/server/Main.scala +++ b/app/server/src/main/scala/org/bitcoins/server/Main.scala @@ -1,30 +1,22 @@ package org.bitcoins.server -import org.bitcoins.rpc.config.BitcoindInstance -import org.bitcoins.node.models.Peer -import org.bitcoins.rpc.client.common.BitcoindRpcClient -import akka.actor.ActorSystem -import scala.concurrent.Await -import scala.concurrent.duration._ -import org.bitcoins.wallet.config.WalletAppConfig -import org.bitcoins.node.config.NodeAppConfig import java.nio.file.Files -import scala.concurrent.Future -import org.bitcoins.wallet.LockedWallet -import org.bitcoins.wallet.Wallet -import org.bitcoins.wallet.api.InitializeWalletSuccess -import org.bitcoins.wallet.api.InitializeWalletError -import org.bitcoins.node.SpvNode -import org.bitcoins.chain.blockchain.ChainHandler + +import akka.actor.ActorSystem import org.bitcoins.chain.config.ChainAppConfig -import org.bitcoins.wallet.api.UnlockedWalletApi -import org.bitcoins.wallet.api.UnlockWalletSuccess -import org.bitcoins.wallet.api.UnlockWalletError -import org.bitcoins.node.networking.peer.DataMessageHandler -import org.bitcoins.node.SpvNodeCallbacks -import org.bitcoins.wallet.WalletStorage import org.bitcoins.db.AppLoggers -import org.bitcoins.chain.models.BlockHeaderDAO +import org.bitcoins.node.{SpvNode, SpvNodeCallbacks} +import org.bitcoins.node.config.NodeAppConfig +import org.bitcoins.node.models.Peer +import org.bitcoins.node.networking.peer.DataMessageHandler +import org.bitcoins.rpc.client.common.BitcoindRpcClient +import org.bitcoins.rpc.config.BitcoindInstance +import org.bitcoins.wallet.{LockedWallet, Wallet, WalletStorage} +import org.bitcoins.wallet.api._ +import org.bitcoins.wallet.config.WalletAppConfig + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ object Main extends App { implicit val conf = { @@ -107,20 +99,17 @@ object Main extends App { SpvNodeCallbacks(onTxReceived = Seq(onTX)) } - val blockheaderDAO = BlockHeaderDAO() - val chain = ChainHandler(blockheaderDAO) - SpvNode(peer, chain, bloom, callbacks).start() + SpvNode(peer, bloom, callbacks).start() } _ = logger.info(s"Starting SPV node sync") _ <- node.sync() - + chainApi <- node.chainApiFromDb() start <- { val walletRoutes = WalletRoutes(wallet, node) val nodeRoutes = NodeRoutes(node) - val chainRoutes = ChainRoutes(node.chainApi) - val server = - Server(nodeConf, // could use either of configurations - Seq(walletRoutes, nodeRoutes, chainRoutes)) + val chainRoutes = ChainRoutes(chainApi) + val server = Server(nodeConf, Seq(walletRoutes, nodeRoutes, chainRoutes)) + server.start() } } yield { diff --git a/chain-test/src/test/scala/org/bitcoins/chain/blockchain/BlockchainTest.scala b/chain-test/src/test/scala/org/bitcoins/chain/blockchain/BlockchainTest.scala index ae8f01554b..a0a8726103 100644 --- a/chain-test/src/test/scala/org/bitcoins/chain/blockchain/BlockchainTest.scala +++ b/chain-test/src/test/scala/org/bitcoins/chain/blockchain/BlockchainTest.scala @@ -12,7 +12,7 @@ class BlockchainTest extends ChainUnitTest { override def withFixture(test: OneArgAsyncTest): FutureOutcome = withBlockHeaderDAO(test) - override implicit val system: ActorSystem = ActorSystem("BlockchainTest") + implicit override val system: ActorSystem = ActorSystem("BlockchainTest") behavior of "Blockchain" @@ -26,7 +26,8 @@ class BlockchainTest extends ChainUnitTest { BlockHeaderHelper.buildNextHeader(ChainUnitTest.genesisHeaderDb) val connectTipF = Blockchain.connectTip(header = newHeader.blockHeader, - blockHeaderDAO = bhDAO) + blockHeaderDAO = bhDAO, + Vector(blockchain)) connectTipF.map { case BlockchainUpdate.Successful(_, connectedHeader) => diff --git a/chain-test/src/test/scala/org/bitcoins/chain/blockchain/ChainHandlerTest.scala b/chain-test/src/test/scala/org/bitcoins/chain/blockchain/ChainHandlerTest.scala index 9212dbdc30..4a58ba853f 100644 --- a/chain-test/src/test/scala/org/bitcoins/chain/blockchain/ChainHandlerTest.scala +++ b/chain-test/src/test/scala/org/bitcoins/chain/blockchain/ChainHandlerTest.scala @@ -120,7 +120,9 @@ class ChainHandlerTest extends ChainUnitTest { val createdF = chainHandler.blockHeaderDAO.createAll(firstThreeBlocks) createdF.flatMap { _ => - val processorF = Future.successful(chainHandler) + val blockchain = Blockchain.fromHeaders(firstThreeBlocks.reverse) + val handler = ChainHandler(chainHandler.blockHeaderDAO, blockchain) + val processorF = Future.successful(handler) // Takes way too long to do all blocks val blockHeadersToTest = blockHeaders.tail .take( diff --git a/chain/src/main/scala/org/bitcoins/chain/blockchain/Blockchain.scala b/chain/src/main/scala/org/bitcoins/chain/blockchain/Blockchain.scala index 1595f7da1e..8d0b88eb23 100644 --- a/chain/src/main/scala/org/bitcoins/chain/blockchain/Blockchain.scala +++ b/chain/src/main/scala/org/bitcoins/chain/blockchain/Blockchain.scala @@ -44,6 +44,7 @@ case class Blockchain(headers: Vector[BlockHeaderDb]) object Blockchain extends ChainVerificationLogger { def fromHeaders(headers: Vector[BlockHeaderDb]): Blockchain = { + Blockchain(headers) } @@ -60,59 +61,63 @@ object Blockchain extends ChainVerificationLogger { * we [[org.bitcoins.chain.blockchain.BlockchainUpdate.Successful successful]] connected the tip, * or [[org.bitcoins.chain.blockchain.BlockchainUpdate.Failed Failed]] to connect to a tip */ - def connectTip(header: BlockHeader, blockHeaderDAO: BlockHeaderDAO)( + def connectTip( + header: BlockHeader, + blockHeaderDAO: BlockHeaderDAO, + blockchains: Vector[Blockchain])( implicit ec: ExecutionContext, conf: ChainAppConfig): Future[BlockchainUpdate] = { logger.debug( s"Attempting to add new tip=${header.hashBE.hex} with prevhash=${header.previousBlockHashBE.hex} to chain") - //get all competing chains we have - val blockchainsF: Future[Vector[Blockchain]] = - blockHeaderDAO.getBlockchains() - - val tipResultF: Future[BlockchainUpdate] = blockchainsF.flatMap { - blockchains => - val nested: Vector[Future[BlockchainUpdate]] = blockchains.map { - blockchain => - val prevBlockHeaderOpt = - blockchain.find(_.hashBE == header.previousBlockHashBE) - prevBlockHeaderOpt match { - case None => - logger.warn( - s"No common ancestor found in the chain to connect header=${header.hashBE.hex}") - val err = TipUpdateResult.BadPreviousBlockHash(header) - val failed = BlockchainUpdate.Failed(blockchain = blockchain, - failedHeader = header, - tipUpdateFailure = err) - Future.successful(failed) - - case Some(prevBlockHeader) => - //found a header to connect to! - logger.trace( - s"Found ancestor=${prevBlockHeader.hashBE.hex} for header=${header.hashBE.hex}") - val tipResultF = - TipValidation.checkNewTip(newPotentialTip = header, - currentTip = prevBlockHeader, - blockHeaderDAO = blockHeaderDAO) - - tipResultF.map { tipResult => - tipResult match { - case TipUpdateResult.Success(headerDb) => - logger.debug( - s"Successfully verified=${headerDb.hashBE.hex}, connecting to chain") - val newChain = - Blockchain.fromHeaders(headerDb +: blockchain.headers) - BlockchainUpdate.Successful(newChain, headerDb) - case fail: TipUpdateResult.Failure => - logger.warn( - s"Could not verify header=${header.hashBE.hex}, reason=$fail") - BlockchainUpdate.Failed(blockchain, header, fail) - } - } + val tipResultF: Future[BlockchainUpdate] = { + val nested: Vector[Future[BlockchainUpdate]] = blockchains.map { + blockchain => + val prevBlockHeaderIdxOpt = + blockchain.headers.zipWithIndex.find { + case (headerDb, _) => + headerDb.hashBE == header.previousBlockHashBE } - } - parseSuccessOrFailure(nested = nested) + prevBlockHeaderIdxOpt match { + case None => + logger.warn( + s"No common ancestor found in the chain to connect to ${header.hashBE}") + val err = TipUpdateResult.BadPreviousBlockHash(header) + val failed = BlockchainUpdate.Failed(blockchain = blockchain, + failedHeader = header, + tipUpdateFailure = err) + Future.successful(failed) + + case Some((prevBlockHeader, prevHeaderIdx)) => + //found a header to connect to! + logger.debug( + s"Attempting to add new tip=${header.hashBE.hex} with prevhash=${header.previousBlockHashBE.hex} to chain") + val tipResultF = + TipValidation.checkNewTip(newPotentialTip = header, + currentTip = prevBlockHeader, + blockHeaderDAO = blockHeaderDAO) + + tipResultF.map { tipResult => + tipResult match { + case TipUpdateResult.Success(headerDb) => + logger.debug( + s"Successfully verified=${headerDb.hashBE.hex}, connecting to chain") + val oldChain = + blockchain.takeRight(blockchain.length - prevHeaderIdx) + val newChain = + Blockchain.fromHeaders(headerDb +: oldChain) + BlockchainUpdate.Successful(newChain, headerDb) + case fail: TipUpdateResult.Failure => + logger.warn( + s"Could not verify header=${header.hashBE.hex}, reason=$fail") + BlockchainUpdate.Failed(blockchain, header, fail) + } + } + } + } + parseSuccessOrFailure(nested = nested) } + tipResultF } diff --git a/chain/src/main/scala/org/bitcoins/chain/blockchain/ChainHandler.scala b/chain/src/main/scala/org/bitcoins/chain/blockchain/ChainHandler.scala index e6cf02d48e..72eb0b3e20 100644 --- a/chain/src/main/scala/org/bitcoins/chain/blockchain/ChainHandler.scala +++ b/chain/src/main/scala/org/bitcoins/chain/blockchain/ChainHandler.scala @@ -19,9 +19,11 @@ import org.bitcoins.chain.validation.TipUpdateResult * of [[org.bitcoins.chain.api.ChainApi ChainApi]], this is the entry point in to the * chain project. */ -case class ChainHandler(blockHeaderDAO: BlockHeaderDAO)( - implicit private[chain] val chainConfig: ChainAppConfig -) extends ChainApi +case class ChainHandler( + blockHeaderDAO: BlockHeaderDAO, + blockchains: Vector[Blockchain])( + implicit private[chain] val chainConfig: ChainAppConfig) + extends ChainApi with ChainVerificationLogger { override def getBlockCount(implicit ec: ExecutionContext): Future[Long] = { @@ -49,18 +51,40 @@ case class ChainHandler(blockHeaderDAO: BlockHeaderDAO)( logger.debug( s"Processing header=${header.hashBE.hex}, previousHash=${header.previousBlockHashBE.hex}") - val blockchainUpdateF = - Blockchain.connectTip(header, blockHeaderDAO) + val blockchainUpdateF = Blockchain.connectTip(header = header, + blockHeaderDAO = + blockHeaderDAO, + blockchains = blockchains) val newHandlerF = blockchainUpdateF.flatMap { - case BlockchainUpdate.Successful(_, updatedHeader) => + case BlockchainUpdate.Successful(newChain, updatedHeader) => //now we have successfully connected the header, we need to insert //it into the database val createdF = blockHeaderDAO.create(updatedHeader) createdF.map { header => logger.debug( - s"Connected new header to blockchain, height=${header.height} hash=${header.hashBE.hex}") - ChainHandler(blockHeaderDAO) + s"Connected new header to blockchain, height=${header.height} hash=${header.hashBE}") + val chainIdxOpt = blockchains.zipWithIndex.find { + case (chain, _) => + val oldTip = newChain(1) //should be safe, even with genesis header as we just connected a tip + oldTip == chain.tip + } + + val updatedChains = { + chainIdxOpt match { + case Some((_, idx)) => + logger.trace( + s"Updating chain at idx=${idx} out of competing chains=${blockchains.length} with new tip=${header.hashBE.hex}") + blockchains.updated(idx, newChain) + + case None => + logger.info( + s"New competing blockchain with tip=${newChain.tip}") + blockchains.:+(newChain) + } + } + + ChainHandler(blockHeaderDAO, updatedChains) } case BlockchainUpdate.Failed(_, _, reason) => val errMsg = @@ -117,11 +141,42 @@ case class ChainHandler(blockHeaderDAO: BlockHeaderDAO)( //this does _not_ mean that it is on the chain that has the most work //TODO: Enhance this in the future to return the "heaviest" header //https://bitcoin.org/en/glossary/block-chain - blockHeaderDAO.chainTips.map { tips => - val sorted = tips.sortBy(header => header.blockHeader.difficulty) - val hash = sorted.head.hashBE - logger.debug(s"getBestBlockHash result: hash=$hash") - hash + val groupedChains = blockchains.groupBy(_.tip.height) + val maxHeight = groupedChains.keys.max + val chains = groupedChains(maxHeight) + + val hashBE: DoubleSha256DigestBE = chains match { + case Vector() => + val errMsg = s"Did not find blockchain with height $maxHeight" + logger.error(errMsg) + throw new RuntimeException(errMsg) + case chain +: Vector() => + chain.tip.hashBE + case chain +: rest => + logger.warn( + s"We have multiple competing blockchains: ${(chain +: rest).map(_.tip.hashBE.hex).mkString(", ")}") + chain.tip.hashBE } + Future.successful(hashBE) + } +} + +object ChainHandler { + + /** Constructs a [[ChainHandler chain handler]] from the state in the database + * This gives us the guaranteed latest state we have in the database + * */ + def fromDatabase(blockHeaderDAO: BlockHeaderDAO)( + implicit ec: ExecutionContext, + chainConfig: ChainAppConfig): Future[ChainHandler] = { + val bestChainsF = blockHeaderDAO.getBlockchains() + + bestChainsF.map(chains => + new ChainHandler(blockHeaderDAO = blockHeaderDAO, blockchains = chains)) + } + + def apply(blockHeaderDAO: BlockHeaderDAO, blockchains: Blockchain)( + implicit chainConfig: ChainAppConfig): ChainHandler = { + new ChainHandler(blockHeaderDAO, Vector(blockchains)) } } diff --git a/core/src/main/scala/org/bitcoins/core/util/FutureUtil.scala b/core/src/main/scala/org/bitcoins/core/util/FutureUtil.scala index 1417cc8090..a4dcf11047 100644 --- a/core/src/main/scala/org/bitcoins/core/util/FutureUtil.scala +++ b/core/src/main/scala/org/bitcoins/core/util/FutureUtil.scala @@ -23,4 +23,21 @@ object FutureUtil { } val unit: Future[Unit] = Future.successful(()) + + /** + * Folds over the given elements sequentially in a non-blocking async way + * @param init the initialized value for the accumulator + * @param items the items we are folding over + * @param fun the function we are applying to every element that returns a future + * @return + */ + def foldLeftAsync[T, U](init: T, items: Seq[U])(fun: (T, U) => Future[T])( + implicit ec: ExecutionContext): Future[T] = { + items.foldLeft(Future.successful(init)) { + case (accumF, elem) => + accumF.flatMap { accum => + fun(accum, elem) + } + } + } } diff --git a/node-test/src/test/scala/org/bitcoins/node/BroadcastTransactionTest.scala b/node-test/src/test/scala/org/bitcoins/node/BroadcastTransactionTest.scala index dddcec94eb..5a43e426a8 100644 --- a/node-test/src/test/scala/org/bitcoins/node/BroadcastTransactionTest.scala +++ b/node-test/src/test/scala/org/bitcoins/node/BroadcastTransactionTest.scala @@ -58,16 +58,10 @@ class BroadcastTransactionTest extends BitcoinSWalletTest { address <- rpc.getNewAddress bloom <- wallet.getBloomFilter() - spv <- { val peer = Peer.fromBitcoind(rpc.instance) - val chainHandler = { - val bhDao = BlockHeaderDAO() - ChainHandler(bhDao) - } - val spv = - SpvNode(peer, chainHandler, bloomFilter = bloom) + val spv = SpvNode(peer, bloomFilter = bloom) spv.start() } _ <- spv.sync() diff --git a/node-test/src/test/scala/org/bitcoins/node/NodeWithWalletTest.scala b/node-test/src/test/scala/org/bitcoins/node/NodeWithWalletTest.scala index bc82386541..42a94170fc 100644 --- a/node-test/src/test/scala/org/bitcoins/node/NodeWithWalletTest.scala +++ b/node-test/src/test/scala/org/bitcoins/node/NodeWithWalletTest.scala @@ -90,7 +90,7 @@ class NodeWithWalletTest extends NodeUnitTest { bloom <- wallet.getBloomFilter() address <- wallet.getNewAddress() spv <- initSpv.start() - updatedBloom = spv.updateBloomFilter(address).bloomFilter + updatedBloom <- spv.updateBloomFilter(address).map(_.bloomFilter) _ <- spv.sync() _ <- NodeTestUtil.awaitSync(spv, rpc) diff --git a/node-test/src/test/scala/org/bitcoins/node/SpvNodeTest.scala b/node-test/src/test/scala/org/bitcoins/node/SpvNodeTest.scala index 6a143b2504..a702ac9969 100644 --- a/node-test/src/test/scala/org/bitcoins/node/SpvNodeTest.scala +++ b/node-test/src/test/scala/org/bitcoins/node/SpvNodeTest.scala @@ -27,9 +27,10 @@ class SpvNodeTest extends NodeUnitTest { val spvNode = spvNodeConnectedWithBitcoind.spvNode val bitcoind = spvNodeConnectedWithBitcoind.bitcoind - assert(spvNode.isConnected) - - assert(spvNode.isInitialized) + val assert1F = for { + _ <- spvNode.isConnected.map(assert(_)) + a2 <- spvNode.isInitialized.map(assert(_)) + } yield a2 val hashF: Future[DoubleSha256DigestBE] = { bitcoind.generate(1).map(_.head) @@ -37,6 +38,7 @@ class SpvNodeTest extends NodeUnitTest { //sync our spv node expecting to get that generated hash val spvSyncF = for { + _ <- assert1F _ <- hashF sync <- spvNode.sync() } yield sync @@ -62,7 +64,9 @@ class SpvNodeTest extends NodeUnitTest { //as they happen with the 'sendheaders' message //both our spv node and our bitcoind node _should_ both be at the genesis block (regtest) //at this point so no actual syncing is happening - val initSyncF = gen1F.flatMap(_ => spvNode.sync()) + val initSyncF = gen1F.flatMap { _ => + spvNode.sync() + } //start generating a block every 10 seconds with bitcoind //this should result in 5 blocks @@ -76,7 +80,8 @@ class SpvNodeTest extends NodeUnitTest { //we should expect 5 headers have been announced to us via //the send headers message. val has6BlocksF = RpcUtil.retryUntilSatisfiedF( - conditionF = () => spvNode.chainApi.getBlockCount.map(_ == 6), + conditionF = + () => spvNode.chainApiFromDb().flatMap(_.getBlockCount.map(_ == 6)), duration = 1.seconds) has6BlocksF.map(_ => succeed) diff --git a/node-test/src/test/scala/org/bitcoins/node/UpdateBloomFilterTest.scala b/node-test/src/test/scala/org/bitcoins/node/UpdateBloomFilterTest.scala index 1fc6066b8e..be76cf2f02 100644 --- a/node-test/src/test/scala/org/bitcoins/node/UpdateBloomFilterTest.scala +++ b/node-test/src/test/scala/org/bitcoins/node/UpdateBloomFilterTest.scala @@ -88,7 +88,7 @@ class UpdateBloomFilterTest extends NodeUnitTest with BeforeAndAfter { addressFromWallet <- wallet.getNewAddress() _ = addressFromWalletP.success(addressFromWallet) spv <- initSpv.start() - _ = spv.updateBloomFilter(addressFromWallet) + _ <- spv.updateBloomFilter(addressFromWallet) _ <- spv.sync() _ <- rpc.sendToAddress(addressFromWallet, 1.bitcoin) _ <- NodeTestUtil.awaitSync(spv, rpc) @@ -130,11 +130,11 @@ class UpdateBloomFilterTest extends NodeUnitTest with BeforeAndAfter { 5.bitcoin, SatoshisPerByte(100.sats)) _ = txFromWalletP.success(tx) - spvNewBloom = spv.updateBloomFilter(tx) + updatedBloom <- spv.updateBloomFilter(tx).map(_.bloomFilter) _ = spv.broadcastTransaction(tx) _ <- spv.sync() _ <- NodeTestUtil.awaitSync(spv, rpc) - _ = assert(spvNewBloom.bloomFilter.contains(tx.txId)) + _ = assert(updatedBloom.contains(tx.txId)) _ = { cancelable = Some { system.scheduler.scheduleOnce( diff --git a/node-test/src/test/scala/org/bitcoins/node/networking/P2PClientTest.scala b/node-test/src/test/scala/org/bitcoins/node/networking/P2PClientTest.scala index 5f0c458cf2..018cba9182 100644 --- a/node-test/src/test/scala/org/bitcoins/node/networking/P2PClientTest.scala +++ b/node-test/src/test/scala/org/bitcoins/node/networking/P2PClientTest.scala @@ -2,6 +2,8 @@ package org.bitcoins.node.networking import akka.io.Tcp import akka.testkit.{TestActorRef, TestProbe} +import org.bitcoins.chain.db.ChainDbManagement +import org.bitcoins.node.SpvNodeCallbacks import org.bitcoins.node.models.Peer import org.bitcoins.node.networking.peer.PeerMessageReceiver import org.bitcoins.node.networking.peer.PeerMessageReceiverState.Preconnection @@ -39,6 +41,7 @@ class P2PClientTest BitcoinSTestAppConfig.getTestConfig() implicit private val chainConf = config.chainConf implicit private val nodeConf = config.nodeConf + implicit private val timeout = akka.util.Timeout(10.seconds) implicit val np = config.chainConf.network @@ -126,8 +129,20 @@ class P2PClientTest } behavior of "P2PClient" + override def beforeAll(): Unit = { + ChainDbManagement.createHeaderTable() + } + + override def afterAll(): Unit = { + ChainDbManagement.dropHeaderTable() + super.afterAll() + } + it must "establish a tcp connection with a bitcoin node" in { - bitcoindPeerF.flatMap(remote => connectAndDisconnect(remote)) + bitcoindPeerF.flatMap { remote => + println(s"Starting test") + connectAndDisconnect(remote) + } } it must "connect to two nodes" in { @@ -152,26 +167,32 @@ class P2PClientTest def connectAndDisconnect(peer: Peer): Future[Assertion] = { val probe = TestProbe() val remote = peer.socket - val chainHandler = { - val dao = BlockHeaderDAO() - ChainHandler(dao) + val peerMessageReceiverF = + PeerMessageReceiver.preConnection(peer, SpvNodeCallbacks.empty) + + val clientActorF: Future[TestActorRef[P2PClientActor]] = + peerMessageReceiverF.map { peerMsgRecv => + TestActorRef(P2PClient.props(peer, peerMsgRecv), probe.ref) + } + val p2pClientF: Future[P2PClient] = clientActorF.map { + client: TestActorRef[P2PClientActor] => + P2PClient(client, peer) } - val peerMessageReceiver = - PeerMessageReceiver(state = Preconnection, chainHandler) - val client = - TestActorRef(P2PClient.props(peer, peerMessageReceiver), probe.ref) - client ! Tcp.Connect(remote) - - val isConnectedF = - TestAsyncUtil.retryUntilSatisfied(peerMessageReceiver.isInitialized) + val isConnectedF = for { + p2pClient <- p2pClientF + _ = p2pClient.actor ! Tcp.Connect(remote) + isConnected <- TestAsyncUtil.retryUntilSatisfiedF(p2pClient.isConnected) + } yield isConnected isConnectedF.flatMap { _ => - //disconnect here - client ! Tcp.Abort - val isDisconnectedF = - TestAsyncUtil.retryUntilSatisfied(peerMessageReceiver.isDisconnected, - duration = 1.seconds) + val isDisconnectedF = for { + p2pClient <- p2pClientF + _ = p2pClient.actor ! Tcp.Abort + isDisconnected <- TestAsyncUtil.retryUntilSatisfiedF( + p2pClient.isDisconnected, + duration = 1.seconds) + } yield isDisconnected isDisconnectedF.map { _ => succeed diff --git a/node-test/src/test/scala/org/bitcoins/node/networking/peer/PeerMessageHandlerTest.scala b/node-test/src/test/scala/org/bitcoins/node/networking/peer/PeerMessageHandlerTest.scala index fdc8e6185c..86c3b256f8 100644 --- a/node-test/src/test/scala/org/bitcoins/node/networking/peer/PeerMessageHandlerTest.scala +++ b/node-test/src/test/scala/org/bitcoins/node/networking/peer/PeerMessageHandlerTest.scala @@ -22,32 +22,22 @@ class PeerMessageHandlerTest extends NodeUnitTest { behavior of "PeerHandler" it must "be able to fully initialize a PeerMessageReceiver" in { _ => - val peerHandlerF = bitcoindPeerF.map(p => NodeUnitTest.buildPeerHandler(p)) + val peerHandlerF = + bitcoindPeerF.flatMap(p => NodeUnitTest.buildPeerHandler(p)) val peerMsgSenderF = peerHandlerF.map(_.peerMsgSender) - val peerMsgRecvF = peerHandlerF.map(_.peerMsgRecv) + val p2pClientF = peerHandlerF.map(_.p2pClient) val _ = bitcoindPeerF.flatMap(p => peerHandlerF.map(_.peerMsgSender.connect())) val isConnectedF = TestAsyncUtil.retryUntilSatisfiedF( - () => peerMsgRecvF.map(_.isConnected), + () => p2pClientF.flatMap(_.isConnected), duration = 500.millis ) - val hasVersionMsgF = isConnectedF.flatMap { _ => - TestAsyncUtil.retryUntilSatisfiedF( - conditionF = () => peerMsgRecvF.map(_.hasReceivedVersionMsg) - ) - } - - val hasVerackMsg = hasVersionMsgF.flatMap { _ => - TestAsyncUtil.retryUntilSatisfiedF( - conditionF = () => peerMsgRecvF.map(_.hasReceivedVerackMsg) - ) - } - - val isInitF = hasVerackMsg.flatMap { _ => - peerMsgRecvF.map(p => assert(p.isInitialized)) + val isInitF = isConnectedF.flatMap { _ => + TestAsyncUtil.retryUntilSatisfiedF(() => + p2pClientF.flatMap(_.isInitialized())) } val disconnectF = isInitF.flatMap { _ => @@ -56,7 +46,7 @@ class PeerMessageHandlerTest extends NodeUnitTest { val isDisconnectedF = disconnectF.flatMap { _ => TestAsyncUtil.retryUntilSatisfiedF(() => - peerMsgRecvF.map(_.isDisconnected)) + p2pClientF.flatMap(_.isDisconnected())) } diff --git a/node/src/main/scala/org/bitcoins/node/SpvNode.scala b/node/src/main/scala/org/bitcoins/node/SpvNode.scala index 9f288ec93f..defa514ed9 100644 --- a/node/src/main/scala/org/bitcoins/node/SpvNode.scala +++ b/node/src/main/scala/org/bitcoins/node/SpvNode.scala @@ -2,76 +2,115 @@ package org.bitcoins.node import akka.actor.ActorSystem import org.bitcoins.chain.api.ChainApi -import org.bitcoins.node.models.Peer +import org.bitcoins.chain.blockchain.ChainHandler +import org.bitcoins.chain.config.ChainAppConfig +import org.bitcoins.chain.models.BlockHeaderDAO +import org.bitcoins.core.bloom.BloomFilter +import org.bitcoins.core.p2p.NetworkPayload +import org.bitcoins.core.protocol.BitcoinAddress +import org.bitcoins.core.protocol.transaction.Transaction +import org.bitcoins.db.P2PLogger +import org.bitcoins.node.config.NodeAppConfig +import org.bitcoins.node.models.{ + BroadcastAbleTransaction, + BroadcastAbleTransactionDAO, + Peer +} import org.bitcoins.node.networking.P2PClient import org.bitcoins.node.networking.peer.{ PeerMessageReceiver, PeerMessageSender } import org.bitcoins.rpc.util.AsyncUtil +import slick.jdbc.SQLiteProfile import scala.concurrent.Future -import org.bitcoins.core.bloom.BloomFilter -import org.bitcoins.core.p2p.NetworkPayload -import org.bitcoins.core.protocol.transaction.Transaction -import org.bitcoins.node.models.BroadcastAbleTransaction -import org.bitcoins.node.models.BroadcastAbleTransactionDAO -import slick.jdbc.SQLiteProfile -import scala.util.Failure -import scala.util.Success -import org.bitcoins.db.P2PLogger -import org.bitcoins.node.config.NodeAppConfig -import org.bitcoins.core.protocol.BitcoinAddress +import scala.concurrent.duration.DurationInt +import scala.util.{Failure, Success} case class SpvNode( peer: Peer, - chainApi: ChainApi, bloomFilter: BloomFilter, callbacks: SpvNodeCallbacks = SpvNodeCallbacks.empty -)(implicit system: ActorSystem, nodeAppConfig: NodeAppConfig) +)( + implicit system: ActorSystem, + nodeAppConfig: NodeAppConfig, + chainAppConfig: ChainAppConfig) extends P2PLogger { import system.dispatcher + /** This implicit is required for using the [[akka.pattern.ask akka ask]] + * to query what the state of our node is, like [[isConnected isConnected]] + * */ + implicit private val timeout = akka.util.Timeout(10.seconds) private val txDAO = BroadcastAbleTransactionDAO(SQLiteProfile) - private val peerMsgRecv = - PeerMessageReceiver.newReceiver(chainApi, callbacks) + /** This is constructing a chain api from disk every time we call this method + * This involves database calls which can be slow and expensive to construct + * our [[org.bitcoins.chain.blockchain.Blockchain Blockchain]] + * */ + def chainApiFromDb(): Future[ChainApi] = { + ChainHandler.fromDatabase(BlockHeaderDAO()) + } - private val client: P2PClient = - P2PClient(context = system, peer = peer, peerMessageReceiver = peerMsgRecv) + /** Unlike our chain api, this is cached inside our spv node + * object. Internally in [[org.bitcoins.node.networking.P2PClient p2p client]] you will see that + * the [[org.bitcoins.chain.api.ChainApi chain api]] is updated inside of the p2p client + * */ + private val clientF: Future[P2PClient] = { + for { + chainApi <- chainApiFromDb() + } yield { + val peerMsgRecv: PeerMessageReceiver = + PeerMessageReceiver.newReceiver(chainApi = chainApi, + peer = peer, + callbacks = callbacks) + val p2p = P2PClient(context = system, + peer = peer, + peerMessageReceiver = peerMsgRecv) + p2p + } + } - private val peerMsgSender: PeerMessageSender = { - PeerMessageSender(client) + private val peerMsgSenderF: Future[PeerMessageSender] = { + clientF.map { client => + PeerMessageSender(client) + } } /** Updates our bloom filter to match the given TX * * @return SPV node with the updated bloom filter */ - def updateBloomFilter(transaction: Transaction): SpvNode = { - logger.info(s"Updating bloom filter with transaction=${transaction.txIdBE}") + def updateBloomFilter(transaction: Transaction): Future[SpvNode] = { + logger(nodeAppConfig).info( + s"Updating bloom filter with transaction=${transaction.txIdBE}") val newBloom = bloomFilter.update(transaction) // we could send filteradd messages, but we would // then need to calculate all the new elements in // the filter. this is easier:-) - peerMsgSender.sendFilterClearMessage() - peerMsgSender.sendFilterLoadMessage(newBloom) + val newBloomLoadF = peerMsgSenderF.map { p => + p.sendFilterClearMessage() + p.sendFilterLoadMessage(newBloom) + } - copy(bloomFilter = newBloom) + newBloomLoadF.map(_ => copy(bloomFilter = newBloom)) } /** Updates our bloom filter to match the given address * * @return SPV node with the updated bloom filter */ - def updateBloomFilter(address: BitcoinAddress): SpvNode = { - logger.info(s"Updating bloom filter with address=$address") + def updateBloomFilter(address: BitcoinAddress): Future[SpvNode] = { + logger(nodeAppConfig).info(s"Updating bloom filter with address=$address") val hash = address.hash val newBloom = bloomFilter.insert(hash) - peerMsgSender.sendFilterAddMessage(hash) + val sentFilterAddF = peerMsgSenderF.map(_.sendFilterAddMessage(hash)) - copy(bloomFilter = newBloom) + sentFilterAddF.map { _ => + copy(bloomFilter = newBloom) + } } /** @@ -80,8 +119,8 @@ case class SpvNode( * with P2P messages, therefore marked as * `private[node]`. */ - private[node] def send(msg: NetworkPayload): Unit = { - peerMsgSender.sendMsg(msg) + private[node] def send(msg: NetworkPayload): Future[Unit] = { + peerMsgSenderF.map(_.sendMsg(msg)) } /** Starts our spv node */ @@ -89,59 +128,72 @@ case class SpvNode( for { _ <- nodeAppConfig.initialize() node <- { - peerMsgSender.connect() + val isInitializedF = for { + _ <- peerMsgSenderF.map(_.connect()) + _ <- AsyncUtil.retryUntilSatisfiedF(() => isInitialized) + } yield () - val isInitializedF = - AsyncUtil.retryUntilSatisfied(peerMsgRecv.isInitialized) - - isInitializedF.failed.foreach(err => - logger.error(s"Failed to connect with peer=$peer with err=${err}")) + isInitializedF.failed.foreach( + err => + logger(nodeAppConfig).error( + s"Failed to connect with peer=$peer with err=${err}")) isInitializedF.map { _ => - logger.info(s"Our peer=${peer} has been initialized") + logger(nodeAppConfig).info(s"Our peer=${peer} has been initialized") this } } + _ <- peerMsgSenderF.map(_.sendFilterLoadMessage(bloomFilter)) } yield { - logger.info(s"Sending bloomfilter=${bloomFilter.hex} to $peer") - val _ = peerMsgSender.sendFilterLoadMessage(bloomFilter) + logger(nodeAppConfig).info( + s"Sending bloomfilter=${bloomFilter.hex} to $peer") node } } /** Stops our spv node */ def stop(): Future[SpvNode] = { - peerMsgSender.disconnect() + logger(nodeAppConfig).info(s"Stopping spv node") + val disconnectF = peerMsgSenderF.map(_.disconnect()) - val isStoppedF = AsyncUtil.retryUntilSatisfied(peerMsgRecv.isDisconnected) + val isStoppedF = disconnectF.flatMap { _ => + logger(nodeAppConfig).info(s"Awaiting disconnect") + AsyncUtil.retryUntilSatisfiedF(() => isDisconnected) + } - isStoppedF.map(_ => this) + isStoppedF.map { _ => + logger(nodeAppConfig).info(s"Spv node stopped!") + this + } } /** Broadcasts the given transaction over the P2P network */ - def broadcastTransaction(transaction: Transaction): Unit = { + def broadcastTransaction(transaction: Transaction): Future[Unit] = { val broadcastTx = BroadcastAbleTransaction(transaction) txDAO.create(broadcastTx).onComplete { case Failure(exception) => - logger.error(s"Error when writing broadcastable TX to DB", exception) + logger(nodeAppConfig) + .error(s"Error when writing broadcastable TX to DB", exception) case Success(written) => - logger.debug( + logger(nodeAppConfig).debug( s"Wrote tx=${written.transaction.txIdBE} to broadcastable table") } - logger.info(s"Sending out inv for tx=${transaction.txIdBE}") - peerMsgSender.sendInventoryMessage(transaction) + logger(nodeAppConfig).info(s"Sending out inv for tx=${transaction.txIdBE}") + peerMsgSenderF.map(_.sendInventoryMessage(transaction)) } /** Checks if we have a tcp connection with our peer */ - def isConnected: Boolean = peerMsgRecv.isConnected + def isConnected: Future[Boolean] = clientF.flatMap(_.isConnected) /** Checks if we are fully initialized with our peer and have executed the handshake * This means we can now send arbitrary messages to our peer * @return */ - def isInitialized: Boolean = peerMsgRecv.isInitialized + def isInitialized: Future[Boolean] = clientF.flatMap(_.isInitialized) + + def isDisconnected: Future[Boolean] = clientF.flatMap(_.isDisconnected) /** Starts to sync our spv node with our peer * If our local best block hash is the same as our peers @@ -151,13 +203,15 @@ case class SpvNode( */ def sync(): Future[Unit] = { for { + chainApi <- chainApiFromDb() hash <- chainApi.getBestBlockHash header <- chainApi .getHeader(hash) .map(_.get) // .get is safe since this is an internal call } yield { - peerMsgSender.sendGetHeadersMessage(hash.flip) - logger.info(s"Starting sync node, height=${header.height} hash=$hash") + peerMsgSenderF.map(_.sendGetHeadersMessage(hash.flip)) + logger(nodeAppConfig).info( + s"Starting sync node, height=${header.height} hash=$hash") } } } diff --git a/node/src/main/scala/org/bitcoins/node/networking/P2PClient.scala b/node/src/main/scala/org/bitcoins/node/networking/P2PClient.scala index 3b917e0e55..2b9970e7ad 100644 --- a/node/src/main/scala/org/bitcoins/node/networking/P2PClient.scala +++ b/node/src/main/scala/org/bitcoins/node/networking/P2PClient.scala @@ -2,21 +2,26 @@ package org.bitcoins.node.networking import akka.actor.{Actor, ActorRef, ActorRefFactory, Props} import akka.io.{IO, Tcp} -import akka.util.ByteString +import akka.pattern.AskTimeoutException +import akka.util.{ByteString, CompactByteString, Timeout} import org.bitcoins.core.config.NetworkParameters import org.bitcoins.core.p2p.NetworkMessage import org.bitcoins.core.p2p.NetworkPayload +import org.bitcoins.core.util.FutureUtil import org.bitcoins.node.models.Peer import org.bitcoins.node.networking.peer.PeerMessageReceiver import org.bitcoins.node.networking.peer.PeerMessageReceiver.NetworkMessageReceived import org.bitcoins.node.util.BitcoinSpvNodeUtil import scodec.bits.ByteVector import org.bitcoins.node.config.NodeAppConfig -import akka.util.CompactByteString + import scala.annotation.tailrec import scala.util._ import org.bitcoins.db.P2PLogger +import scala.concurrent.{Await, ExecutionContext, Future} +import scala.concurrent.duration.DurationInt + /** * This actor is responsible for creating a connection, * relaying messages and closing a connection to our peer on @@ -43,17 +48,19 @@ import org.bitcoins.db.P2PLogger * CANNOT fit in a single TCP packet. This means we must cache * the bytes and wait for the rest of them to be sent. * - * @param peerMsgHandlerReceiver The place we send messages that we successfully parsed + * @param initPeerMsgHandlerReceiver The place we send messages that we successfully parsed * from our peer on the P2P network. This is mostly likely * a [[org.bitcoins.node.networking.peer.PeerMessageSender]] */ case class P2PClientActor( peer: Peer, - peerMsgHandlerReceiver: PeerMessageReceiver + initPeerMsgHandlerReceiver: PeerMessageReceiver )(implicit config: NodeAppConfig) extends Actor with P2PLogger { + private var currentPeerMsgHandlerRecv = initPeerMsgHandlerReceiver + /** * The manager is an actor that handles the underlying low level I/O resources (selectors, channels) * and instantiates workers for specific tasks, such as listening to incoming connections. @@ -65,6 +72,8 @@ case class P2PClientActor( */ val network: NetworkParameters = config.network + private val timeout = 10.seconds + /** * TODO: this comment seems wrong? * @@ -80,8 +89,12 @@ case class P2PClientActor( self.forward(networkMsg) case message: Tcp.Message => val newUnalignedBytes = - handleTcpMessage(message, Some(peer), unalignedBytes) + Await.result(handleTcpMessage(message, Some(peer), unalignedBytes), + timeout) context.become(awaitNetworkRequest(peer, newUnalignedBytes)) + + case metaMsg: P2PClient.MetaMsg => + sender ! handleMetaMsg(metaMsg) } /** This context is responsible for initializing a tcp connection with a peer on the bitcoin p2p network */ @@ -92,16 +105,19 @@ case class P2PClientActor( //after receiving Tcp.Connected we switch to the //'awaitNetworkRequest' context. This is the main //execution loop for the Client actor - val _ = handleCommand(cmd, peer = None) + handleCommand(cmd, peer = None) case connected: Tcp.Connected => - val _ = handleEvent(connected, unalignedBytes = ByteVector.empty) - + Await.result(handleEvent(connected, unalignedBytes = ByteVector.empty), + timeout) case msg: NetworkMessage => self.forward(msg.payload) case payload: NetworkPayload => logger.error( s"Cannot send a message to our peer when we are not connected! payload=${payload} peer=${peer}") + + case metaMsg: P2PClient.MetaMsg => + sender ! handleMetaMsg(metaMsg) } /** @@ -112,14 +128,14 @@ case class P2PClientActor( private def handleTcpMessage( message: Tcp.Message, peer: Option[ActorRef], - unalignedBytes: ByteVector): ByteVector = { + unalignedBytes: ByteVector): Future[ByteVector] = { message match { case event: Tcp.Event => - handleEvent(event, unalignedBytes) + handleEvent(event, unalignedBytes = unalignedBytes) case command: Tcp.Command => handleCommand(command, peer) - unalignedBytes + Future.successful(unalignedBytes) } } @@ -128,18 +144,19 @@ case class P2PClientActor( */ private def handleEvent( event: Tcp.Event, - unalignedBytes: ByteVector): ByteVector = { + unalignedBytes: ByteVector): Future[ByteVector] = { + import context.dispatcher event match { case Tcp.Bound(localAddress) => logger.debug( s"Actor is now bound to the local address: ${localAddress}") context.parent ! Tcp.Bound(localAddress) - unalignedBytes + Future.successful(unalignedBytes) case Tcp.CommandFailed(command) => logger.debug(s"Client Command failed: ${command}") - unalignedBytes + Future.successful(unalignedBytes) case Tcp.Connected(remote, local) => logger.debug(s"Tcp connection to: ${remote}") logger.debug(s"Local: ${local}") @@ -149,22 +166,30 @@ case class P2PClientActor( //our bitcoin peer will send all messages to this actor. sender ! Tcp.Register(self) - val _ = peerMsgHandlerReceiver.connect(P2PClient(self, peer)) + val newPeerMsgRecvF: Future[PeerMessageReceiver] = + currentPeerMsgHandlerRecv.connect(P2PClient(self, peer)) + newPeerMsgRecvF.map { newPeerMsgRecv => + currentPeerMsgHandlerRecv = newPeerMsgRecv + context.become(awaitNetworkRequest(sender, unalignedBytes)) + unalignedBytes + } - context.become(awaitNetworkRequest(sender, ByteVector.empty)) - - unalignedBytes case closeCmd @ (Tcp.ConfirmedClosed | Tcp.Closed | Tcp.Aborted | Tcp.PeerClosed) => logger.debug(s"Closed command received: ${closeCmd}") //tell our peer message handler we are disconnecting - val disconnectT = peerMsgHandlerReceiver.disconnect() + val newPeerMsgRecvF = currentPeerMsgHandlerRecv.disconnect() - disconnectT.failed.foreach(err => + newPeerMsgRecvF.failed.foreach(err => logger.error(s"Failed to disconnect=${err}")) - context.stop(self) - unalignedBytes + + newPeerMsgRecvF.map { newPeerMsgRecv => + currentPeerMsgHandlerRecv = newPeerMsgRecv + context.stop(self) + unalignedBytes + } + case Tcp.Received(byteString: ByteString) => val byteVec = ByteVector(byteString.toArray) logger.debug(s"Received ${byteVec.length} TCP bytes") @@ -194,17 +219,25 @@ case class P2PClientActor( logger.trace(s"Unaligned bytes: ${newUnalignedBytes.toHex}") } - //for the messages we successfully parsed above - //send them to 'context.parent' -- this is the - //PeerMessageHandler that is responsible for - //creating this Client Actor - messages.foreach { m => - val msg = NetworkMessageReceived(m, P2PClient(self, peer)) - peerMsgHandlerReceiver.handleNetworkMessageReceived(msg) - + val f: ( + PeerMessageReceiver, + NetworkMessage) => Future[PeerMessageReceiver] = { + case (peerMsgRecv: PeerMessageReceiver, m: NetworkMessage) => + logger.trace(s"Processing message=${m}") + val msg = NetworkMessageReceived(m, P2PClient(self, peer)) + val doneF = peerMsgRecv.handleNetworkMessageReceived(msg) + doneF } - newUnalignedBytes + val newMsgReceiverF: Future[PeerMessageReceiver] = { + logger.trace(s"About to process ${messages.length} messages") + FutureUtil.foldLeftAsync(currentPeerMsgHandlerRecv, messages)(f) + } + + newMsgReceiverF.map { newMsgReceiver => + currentPeerMsgHandlerRecv = newMsgReceiver + newUnalignedBytes + } } } @@ -224,6 +257,17 @@ case class P2PClientActor( manager ! bind } + /** + * Returns the current state of our peer given the [[P2PClient.MetaMsg meta message]] + */ + private def handleMetaMsg(metaMsg: P2PClient.MetaMsg): Boolean = { + metaMsg match { + case P2PClient.IsConnected => currentPeerMsgHandlerRecv.isConnected + case P2PClient.IsInitialized => currentPeerMsgHandlerRecv.isInitialized + case P2PClient.IsDisconnected => currentPeerMsgHandlerRecv.isDisconnected + } + } + /** * Sends a network request to our peer on the network */ @@ -237,10 +281,60 @@ case class P2PClientActor( } -case class P2PClient(actor: ActorRef, peer: Peer) +case class P2PClient(actor: ActorRef, peer: Peer) extends P2PLogger { + import akka.pattern.ask + + def isConnected()( + implicit timeout: Timeout, + ec: ExecutionContext): Future[Boolean] = { + val isConnectedF = actor.ask(P2PClient.IsConnected).mapTo[Boolean] + isConnectedF.recoverWith { + case _: Throwable => Future.successful(false) + } + } + + def isInitialized()( + implicit timeout: Timeout, + ec: ExecutionContext): Future[Boolean] = { + val isInitF = actor.ask(P2PClient.IsInitialized).mapTo[Boolean] + isInitF.recoverWith { + case _: Throwable => Future.successful(false) + } + } + + def isDisconnected()( + implicit timeout: Timeout, + ec: ExecutionContext): Future[Boolean] = { + val isDisconnect: Future[Boolean] = + actor.ask(P2PClient.IsDisconnected).mapTo[Boolean] + + //this future can be failed, as we stop the P2PClientActor if we send a disconnect + //if that actor has been killed, the peer _has_ to have been disconnected + isDisconnect.recoverWith { + case _: Throwable => Future.successful(true) + } + } +} object P2PClient extends P2PLogger { + /** A message hierarchy that canbe sent to [[P2PClientActor P2P Client Actor]] + * to query about meta information of a peer + * */ + sealed trait MetaMsg + + /** A message that can be sent to [[P2PClient p2p client]] that returns true + * if the peer is connected, false if not */ + final case object IsConnected extends MetaMsg + + /** A message that can be sent to [[P2PClient p2p client]] that returns true + * if the peer is initialized (p2p handshake complete), false if not */ + final case object IsInitialized extends MetaMsg + + /** A message that can be sent to [[P2PClient p2p client]] that returns true + * if the peer is disconnected, false otherwise */ + final case object IsDisconnected extends MetaMsg + def props(peer: Peer, peerMsgHandlerReceiver: PeerMessageReceiver)( implicit config: NodeAppConfig ): Props = diff --git a/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala b/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala index 30a5163cff..ca1fd09c47 100644 --- a/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala +++ b/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala @@ -1,31 +1,23 @@ package org.bitcoins.node.networking.peer import org.bitcoins.chain.api.ChainApi -import org.bitcoins.core.util.FutureUtil -import org.bitcoins.core.p2p.{DataPayload, HeadersMessage, InventoryMessage} - -import scala.concurrent.{ExecutionContext, Future} -import org.bitcoins.core.protocol.blockchain.Block -import org.bitcoins.core.protocol.blockchain.MerkleBlock +import org.bitcoins.core.p2p.{Inventory, MsgUnassigned, TypeIdentifier, _} +import org.bitcoins.core.protocol.blockchain.{Block, MerkleBlock} import org.bitcoins.core.protocol.transaction.Transaction -import org.bitcoins.core.p2p.BlockMessage -import org.bitcoins.core.p2p.TransactionMessage -import org.bitcoins.core.p2p.MerkleBlockMessage +import org.bitcoins.db.P2PLogger import org.bitcoins.node.SpvNodeCallbacks -import org.bitcoins.core.p2p.GetDataMessage +import org.bitcoins.node.config.NodeAppConfig import org.bitcoins.node.models.BroadcastAbleTransactionDAO import slick.jdbc.SQLiteProfile -import org.bitcoins.node.config.NodeAppConfig -import org.bitcoins.core.p2p.TypeIdentifier -import org.bitcoins.core.p2p.MsgUnassigned -import org.bitcoins.db.P2PLogger -import org.bitcoins.core.p2p.Inventory + +import scala.concurrent.{ExecutionContext, Future} /** This actor is meant to handle a [[org.bitcoins.core.p2p.DataPayload DataPayload]] * that a peer to sent to us on the p2p network, for instance, if we a receive a * [[org.bitcoins.core.p2p.HeadersMessage HeadersMessage]] we should store those headers in our database */ -class DataMessageHandler(callbacks: SpvNodeCallbacks, chainHandler: ChainApi)( + +class DataMessageHandler(chainApi: ChainApi, callbacks: SpvNodeCallbacks)( implicit ec: ExecutionContext, appConfig: NodeAppConfig) extends P2PLogger { @@ -34,7 +26,7 @@ class DataMessageHandler(callbacks: SpvNodeCallbacks, chainHandler: ChainApi)( def handleDataPayload( payload: DataPayload, - peerMsgSender: PeerMessageSender): Future[Unit] = { + peerMsgSender: PeerMessageSender): Future[DataMessageHandler] = { payload match { case getData: GetDataMessage => @@ -65,17 +57,17 @@ class DataMessageHandler(callbacks: SpvNodeCallbacks, chainHandler: ChainApi)( } } - FutureUtil.unit + Future.successful(this) case HeadersMessage(count, headers) => logger.debug(s"Received headers message with ${count.toInt} headers") logger.trace( s"Received headers=${headers.map(_.hashBE.hex).mkString("[", ",", "]")}") - val chainApiF = chainHandler.processHeaders(headers) + val chainApiF = chainApi.processHeaders(headers) logger.trace(s"Requesting data for headers=${headers.length}") peerMsgSender.sendGetDataMessage(headers: _*) - chainApiF + val getHeadersF = chainApiF .map { newApi => if (headers.nonEmpty) { @@ -100,27 +92,40 @@ class DataMessageHandler(callbacks: SpvNodeCallbacks, chainHandler: ChainApi)( } } - .failed - .map { err => - logger.error(s"Error when processing headers message", err) - } + + getHeadersF.failed.map { err => + logger.error(s"Error when processing headers message", err) + } + + for { + newApi <- chainApiF + _ <- getHeadersF + } yield { + new DataMessageHandler(newApi, callbacks) + } case msg: BlockMessage => - Future { callbacks.onBlockReceived.foreach(_.apply(msg.block)) } + Future { + callbacks.onBlockReceived.foreach(_.apply(msg.block)) + this + } case TransactionMessage(tx) => val belongsToMerkle = MerkleBuffers.putTx(tx, callbacks.onMerkleBlockReceived) if (belongsToMerkle) { logger.trace( s"Transaction=${tx.txIdBE} belongs to merkleblock, not calling callbacks") - FutureUtil.unit + Future.successful(this) } else { logger.trace( s"Transaction=${tx.txIdBE} does not belong to merkleblock, processing given callbacks") - Future { callbacks.onTxReceived.foreach(_.apply(tx)) } + Future { + callbacks.onTxReceived.foreach(_.apply(tx)) + this + } } case MerkleBlockMessage(merkleBlock) => MerkleBuffers.putMerkle(merkleBlock) - FutureUtil.unit + Future.successful(this) case invMsg: InventoryMessage => handleInventoryMsg(invMsg = invMsg, peerMsgSender = peerMsgSender) } @@ -128,7 +133,7 @@ class DataMessageHandler(callbacks: SpvNodeCallbacks, chainHandler: ChainApi)( private def handleInventoryMsg( invMsg: InventoryMessage, - peerMsgSender: PeerMessageSender): Future[Unit] = { + peerMsgSender: PeerMessageSender): Future[DataMessageHandler] = { logger.info(s"Received inv=${invMsg}") val getData = GetDataMessage(invMsg.inventories.map { case Inventory(TypeIdentifier.MsgBlock, hash) => @@ -136,7 +141,7 @@ class DataMessageHandler(callbacks: SpvNodeCallbacks, chainHandler: ChainApi)( case other: Inventory => other }) peerMsgSender.sendMsg(getData) - FutureUtil.unit + Future.successful(this) } } diff --git a/node/src/main/scala/org/bitcoins/node/networking/peer/PeerHandler.scala b/node/src/main/scala/org/bitcoins/node/networking/peer/PeerHandler.scala index 9a78d2b86f..cdbced421f 100644 --- a/node/src/main/scala/org/bitcoins/node/networking/peer/PeerHandler.scala +++ b/node/src/main/scala/org/bitcoins/node/networking/peer/PeerHandler.scala @@ -1,5 +1,7 @@ package org.bitcoins.node.networking.peer +import org.bitcoins.node.networking.P2PClient + /* abstract class PeerHandler extends BitcoinSLogger { implicit val system: ActorSystem @@ -58,6 +60,4 @@ object PeerHandler { } */ -case class PeerHandler( - peerMsgRecv: PeerMessageReceiver, - peerMsgSender: PeerMessageSender) +case class PeerHandler(p2pClient: P2PClient, peerMsgSender: PeerMessageSender) diff --git a/node/src/main/scala/org/bitcoins/node/networking/peer/PeerMessageReceiver.scala b/node/src/main/scala/org/bitcoins/node/networking/peer/PeerMessageReceiver.scala index af0de27475..3c5b7f144d 100644 --- a/node/src/main/scala/org/bitcoins/node/networking/peer/PeerMessageReceiver.scala +++ b/node/src/main/scala/org/bitcoins/node/networking/peer/PeerMessageReceiver.scala @@ -1,9 +1,14 @@ package org.bitcoins.node.networking.peer import akka.actor.ActorRefFactory -import org.bitcoins.core.p2p.NetworkMessage +import org.bitcoins.chain.api.ChainApi +import org.bitcoins.chain.blockchain.ChainHandler +import org.bitcoins.chain.config.ChainAppConfig +import org.bitcoins.chain.models.BlockHeaderDAO +import org.bitcoins.core.p2p.{NetworkMessage, _} +import org.bitcoins.db.P2PLogger +import org.bitcoins.node.SpvNodeCallbacks import org.bitcoins.node.config.NodeAppConfig -import org.bitcoins.core.p2p._ import org.bitcoins.node.models.Peer import org.bitcoins.node.networking.P2PClient import org.bitcoins.node.networking.peer.PeerMessageReceiverState.{ @@ -13,10 +18,7 @@ import org.bitcoins.node.networking.peer.PeerMessageReceiverState.{ Preconnection } -import scala.util.{Failure, Success, Try} -import org.bitcoins.node.SpvNodeCallbacks -import org.bitcoins.db.P2PLogger -import org.bitcoins.chain.api.ChainApi +import scala.concurrent.Future /** * Responsible for receiving messages from a peer on the @@ -26,62 +28,57 @@ import org.bitcoins.chain.api.ChainApi * [[org.bitcoins.core.p2p.NetworkMessage NetworkMessage]] */ class PeerMessageReceiver( - state: PeerMessageReceiverState, - callbacks: SpvNodeCallbacks, - chainHandler: ChainApi -)(implicit ref: ActorRefFactory, nodeAppConfig: NodeAppConfig) + dataMessageHandler: DataMessageHandler, + val state: PeerMessageReceiverState, + peer: Peer, + callbacks: SpvNodeCallbacks +)( + implicit ref: ActorRefFactory, + nodeAppConfig: NodeAppConfig, + chainAppConfig: ChainAppConfig) extends P2PLogger { - import ref.dispatcher - //TODO: Really bad to just modify this internal state - //not async safe at all - private var internalState: PeerMessageReceiverState = state - - /** The peer we are connected to. */ - private var peerOpt: Option[Peer] = None - /** This method is called when we have received * a [[akka.io.Tcp.Connected]] message from our peer * This means we have opened a Tcp connection, * but have NOT started the handshake * This method will initiate the handshake */ - protected[networking] def connect(client: P2PClient): Try[Unit] = { + protected[networking] def connect( + client: P2PClient): Future[PeerMessageReceiver] = { - internalState match { + state match { case bad @ (_: Initializing | _: Normal | _: Disconnected) => - Failure( + Future.failed( new RuntimeException(s"Cannot call connect when in state=${bad}") ) case Preconnection => - peerOpt = Some(client.peer) - - logger.info(s"Connection established with peer=${peerOpt.get}") + logger(nodeAppConfig).info(s"Connection established with peer=${peer}") val newState = Preconnection.toInitializing(client) - val _ = toState(newState) - val peerMsgSender = PeerMessageSender(client) peerMsgSender.sendVersionMessage() - Success(()) + val newRecv = toState(newState) + + Future.successful(newRecv) } } - protected[networking] def disconnect(): Try[Unit] = { - - internalState match { + protected[networking] def disconnect(): Future[PeerMessageReceiver] = { + logger(nodeAppConfig).trace(s"Disconnecting with internalstate=${state}") + state match { case bad @ (_: Initializing | _: Disconnected | Preconnection) => - Failure( + Future.failed( new RuntimeException( - s"Cannot disconnect from peer=${peerOpt.get} when in state=${bad}") + s"Cannot disconnect from peer=${peer} when in state=${bad}") ) case good: Normal => - logger.debug(s"Disconnected bitcoin peer=${peerOpt.get}") + logger(nodeAppConfig).debug(s"Disconnected bitcoin peer=${peer}") val newState = Disconnected( clientConnectP = good.clientConnectP, clientDisconnectP = good.clientDisconnectP.success(()), @@ -89,37 +86,40 @@ class PeerMessageReceiver( verackMsgP = good.verackMsgP ) - val _ = toState(newState) - Success(()) + val newRecv = toState(newState) + + Future.successful(newRecv) } } - def isConnected: Boolean = internalState.isConnected + private[networking] def isConnected: Boolean = state.isConnected - def isDisconnected: Boolean = internalState.isDisconnected + private[networking] def isDisconnected: Boolean = state.isDisconnected - def hasReceivedVersionMsg: Boolean = - internalState.hasReceivedVersionMsg.isCompleted + private[networking] def hasReceivedVersionMsg: Boolean = + state.hasReceivedVersionMsg.isCompleted - def hasReceivedVerackMsg: Boolean = - internalState.hasReceivedVerackMsg.isCompleted + private[networking] def hasReceivedVerackMsg: Boolean = + state.hasReceivedVerackMsg.isCompleted - def isInitialized: Boolean = internalState.isInitialized + private[networking] def isInitialized: Boolean = state.isInitialized def handleNetworkMessageReceived( - networkMsgRecv: PeerMessageReceiver.NetworkMessageReceived): Unit = { + networkMsgRecv: PeerMessageReceiver.NetworkMessageReceived): Future[ + PeerMessageReceiver] = { val client = networkMsgRecv.client //create a way to send a response if we need too val peerMsgSender = PeerMessageSender(client) - logger.debug( - s"Received message=${networkMsgRecv.msg.header.commandName} from peer=${client.peer} ") + logger(nodeAppConfig).debug( + s"Received message=${networkMsgRecv.msg.header.commandName} from peer=${client.peer} state=${state} ") networkMsgRecv.msg.payload match { case controlPayload: ControlPayload => - handleControlPayload(payload = controlPayload, sender = peerMsgSender) - () + val peerMsgRecvF = + handleControlPayload(payload = controlPayload, sender = peerMsgSender) + peerMsgRecvF case dataPayload: DataPayload => handleDataPayload(payload = dataPayload, sender = peerMsgSender) } @@ -135,12 +135,15 @@ class PeerMessageReceiver( */ private def handleDataPayload( payload: DataPayload, - sender: PeerMessageSender): Unit = { - val dataMsgHandler = new DataMessageHandler(callbacks, chainHandler) + sender: PeerMessageSender): Future[PeerMessageReceiver] = { //else it means we are receiving this data payload from a peer, //we need to handle it - dataMsgHandler.handleDataPayload(payload, sender) - () + val newDataMessageHandlerF = + dataMessageHandler.handleDataPayload(payload, sender) + + newDataMessageHandlerF.map { handler => + new PeerMessageReceiver(handler, state, peer, callbacks) + } } /** @@ -152,21 +155,21 @@ class PeerMessageReceiver( */ private def handleControlPayload( payload: ControlPayload, - sender: PeerMessageSender): Try[Unit] = { + sender: PeerMessageSender): Future[PeerMessageReceiver] = { payload match { case versionMsg: VersionMessage => - logger.trace( - s"Received versionMsg=${versionMsg}from peer=${peerOpt.get}") + logger(nodeAppConfig).trace( + s"Received versionMsg=${versionMsg}from peer=${peer}") - internalState match { + state match { case bad @ (_: Disconnected | _: Normal | Preconnection) => - Failure( + Future.failed( new RuntimeException( s"Cannot handle version message while in state=${bad}")) case good: Initializing => - internalState = good.withVersionMsg(versionMsg) + val newState = good.withVersionMsg(versionMsg) sender.sendVerackMessage() @@ -174,45 +177,52 @@ class PeerMessageReceiver( //we don't want to have to request them manually sender.sendHeadersMessage() - Success(()) + val newRecv = toState(newState) + + Future.successful(newRecv) } case VerAckMessage => - internalState match { + state match { case bad @ (_: Disconnected | _: Normal | Preconnection) => - Failure( + Future.failed( new RuntimeException( s"Cannot handle version message while in state=${bad}")) case good: Initializing => - internalState = good.toNormal(VerAckMessage) - Success(()) + val newState = good.toNormal(VerAckMessage) + val newRecv = toState(newState) + Future.successful(newRecv) } case ping: PingMessage => sender.sendPong(ping) - Success(()) + Future.successful(this) case SendHeadersMessage => //not implemented as of now - Success(()) + Future.successful(this) case _: AddrMessage => - Success(()) + Future.successful(this) case _ @(_: FilterAddMessage | _: FilterLoadMessage | FilterClearMessage) => - Success(()) + Future.successful(this) case _ @(GetAddrMessage | _: PongMessage) => - Success(()) + Future.successful(this) case _: RejectMessage => - Success(()) + Future.successful(this) case _: FeeFilterMessage => - Success(()) + Future.successful(this) } } - private def toState(state: PeerMessageReceiverState): Unit = { - logger.debug( - s"PeerMessageReceiver changing state, oldState=$internalState, newState=$state") - internalState = state + /** Transitions our PeerMessageReceiver to a new state */ + def toState(newState: PeerMessageReceiverState): PeerMessageReceiver = { + new PeerMessageReceiver( + dataMessageHandler = dataMessageHandler, + state = newState, + peer = peer, + callbacks = callbacks + ) } } @@ -231,20 +241,52 @@ object PeerMessageReceiver { def apply( state: PeerMessageReceiverState, - chainHandler: ChainApi, - callbacks: SpvNodeCallbacks = SpvNodeCallbacks.empty)( + chainApi: ChainApi, + peer: Peer, + callbacks: SpvNodeCallbacks)( implicit ref: ActorRefFactory, - nodeAppConfig: NodeAppConfig): PeerMessageReceiver = { - new PeerMessageReceiver(state, callbacks, chainHandler) + nodeAppConfig: NodeAppConfig, + chainAppConfig: ChainAppConfig + ): PeerMessageReceiver = { + import ref.dispatcher + val dataHandler = new DataMessageHandler(chainApi, callbacks) + new PeerMessageReceiver(dataMessageHandler = dataHandler, + state = state, + peer = peer, + callbacks = callbacks) } - def newReceiver( - chainHandler: ChainApi, - callbacks: SpvNodeCallbacks = SpvNodeCallbacks.empty)( + /** + * Creates a peer message receiver that is ready + * to be connected to a peer. This can be given to [[org.bitcoins.node.networking.P2PClient.props() P2PClient]] + * to connect to a peer on the network + */ + def preConnection(peer: Peer, callbacks: SpvNodeCallbacks)( + implicit ref: ActorRefFactory, + nodeAppConfig: NodeAppConfig, + chainAppConfig: ChainAppConfig + ): Future[PeerMessageReceiver] = { + import ref.dispatcher + val blockHeaderDAO = BlockHeaderDAO() + val chainHandlerF = + ChainHandler.fromDatabase(blockHeaderDAO) + for { + chainHandler <- chainHandlerF + } yield { + PeerMessageReceiver(state = PeerMessageReceiverState.fresh(), + chainApi = chainHandler, + peer = peer, + callbacks = callbacks) + } + } + + def newReceiver(chainApi: ChainApi, peer: Peer, callbacks: SpvNodeCallbacks)( implicit nodeAppConfig: NodeAppConfig, + chainAppConfig: ChainAppConfig, ref: ActorRefFactory): PeerMessageReceiver = { - new PeerMessageReceiver(state = PeerMessageReceiverState.fresh(), - callbacks, - chainHandler) + PeerMessageReceiver(state = PeerMessageReceiverState.fresh(), + chainApi = chainApi, + peer = peer, + callbacks = callbacks) } } diff --git a/testkit/src/main/scala/org/bitcoins/testkit/chain/ChainUnitTest.scala b/testkit/src/main/scala/org/bitcoins/testkit/chain/ChainUnitTest.scala index 542f0439a3..64ccc88d7e 100644 --- a/testkit/src/main/scala/org/bitcoins/testkit/chain/ChainUnitTest.scala +++ b/testkit/src/main/scala/org/bitcoins/testkit/chain/ChainUnitTest.scala @@ -184,7 +184,8 @@ trait ChainUnitTest def createPopulatedChainHandler(): Future[ChainHandler] = { for { blockHeaderDAO <- ChainUnitTest.createPopulatedBlockHeaderDAO() - } yield ChainHandler(blockHeaderDAO = blockHeaderDAO) + chainHandler <- ChainHandler.fromDatabase(blockHeaderDAO = blockHeaderDAO) + } yield chainHandler } def withPopulatedChainHandler(test: OneArgAsyncTest): FutureOutcome = { @@ -194,15 +195,17 @@ trait ChainUnitTest def createChainHandlerWithBitcoindZmq( bitcoind: BitcoindRpcClient): Future[(ChainHandler, ZMQSubscriber)] = { - val (chainHandler, genesisHeaderF) = + val handlerWithGenesisHeaderF = ChainUnitTest.setupHeaderTableWithGenesisHeader() + val chainHandlerF = handlerWithGenesisHeaderF.map(_._1) + val zmqRawBlockUriOpt: Option[InetSocketAddress] = bitcoind.instance.zmqConfig.rawBlock val handleRawBlock: ByteVector => Unit = { bytes: ByteVector => val block = Block.fromBytes(bytes) - chainHandler.processHeader(block.blockHeader) + chainHandlerF.flatMap(_.processHeader(block.blockHeader)) () } @@ -217,15 +220,19 @@ trait ChainUnitTest zmqSubscriber.start() Thread.sleep(1000) - genesisHeaderF.map(_ => (chainHandler, zmqSubscriber)) + for { + chainHandler <- chainHandlerF + } yield (chainHandler, zmqSubscriber) } def createChainApiWithBitcoindRpc( bitcoind: BitcoindRpcClient): Future[BitcoindChainHandlerViaRpc] = { - val (handler, genesisHeaderF) = + val handlerWithGenesisHeaderF = ChainUnitTest.setupHeaderTableWithGenesisHeader() - genesisHeaderF.map { _ => + val chainHandlerF = handlerWithGenesisHeaderF.map(_._1) + + chainHandlerF.map { handler => chain.fixture.BitcoindChainHandlerViaRpc(bitcoind, handler) } @@ -304,16 +311,21 @@ object ChainUnitTest extends BitcoinSLogger { def createChainHandler()( implicit ec: ExecutionContext, appConfig: ChainAppConfig): Future[ChainHandler] = { - val (chainHandler, genesisHeaderF) = setupHeaderTableWithGenesisHeader() - genesisHeaderF.map(_ => chainHandler) + val handlerWithGenesisHeaderF = + ChainUnitTest.setupHeaderTableWithGenesisHeader() + + val chainHandlerF = handlerWithGenesisHeaderF.map(_._1) + chainHandlerF } def createBlockHeaderDAO()( implicit ec: ExecutionContext, appConfig: ChainAppConfig): Future[BlockHeaderDAO] = { - val (chainHandler, genesisHeaderF) = setupHeaderTableWithGenesisHeader() + val handlerWithGenesisHeaderF = + ChainUnitTest.setupHeaderTableWithGenesisHeader() - genesisHeaderF.map(_ => chainHandler.blockHeaderDAO) + val chainHandlerF = handlerWithGenesisHeaderF.map(_._1) + chainHandlerF.map(_.blockHeaderDAO) } /** Creates and populates BlockHeaderTable with block headers 562375 to 571375 */ @@ -365,17 +377,21 @@ object ChainUnitTest extends BitcoinSLogger { dbHeaders = dbHeaders, batchesSoFar = Vector.empty) - val chainHandler = ChainUnitTest.makeChainHandler() + val chainHandlerF = ChainUnitTest.makeChainHandler() val insertedF = tableSetupF.flatMap { _ => batchedDbHeaders.foldLeft( Future.successful[Vector[BlockHeaderDb]](Vector.empty)) { case (fut, batch) => - fut.flatMap(_ => chainHandler.blockHeaderDAO.createAll(batch)) + for { + _ <- fut + chainHandler <- chainHandlerF + headers <- chainHandler.blockHeaderDAO.createAll(batch) + } yield headers } } - insertedF.map(_ => chainHandler.blockHeaderDAO) + insertedF.flatMap(_ => chainHandlerF.map(_.blockHeaderDAO)) } } @@ -398,24 +414,31 @@ object ChainUnitTest extends BitcoinSLogger { /** Creates the [[org.bitcoins.chain.models.BlockHeaderTable]] and inserts the genesis header */ def setupHeaderTableWithGenesisHeader()( implicit ec: ExecutionContext, - appConfig: ChainAppConfig): (ChainHandler, Future[BlockHeaderDb]) = { + appConfig: ChainAppConfig): Future[(ChainHandler, BlockHeaderDb)] = { val tableSetupF = setupHeaderTable() - val chainHandler = makeChainHandler() + val chainHandlerF = makeChainHandler() val genesisHeaderF = tableSetupF.flatMap { _ => - chainHandler.blockHeaderDAO.create(genesisHeaderDb) + for { + chainHandler <- chainHandlerF + genHeader <- chainHandler.blockHeaderDAO.create(genesisHeaderDb) + } yield genHeader } - (chainHandler, genesisHeaderF) + for { + genHeader <- genesisHeaderF + chainHandler <- makeChainHandler() + } yield (chainHandler, genHeader) } def makeChainHandler()( implicit appConfig: ChainAppConfig, - ec: ExecutionContext): ChainHandler = { + ec: ExecutionContext): Future[ChainHandler] = { lazy val blockHeaderDAO = BlockHeaderDAO() - ChainHandler(blockHeaderDAO) + ChainHandler.fromDatabase(blockHeaderDAO = blockHeaderDAO) + } } diff --git a/testkit/src/main/scala/org/bitcoins/testkit/node/NodeTestUtil.scala b/testkit/src/main/scala/org/bitcoins/testkit/node/NodeTestUtil.scala index 88fdcaaa88..466ad8a451 100644 --- a/testkit/src/main/scala/org/bitcoins/testkit/node/NodeTestUtil.scala +++ b/testkit/src/main/scala/org/bitcoins/testkit/node/NodeTestUtil.scala @@ -107,9 +107,9 @@ abstract class NodeTestUtil extends BitcoinSLogger { def isSameBestHash(node: SpvNode, rpc: BitcoindRpcClient)( implicit ec: ExecutionContext): Future[Boolean] = { val hashF = rpc.getBestBlockHash - val spvHashF = node.chainApi.getBestBlockHash for { - spvBestHash <- spvHashF + chainApi <- node.chainApiFromDb() + spvBestHash <- chainApi.getBestBlockHash hash <- hashF } yield { spvBestHash == hash @@ -122,9 +122,8 @@ abstract class NodeTestUtil extends BitcoinSLogger { def isSameBlockCount(spv: SpvNode, rpc: BitcoindRpcClient)( implicit ec: ExecutionContext): Future[Boolean] = { val rpcCountF = rpc.getBlockCount - val spvCountF = spv.chainApi.getBlockCount for { - spvCount <- spvCountF + spvCount <- spv.chainApiFromDb().flatMap(_.getBlockCount) rpcCount <- rpcCountF } yield rpcCount == spvCount } diff --git a/testkit/src/main/scala/org/bitcoins/testkit/node/NodeUnitTest.scala b/testkit/src/main/scala/org/bitcoins/testkit/node/NodeUnitTest.scala index 669c62ae10..2f35e7d100 100644 --- a/testkit/src/main/scala/org/bitcoins/testkit/node/NodeUnitTest.scala +++ b/testkit/src/main/scala/org/bitcoins/testkit/node/NodeUnitTest.scala @@ -6,6 +6,7 @@ import akka.actor.ActorSystem import org.bitcoins.chain.blockchain.ChainHandler import org.bitcoins.chain.config.ChainAppConfig import org.bitcoins.chain.models.BlockHeaderDAO +import org.bitcoins.chain.api.ChainApi import org.bitcoins.core.config.NetworkParameters import org.bitcoins.core.util.BitcoinSLogger import org.bitcoins.db.AppConfig @@ -15,6 +16,7 @@ import org.bitcoins.node.models.Peer import org.bitcoins.node.networking.peer.{ PeerHandler, PeerMessageReceiver, + PeerMessageReceiverState, PeerMessageSender } import org.bitcoins.rpc.client.common.BitcoindRpcClient @@ -154,6 +156,38 @@ object NodeUnitTest extends BitcoinSLogger { wallet: UnlockedWalletApi, bitcoindRpc: BitcoindRpcClient) + def buildPeerMessageReceiver(chainApi: ChainApi, peer: Peer)( + implicit appConfig: BitcoinSAppConfig, + system: ActorSystem): Future[PeerMessageReceiver] = { + val receiver = + PeerMessageReceiver(state = PeerMessageReceiverState.fresh(), + chainApi = chainApi, + peer = peer, + callbacks = SpvNodeCallbacks.empty) + Future.successful(receiver) + } + + def buildPeerHandler(peer: Peer)( + implicit nodeAppConfig: NodeAppConfig, + chainAppConfig: ChainAppConfig, + system: ActorSystem): Future[PeerHandler] = { + import system.dispatcher + val chainApiF = ChainUnitTest.createChainHandler() + val peerMsgReceiverF = chainApiF.flatMap { _ => + PeerMessageReceiver.preConnection(peer, SpvNodeCallbacks.empty) + } + //the problem here is the 'self', this needs to be an ordinary peer message handler + //that can handle the handshake + val peerHandlerF = for { + peerMsgReceiver <- peerMsgReceiverF + client = NodeTestUtil.client(peer, peerMsgReceiver) + peerMsgSender = PeerMessageSender(client) + } yield PeerHandler(client, peerMsgSender) + + peerHandlerF + + } + def destroySpvNode(spvNode: SpvNode)( implicit config: BitcoinSAppConfig, ec: ExecutionContext): Future[Unit] = { @@ -165,6 +199,7 @@ object NodeUnitTest extends BitcoinSLogger { spvNodeConnectedWithBitcoind: SpvNodeConnectedWithBitcoind)( implicit system: ActorSystem, appConfig: BitcoinSAppConfig): Future[Unit] = { + logger.debug(s"Beggining tear down of spv node connected with bitcoind") import system.dispatcher val spvNode = spvNodeConnectedWithBitcoind.spvNode val bitcoind = spvNodeConnectedWithBitcoind.bitcoind @@ -174,7 +209,10 @@ object NodeUnitTest extends BitcoinSLogger { for { _ <- spvNodeDestroyF _ <- bitcoindDestroyF - } yield () + } yield { + logger.debug(s"Done with teardown of spv node connected with bitcoind!") + () + } } /** Creates a spv node, a funded bitcoin-s wallet, all of which are connected to bitcoind */ @@ -211,31 +249,16 @@ object NodeUnitTest extends BitcoinSLogger { } - def buildPeerMessageReceiver()( - implicit system: ActorSystem, + def buildPeerMessageReceiver(chainApi: ChainApi, peer: Peer)( + implicit nodeAppConfig: NodeAppConfig, chainAppConfig: ChainAppConfig, - nodeAppConfig: NodeAppConfig): PeerMessageReceiver = { - import system.dispatcher - val dao = BlockHeaderDAO() - val chainHandler = ChainHandler(dao) + system: ActorSystem): Future[PeerMessageReceiver] = { val receiver = - PeerMessageReceiver.newReceiver(chainHandler, SpvNodeCallbacks.empty) - receiver - } - - def buildPeerHandler(peer: Peer)( - implicit system: ActorSystem, - chainAppConfig: ChainAppConfig, - nodeAppConfig: NodeAppConfig): PeerHandler = { - val peerMsgReceiver = buildPeerMessageReceiver() - //the problem here is the 'self', this needs to be an ordinary peer message handler - //that can handle the handshake - val peerMsgSender: PeerMessageSender = { - val client = NodeTestUtil.client(peer, peerMsgReceiver) - PeerMessageSender(client) - } - PeerHandler(peerMsgReceiver, peerMsgSender) - + PeerMessageReceiver(state = PeerMessageReceiverState.fresh(), + chainApi = chainApi, + peer = peer, + callbacks = SpvNodeCallbacks.empty) + Future.successful(receiver) } def peerSocketAddress( @@ -256,10 +279,9 @@ object NodeUnitTest extends BitcoinSLogger { val chainApiF = ChainUnitTest.createChainHandler() val peer = createPeer(bitcoind) for { - chainApi <- chainApiF + _ <- chainApiF } yield { SpvNode(peer = peer, - chainApi = chainApi, bloomFilter = NodeTestUtil.emptyBloomFilter, callbacks = callbacks) }