From 98ace6f14e46689b2dceda66c0fd41943a23360b Mon Sep 17 00:00:00 2001 From: Chris Stewart Date: Sat, 9 Jan 2021 09:33:37 -0600 Subject: [PATCH] 2021 01 02 issue 2457 (#2461) * WIP * Get neutrino node with wallet 'receive information about received payments' working again * Fix compile * Remove initial sync logic from test case * Remove sync logic in NeutrinoNodeWithWallet test cases * Improve logging and rename a few things * WIP2 * WIP3 * Get NeutrinoNodeWithWallet tests working * Implement WalletSync, which allows you to sync a wallet from a arbitrary data source * Get all tests passing again * Use spv.appConfig in DataMessageHandlerTest rather than caching the config * Modify cleanup to hopefully get CI passing * Fix postgres tests by cleaning the table during the destroy phase of the test fixture. This is needed because the same postgres database is shared between tests in the same test suite * Revert logback-test.xml * Get sqlite/postgres tests passing pt 2 * syncHeight -> syncDescriptorOpt() * Add case for genesis block hash in WalletSync * Fix SpvNodeWithWallet test case to actually test spv functionality * Remove nodeCallbacks parameters, callbacks should be registered on nodeAppConfig --- .../chain/blockchain/ChainHandler.scala | 4 +- .../bitcoins/core/api/chain/ChainApi.scala | 2 +- .../scala/org/bitcoins/db/DbManagement.scala | 11 ++ .../org/bitcoins/node/NeutrinoNodeTest.scala | 3 +- .../node/NeutrinoNodeWithWalletTest.scala | 84 +++------- .../bitcoins/node/SpvNodeWithWalletTest.scala | 96 +++--------- .../bitcoins/node/UpdateBloomFilterTest.scala | 10 +- .../peer/DataMessageHandlerTest.scala | 35 +++-- .../peer/PeerMessageHandlerTest.scala | 9 ++ .../org/bitcoins/node/NeutrinoNode.scala | 4 +- .../main/scala/org/bitcoins/node/Node.scala | 3 +- .../scala/org/bitcoins/node/SpvNode.scala | 7 +- .../org/bitcoins/testkit/chain/SyncUtil.scala | 18 ++- .../testkit/fixtures/NodeDAOFixture.scala | 2 + .../bitcoins/testkit/node/NodeUnitTest.scala | 148 ++++++++++-------- .../testkit/wallet/BitcoinSWalletTest.scala | 112 ++++++++++--- .../testkit/wallet/FundWalletUtil.scala | 9 +- .../bitcoins/wallet/sync/WalletSyncTest.scala | 45 ++++++ .../internal/TransactionProcessing.scala | 7 +- .../org/bitcoins/wallet/sync/WalletSync.scala | 88 +++++++++++ 20 files changed, 449 insertions(+), 248 deletions(-) create mode 100644 wallet-test/src/test/scala/org/bitcoins/wallet/sync/WalletSyncTest.scala create mode 100644 wallet/src/main/scala/org/bitcoins/wallet/sync/WalletSync.scala diff --git a/chain/src/main/scala/org/bitcoins/chain/blockchain/ChainHandler.scala b/chain/src/main/scala/org/bitcoins/chain/blockchain/ChainHandler.scala index 9d710f39bf..848f0e3e4c 100644 --- a/chain/src/main/scala/org/bitcoins/chain/blockchain/ChainHandler.scala +++ b/chain/src/main/scala/org/bitcoins/chain/blockchain/ChainHandler.scala @@ -392,7 +392,7 @@ class ChainHandler( (minHeightOpt, maxHeightOpt) match { case (Some(minHeight), Some(maxHeight)) => logger.info( - s"Processed filters headers from height=${minHeight.height} to ${maxHeight.height}. Best hash=${maxHeight.blockHashBE.hex}") + s"Processed filters headers from height=${minHeight.height} to ${maxHeight.height}. Best filterheader.blockHash=${maxHeight.blockHashBE.hex}") this // Should never have the case where we have (Some, None) or (None, Some) because that means the vec would be both empty and non empty case (_, _) => @@ -439,7 +439,7 @@ class ChainHandler( (minHeightOpt, maxHeightOpt) match { case (Some(minHeight), Some(maxHeight)) => logger.info( - s"Processed filters from height=${minHeight.height} to ${maxHeight.height}. Best hash=${maxHeight.blockHashBE.hex}") + s"Processed filters from height=${minHeight.height} to ${maxHeight.height}. Best filter.blockHash=${maxHeight.blockHashBE.hex}") this // Should never have the case where we have (Some, None) or (None, Some) because that means the vec would be both empty and non empty case (_, _) => diff --git a/core/src/main/scala/org/bitcoins/core/api/chain/ChainApi.scala b/core/src/main/scala/org/bitcoins/core/api/chain/ChainApi.scala index cd36b6d9cf..53d63e6344 100644 --- a/core/src/main/scala/org/bitcoins/core/api/chain/ChainApi.scala +++ b/core/src/main/scala/org/bitcoins/core/api/chain/ChainApi.scala @@ -79,7 +79,7 @@ trait ChainApi extends ChainQueryApi { * Generates a filter header range in form of (startHeight, stopHash) by the given stop hash. */ def nextFilterHeaderBatchRange( - stopHash: DoubleSha256DigestBE, + prevStopHash: DoubleSha256DigestBE, batchSize: Int): Future[Option[FilterSyncMarker]] /** diff --git a/db-commons/src/main/scala/org/bitcoins/db/DbManagement.scala b/db-commons/src/main/scala/org/bitcoins/db/DbManagement.scala index 76744b3f4a..8265f813ca 100644 --- a/db-commons/src/main/scala/org/bitcoins/db/DbManagement.scala +++ b/db-commons/src/main/scala/org/bitcoins/db/DbManagement.scala @@ -170,6 +170,17 @@ trait DbManagement extends BitcoinSLogger { } } + /** Runs flyway clean + * + * WARNING: + * THIS DELETES ALL DATA IN THE DATABASE, YOU PROBABLY DON'T WANT THIS UNLESS YOU ARE USING TESTS + * + * @see https://flywaydb.org/documentation/command/clean + */ + private[bitcoins] def clean(): Unit = { + flyway.clean() + } + private def createDbFileIfDNE(): Unit = { //should add a check in here that we are using sqlite if (!Files.exists(appConfig.dbPath)) { diff --git a/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeTest.scala b/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeTest.scala index 41df319c96..69ab3bcd7e 100644 --- a/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeTest.scala +++ b/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeTest.scala @@ -30,7 +30,6 @@ class NeutrinoNodeTest extends NodeUnitTest { override def withFixture(test: OneArgAsyncTest): FutureOutcome = withNeutrinoNodeFundedWalletBitcoind(test, - callbacks, getBIP39PasswordOpt(), Some(BitcoindVersion.Experimental)) @@ -64,6 +63,8 @@ class NeutrinoNodeTest extends NodeUnitTest { it must "receive notification that a block occurred on the p2p network" taggedAs UsesExperimentalBitcoind in { nodeConnectedWithBitcoind: NeutrinoNodeFundedWalletBitcoind => val node = nodeConnectedWithBitcoind.node + + val _ = node.nodeAppConfig.addCallbacks(callbacks) val bitcoind = nodeConnectedWithBitcoind.bitcoindRpc val assert1F = for { diff --git a/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeWithWalletTest.scala b/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeWithWalletTest.scala index 9329e8e9fe..15fb6ebf5b 100644 --- a/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeWithWalletTest.scala +++ b/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeWithWalletTest.scala @@ -17,10 +17,9 @@ import org.bitcoins.testkit.node.{ NodeUnitTest } import org.bitcoins.testkit.wallet.BitcoinSWalletTest -import org.bitcoins.wallet.Wallet import org.scalatest.FutureOutcome -import scala.concurrent.{Future, Promise} +import scala.concurrent.Future class NeutrinoNodeWithWalletTest extends NodeUnitTest { @@ -39,55 +38,20 @@ class NeutrinoNodeWithWalletTest extends NodeUnitTest { } else { withNeutrinoNodeFundedWalletBitcoind( test = test, - nodeCallbacks = nodeCallbacks, bip39PasswordOpt = getBIP39PasswordOpt(), versionOpt = Some(BitcoindVersion.Experimental) - ) + )(system, config) } } - private var walletP: Promise[Wallet] = Promise() - private var walletF: Future[Wallet] = walletP.future - - after { - //reset assertion after a test runs, because we - //are doing mutation to work around our callback - //limitations, we can't currently modify callbacks - //after a NeutrinoNode is constructed :-( - walletP = Promise() - walletF = walletP.future - } - val TestAmount = 1.bitcoin val FeeRate = SatoshisPerByte(10.sats) val TestFees: Satoshis = 2230.sats - def nodeCallbacks: NodeCallbacks = { - val onBlock: OnBlockReceived = { block => - for { - wallet <- walletF - _ <- wallet.processBlock(block) - } yield () - } - val onCompactFilters: OnCompactFiltersReceived = { blockFilters => - for { - wallet <- walletF - _ <- wallet.processCompactFilters(blockFilters) - } yield () - } - - NodeCallbacks( - onBlockReceived = Vector(onBlock), - onCompactFiltersReceived = Vector(onCompactFilters) - ) - } - it must "receive information about received payments" taggedAs UsesExperimentalBitcoind in { param => val NeutrinoNodeFundedWalletBitcoind(node, wallet, bitcoind, _) = param - walletP.success(wallet) - def condition( expectedConfirmedAmount: CurrencyUnit, expectedUnconfirmedAmount: CurrencyUnit, @@ -107,43 +71,50 @@ class NeutrinoNodeWithWalletTest extends NodeUnitTest { } } + //default wallet utxos are 3BTC, 2BTC, 1BTC + //our coin selection algorithm seems to be selecting + //the 3BTC utxo to spend, so we should have + //confirmed = 2BTC + 1BTC + //unconfirmed = 3 BTC - TestAmount - TestFees val condition1 = () => { condition( - expectedConfirmedAmount = 0.sats, + expectedConfirmedAmount = 3.bitcoin, expectedUnconfirmedAmount = - BitcoinSWalletTest.expectedDefaultAmt - TestAmount - TestFees, + 3.bitcoin - TestAmount - TestFees, expectedUtxos = 3, expectedAddresses = 7 ) } + + //this is just sending TestAmount back to us + //so everything should stay the same as above + //expected we should have received TestAmount back + //and have 1 more address/utxo val condition2 = { () => condition( - expectedConfirmedAmount = 0.sats, + expectedConfirmedAmount = 3.bitcoin, expectedUnconfirmedAmount = - BitcoinSWalletTest.expectedDefaultAmt - TestFees, + (3.bitcoin - TestAmount - TestFees) + TestAmount, expectedUtxos = 4, expectedAddresses = 8 ) } for { - _ <- node.sync() - _ <- NodeTestUtil.awaitSync(node, bitcoind) - _ <- NodeTestUtil.awaitCompactFilterHeadersSync(node, bitcoind) - _ <- NodeTestUtil.awaitCompactFiltersSync(node, bitcoind) - // send addr <- bitcoind.getNewAddress _ <- wallet.sendToAddress(addr, TestAmount, Some(FeeRate)) + _ <- wallet.getConfirmedBalance() + _ <- wallet.getUnconfirmedBalance() + _ <- wallet.getBalance() _ <- bitcoind.getNewAddress .flatMap(bitcoind.generateToAddress(1, _)) + _ <- wallet.getConfirmedBalance() _ <- NodeTestUtil.awaitSync(node, bitcoind) _ <- NodeTestUtil.awaitCompactFiltersSync(node, bitcoind) - _ <- AsyncUtil.awaitConditionF(condition1) - // receive address <- wallet.getNewAddress() txId <- bitcoind.sendToAddress(address, TestAmount) @@ -153,10 +124,9 @@ class NeutrinoNodeWithWalletTest extends NodeUnitTest { bitcoind.getNewAddress .flatMap(bitcoind.generateToAddress(1, _)) _ <- NodeTestUtil.awaitSync(node, bitcoind) + _ <- NodeTestUtil.awaitCompactFilterHeadersSync(node, bitcoind) _ <- NodeTestUtil.awaitCompactFiltersSync(node, bitcoind) - _ <- AsyncUtil.awaitConditionF(condition2) - // assert we got the full tx with witness data txs <- wallet.listTransactions() } yield assert(txs.exists(_.transaction == expectedTx)) @@ -166,8 +136,6 @@ class NeutrinoNodeWithWalletTest extends NodeUnitTest { param => val NeutrinoNodeFundedWalletBitcoind(node, wallet, bitcoind, _) = param - walletP.success(wallet) - def generateBlock() = for { _ <- @@ -184,10 +152,6 @@ class NeutrinoNodeWithWalletTest extends NodeUnitTest { val output = TransactionOutput(sats, spk) for { - _ <- node.sync() - _ <- NodeTestUtil.awaitSync(node, bitcoind) - _ <- NodeTestUtil.awaitCompactFiltersSync(node, bitcoind) - // start watching _ <- wallet.watchScriptPubKey(spk) @@ -209,8 +173,6 @@ class NeutrinoNodeWithWalletTest extends NodeUnitTest { param => val NeutrinoNodeFundedWalletBitcoind(node, wallet, bitcoind, _) = param - walletP.success(wallet) - def condition(): Future[Boolean] = { for { balance <- wallet.getBalance() @@ -231,10 +193,6 @@ class NeutrinoNodeWithWalletTest extends NodeUnitTest { _ = assert(addresses.size == 6) _ = assert(utxos.size == 3) - _ <- node.sync() - _ <- NodeTestUtil.awaitSync(node, bitcoind) - _ <- NodeTestUtil.awaitCompactFiltersSync(node, bitcoind) - address <- wallet.getNewAddress() _ <- bitcoind diff --git a/node-test/src/test/scala/org/bitcoins/node/SpvNodeWithWalletTest.scala b/node-test/src/test/scala/org/bitcoins/node/SpvNodeWithWalletTest.scala index 697eb42175..46464acae2 100644 --- a/node-test/src/test/scala/org/bitcoins/node/SpvNodeWithWalletTest.scala +++ b/node-test/src/test/scala/org/bitcoins/node/SpvNodeWithWalletTest.scala @@ -1,9 +1,8 @@ package org.bitcoins.node -import akka.actor.Cancellable -import org.bitcoins.core.api.wallet.WalletApi import org.bitcoins.core.currency._ -import org.bitcoins.crypto.{DoubleSha256Digest, DoubleSha256DigestBE} +import org.bitcoins.crypto.DoubleSha256DigestBE +import org.bitcoins.rpc.util.AsyncUtil import org.bitcoins.server.BitcoinSAppConfig import org.bitcoins.testkit.BitcoinSTestAppConfig import org.bitcoins.testkit.node.{ @@ -11,11 +10,10 @@ import org.bitcoins.testkit.node.{ NodeUnitTest, SpvNodeFundedWalletBitcoind } +import org.bitcoins.wallet.Wallet import org.scalatest.FutureOutcome -import org.scalatest.exceptions.TestFailedException -import scala.concurrent.duration._ -import scala.concurrent.{Future, Promise} +import scala.concurrent.Future class SpvNodeWithWalletTest extends NodeUnitTest { @@ -26,89 +24,47 @@ class SpvNodeWithWalletTest extends NodeUnitTest { override type FixtureParam = SpvNodeFundedWalletBitcoind def withFixture(test: OneArgAsyncTest): FutureOutcome = { - withSpvNodeFundedWalletBitcoind(test, callbacks, getBIP39PasswordOpt()) + withSpvNodeFundedWalletBitcoind(test, getBIP39PasswordOpt()) } - private val assertionP: Promise[Boolean] = Promise() - - private val expectedTxIdP: Promise[DoubleSha256Digest] = Promise() - private val expectedTxIdF: Future[DoubleSha256Digest] = expectedTxIdP.future - - private val walletP: Promise[WalletApi] = Promise() - private val walletF: Future[WalletApi] = walletP.future - val amountFromBitcoind = 1.bitcoin - def callbacks: NodeCallbacks = { - val onTx: OnTxReceived = { tx => - for { - expectedTxId <- expectedTxIdF - wallet <- walletF - } yield { - if (expectedTxId == tx.txId) { - for { - prevBalance <- wallet.getUnconfirmedBalance() - _ <- wallet.processTransaction(tx, None) - balance <- wallet.getUnconfirmedBalance() - } yield { - val result = balance == prevBalance + amountFromBitcoind - assertionP.success(result) - } - } - () - } - } - NodeCallbacks( - onTxReceived = Vector(onTx) - ) - } - it must "load a bloom filter and receive information about received payments" in { param => val SpvNodeFundedWalletBitcoind(spv, wallet, rpc, _) = param - walletP.success(wallet) - - var cancellable: Option[Cancellable] = None - - def processWalletTx(tx: DoubleSha256DigestBE): DoubleSha256DigestBE = { - expectedTxIdP.success(tx.flip) - // how long we're waiting for a tx notify before failing the test - val delay = 25.seconds - - val failTest: Runnable = new Runnable { - override def run = { - if (!assertionP.isCompleted) { - val msg = - s"Did not receive sent transaction within $delay" - logger.error(msg) - assertionP.failure(new TestFailedException(msg, 0)) - } - } - } - - logger.debug(s"Setting timeout for receiving TX through node") - cancellable = Some(system.scheduler.scheduleOnce(delay, failTest)) - tx - } - for { _ <- wallet.getBloomFilter() address <- wallet.getNewAddress() updatedBloom <- spv.updateBloomFilter(address).map(_.bloomFilter) - _ <- spv.sync() - _ <- NodeTestUtil.awaitSync(spv, rpc) ourTxid <- rpc .sendToAddress(address, amountFromBitcoind) - .map(processWalletTx) + _ <- rpc.generateToAddress(1, junkAddress) + _ <- spv.sync() + _ <- NodeTestUtil.awaitSync(spv, rpc) ourTx <- rpc.getTransaction(ourTxid) + _ = assert(updatedBloom.isRelevant(ourTx.hex)) + //wait for bitcoind to propagate us a merkle block + //and transactions associated with it + //eventually we should have the tx + //added to our wallet when this occurs + _ <- AsyncUtil.retryUntilSatisfiedF(() => + walletContainsTx(wallet, ourTx.txid)) + } yield { + succeed + } + } - result <- assertionP.future - } yield assert(result) - + private def walletContainsTx( + wallet: Wallet, + txid: DoubleSha256DigestBE): Future[Boolean] = { + val txOptF = wallet.findTransaction(txid) + for { + txOpt <- txOptF + } yield txOpt.isDefined } } diff --git a/node-test/src/test/scala/org/bitcoins/node/UpdateBloomFilterTest.scala b/node-test/src/test/scala/org/bitcoins/node/UpdateBloomFilterTest.scala index d137b85125..708173853d 100644 --- a/node-test/src/test/scala/org/bitcoins/node/UpdateBloomFilterTest.scala +++ b/node-test/src/test/scala/org/bitcoins/node/UpdateBloomFilterTest.scala @@ -3,7 +3,11 @@ package org.bitcoins.node import org.bitcoins.core.currency._ import org.bitcoins.server.BitcoinSAppConfig import org.bitcoins.testkit.BitcoinSTestAppConfig -import org.bitcoins.testkit.node.{NodeUnitTest, SpvNodeFundedWalletBitcoind} +import org.bitcoins.testkit.node.{ + NodeTestUtil, + NodeUnitTest, + SpvNodeFundedWalletBitcoind +} import org.scalatest.{BeforeAndAfter, FutureOutcome} class UpdateBloomFilterTest extends NodeUnitTest with BeforeAndAfter { @@ -15,7 +19,7 @@ class UpdateBloomFilterTest extends NodeUnitTest with BeforeAndAfter { override type FixtureParam = SpvNodeFundedWalletBitcoind def withFixture(test: OneArgAsyncTest): FutureOutcome = { - withSpvNodeFundedWalletBitcoind(test, NodeCallbacks.empty, None) + withSpvNodeFundedWalletBitcoind(test, None) } it must "update the bloom filter with a TX" in { param => @@ -31,7 +35,7 @@ class UpdateBloomFilterTest extends NodeUnitTest with BeforeAndAfter { // this should confirm our TX // since we updated the bloom filter hash <- rpc.generateToAddress(1, junkAddress).map(_.head) - + _ <- NodeTestUtil.awaitSync(spv, rpc) merkleBlock <- rpc.getTxOutProof(Vector(tx.txIdBE), hash) txs <- rpc.verifyTxOutProof(merkleBlock) diff --git a/node-test/src/test/scala/org/bitcoins/node/networking/peer/DataMessageHandlerTest.scala b/node-test/src/test/scala/org/bitcoins/node/networking/peer/DataMessageHandlerTest.scala index e9bcf1948a..b26ccfe85a 100644 --- a/node-test/src/test/scala/org/bitcoins/node/networking/peer/DataMessageHandlerTest.scala +++ b/node-test/src/test/scala/org/bitcoins/node/networking/peer/DataMessageHandlerTest.scala @@ -52,10 +52,13 @@ class DataMessageHandlerTest extends NodeUnitTest { payload1 = MerkleBlockMessage(merkleBlock) payload2 = TransactionMessage(tx) - callbacks = NodeCallbacks.onMerkleBlockReceived(callback) - _ = nodeConfig.addCallbacks(callbacks) + nodeCallbacks = NodeCallbacks(onMerkleBlockReceived = Vector(callback)) + _ = spv.nodeAppConfig.addCallbacks(nodeCallbacks) - dataMessageHandler = DataMessageHandler(genesisChainApi) + dataMessageHandler = + DataMessageHandler(genesisChainApi)(spv.executionContext, + spv.nodeAppConfig, + spv.chainConfig) _ <- dataMessageHandler.handleDataPayload(payload1, sender) _ <- dataMessageHandler.handleDataPayload(payload2, sender) result <- resultP.future @@ -82,10 +85,13 @@ class DataMessageHandlerTest extends NodeUnitTest { payload = BlockMessage(block) - callbacks = NodeCallbacks.onBlockReceived(callback) - _ = nodeConfig.addCallbacks(callbacks) + nodeCallbacks = NodeCallbacks.onBlockReceived(callback) + _ = spv.nodeAppConfig.addCallbacks(nodeCallbacks) - dataMessageHandler = DataMessageHandler(genesisChainApi) + dataMessageHandler = + DataMessageHandler(genesisChainApi)(spv.executionContext, + spv.nodeAppConfig, + spv.chainConfig) _ <- dataMessageHandler.handleDataPayload(payload, sender) result <- resultP.future } yield assert(result == block) @@ -113,9 +119,13 @@ class DataMessageHandlerTest extends NodeUnitTest { payload = HeadersMessage(CompactSizeUInt.one, Vector(header)) callbacks = NodeCallbacks.onBlockHeadersReceived(callback) - _ = nodeConfig.addCallbacks(callbacks) - dataMessageHandler = DataMessageHandler(genesisChainApi) + _ = spv.nodeAppConfig.addCallbacks(callbacks) + dataMessageHandler = + DataMessageHandler(genesisChainApi)(spv.executionContext, + spv.nodeAppConfig, + spv.chainConfig) + _ <- dataMessageHandler.handleDataPayload(payload, sender) result <- resultP.future } yield assert(result == Vector(header)) @@ -143,10 +153,13 @@ class DataMessageHandlerTest extends NodeUnitTest { payload = CompactFilterMessage(FilterType.Basic, hash.flip, filter.filter.bytes) - callbacks = NodeCallbacks.onCompactFilterReceived(callback) - _ = nodeConfig.addCallbacks(callbacks) + nodeCallbacks = NodeCallbacks.onCompactFilterReceived(callback) + _ = spv.nodeAppConfig.addCallbacks(nodeCallbacks) + dataMessageHandler = + DataMessageHandler(genesisChainApi)(spv.executionContext, + spv.nodeAppConfig, + spv.chainConfig) - dataMessageHandler = DataMessageHandler(genesisChainApi) _ <- dataMessageHandler.handleDataPayload(payload, sender) result <- resultP.future } yield assert(result == Vector((hash.flip, filter.filter))) diff --git a/node-test/src/test/scala/org/bitcoins/node/networking/peer/PeerMessageHandlerTest.scala b/node-test/src/test/scala/org/bitcoins/node/networking/peer/PeerMessageHandlerTest.scala index 76f7a49641..903f3bd5a7 100644 --- a/node-test/src/test/scala/org/bitcoins/node/networking/peer/PeerMessageHandlerTest.scala +++ b/node-test/src/test/scala/org/bitcoins/node/networking/peer/PeerMessageHandlerTest.scala @@ -1,5 +1,7 @@ package org.bitcoins.node.networking.peer +import org.bitcoins.chain.config.ChainAppConfig +import org.bitcoins.node.config.NodeAppConfig import org.bitcoins.server.BitcoinSAppConfig import org.bitcoins.testkit.BitcoinSTestAppConfig import org.bitcoins.testkit.async.TestAsyncUtil @@ -23,6 +25,13 @@ class PeerMessageHandlerTest extends NodeUnitTest { test(()) } + private val cachedConfig = config + + implicit private lazy val nodeAppConfig: NodeAppConfig = cachedConfig.nodeConf + + implicit protected lazy val chainConfig: ChainAppConfig = + cachedConfig.chainConf + behavior of "PeerHandler" it must "be able to fully initialize a PeerMessageReceiver" in { _ => diff --git a/node/src/main/scala/org/bitcoins/node/NeutrinoNode.scala b/node/src/main/scala/org/bitcoins/node/NeutrinoNode.scala index a40ee17b81..a2e027c87d 100644 --- a/node/src/main/scala/org/bitcoins/node/NeutrinoNode.scala +++ b/node/src/main/scala/org/bitcoins/node/NeutrinoNode.scala @@ -30,7 +30,7 @@ case class NeutrinoNode( override val peer: Peer = nodePeer - override def start(): Future[Node] = { + override def start(): Future[NeutrinoNode] = { val res = for { node <- super.start() chainApi <- chainApiFromDb() @@ -39,7 +39,7 @@ case class NeutrinoNode( _ <- peerMsgSender.sendGetCompactFilterCheckPointMessage( stopHash = bestHash.flip) } yield { - node + node.asInstanceOf[NeutrinoNode] } res.failed.foreach(logger.error("Cannot start Neutrino node", _)) diff --git a/node/src/main/scala/org/bitcoins/node/Node.scala b/node/src/main/scala/org/bitcoins/node/Node.scala index 076ec5e3e6..e4e775513a 100644 --- a/node/src/main/scala/org/bitcoins/node/Node.scala +++ b/node/src/main/scala/org/bitcoins/node/Node.scala @@ -156,10 +156,9 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger { def stop(): Future[Node] = { logger.info(s"Stopping node") val disconnectF = for { - _ <- nodeAppConfig.stop() - _ <- chainAppConfig.stop() p <- peerMsgSenderF disconnect <- p.disconnect() + _ <- nodeAppConfig.stop() } yield disconnect val start = System.currentTimeMillis() diff --git a/node/src/main/scala/org/bitcoins/node/SpvNode.scala b/node/src/main/scala/org/bitcoins/node/SpvNode.scala index e85bed968c..75bf2d51a8 100644 --- a/node/src/main/scala/org/bitcoins/node/SpvNode.scala +++ b/node/src/main/scala/org/bitcoins/node/SpvNode.scala @@ -10,6 +10,7 @@ import org.bitcoins.core.protocol.{BitcoinAddress, BlockStamp} import org.bitcoins.core.util.Mutable import org.bitcoins.node.config.NodeAppConfig import org.bitcoins.node.models.Peer +import org.bitcoins.rpc.util.AsyncUtil import scala.concurrent.{Future, Promise} @@ -73,15 +74,15 @@ case class SpvNode( sentFilterAddF.map(_ => this) } - override def start(): Future[Node] = { + override def start(): Future[SpvNode] = { for { node <- super.start() peerMsgSender <- peerMsgSenderF + _ <- AsyncUtil.retryUntilSatisfiedF(() => isConnected) _ <- peerMsgSender.sendFilterLoadMessage(bloomFilter) } yield { logger.info(s"Sending bloomfilter=${bloomFilter.hex} to $peer") - logger.info(s"Sending bloomfilter=${bloomFilter.hex} to $peer") - node + node.asInstanceOf[SpvNode] } } diff --git a/testkit/src/main/scala/org/bitcoins/testkit/chain/SyncUtil.scala b/testkit/src/main/scala/org/bitcoins/testkit/chain/SyncUtil.scala index a33f74578d..97857c8769 100644 --- a/testkit/src/main/scala/org/bitcoins/testkit/chain/SyncUtil.scala +++ b/testkit/src/main/scala/org/bitcoins/testkit/chain/SyncUtil.scala @@ -5,13 +5,14 @@ import org.bitcoins.commons.jsonmodels.bitcoind.GetBlockFilterResult import org.bitcoins.core.api.node import org.bitcoins.core.api.node.{NodeApi, NodeChainQueryApi} import org.bitcoins.core.gcs.FilterType -import org.bitcoins.core.protocol.blockchain.BlockHeader +import org.bitcoins.core.protocol.blockchain.{Block, BlockHeader} import org.bitcoins.core.protocol.transaction.Transaction import org.bitcoins.core.util.{BitcoinSLogger, FutureUtil} import org.bitcoins.crypto.{DoubleSha256Digest, DoubleSha256DigestBE} import org.bitcoins.rpc.client.common.BitcoindRpcClient import org.bitcoins.rpc.client.v19.BitcoindV19RpcClient import org.bitcoins.wallet.Wallet +import org.bitcoins.wallet.sync.WalletSync import scala.concurrent.{ExecutionContext, Future} @@ -45,6 +46,11 @@ abstract class SyncUtil extends BitcoinSLogger { } } + def getBlockFunc( + bitcoind: BitcoindRpcClient): DoubleSha256DigestBE => Future[Block] = { + bitcoind.getBlockRaw(_: DoubleSha256DigestBE) + } + def getNodeApi(bitcoindRpcClient: BitcoindRpcClient)(implicit ec: ExecutionContext): NodeApi = { new NodeApi { @@ -161,6 +167,16 @@ abstract class SyncUtil extends BitcoinSLogger { SyncUtil.getNodeApiWalletCallback(bitcoind, walletF) node.NodeChainQueryApi(nodeApi, chainQuery) } + + def syncWallet(wallet: Wallet, bitcoind: BitcoindRpcClient)(implicit + ec: ExecutionContext): Future[Wallet] = { + WalletSync.sync( + wallet = wallet, + getBlockHeaderFunc = SyncUtil.getBlockHeaderFunc(bitcoind), + getBestBlockHashFunc = SyncUtil.getBestBlockHashFunc(bitcoind), + getBlock = SyncUtil.getBlockFunc(bitcoind) + ) + } } object SyncUtil extends SyncUtil diff --git a/testkit/src/main/scala/org/bitcoins/testkit/fixtures/NodeDAOFixture.scala b/testkit/src/main/scala/org/bitcoins/testkit/fixtures/NodeDAOFixture.scala index e495a27376..0602c00296 100644 --- a/testkit/src/main/scala/org/bitcoins/testkit/fixtures/NodeDAOFixture.scala +++ b/testkit/src/main/scala/org/bitcoins/testkit/fixtures/NodeDAOFixture.scala @@ -12,6 +12,8 @@ case class NodeDAOs(txDAO: BroadcastAbleTransactionDAO) /** Provides a fixture where all DAOs used by the node projects are provided */ trait NodeDAOFixture extends NodeUnitTest { + implicit protected lazy val nodeConfig: NodeAppConfig = config.nodeConf + private lazy val daos = { val tx = BroadcastAbleTransactionDAO() NodeDAOs(tx) diff --git a/testkit/src/main/scala/org/bitcoins/testkit/node/NodeUnitTest.scala b/testkit/src/main/scala/org/bitcoins/testkit/node/NodeUnitTest.scala index 0f5b077525..28bff58403 100644 --- a/testkit/src/main/scala/org/bitcoins/testkit/node/NodeUnitTest.scala +++ b/testkit/src/main/scala/org/bitcoins/testkit/node/NodeUnitTest.scala @@ -48,7 +48,7 @@ import org.bitcoins.wallet.WalletCallbacks import org.scalatest.FutureOutcome import scala.concurrent.duration._ -import scala.concurrent.{Await, ExecutionContext, Future} +import scala.concurrent.{ExecutionContext, Future} trait NodeUnitTest extends BitcoinSFixture with EmbeddedPg { @@ -58,19 +58,12 @@ trait NodeUnitTest extends BitcoinSFixture with EmbeddedPg { } override def afterAll(): Unit = { - Await.result(config.chainConf.stop(), 1.minute) - Await.result(config.nodeConf.stop(), 1.minute) - Await.result(config.walletConf.stop(), 1.minute) super[EmbeddedPg].afterAll() } /** Wallet config with data directory set to user temp directory */ implicit protected def config: BitcoinSAppConfig - implicit protected lazy val chainConfig: ChainAppConfig = config.chainConf - - implicit protected lazy val nodeConfig: NodeAppConfig = config.nodeConf - implicit override lazy val np: NetworkParameters = config.nodeConf.network lazy val startedBitcoindF = BitcoindRpcTestUtil.startedBitcoindRpcClient() @@ -180,11 +173,10 @@ trait NodeUnitTest extends BitcoinSFixture with EmbeddedPg { val nodeBuilder: () => Future[SpvNode] = { () => require(appConfig.nodeType == NodeType.SpvNode) for { - node <- NodeUnitTest.createSpvNode( - emptyPeer, - NodeCallbacks.empty, - start = false)(system, appConfig.chainConf, appConfig.nodeConf) - _ <- appConfig.start() + node <- NodeUnitTest.createSpvNode(emptyPeer)(system, + appConfig.chainConf, + appConfig.nodeConf) + } yield node } @@ -192,7 +184,8 @@ trait NodeUnitTest extends BitcoinSFixture with EmbeddedPg { build = nodeBuilder, destroy = (_: Node) => { for { - _ <- ChainUnitTest.destroyAllTables() + _ <- ChainUnitTest.destroyAllTables()(appConfig.chainConf, + system.dispatcher) _ <- appConfig.stop() } yield () } @@ -209,11 +202,12 @@ trait NodeUnitTest extends BitcoinSFixture with EmbeddedPg { require(appConfig.nodeType == NodeType.SpvNode) for { bitcoind <- BitcoinSFixture.createBitcoind(versionOpt) - node <- NodeUnitTest.createSpvNode(createPeer(bitcoind), - NodeCallbacks.empty)( + node <- NodeUnitTest.createSpvNode(createPeer(bitcoind))( system, appConfig.chainConf, appConfig.nodeConf) + started <- node.start() + _ <- NodeUnitTest.syncSpvNode(started, bitcoind) } yield SpvNodeConnectedWithBitcoind(node, bitcoind) } @@ -235,9 +229,12 @@ trait NodeUnitTest extends BitcoinSFixture with EmbeddedPg { BitcoinSFixture .createBitcoindWithFunds(Some(V19)) .map(_.asInstanceOf[BitcoindV19RpcClient]) - node <- NodeUnitTest.createSpvNode( - createPeer(bitcoind), - NodeCallbacks.empty)(system, appConfig.chainConf, appConfig.nodeConf) + node <- NodeUnitTest.createSpvNode(createPeer(bitcoind))( + system, + appConfig.chainConf, + appConfig.nodeConf) + started <- node.start() + _ <- NodeUnitTest.syncSpvNode(started, bitcoind) } yield SpvNodeConnectedWithBitcoindV19(node, bitcoind) } @@ -258,10 +255,9 @@ trait NodeUnitTest extends BitcoinSFixture with EmbeddedPg { require(appConfig.nodeType == NodeType.NeutrinoNode) for { bitcoind <- BitcoinSFixture.createBitcoind(versionOpt) - node <- NodeUnitTest.createNeutrinoNode(bitcoind, NodeCallbacks.empty)( - system, - appConfig.chainConf, - appConfig.nodeConf) + node <- NodeUnitTest.createNeutrinoNode(bitcoind)(system, + appConfig.chainConf, + appConfig.nodeConf) } yield NeutrinoNodeConnectedWithBitcoind(node, bitcoind) } makeDependentFixture( @@ -273,16 +269,13 @@ trait NodeUnitTest extends BitcoinSFixture with EmbeddedPg { def withSpvNodeFundedWalletBitcoind( test: OneArgAsyncTest, - callbacks: NodeCallbacks, bip39PasswordOpt: Option[String])(implicit system: ActorSystem, appConfig: BitcoinSAppConfig): FutureOutcome = { makeDependentFixture( build = () => - NodeUnitTest.createSpvNodeFundedWalletBitcoind(nodeCallbacks = - callbacks, - bip39PasswordOpt = + NodeUnitTest.createSpvNodeFundedWalletBitcoind(bip39PasswordOpt = bip39PasswordOpt, versionOpt = Option(V18), walletCallbacks = @@ -296,21 +289,18 @@ trait NodeUnitTest extends BitcoinSFixture with EmbeddedPg { def withNeutrinoNodeFundedWalletBitcoind( test: OneArgAsyncTest, - nodeCallbacks: NodeCallbacks, bip39PasswordOpt: Option[String], versionOpt: Option[BitcoindVersion] = None, walletCallbacks: WalletCallbacks = WalletCallbacks.empty)(implicit system: ActorSystem, appConfig: BitcoinSAppConfig): FutureOutcome = { - makeDependentFixture( build = () => NodeUnitTest .createNeutrinoNodeFundedWalletBitcoind( - nodeCallbacks, - bip39PasswordOpt, - versionOpt, - walletCallbacks)(system, appConfig), + bip39PasswordOpt = bip39PasswordOpt, + versionOpt = versionOpt, + walletCallbacks = walletCallbacks)(system, appConfig), destroy = NodeUnitTest.destroyNodeFundedWalletBitcoind( _: NodeFundedWalletBitcoind)(system, appConfig) )(test) @@ -376,11 +366,8 @@ object NodeUnitTest extends P2PLogger { } - def destroyNode(node: Node)(implicit - config: BitcoinSAppConfig, - ec: ExecutionContext): Future[Unit] = { + def destroyNode(node: Node)(implicit ec: ExecutionContext): Future[Unit] = { for { - _ <- ChainUnitTest.destroyAllTables() _ <- node.stop() } yield () } @@ -396,6 +383,8 @@ object NodeUnitTest extends P2PLogger { val resultF = for { _ <- destroyNode(node) _ <- ChainUnitTest.destroyBitcoind(bitcoind) + _ = cleanTables(appConfig) + _ <- appConfig.stop() } yield { logger.debug(s"Done with teardown of node connected with bitcoind!") () @@ -406,7 +395,6 @@ object NodeUnitTest extends P2PLogger { /** Creates a spv node, a funded bitcoin-s wallet, all of which are connected to bitcoind */ def createSpvNodeFundedWalletBitcoind( - nodeCallbacks: NodeCallbacks, walletCallbacks: WalletCallbacks, bip39PasswordOpt: Option[String], versionOpt: Option[BitcoindVersion] = None)(implicit @@ -416,15 +404,25 @@ object NodeUnitTest extends P2PLogger { require(appConfig.nodeType == NodeType.SpvNode) for { bitcoind <- BitcoinSFixture.createBitcoindWithFunds(versionOpt) - node <- createSpvNode(createPeer(bitcoind), nodeCallbacks) + node <- createSpvNode(createPeer(bitcoind)) fundedWallet <- BitcoinSWalletTest.fundedWalletAndBitcoind( bitcoind, node, node, bip39PasswordOpt, walletCallbacks) + spvCallbacks = + BitcoinSWalletTest.createSpvNodeCallbacksForWallet(fundedWallet.wallet) + _ = appConfig.nodeConf.addCallbacks(spvCallbacks) + walletBloomFilter <- fundedWallet.wallet.getBloomFilter() + withBloomFilter = node.setBloomFilter(walletBloomFilter) + startedNodeWithBloomFilter <- withBloomFilter.start() + _ <- syncSpvNode(startedNodeWithBloomFilter, bitcoind) + //callbacks are executed asynchronously, which is how we fund the wallet + //so we need to wait until the wallet balances are correct + _ <- BitcoinSWalletTest.awaitWalletBalances(fundedWallet) } yield { - SpvNodeFundedWalletBitcoind(node = node, + SpvNodeFundedWalletBitcoind(node = startedNodeWithBloomFilter, wallet = fundedWallet.wallet, bitcoindRpc = fundedWallet.bitcoind, bip39PasswordOpt) @@ -433,7 +431,6 @@ object NodeUnitTest extends P2PLogger { /** Creates a neutrino node, a funded bitcoin-s wallet, all of which are connected to bitcoind */ def createNeutrinoNodeFundedWalletBitcoind( - nodeCallbacks: NodeCallbacks, bip39PasswordOpt: Option[String], versionOpt: Option[BitcoindVersion], walletCallbacks: WalletCallbacks)(implicit @@ -444,15 +441,20 @@ object NodeUnitTest extends P2PLogger { require(appConfig.nodeType == NodeType.NeutrinoNode) for { bitcoind <- BitcoinSFixture.createBitcoindWithFunds(versionOpt) - node <- createNeutrinoNode(bitcoind, nodeCallbacks) + node <- createNeutrinoNode(bitcoind) fundedWallet <- BitcoinSWalletTest.fundedWalletAndBitcoind( bitcoindRpcClient = bitcoind, nodeApi = node, chainQueryApi = node, bip39PasswordOpt = bip39PasswordOpt, walletCallbacks = walletCallbacks) + startedNode <- node.start() + syncedNode <- syncNeutrinoNode(startedNode, bitcoind) + //callbacks are executed asynchronously, which is how we fund the wallet + //so we need to wait until the wallet balances are correct + _ <- BitcoinSWalletTest.awaitWalletBalances(fundedWallet) } yield { - NeutrinoNodeFundedWalletBitcoind(node = node, + NeutrinoNodeFundedWalletBitcoind(node = syncedNode, wallet = fundedWallet.wallet, bitcoindRpc = fundedWallet.bitcoind, bip39PasswordOpt = bip39PasswordOpt) @@ -474,11 +476,11 @@ object NodeUnitTest extends P2PLogger { val destroyedF = for { _ <- destroyNode(fundedWalletBitcoind.node) _ <- BitcoinSWalletTest.destroyWalletWithBitcoind(walletWithBitcoind) - _ <- appConfig.walletConf.stop() + _ = cleanTables(appConfig) + _ <- appConfig.stop() } yield () destroyedF - } def buildPeerMessageReceiver(chainApi: ChainApi, peer: Peer)(implicit @@ -508,25 +510,22 @@ object NodeUnitTest extends P2PLogger { Peer(id = None, socket = socket) } - /** Creates a spv node peered with the given bitcoind client, this method - * also calls [[org.bitcoins.node.Node.start() start]] to start the node + /** Creates a spv node peered with the given bitcoind client + * This does NOT start the spv node */ - def createSpvNode( - peer: Peer, - callbacks: NodeCallbacks, - start: Boolean = true)(implicit + def createSpvNode(peer: Peer)(implicit system: ActorSystem, chainAppConfig: ChainAppConfig, nodeAppConfig: NodeAppConfig): Future[SpvNode] = { import system.dispatcher - nodeAppConfig.addCallbacks(callbacks) - val checkConfigF = Future { assert(nodeAppConfig.nodeType == NodeType.SpvNode) } val chainApiF = for { _ <- checkConfigF + _ = chainAppConfig.migrate() + _ = nodeAppConfig.start() chainHandler <- ChainUnitTest.createChainHandler() } yield chainHandler val nodeF = for { @@ -541,24 +540,19 @@ object NodeUnitTest extends P2PLogger { ).setBloomFilter(NodeTestUtil.emptyBloomFilter) } - if (start) - nodeF.flatMap(_.start()).flatMap(_ => nodeF) - else nodeF + nodeF } /** Creates a Neutrino node peered with the given bitcoind client, this method * also calls [[org.bitcoins.node.Node.start() start]] to start the node */ - def createNeutrinoNode(bitcoind: BitcoindRpcClient, callbacks: NodeCallbacks)( - implicit + def createNeutrinoNode(bitcoind: BitcoindRpcClient)(implicit system: ActorSystem, chainAppConfig: ChainAppConfig, nodeAppConfig: NodeAppConfig): Future[NeutrinoNode] = { import system.dispatcher - nodeAppConfig.addCallbacks(callbacks) - val checkConfigF = Future { assert(nodeAppConfig.nodeType == NodeType.NeutrinoNode) } @@ -577,7 +571,39 @@ object NodeUnitTest extends P2PLogger { initialSyncDone = None) } - nodeF.flatMap(_.start()).flatMap(_ => nodeF) + nodeF } + def syncNeutrinoNode(node: NeutrinoNode, bitcoind: BitcoindRpcClient)(implicit + system: ActorSystem): Future[NeutrinoNode] = { + import system.dispatcher + for { + _ <- node.sync() + _ <- NodeTestUtil.awaitSync(node, bitcoind) + _ <- NodeTestUtil.awaitCompactFilterHeadersSync(node, bitcoind) + _ <- NodeTestUtil.awaitCompactFiltersSync(node, bitcoind) + } yield node + } + + def syncSpvNode(node: SpvNode, bitcoind: BitcoindRpcClient)(implicit + system: ActorSystem): Future[SpvNode] = { + import system.dispatcher + for { + _ <- node.sync() + _ <- NodeTestUtil.awaitSync(node, bitcoind) + } yield node + } + + /** + * This is needed for postgres, we do not drop tables in between individual tests with postgres + * rather an entire test suite shares the same postgres database. + * therefore, we need to clean the database after each test, so that migrations can be applied during + * the setup phase for the next test. + * @param appConfig + */ + private def cleanTables(appConfig: BitcoinSAppConfig): Unit = { + appConfig.nodeConf.clean() + //appConfig.walletConf.clean() + appConfig.chainConf.clean() + } } diff --git a/testkit/src/main/scala/org/bitcoins/testkit/wallet/BitcoinSWalletTest.scala b/testkit/src/main/scala/org/bitcoins/testkit/wallet/BitcoinSWalletTest.scala index e76c231aec..b93395eea0 100644 --- a/testkit/src/main/scala/org/bitcoins/testkit/wallet/BitcoinSWalletTest.scala +++ b/testkit/src/main/scala/org/bitcoins/testkit/wallet/BitcoinSWalletTest.scala @@ -15,8 +15,15 @@ import org.bitcoins.core.wallet.fee._ import org.bitcoins.crypto.{DoubleSha256Digest, DoubleSha256DigestBE} import org.bitcoins.db.AppConfig import org.bitcoins.keymanager.bip39.BIP39KeyManager +import org.bitcoins.node.{ + NodeCallbacks, + OnBlockReceived, + OnCompactFiltersReceived, + OnMerkleBlockReceived +} import org.bitcoins.rpc.client.common.{BitcoindRpcClient, BitcoindVersion} import org.bitcoins.rpc.client.v19.BitcoindV19RpcClient +import org.bitcoins.rpc.util.AsyncUtil import org.bitcoins.server.BitcoinSAppConfig import org.bitcoins.server.BitcoinSAppConfig._ import org.bitcoins.testkit.Implicits.GeneratorOps @@ -263,10 +270,13 @@ trait BitcoinSWalletTest extends BitcoinSFixture with EmbeddedPg { bitcoind <- BitcoinSFixture .createBitcoindWithFunds(None) - wallet <- createWalletWithBitcoindCallbacks(bitcoind = bitcoind, - bip39PasswordOpt = - bip39PasswordOpt) - fundedWallet <- fundWalletWithBitcoind(wallet) + walletWithBitcoind <- createWalletWithBitcoindCallbacks( + bitcoind = bitcoind, + bip39PasswordOpt = bip39PasswordOpt) + fundedWallet <- fundWalletWithBitcoind(walletWithBitcoind) + _ <- + SyncUtil.syncWallet(wallet = fundedWallet.wallet, bitcoind = bitcoind) + _ <- BitcoinSWalletTest.awaitWalletBalances(fundedWallet) } yield fundedWallet } @@ -284,6 +294,9 @@ trait BitcoinSWalletTest extends BitcoinSFixture with EmbeddedPg { .map(_.asInstanceOf[BitcoindV19RpcClient]) wallet <- createWalletWithBitcoindCallbacks(bitcoind, bip39PasswordOpt) fundedWallet <- fundWalletWithBitcoind(wallet) + _ <- + SyncUtil.syncWallet(wallet = fundedWallet.wallet, bitcoind = bitcoind) + _ <- BitcoinSWalletTest.awaitWalletBalances(fundedWallet) } yield { WalletWithBitcoindV19(fundedWallet.wallet, bitcoind) } @@ -601,6 +614,13 @@ object BitcoinSWalletTest extends WalletLogger { } yield funded } + /** Funds a wallet with bitcoind, this method adds [[BitcoinSWalletTest.createNodeCallbacksForWallet()]] + * which processes filters/blocks that can be used to fund the wallet. + * + * It's important to note that this does NOT synchronize the wallet with a chain state. + * This should be done by the caller of this method. A useful method to help you with that + * in neutrino node cases is [[BitcoinSWalletTest.awaitWalletBalances]] + */ def fundedWalletAndBitcoind( bitcoindRpcClient: BitcoindRpcClient, nodeApi: NodeApi, @@ -616,6 +636,10 @@ object BitcoinSWalletTest extends WalletLogger { nodeApi = nodeApi, chainQueryApi = chainQueryApi, bip39PasswordOpt = bip39PasswordOpt) + //add callbacks for wallet + nodeCallbacks = + BitcoinSWalletTest.createNeutrinoNodeCallbacksForWallet(wallet) + _ = config.nodeConf.addCallbacks(nodeCallbacks) withBitcoind <- createWalletWithBitcoind(wallet, bitcoindRpcClient) funded <- fundWalletWithBitcoind(withBitcoind) } yield funded @@ -646,22 +670,7 @@ object BitcoinSWalletTest extends WalletLogger { ) } yield fundedAcct1 - //sanity check to make sure we have money - for { - fundedWallet <- fundedAccount1WalletF - balance <- fundedWallet.getBalance(defaultAccount) - _ = require( - balance == expectedDefaultAmt, - s"Funding wallet fixture failed to fund the wallet, got balance=$balance expected=$expectedDefaultAmt") - - account1Balance <- fundedWallet.getBalance(hdAccount1) - _ = require( - account1Balance == expectedAccount1Amt, - s"Funding wallet fixture failed to fund account 1, " + - s"got balance=$hdAccount1 expected=$expectedAccount1Amt" - ) - - } yield pair + fundedAccount1WalletF.map(_ => pair) } def destroyWalletWithBitcoind(walletWithBitcoind: WalletWithBitcoind)(implicit @@ -686,4 +695,67 @@ object BitcoinSWalletTest extends WalletLogger { } yield () } + /** Constructs callbacks for the wallet from the node to process blocks and compact filters */ + def createNeutrinoNodeCallbacksForWallet(wallet: Wallet)(implicit + ec: ExecutionContext): NodeCallbacks = { + val onBlock: OnBlockReceived = { block => + for { + _ <- wallet.processBlock(block) + } yield () + } + val onCompactFilters: OnCompactFiltersReceived = { blockFilters => + for { + _ <- wallet.processCompactFilters(blockFilters) + } yield () + } + + NodeCallbacks( + onBlockReceived = Vector(onBlock), + onCompactFiltersReceived = Vector(onCompactFilters) + ) + } + + /** Registers a callback to handle merkle blocks given to us by a spv node */ + def createSpvNodeCallbacksForWallet(wallet: Wallet)(implicit + ec: ExecutionContext): NodeCallbacks = { + val onMerkleBlockReceived: OnMerkleBlockReceived = { + case (merkleBlock, txs) => + for { + _ <- wallet.processTransactions(txs, + Some(merkleBlock.blockHeader.hashBE)) + } yield () + } + NodeCallbacks(onMerkleBlockReceived = Vector(onMerkleBlockReceived)) + } + + /** Makes sure our wallet is fully funded with the default amounts specified in + * [[BitcoinSWalletTest]]. This will future won't be completed until balances satisfy [[isSameWalletBalances()]] + */ + def awaitWalletBalances(fundedWallet: WalletWithBitcoind)(implicit + config: BitcoinSAppConfig, + system: ActorSystem): Future[Unit] = { + AsyncUtil.retryUntilSatisfiedF(conditionF = + () => isSameWalletBalances(fundedWallet), + interval = 1.seconds) + } + + private def isSameWalletBalances(fundedWallet: WalletWithBitcoind)(implicit + config: BitcoinSAppConfig, + system: ActorSystem): Future[Boolean] = { + import system.dispatcher + val defaultAccount = config.walletConf.defaultAccount + val hdAccount1 = WalletTestUtil.getHdAccount1(config.walletConf) + val expectedDefaultAmt = BitcoinSWalletTest.expectedDefaultAmt + val expectedAccount1Amt = BitcoinSWalletTest.expectedAccount1Amt + val defaultBalanceF = fundedWallet.wallet.getBalance(defaultAccount) + val account1BalanceF = fundedWallet.wallet.getBalance(hdAccount1) + for { + balance <- defaultBalanceF + account1Balance <- account1BalanceF + } yield { + balance == expectedDefaultAmt && + account1Balance == expectedAccount1Amt + } + } + } diff --git a/testkit/src/main/scala/org/bitcoins/testkit/wallet/FundWalletUtil.scala b/testkit/src/main/scala/org/bitcoins/testkit/wallet/FundWalletUtil.scala index f1854c495f..0c219bf214 100644 --- a/testkit/src/main/scala/org/bitcoins/testkit/wallet/FundWalletUtil.scala +++ b/testkit/src/main/scala/org/bitcoins/testkit/wallet/FundWalletUtil.scala @@ -8,6 +8,7 @@ import org.bitcoins.core.currency.CurrencyUnit import org.bitcoins.core.hd.HDAccount import org.bitcoins.core.protocol.BitcoinAddress import org.bitcoins.core.protocol.transaction.TransactionOutput +import org.bitcoins.core.util.BitcoinSLogger import org.bitcoins.crypto.DoubleSha256DigestBE import org.bitcoins.rpc.client.common.BitcoindRpcClient import org.bitcoins.server.BitcoinSAppConfig @@ -17,7 +18,7 @@ import org.bitcoins.wallet.Wallet import scala.concurrent.{ExecutionContext, Future} -trait FundWalletUtil { +trait FundWalletUtil extends BitcoinSLogger { def fundAccountForWallet( amts: Vector[CurrencyUnit], @@ -68,11 +69,7 @@ trait FundWalletUtil { hashes <- bitcoind.getNewAddress.flatMap(bitcoind.generateToAddress(6, _)) } yield (tx, hashes.head) - val fundedWalletF = - txAndHashF.map(txAndHash => - wallet.processTransaction(txAndHash._1, Some(txAndHash._2))) - - fundedWalletF.flatMap(_.map(_.asInstanceOf[Wallet])) + txAndHashF.map(_ => wallet) } /** Funds a bitcoin-s wallet with 3 utxos with 1, 2 and 3 bitcoin in the utxos */ diff --git a/wallet-test/src/test/scala/org/bitcoins/wallet/sync/WalletSyncTest.scala b/wallet-test/src/test/scala/org/bitcoins/wallet/sync/WalletSyncTest.scala new file mode 100644 index 0000000000..0b1468fdda --- /dev/null +++ b/wallet-test/src/test/scala/org/bitcoins/wallet/sync/WalletSyncTest.scala @@ -0,0 +1,45 @@ +package org.bitcoins.wallet.sync + +import org.bitcoins.testkit.chain.SyncUtil +import org.bitcoins.testkit.wallet.{BitcoinSWalletTest, WalletWithBitcoindV19} +import org.scalatest.FutureOutcome + +class WalletSyncTest extends BitcoinSWalletTest { + + behavior of "WalletSync" + + override type FixtureParam = WalletWithBitcoindV19 + + override def withFixture(test: OneArgAsyncTest): FutureOutcome = + withNewWalletAndBitcoindV19(test, getBIP39PasswordOpt()) + + it must "sync a wallet with bitcoind" in { param => + val wallet = param.wallet + val bitcoind = param.bitcoind + //first we need to implement the 'getBestBlockHashFunc' and 'getBlockHeaderFunc' functions + val getBestBlockHashFunc = SyncUtil.getBestBlockHashFunc(bitcoind) + + val getBlockHeaderFunc = SyncUtil.getBlockHeaderFunc(bitcoind) + + val getBlockFunc = SyncUtil.getBlockFunc(bitcoind) + val syncedWalletF = WalletSync.sync(wallet, + getBlockHeaderFunc, + getBestBlockHashFunc, + getBlockFunc) + + val bitcoindBestHeaderF = bitcoind.getBestBlockHeader() + for { + syncedWallet <- syncedWalletF + descriptorOpt <- syncedWallet.getSyncDescriptorOpt() + bitcoindBestHeader <- bitcoindBestHeaderF + } yield { + descriptorOpt match { + case Some(descriptor) => + assert(descriptor.bestHash == bitcoindBestHeader.hashBE) + assert(descriptor.height == bitcoindBestHeader.height) + case None => + fail(s"Could not sync wallet with bitcoind, got no descriptor!") + } + } + } +} diff --git a/wallet/src/main/scala/org/bitcoins/wallet/internal/TransactionProcessing.scala b/wallet/src/main/scala/org/bitcoins/wallet/internal/TransactionProcessing.scala index 6086eb1162..9e3898407a 100644 --- a/wallet/src/main/scala/org/bitcoins/wallet/internal/TransactionProcessing.scala +++ b/wallet/src/main/scala/org/bitcoins/wallet/internal/TransactionProcessing.scala @@ -43,8 +43,9 @@ private[wallet] trait TransactionProcessing extends WalletLogger { override def processBlock(block: Block): Future[Wallet] = { logger.info(s"Processing block=${block.blockHeader.hash.flip}") - val resF = - block.transactions.foldLeft(Future.successful(this)) { + + val resF = for { + newWallet <- block.transactions.foldLeft(Future.successful(this)) { (acc, transaction) => for { _ <- acc @@ -54,6 +55,8 @@ private[wallet] trait TransactionProcessing extends WalletLogger { newWallet } } + } yield newWallet + val f = for { res <- resF diff --git a/wallet/src/main/scala/org/bitcoins/wallet/sync/WalletSync.scala b/wallet/src/main/scala/org/bitcoins/wallet/sync/WalletSync.scala new file mode 100644 index 0000000000..38e6e1610d --- /dev/null +++ b/wallet/src/main/scala/org/bitcoins/wallet/sync/WalletSync.scala @@ -0,0 +1,88 @@ +package org.bitcoins.wallet.sync + +import org.bitcoins.core.protocol.blockchain.{Block, BlockHeader} +import org.bitcoins.core.util.{BitcoinSLogger, FutureUtil} +import org.bitcoins.crypto.DoubleSha256DigestBE +import org.bitcoins.wallet.Wallet + +import scala.concurrent.{ExecutionContext, Future} + +trait WalletSync extends BitcoinSLogger { + + def sync( + wallet: Wallet, + getBlockHeaderFunc: DoubleSha256DigestBE => Future[BlockHeader], + getBestBlockHashFunc: () => Future[DoubleSha256DigestBE], + getBlock: DoubleSha256DigestBE => Future[Block])(implicit + ec: ExecutionContext): Future[Wallet] = { + val bestBlockHashF = getBestBlockHashFunc() + val bestBlockHeaderF = for { + bestBlockHash <- bestBlockHashF + bestheader <- getBlockHeaderFunc(bestBlockHash) + } yield bestheader + + val blocksToSyncF = for { + bestHeader <- bestBlockHeaderF + blocksToSync <- getBlocksToSync(wallet = wallet, + currentTipBlockHashBE = bestHeader.hashBE, + accum = Vector.empty, + getBlock = getBlock) + } yield blocksToSync + + val syncedWalletF = for { + blocksToSync <- blocksToSyncF + syncedWallet <- FutureUtil.foldLeftAsync(wallet, blocksToSync) { + case (wallet, nextBlock) => + wallet.processBlock(nextBlock) + } + } yield syncedWallet + + syncedWalletF + } + + /** Syncs the wallet by walking backwards from the currentTip until we reach our wallet's best blockHash */ + private def getBlocksToSync( + wallet: Wallet, + currentTipBlockHashBE: DoubleSha256DigestBE, + accum: Vector[Block], + getBlock: DoubleSha256DigestBE => Future[Block])(implicit + ec: ExecutionContext): Future[Vector[Block]] = { + val initSyncDescriptorOptF = wallet.getSyncDescriptorOpt() + val genesisBlockHashBE = wallet.walletConfig.chain.genesisHashBE + for { + syncDescriptorOpt <- initSyncDescriptorOptF + walletBestHash = syncDescriptorOpt match { + case Some(descriptor) => descriptor.bestHash + case None => wallet.chainParams.genesisHashBE + } + currentBlockOpt <- { + if ( + walletBestHash == currentTipBlockHashBE || currentTipBlockHashBE == genesisBlockHashBE + ) { + Future.successful(None) // done syncing! + } else { + getBlock(currentTipBlockHashBE) + .map(Some(_)) + } + } + blocks <- { + currentBlockOpt match { + case Some(currentBlock) => + //loop again as we need to keep syncing + getBlocksToSync(wallet = wallet, + currentTipBlockHashBE = + currentBlock.blockHeader.previousBlockHashBE, + accum = currentBlock +: accum, + getBlock = getBlock) + case None => + //yay! Done syncing, return all blocks our wallet needs to be synced with + Future.successful(accum) + } + } + } yield { + blocks + } + } +} + +object WalletSync extends WalletSync