2021 01 02 issue 2457 (#2461)

* WIP

* Get neutrino node with wallet 'receive information about received payments' working again

* Fix compile

* Remove initial sync logic from test case

* Remove sync logic in NeutrinoNodeWithWallet test cases

* Improve logging and rename a few things

* WIP2

* WIP3

* Get NeutrinoNodeWithWallet tests working

* Implement WalletSync, which allows you to sync a wallet from a arbitrary data source

* Get all tests passing again

* Use spv.appConfig in DataMessageHandlerTest rather than caching the config

* Modify cleanup to hopefully get CI passing

* Fix postgres tests by cleaning the table during the destroy phase of the test fixture. This is needed because the same postgres database is shared between tests in the same test suite

* Revert logback-test.xml

* Get sqlite/postgres tests passing pt 2

* syncHeight -> syncDescriptorOpt()

* Add case for genesis block hash in WalletSync

* Fix SpvNodeWithWallet test case to actually test spv functionality

* Remove nodeCallbacks parameters, callbacks should be registered on nodeAppConfig
This commit is contained in:
Chris Stewart 2021-01-09 09:33:37 -06:00 committed by GitHub
parent 72ee201c1f
commit 98ace6f14e
20 changed files with 449 additions and 248 deletions

View file

@ -392,7 +392,7 @@ class ChainHandler(
(minHeightOpt, maxHeightOpt) match { (minHeightOpt, maxHeightOpt) match {
case (Some(minHeight), Some(maxHeight)) => case (Some(minHeight), Some(maxHeight)) =>
logger.info( logger.info(
s"Processed filters headers from height=${minHeight.height} to ${maxHeight.height}. Best hash=${maxHeight.blockHashBE.hex}") s"Processed filters headers from height=${minHeight.height} to ${maxHeight.height}. Best filterheader.blockHash=${maxHeight.blockHashBE.hex}")
this this
// Should never have the case where we have (Some, None) or (None, Some) because that means the vec would be both empty and non empty // Should never have the case where we have (Some, None) or (None, Some) because that means the vec would be both empty and non empty
case (_, _) => case (_, _) =>
@ -439,7 +439,7 @@ class ChainHandler(
(minHeightOpt, maxHeightOpt) match { (minHeightOpt, maxHeightOpt) match {
case (Some(minHeight), Some(maxHeight)) => case (Some(minHeight), Some(maxHeight)) =>
logger.info( logger.info(
s"Processed filters from height=${minHeight.height} to ${maxHeight.height}. Best hash=${maxHeight.blockHashBE.hex}") s"Processed filters from height=${minHeight.height} to ${maxHeight.height}. Best filter.blockHash=${maxHeight.blockHashBE.hex}")
this this
// Should never have the case where we have (Some, None) or (None, Some) because that means the vec would be both empty and non empty // Should never have the case where we have (Some, None) or (None, Some) because that means the vec would be both empty and non empty
case (_, _) => case (_, _) =>

View file

@ -79,7 +79,7 @@ trait ChainApi extends ChainQueryApi {
* Generates a filter header range in form of (startHeight, stopHash) by the given stop hash. * Generates a filter header range in form of (startHeight, stopHash) by the given stop hash.
*/ */
def nextFilterHeaderBatchRange( def nextFilterHeaderBatchRange(
stopHash: DoubleSha256DigestBE, prevStopHash: DoubleSha256DigestBE,
batchSize: Int): Future[Option[FilterSyncMarker]] batchSize: Int): Future[Option[FilterSyncMarker]]
/** /**

View file

@ -170,6 +170,17 @@ trait DbManagement extends BitcoinSLogger {
} }
} }
/** Runs flyway clean
*
* WARNING:
* THIS DELETES ALL DATA IN THE DATABASE, YOU PROBABLY DON'T WANT THIS UNLESS YOU ARE USING TESTS
*
* @see https://flywaydb.org/documentation/command/clean
*/
private[bitcoins] def clean(): Unit = {
flyway.clean()
}
private def createDbFileIfDNE(): Unit = { private def createDbFileIfDNE(): Unit = {
//should add a check in here that we are using sqlite //should add a check in here that we are using sqlite
if (!Files.exists(appConfig.dbPath)) { if (!Files.exists(appConfig.dbPath)) {

View file

@ -30,7 +30,6 @@ class NeutrinoNodeTest extends NodeUnitTest {
override def withFixture(test: OneArgAsyncTest): FutureOutcome = override def withFixture(test: OneArgAsyncTest): FutureOutcome =
withNeutrinoNodeFundedWalletBitcoind(test, withNeutrinoNodeFundedWalletBitcoind(test,
callbacks,
getBIP39PasswordOpt(), getBIP39PasswordOpt(),
Some(BitcoindVersion.Experimental)) Some(BitcoindVersion.Experimental))
@ -64,6 +63,8 @@ class NeutrinoNodeTest extends NodeUnitTest {
it must "receive notification that a block occurred on the p2p network" taggedAs UsesExperimentalBitcoind in { it must "receive notification that a block occurred on the p2p network" taggedAs UsesExperimentalBitcoind in {
nodeConnectedWithBitcoind: NeutrinoNodeFundedWalletBitcoind => nodeConnectedWithBitcoind: NeutrinoNodeFundedWalletBitcoind =>
val node = nodeConnectedWithBitcoind.node val node = nodeConnectedWithBitcoind.node
val _ = node.nodeAppConfig.addCallbacks(callbacks)
val bitcoind = nodeConnectedWithBitcoind.bitcoindRpc val bitcoind = nodeConnectedWithBitcoind.bitcoindRpc
val assert1F = for { val assert1F = for {

View file

@ -17,10 +17,9 @@ import org.bitcoins.testkit.node.{
NodeUnitTest NodeUnitTest
} }
import org.bitcoins.testkit.wallet.BitcoinSWalletTest import org.bitcoins.testkit.wallet.BitcoinSWalletTest
import org.bitcoins.wallet.Wallet
import org.scalatest.FutureOutcome import org.scalatest.FutureOutcome
import scala.concurrent.{Future, Promise} import scala.concurrent.Future
class NeutrinoNodeWithWalletTest extends NodeUnitTest { class NeutrinoNodeWithWalletTest extends NodeUnitTest {
@ -39,55 +38,20 @@ class NeutrinoNodeWithWalletTest extends NodeUnitTest {
} else { } else {
withNeutrinoNodeFundedWalletBitcoind( withNeutrinoNodeFundedWalletBitcoind(
test = test, test = test,
nodeCallbacks = nodeCallbacks,
bip39PasswordOpt = getBIP39PasswordOpt(), bip39PasswordOpt = getBIP39PasswordOpt(),
versionOpt = Some(BitcoindVersion.Experimental) versionOpt = Some(BitcoindVersion.Experimental)
) )(system, config)
} }
} }
private var walletP: Promise[Wallet] = Promise()
private var walletF: Future[Wallet] = walletP.future
after {
//reset assertion after a test runs, because we
//are doing mutation to work around our callback
//limitations, we can't currently modify callbacks
//after a NeutrinoNode is constructed :-(
walletP = Promise()
walletF = walletP.future
}
val TestAmount = 1.bitcoin val TestAmount = 1.bitcoin
val FeeRate = SatoshisPerByte(10.sats) val FeeRate = SatoshisPerByte(10.sats)
val TestFees: Satoshis = 2230.sats val TestFees: Satoshis = 2230.sats
def nodeCallbacks: NodeCallbacks = {
val onBlock: OnBlockReceived = { block =>
for {
wallet <- walletF
_ <- wallet.processBlock(block)
} yield ()
}
val onCompactFilters: OnCompactFiltersReceived = { blockFilters =>
for {
wallet <- walletF
_ <- wallet.processCompactFilters(blockFilters)
} yield ()
}
NodeCallbacks(
onBlockReceived = Vector(onBlock),
onCompactFiltersReceived = Vector(onCompactFilters)
)
}
it must "receive information about received payments" taggedAs UsesExperimentalBitcoind in { it must "receive information about received payments" taggedAs UsesExperimentalBitcoind in {
param => param =>
val NeutrinoNodeFundedWalletBitcoind(node, wallet, bitcoind, _) = param val NeutrinoNodeFundedWalletBitcoind(node, wallet, bitcoind, _) = param
walletP.success(wallet)
def condition( def condition(
expectedConfirmedAmount: CurrencyUnit, expectedConfirmedAmount: CurrencyUnit,
expectedUnconfirmedAmount: CurrencyUnit, expectedUnconfirmedAmount: CurrencyUnit,
@ -107,43 +71,50 @@ class NeutrinoNodeWithWalletTest extends NodeUnitTest {
} }
} }
//default wallet utxos are 3BTC, 2BTC, 1BTC
//our coin selection algorithm seems to be selecting
//the 3BTC utxo to spend, so we should have
//confirmed = 2BTC + 1BTC
//unconfirmed = 3 BTC - TestAmount - TestFees
val condition1 = () => { val condition1 = () => {
condition( condition(
expectedConfirmedAmount = 0.sats, expectedConfirmedAmount = 3.bitcoin,
expectedUnconfirmedAmount = expectedUnconfirmedAmount =
BitcoinSWalletTest.expectedDefaultAmt - TestAmount - TestFees, 3.bitcoin - TestAmount - TestFees,
expectedUtxos = 3, expectedUtxos = 3,
expectedAddresses = 7 expectedAddresses = 7
) )
} }
//this is just sending TestAmount back to us
//so everything should stay the same as above
//expected we should have received TestAmount back
//and have 1 more address/utxo
val condition2 = { () => val condition2 = { () =>
condition( condition(
expectedConfirmedAmount = 0.sats, expectedConfirmedAmount = 3.bitcoin,
expectedUnconfirmedAmount = expectedUnconfirmedAmount =
BitcoinSWalletTest.expectedDefaultAmt - TestFees, (3.bitcoin - TestAmount - TestFees) + TestAmount,
expectedUtxos = 4, expectedUtxos = 4,
expectedAddresses = 8 expectedAddresses = 8
) )
} }
for { for {
_ <- node.sync()
_ <- NodeTestUtil.awaitSync(node, bitcoind)
_ <- NodeTestUtil.awaitCompactFilterHeadersSync(node, bitcoind)
_ <- NodeTestUtil.awaitCompactFiltersSync(node, bitcoind)
// send // send
addr <- bitcoind.getNewAddress addr <- bitcoind.getNewAddress
_ <- wallet.sendToAddress(addr, TestAmount, Some(FeeRate)) _ <- wallet.sendToAddress(addr, TestAmount, Some(FeeRate))
_ <- wallet.getConfirmedBalance()
_ <- wallet.getUnconfirmedBalance()
_ <- wallet.getBalance()
_ <- _ <-
bitcoind.getNewAddress bitcoind.getNewAddress
.flatMap(bitcoind.generateToAddress(1, _)) .flatMap(bitcoind.generateToAddress(1, _))
_ <- wallet.getConfirmedBalance()
_ <- NodeTestUtil.awaitSync(node, bitcoind) _ <- NodeTestUtil.awaitSync(node, bitcoind)
_ <- NodeTestUtil.awaitCompactFiltersSync(node, bitcoind) _ <- NodeTestUtil.awaitCompactFiltersSync(node, bitcoind)
_ <- AsyncUtil.awaitConditionF(condition1) _ <- AsyncUtil.awaitConditionF(condition1)
// receive // receive
address <- wallet.getNewAddress() address <- wallet.getNewAddress()
txId <- bitcoind.sendToAddress(address, TestAmount) txId <- bitcoind.sendToAddress(address, TestAmount)
@ -153,10 +124,9 @@ class NeutrinoNodeWithWalletTest extends NodeUnitTest {
bitcoind.getNewAddress bitcoind.getNewAddress
.flatMap(bitcoind.generateToAddress(1, _)) .flatMap(bitcoind.generateToAddress(1, _))
_ <- NodeTestUtil.awaitSync(node, bitcoind) _ <- NodeTestUtil.awaitSync(node, bitcoind)
_ <- NodeTestUtil.awaitCompactFilterHeadersSync(node, bitcoind)
_ <- NodeTestUtil.awaitCompactFiltersSync(node, bitcoind) _ <- NodeTestUtil.awaitCompactFiltersSync(node, bitcoind)
_ <- AsyncUtil.awaitConditionF(condition2) _ <- AsyncUtil.awaitConditionF(condition2)
// assert we got the full tx with witness data // assert we got the full tx with witness data
txs <- wallet.listTransactions() txs <- wallet.listTransactions()
} yield assert(txs.exists(_.transaction == expectedTx)) } yield assert(txs.exists(_.transaction == expectedTx))
@ -166,8 +136,6 @@ class NeutrinoNodeWithWalletTest extends NodeUnitTest {
param => param =>
val NeutrinoNodeFundedWalletBitcoind(node, wallet, bitcoind, _) = param val NeutrinoNodeFundedWalletBitcoind(node, wallet, bitcoind, _) = param
walletP.success(wallet)
def generateBlock() = def generateBlock() =
for { for {
_ <- _ <-
@ -184,10 +152,6 @@ class NeutrinoNodeWithWalletTest extends NodeUnitTest {
val output = TransactionOutput(sats, spk) val output = TransactionOutput(sats, spk)
for { for {
_ <- node.sync()
_ <- NodeTestUtil.awaitSync(node, bitcoind)
_ <- NodeTestUtil.awaitCompactFiltersSync(node, bitcoind)
// start watching // start watching
_ <- wallet.watchScriptPubKey(spk) _ <- wallet.watchScriptPubKey(spk)
@ -209,8 +173,6 @@ class NeutrinoNodeWithWalletTest extends NodeUnitTest {
param => param =>
val NeutrinoNodeFundedWalletBitcoind(node, wallet, bitcoind, _) = param val NeutrinoNodeFundedWalletBitcoind(node, wallet, bitcoind, _) = param
walletP.success(wallet)
def condition(): Future[Boolean] = { def condition(): Future[Boolean] = {
for { for {
balance <- wallet.getBalance() balance <- wallet.getBalance()
@ -231,10 +193,6 @@ class NeutrinoNodeWithWalletTest extends NodeUnitTest {
_ = assert(addresses.size == 6) _ = assert(addresses.size == 6)
_ = assert(utxos.size == 3) _ = assert(utxos.size == 3)
_ <- node.sync()
_ <- NodeTestUtil.awaitSync(node, bitcoind)
_ <- NodeTestUtil.awaitCompactFiltersSync(node, bitcoind)
address <- wallet.getNewAddress() address <- wallet.getNewAddress()
_ <- _ <-
bitcoind bitcoind

View file

@ -1,9 +1,8 @@
package org.bitcoins.node package org.bitcoins.node
import akka.actor.Cancellable
import org.bitcoins.core.api.wallet.WalletApi
import org.bitcoins.core.currency._ import org.bitcoins.core.currency._
import org.bitcoins.crypto.{DoubleSha256Digest, DoubleSha256DigestBE} import org.bitcoins.crypto.DoubleSha256DigestBE
import org.bitcoins.rpc.util.AsyncUtil
import org.bitcoins.server.BitcoinSAppConfig import org.bitcoins.server.BitcoinSAppConfig
import org.bitcoins.testkit.BitcoinSTestAppConfig import org.bitcoins.testkit.BitcoinSTestAppConfig
import org.bitcoins.testkit.node.{ import org.bitcoins.testkit.node.{
@ -11,11 +10,10 @@ import org.bitcoins.testkit.node.{
NodeUnitTest, NodeUnitTest,
SpvNodeFundedWalletBitcoind SpvNodeFundedWalletBitcoind
} }
import org.bitcoins.wallet.Wallet
import org.scalatest.FutureOutcome import org.scalatest.FutureOutcome
import org.scalatest.exceptions.TestFailedException
import scala.concurrent.duration._ import scala.concurrent.Future
import scala.concurrent.{Future, Promise}
class SpvNodeWithWalletTest extends NodeUnitTest { class SpvNodeWithWalletTest extends NodeUnitTest {
@ -26,89 +24,47 @@ class SpvNodeWithWalletTest extends NodeUnitTest {
override type FixtureParam = SpvNodeFundedWalletBitcoind override type FixtureParam = SpvNodeFundedWalletBitcoind
def withFixture(test: OneArgAsyncTest): FutureOutcome = { def withFixture(test: OneArgAsyncTest): FutureOutcome = {
withSpvNodeFundedWalletBitcoind(test, callbacks, getBIP39PasswordOpt()) withSpvNodeFundedWalletBitcoind(test, getBIP39PasswordOpt())
} }
private val assertionP: Promise[Boolean] = Promise()
private val expectedTxIdP: Promise[DoubleSha256Digest] = Promise()
private val expectedTxIdF: Future[DoubleSha256Digest] = expectedTxIdP.future
private val walletP: Promise[WalletApi] = Promise()
private val walletF: Future[WalletApi] = walletP.future
val amountFromBitcoind = 1.bitcoin val amountFromBitcoind = 1.bitcoin
def callbacks: NodeCallbacks = {
val onTx: OnTxReceived = { tx =>
for {
expectedTxId <- expectedTxIdF
wallet <- walletF
} yield {
if (expectedTxId == tx.txId) {
for {
prevBalance <- wallet.getUnconfirmedBalance()
_ <- wallet.processTransaction(tx, None)
balance <- wallet.getUnconfirmedBalance()
} yield {
val result = balance == prevBalance + amountFromBitcoind
assertionP.success(result)
}
}
()
}
}
NodeCallbacks(
onTxReceived = Vector(onTx)
)
}
it must "load a bloom filter and receive information about received payments" in { it must "load a bloom filter and receive information about received payments" in {
param => param =>
val SpvNodeFundedWalletBitcoind(spv, wallet, rpc, _) = param val SpvNodeFundedWalletBitcoind(spv, wallet, rpc, _) = param
walletP.success(wallet)
var cancellable: Option[Cancellable] = None
def processWalletTx(tx: DoubleSha256DigestBE): DoubleSha256DigestBE = {
expectedTxIdP.success(tx.flip)
// how long we're waiting for a tx notify before failing the test
val delay = 25.seconds
val failTest: Runnable = new Runnable {
override def run = {
if (!assertionP.isCompleted) {
val msg =
s"Did not receive sent transaction within $delay"
logger.error(msg)
assertionP.failure(new TestFailedException(msg, 0))
}
}
}
logger.debug(s"Setting timeout for receiving TX through node")
cancellable = Some(system.scheduler.scheduleOnce(delay, failTest))
tx
}
for { for {
_ <- wallet.getBloomFilter() _ <- wallet.getBloomFilter()
address <- wallet.getNewAddress() address <- wallet.getNewAddress()
updatedBloom <- spv.updateBloomFilter(address).map(_.bloomFilter) updatedBloom <- spv.updateBloomFilter(address).map(_.bloomFilter)
_ <- spv.sync()
_ <- NodeTestUtil.awaitSync(spv, rpc)
ourTxid <- ourTxid <-
rpc rpc
.sendToAddress(address, amountFromBitcoind) .sendToAddress(address, amountFromBitcoind)
.map(processWalletTx) _ <- rpc.generateToAddress(1, junkAddress)
_ <- spv.sync()
_ <- NodeTestUtil.awaitSync(spv, rpc)
ourTx <- rpc.getTransaction(ourTxid) ourTx <- rpc.getTransaction(ourTxid)
_ = assert(updatedBloom.isRelevant(ourTx.hex)) _ = assert(updatedBloom.isRelevant(ourTx.hex))
//wait for bitcoind to propagate us a merkle block
result <- assertionP.future //and transactions associated with it
} yield assert(result) //eventually we should have the tx
//added to our wallet when this occurs
_ <- AsyncUtil.retryUntilSatisfiedF(() =>
walletContainsTx(wallet, ourTx.txid))
} yield {
succeed
}
}
private def walletContainsTx(
wallet: Wallet,
txid: DoubleSha256DigestBE): Future[Boolean] = {
val txOptF = wallet.findTransaction(txid)
for {
txOpt <- txOptF
} yield txOpt.isDefined
} }
} }

View file

@ -3,7 +3,11 @@ package org.bitcoins.node
import org.bitcoins.core.currency._ import org.bitcoins.core.currency._
import org.bitcoins.server.BitcoinSAppConfig import org.bitcoins.server.BitcoinSAppConfig
import org.bitcoins.testkit.BitcoinSTestAppConfig import org.bitcoins.testkit.BitcoinSTestAppConfig
import org.bitcoins.testkit.node.{NodeUnitTest, SpvNodeFundedWalletBitcoind} import org.bitcoins.testkit.node.{
NodeTestUtil,
NodeUnitTest,
SpvNodeFundedWalletBitcoind
}
import org.scalatest.{BeforeAndAfter, FutureOutcome} import org.scalatest.{BeforeAndAfter, FutureOutcome}
class UpdateBloomFilterTest extends NodeUnitTest with BeforeAndAfter { class UpdateBloomFilterTest extends NodeUnitTest with BeforeAndAfter {
@ -15,7 +19,7 @@ class UpdateBloomFilterTest extends NodeUnitTest with BeforeAndAfter {
override type FixtureParam = SpvNodeFundedWalletBitcoind override type FixtureParam = SpvNodeFundedWalletBitcoind
def withFixture(test: OneArgAsyncTest): FutureOutcome = { def withFixture(test: OneArgAsyncTest): FutureOutcome = {
withSpvNodeFundedWalletBitcoind(test, NodeCallbacks.empty, None) withSpvNodeFundedWalletBitcoind(test, None)
} }
it must "update the bloom filter with a TX" in { param => it must "update the bloom filter with a TX" in { param =>
@ -31,7 +35,7 @@ class UpdateBloomFilterTest extends NodeUnitTest with BeforeAndAfter {
// this should confirm our TX // this should confirm our TX
// since we updated the bloom filter // since we updated the bloom filter
hash <- rpc.generateToAddress(1, junkAddress).map(_.head) hash <- rpc.generateToAddress(1, junkAddress).map(_.head)
_ <- NodeTestUtil.awaitSync(spv, rpc)
merkleBlock <- rpc.getTxOutProof(Vector(tx.txIdBE), hash) merkleBlock <- rpc.getTxOutProof(Vector(tx.txIdBE), hash)
txs <- rpc.verifyTxOutProof(merkleBlock) txs <- rpc.verifyTxOutProof(merkleBlock)

View file

@ -52,10 +52,13 @@ class DataMessageHandlerTest extends NodeUnitTest {
payload1 = MerkleBlockMessage(merkleBlock) payload1 = MerkleBlockMessage(merkleBlock)
payload2 = TransactionMessage(tx) payload2 = TransactionMessage(tx)
callbacks = NodeCallbacks.onMerkleBlockReceived(callback) nodeCallbacks = NodeCallbacks(onMerkleBlockReceived = Vector(callback))
_ = nodeConfig.addCallbacks(callbacks) _ = spv.nodeAppConfig.addCallbacks(nodeCallbacks)
dataMessageHandler = DataMessageHandler(genesisChainApi) dataMessageHandler =
DataMessageHandler(genesisChainApi)(spv.executionContext,
spv.nodeAppConfig,
spv.chainConfig)
_ <- dataMessageHandler.handleDataPayload(payload1, sender) _ <- dataMessageHandler.handleDataPayload(payload1, sender)
_ <- dataMessageHandler.handleDataPayload(payload2, sender) _ <- dataMessageHandler.handleDataPayload(payload2, sender)
result <- resultP.future result <- resultP.future
@ -82,10 +85,13 @@ class DataMessageHandlerTest extends NodeUnitTest {
payload = BlockMessage(block) payload = BlockMessage(block)
callbacks = NodeCallbacks.onBlockReceived(callback) nodeCallbacks = NodeCallbacks.onBlockReceived(callback)
_ = nodeConfig.addCallbacks(callbacks) _ = spv.nodeAppConfig.addCallbacks(nodeCallbacks)
dataMessageHandler = DataMessageHandler(genesisChainApi) dataMessageHandler =
DataMessageHandler(genesisChainApi)(spv.executionContext,
spv.nodeAppConfig,
spv.chainConfig)
_ <- dataMessageHandler.handleDataPayload(payload, sender) _ <- dataMessageHandler.handleDataPayload(payload, sender)
result <- resultP.future result <- resultP.future
} yield assert(result == block) } yield assert(result == block)
@ -113,9 +119,13 @@ class DataMessageHandlerTest extends NodeUnitTest {
payload = HeadersMessage(CompactSizeUInt.one, Vector(header)) payload = HeadersMessage(CompactSizeUInt.one, Vector(header))
callbacks = NodeCallbacks.onBlockHeadersReceived(callback) callbacks = NodeCallbacks.onBlockHeadersReceived(callback)
_ = nodeConfig.addCallbacks(callbacks)
dataMessageHandler = DataMessageHandler(genesisChainApi) _ = spv.nodeAppConfig.addCallbacks(callbacks)
dataMessageHandler =
DataMessageHandler(genesisChainApi)(spv.executionContext,
spv.nodeAppConfig,
spv.chainConfig)
_ <- dataMessageHandler.handleDataPayload(payload, sender) _ <- dataMessageHandler.handleDataPayload(payload, sender)
result <- resultP.future result <- resultP.future
} yield assert(result == Vector(header)) } yield assert(result == Vector(header))
@ -143,10 +153,13 @@ class DataMessageHandlerTest extends NodeUnitTest {
payload = payload =
CompactFilterMessage(FilterType.Basic, hash.flip, filter.filter.bytes) CompactFilterMessage(FilterType.Basic, hash.flip, filter.filter.bytes)
callbacks = NodeCallbacks.onCompactFilterReceived(callback) nodeCallbacks = NodeCallbacks.onCompactFilterReceived(callback)
_ = nodeConfig.addCallbacks(callbacks) _ = spv.nodeAppConfig.addCallbacks(nodeCallbacks)
dataMessageHandler =
DataMessageHandler(genesisChainApi)(spv.executionContext,
spv.nodeAppConfig,
spv.chainConfig)
dataMessageHandler = DataMessageHandler(genesisChainApi)
_ <- dataMessageHandler.handleDataPayload(payload, sender) _ <- dataMessageHandler.handleDataPayload(payload, sender)
result <- resultP.future result <- resultP.future
} yield assert(result == Vector((hash.flip, filter.filter))) } yield assert(result == Vector((hash.flip, filter.filter)))

View file

@ -1,5 +1,7 @@
package org.bitcoins.node.networking.peer package org.bitcoins.node.networking.peer
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.server.BitcoinSAppConfig import org.bitcoins.server.BitcoinSAppConfig
import org.bitcoins.testkit.BitcoinSTestAppConfig import org.bitcoins.testkit.BitcoinSTestAppConfig
import org.bitcoins.testkit.async.TestAsyncUtil import org.bitcoins.testkit.async.TestAsyncUtil
@ -23,6 +25,13 @@ class PeerMessageHandlerTest extends NodeUnitTest {
test(()) test(())
} }
private val cachedConfig = config
implicit private lazy val nodeAppConfig: NodeAppConfig = cachedConfig.nodeConf
implicit protected lazy val chainConfig: ChainAppConfig =
cachedConfig.chainConf
behavior of "PeerHandler" behavior of "PeerHandler"
it must "be able to fully initialize a PeerMessageReceiver" in { _ => it must "be able to fully initialize a PeerMessageReceiver" in { _ =>

View file

@ -30,7 +30,7 @@ case class NeutrinoNode(
override val peer: Peer = nodePeer override val peer: Peer = nodePeer
override def start(): Future[Node] = { override def start(): Future[NeutrinoNode] = {
val res = for { val res = for {
node <- super.start() node <- super.start()
chainApi <- chainApiFromDb() chainApi <- chainApiFromDb()
@ -39,7 +39,7 @@ case class NeutrinoNode(
_ <- peerMsgSender.sendGetCompactFilterCheckPointMessage( _ <- peerMsgSender.sendGetCompactFilterCheckPointMessage(
stopHash = bestHash.flip) stopHash = bestHash.flip)
} yield { } yield {
node node.asInstanceOf[NeutrinoNode]
} }
res.failed.foreach(logger.error("Cannot start Neutrino node", _)) res.failed.foreach(logger.error("Cannot start Neutrino node", _))

View file

@ -156,10 +156,9 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
def stop(): Future[Node] = { def stop(): Future[Node] = {
logger.info(s"Stopping node") logger.info(s"Stopping node")
val disconnectF = for { val disconnectF = for {
_ <- nodeAppConfig.stop()
_ <- chainAppConfig.stop()
p <- peerMsgSenderF p <- peerMsgSenderF
disconnect <- p.disconnect() disconnect <- p.disconnect()
_ <- nodeAppConfig.stop()
} yield disconnect } yield disconnect
val start = System.currentTimeMillis() val start = System.currentTimeMillis()

View file

@ -10,6 +10,7 @@ import org.bitcoins.core.protocol.{BitcoinAddress, BlockStamp}
import org.bitcoins.core.util.Mutable import org.bitcoins.core.util.Mutable
import org.bitcoins.node.config.NodeAppConfig import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models.Peer import org.bitcoins.node.models.Peer
import org.bitcoins.rpc.util.AsyncUtil
import scala.concurrent.{Future, Promise} import scala.concurrent.{Future, Promise}
@ -73,15 +74,15 @@ case class SpvNode(
sentFilterAddF.map(_ => this) sentFilterAddF.map(_ => this)
} }
override def start(): Future[Node] = { override def start(): Future[SpvNode] = {
for { for {
node <- super.start() node <- super.start()
peerMsgSender <- peerMsgSenderF peerMsgSender <- peerMsgSenderF
_ <- AsyncUtil.retryUntilSatisfiedF(() => isConnected)
_ <- peerMsgSender.sendFilterLoadMessage(bloomFilter) _ <- peerMsgSender.sendFilterLoadMessage(bloomFilter)
} yield { } yield {
logger.info(s"Sending bloomfilter=${bloomFilter.hex} to $peer") logger.info(s"Sending bloomfilter=${bloomFilter.hex} to $peer")
logger.info(s"Sending bloomfilter=${bloomFilter.hex} to $peer") node.asInstanceOf[SpvNode]
node
} }
} }

View file

@ -5,13 +5,14 @@ import org.bitcoins.commons.jsonmodels.bitcoind.GetBlockFilterResult
import org.bitcoins.core.api.node import org.bitcoins.core.api.node
import org.bitcoins.core.api.node.{NodeApi, NodeChainQueryApi} import org.bitcoins.core.api.node.{NodeApi, NodeChainQueryApi}
import org.bitcoins.core.gcs.FilterType import org.bitcoins.core.gcs.FilterType
import org.bitcoins.core.protocol.blockchain.BlockHeader import org.bitcoins.core.protocol.blockchain.{Block, BlockHeader}
import org.bitcoins.core.protocol.transaction.Transaction import org.bitcoins.core.protocol.transaction.Transaction
import org.bitcoins.core.util.{BitcoinSLogger, FutureUtil} import org.bitcoins.core.util.{BitcoinSLogger, FutureUtil}
import org.bitcoins.crypto.{DoubleSha256Digest, DoubleSha256DigestBE} import org.bitcoins.crypto.{DoubleSha256Digest, DoubleSha256DigestBE}
import org.bitcoins.rpc.client.common.BitcoindRpcClient import org.bitcoins.rpc.client.common.BitcoindRpcClient
import org.bitcoins.rpc.client.v19.BitcoindV19RpcClient import org.bitcoins.rpc.client.v19.BitcoindV19RpcClient
import org.bitcoins.wallet.Wallet import org.bitcoins.wallet.Wallet
import org.bitcoins.wallet.sync.WalletSync
import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.{ExecutionContext, Future}
@ -45,6 +46,11 @@ abstract class SyncUtil extends BitcoinSLogger {
} }
} }
def getBlockFunc(
bitcoind: BitcoindRpcClient): DoubleSha256DigestBE => Future[Block] = {
bitcoind.getBlockRaw(_: DoubleSha256DigestBE)
}
def getNodeApi(bitcoindRpcClient: BitcoindRpcClient)(implicit def getNodeApi(bitcoindRpcClient: BitcoindRpcClient)(implicit
ec: ExecutionContext): NodeApi = { ec: ExecutionContext): NodeApi = {
new NodeApi { new NodeApi {
@ -161,6 +167,16 @@ abstract class SyncUtil extends BitcoinSLogger {
SyncUtil.getNodeApiWalletCallback(bitcoind, walletF) SyncUtil.getNodeApiWalletCallback(bitcoind, walletF)
node.NodeChainQueryApi(nodeApi, chainQuery) node.NodeChainQueryApi(nodeApi, chainQuery)
} }
def syncWallet(wallet: Wallet, bitcoind: BitcoindRpcClient)(implicit
ec: ExecutionContext): Future[Wallet] = {
WalletSync.sync(
wallet = wallet,
getBlockHeaderFunc = SyncUtil.getBlockHeaderFunc(bitcoind),
getBestBlockHashFunc = SyncUtil.getBestBlockHashFunc(bitcoind),
getBlock = SyncUtil.getBlockFunc(bitcoind)
)
}
} }
object SyncUtil extends SyncUtil object SyncUtil extends SyncUtil

View file

@ -12,6 +12,8 @@ case class NodeDAOs(txDAO: BroadcastAbleTransactionDAO)
/** Provides a fixture where all DAOs used by the node projects are provided */ /** Provides a fixture where all DAOs used by the node projects are provided */
trait NodeDAOFixture extends NodeUnitTest { trait NodeDAOFixture extends NodeUnitTest {
implicit protected lazy val nodeConfig: NodeAppConfig = config.nodeConf
private lazy val daos = { private lazy val daos = {
val tx = BroadcastAbleTransactionDAO() val tx = BroadcastAbleTransactionDAO()
NodeDAOs(tx) NodeDAOs(tx)

View file

@ -48,7 +48,7 @@ import org.bitcoins.wallet.WalletCallbacks
import org.scalatest.FutureOutcome import org.scalatest.FutureOutcome
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future} import scala.concurrent.{ExecutionContext, Future}
trait NodeUnitTest extends BitcoinSFixture with EmbeddedPg { trait NodeUnitTest extends BitcoinSFixture with EmbeddedPg {
@ -58,19 +58,12 @@ trait NodeUnitTest extends BitcoinSFixture with EmbeddedPg {
} }
override def afterAll(): Unit = { override def afterAll(): Unit = {
Await.result(config.chainConf.stop(), 1.minute)
Await.result(config.nodeConf.stop(), 1.minute)
Await.result(config.walletConf.stop(), 1.minute)
super[EmbeddedPg].afterAll() super[EmbeddedPg].afterAll()
} }
/** Wallet config with data directory set to user temp directory */ /** Wallet config with data directory set to user temp directory */
implicit protected def config: BitcoinSAppConfig implicit protected def config: BitcoinSAppConfig
implicit protected lazy val chainConfig: ChainAppConfig = config.chainConf
implicit protected lazy val nodeConfig: NodeAppConfig = config.nodeConf
implicit override lazy val np: NetworkParameters = config.nodeConf.network implicit override lazy val np: NetworkParameters = config.nodeConf.network
lazy val startedBitcoindF = BitcoindRpcTestUtil.startedBitcoindRpcClient() lazy val startedBitcoindF = BitcoindRpcTestUtil.startedBitcoindRpcClient()
@ -180,11 +173,10 @@ trait NodeUnitTest extends BitcoinSFixture with EmbeddedPg {
val nodeBuilder: () => Future[SpvNode] = { () => val nodeBuilder: () => Future[SpvNode] = { () =>
require(appConfig.nodeType == NodeType.SpvNode) require(appConfig.nodeType == NodeType.SpvNode)
for { for {
node <- NodeUnitTest.createSpvNode( node <- NodeUnitTest.createSpvNode(emptyPeer)(system,
emptyPeer, appConfig.chainConf,
NodeCallbacks.empty, appConfig.nodeConf)
start = false)(system, appConfig.chainConf, appConfig.nodeConf)
_ <- appConfig.start()
} yield node } yield node
} }
@ -192,7 +184,8 @@ trait NodeUnitTest extends BitcoinSFixture with EmbeddedPg {
build = nodeBuilder, build = nodeBuilder,
destroy = (_: Node) => { destroy = (_: Node) => {
for { for {
_ <- ChainUnitTest.destroyAllTables() _ <- ChainUnitTest.destroyAllTables()(appConfig.chainConf,
system.dispatcher)
_ <- appConfig.stop() _ <- appConfig.stop()
} yield () } yield ()
} }
@ -209,11 +202,12 @@ trait NodeUnitTest extends BitcoinSFixture with EmbeddedPg {
require(appConfig.nodeType == NodeType.SpvNode) require(appConfig.nodeType == NodeType.SpvNode)
for { for {
bitcoind <- BitcoinSFixture.createBitcoind(versionOpt) bitcoind <- BitcoinSFixture.createBitcoind(versionOpt)
node <- NodeUnitTest.createSpvNode(createPeer(bitcoind), node <- NodeUnitTest.createSpvNode(createPeer(bitcoind))(
NodeCallbacks.empty)(
system, system,
appConfig.chainConf, appConfig.chainConf,
appConfig.nodeConf) appConfig.nodeConf)
started <- node.start()
_ <- NodeUnitTest.syncSpvNode(started, bitcoind)
} yield SpvNodeConnectedWithBitcoind(node, bitcoind) } yield SpvNodeConnectedWithBitcoind(node, bitcoind)
} }
@ -235,9 +229,12 @@ trait NodeUnitTest extends BitcoinSFixture with EmbeddedPg {
BitcoinSFixture BitcoinSFixture
.createBitcoindWithFunds(Some(V19)) .createBitcoindWithFunds(Some(V19))
.map(_.asInstanceOf[BitcoindV19RpcClient]) .map(_.asInstanceOf[BitcoindV19RpcClient])
node <- NodeUnitTest.createSpvNode( node <- NodeUnitTest.createSpvNode(createPeer(bitcoind))(
createPeer(bitcoind), system,
NodeCallbacks.empty)(system, appConfig.chainConf, appConfig.nodeConf) appConfig.chainConf,
appConfig.nodeConf)
started <- node.start()
_ <- NodeUnitTest.syncSpvNode(started, bitcoind)
} yield SpvNodeConnectedWithBitcoindV19(node, bitcoind) } yield SpvNodeConnectedWithBitcoindV19(node, bitcoind)
} }
@ -258,8 +255,7 @@ trait NodeUnitTest extends BitcoinSFixture with EmbeddedPg {
require(appConfig.nodeType == NodeType.NeutrinoNode) require(appConfig.nodeType == NodeType.NeutrinoNode)
for { for {
bitcoind <- BitcoinSFixture.createBitcoind(versionOpt) bitcoind <- BitcoinSFixture.createBitcoind(versionOpt)
node <- NodeUnitTest.createNeutrinoNode(bitcoind, NodeCallbacks.empty)( node <- NodeUnitTest.createNeutrinoNode(bitcoind)(system,
system,
appConfig.chainConf, appConfig.chainConf,
appConfig.nodeConf) appConfig.nodeConf)
} yield NeutrinoNodeConnectedWithBitcoind(node, bitcoind) } yield NeutrinoNodeConnectedWithBitcoind(node, bitcoind)
@ -273,16 +269,13 @@ trait NodeUnitTest extends BitcoinSFixture with EmbeddedPg {
def withSpvNodeFundedWalletBitcoind( def withSpvNodeFundedWalletBitcoind(
test: OneArgAsyncTest, test: OneArgAsyncTest,
callbacks: NodeCallbacks,
bip39PasswordOpt: Option[String])(implicit bip39PasswordOpt: Option[String])(implicit
system: ActorSystem, system: ActorSystem,
appConfig: BitcoinSAppConfig): FutureOutcome = { appConfig: BitcoinSAppConfig): FutureOutcome = {
makeDependentFixture( makeDependentFixture(
build = () => build = () =>
NodeUnitTest.createSpvNodeFundedWalletBitcoind(nodeCallbacks = NodeUnitTest.createSpvNodeFundedWalletBitcoind(bip39PasswordOpt =
callbacks,
bip39PasswordOpt =
bip39PasswordOpt, bip39PasswordOpt,
versionOpt = Option(V18), versionOpt = Option(V18),
walletCallbacks = walletCallbacks =
@ -296,21 +289,18 @@ trait NodeUnitTest extends BitcoinSFixture with EmbeddedPg {
def withNeutrinoNodeFundedWalletBitcoind( def withNeutrinoNodeFundedWalletBitcoind(
test: OneArgAsyncTest, test: OneArgAsyncTest,
nodeCallbacks: NodeCallbacks,
bip39PasswordOpt: Option[String], bip39PasswordOpt: Option[String],
versionOpt: Option[BitcoindVersion] = None, versionOpt: Option[BitcoindVersion] = None,
walletCallbacks: WalletCallbacks = WalletCallbacks.empty)(implicit walletCallbacks: WalletCallbacks = WalletCallbacks.empty)(implicit
system: ActorSystem, system: ActorSystem,
appConfig: BitcoinSAppConfig): FutureOutcome = { appConfig: BitcoinSAppConfig): FutureOutcome = {
makeDependentFixture( makeDependentFixture(
build = () => build = () =>
NodeUnitTest NodeUnitTest
.createNeutrinoNodeFundedWalletBitcoind( .createNeutrinoNodeFundedWalletBitcoind(
nodeCallbacks, bip39PasswordOpt = bip39PasswordOpt,
bip39PasswordOpt, versionOpt = versionOpt,
versionOpt, walletCallbacks = walletCallbacks)(system, appConfig),
walletCallbacks)(system, appConfig),
destroy = NodeUnitTest.destroyNodeFundedWalletBitcoind( destroy = NodeUnitTest.destroyNodeFundedWalletBitcoind(
_: NodeFundedWalletBitcoind)(system, appConfig) _: NodeFundedWalletBitcoind)(system, appConfig)
)(test) )(test)
@ -376,11 +366,8 @@ object NodeUnitTest extends P2PLogger {
} }
def destroyNode(node: Node)(implicit def destroyNode(node: Node)(implicit ec: ExecutionContext): Future[Unit] = {
config: BitcoinSAppConfig,
ec: ExecutionContext): Future[Unit] = {
for { for {
_ <- ChainUnitTest.destroyAllTables()
_ <- node.stop() _ <- node.stop()
} yield () } yield ()
} }
@ -396,6 +383,8 @@ object NodeUnitTest extends P2PLogger {
val resultF = for { val resultF = for {
_ <- destroyNode(node) _ <- destroyNode(node)
_ <- ChainUnitTest.destroyBitcoind(bitcoind) _ <- ChainUnitTest.destroyBitcoind(bitcoind)
_ = cleanTables(appConfig)
_ <- appConfig.stop()
} yield { } yield {
logger.debug(s"Done with teardown of node connected with bitcoind!") logger.debug(s"Done with teardown of node connected with bitcoind!")
() ()
@ -406,7 +395,6 @@ object NodeUnitTest extends P2PLogger {
/** Creates a spv node, a funded bitcoin-s wallet, all of which are connected to bitcoind */ /** Creates a spv node, a funded bitcoin-s wallet, all of which are connected to bitcoind */
def createSpvNodeFundedWalletBitcoind( def createSpvNodeFundedWalletBitcoind(
nodeCallbacks: NodeCallbacks,
walletCallbacks: WalletCallbacks, walletCallbacks: WalletCallbacks,
bip39PasswordOpt: Option[String], bip39PasswordOpt: Option[String],
versionOpt: Option[BitcoindVersion] = None)(implicit versionOpt: Option[BitcoindVersion] = None)(implicit
@ -416,15 +404,25 @@ object NodeUnitTest extends P2PLogger {
require(appConfig.nodeType == NodeType.SpvNode) require(appConfig.nodeType == NodeType.SpvNode)
for { for {
bitcoind <- BitcoinSFixture.createBitcoindWithFunds(versionOpt) bitcoind <- BitcoinSFixture.createBitcoindWithFunds(versionOpt)
node <- createSpvNode(createPeer(bitcoind), nodeCallbacks) node <- createSpvNode(createPeer(bitcoind))
fundedWallet <- BitcoinSWalletTest.fundedWalletAndBitcoind( fundedWallet <- BitcoinSWalletTest.fundedWalletAndBitcoind(
bitcoind, bitcoind,
node, node,
node, node,
bip39PasswordOpt, bip39PasswordOpt,
walletCallbacks) walletCallbacks)
spvCallbacks =
BitcoinSWalletTest.createSpvNodeCallbacksForWallet(fundedWallet.wallet)
_ = appConfig.nodeConf.addCallbacks(spvCallbacks)
walletBloomFilter <- fundedWallet.wallet.getBloomFilter()
withBloomFilter = node.setBloomFilter(walletBloomFilter)
startedNodeWithBloomFilter <- withBloomFilter.start()
_ <- syncSpvNode(startedNodeWithBloomFilter, bitcoind)
//callbacks are executed asynchronously, which is how we fund the wallet
//so we need to wait until the wallet balances are correct
_ <- BitcoinSWalletTest.awaitWalletBalances(fundedWallet)
} yield { } yield {
SpvNodeFundedWalletBitcoind(node = node, SpvNodeFundedWalletBitcoind(node = startedNodeWithBloomFilter,
wallet = fundedWallet.wallet, wallet = fundedWallet.wallet,
bitcoindRpc = fundedWallet.bitcoind, bitcoindRpc = fundedWallet.bitcoind,
bip39PasswordOpt) bip39PasswordOpt)
@ -433,7 +431,6 @@ object NodeUnitTest extends P2PLogger {
/** Creates a neutrino node, a funded bitcoin-s wallet, all of which are connected to bitcoind */ /** Creates a neutrino node, a funded bitcoin-s wallet, all of which are connected to bitcoind */
def createNeutrinoNodeFundedWalletBitcoind( def createNeutrinoNodeFundedWalletBitcoind(
nodeCallbacks: NodeCallbacks,
bip39PasswordOpt: Option[String], bip39PasswordOpt: Option[String],
versionOpt: Option[BitcoindVersion], versionOpt: Option[BitcoindVersion],
walletCallbacks: WalletCallbacks)(implicit walletCallbacks: WalletCallbacks)(implicit
@ -444,15 +441,20 @@ object NodeUnitTest extends P2PLogger {
require(appConfig.nodeType == NodeType.NeutrinoNode) require(appConfig.nodeType == NodeType.NeutrinoNode)
for { for {
bitcoind <- BitcoinSFixture.createBitcoindWithFunds(versionOpt) bitcoind <- BitcoinSFixture.createBitcoindWithFunds(versionOpt)
node <- createNeutrinoNode(bitcoind, nodeCallbacks) node <- createNeutrinoNode(bitcoind)
fundedWallet <- BitcoinSWalletTest.fundedWalletAndBitcoind( fundedWallet <- BitcoinSWalletTest.fundedWalletAndBitcoind(
bitcoindRpcClient = bitcoind, bitcoindRpcClient = bitcoind,
nodeApi = node, nodeApi = node,
chainQueryApi = node, chainQueryApi = node,
bip39PasswordOpt = bip39PasswordOpt, bip39PasswordOpt = bip39PasswordOpt,
walletCallbacks = walletCallbacks) walletCallbacks = walletCallbacks)
startedNode <- node.start()
syncedNode <- syncNeutrinoNode(startedNode, bitcoind)
//callbacks are executed asynchronously, which is how we fund the wallet
//so we need to wait until the wallet balances are correct
_ <- BitcoinSWalletTest.awaitWalletBalances(fundedWallet)
} yield { } yield {
NeutrinoNodeFundedWalletBitcoind(node = node, NeutrinoNodeFundedWalletBitcoind(node = syncedNode,
wallet = fundedWallet.wallet, wallet = fundedWallet.wallet,
bitcoindRpc = fundedWallet.bitcoind, bitcoindRpc = fundedWallet.bitcoind,
bip39PasswordOpt = bip39PasswordOpt) bip39PasswordOpt = bip39PasswordOpt)
@ -474,11 +476,11 @@ object NodeUnitTest extends P2PLogger {
val destroyedF = for { val destroyedF = for {
_ <- destroyNode(fundedWalletBitcoind.node) _ <- destroyNode(fundedWalletBitcoind.node)
_ <- BitcoinSWalletTest.destroyWalletWithBitcoind(walletWithBitcoind) _ <- BitcoinSWalletTest.destroyWalletWithBitcoind(walletWithBitcoind)
_ <- appConfig.walletConf.stop() _ = cleanTables(appConfig)
_ <- appConfig.stop()
} yield () } yield ()
destroyedF destroyedF
} }
def buildPeerMessageReceiver(chainApi: ChainApi, peer: Peer)(implicit def buildPeerMessageReceiver(chainApi: ChainApi, peer: Peer)(implicit
@ -508,25 +510,22 @@ object NodeUnitTest extends P2PLogger {
Peer(id = None, socket = socket) Peer(id = None, socket = socket)
} }
/** Creates a spv node peered with the given bitcoind client, this method /** Creates a spv node peered with the given bitcoind client
* also calls [[org.bitcoins.node.Node.start() start]] to start the node * This does NOT start the spv node
*/ */
def createSpvNode( def createSpvNode(peer: Peer)(implicit
peer: Peer,
callbacks: NodeCallbacks,
start: Boolean = true)(implicit
system: ActorSystem, system: ActorSystem,
chainAppConfig: ChainAppConfig, chainAppConfig: ChainAppConfig,
nodeAppConfig: NodeAppConfig): Future[SpvNode] = { nodeAppConfig: NodeAppConfig): Future[SpvNode] = {
import system.dispatcher import system.dispatcher
nodeAppConfig.addCallbacks(callbacks)
val checkConfigF = Future { val checkConfigF = Future {
assert(nodeAppConfig.nodeType == NodeType.SpvNode) assert(nodeAppConfig.nodeType == NodeType.SpvNode)
} }
val chainApiF = for { val chainApiF = for {
_ <- checkConfigF _ <- checkConfigF
_ = chainAppConfig.migrate()
_ = nodeAppConfig.start()
chainHandler <- ChainUnitTest.createChainHandler() chainHandler <- ChainUnitTest.createChainHandler()
} yield chainHandler } yield chainHandler
val nodeF = for { val nodeF = for {
@ -541,24 +540,19 @@ object NodeUnitTest extends P2PLogger {
).setBloomFilter(NodeTestUtil.emptyBloomFilter) ).setBloomFilter(NodeTestUtil.emptyBloomFilter)
} }
if (start) nodeF
nodeF.flatMap(_.start()).flatMap(_ => nodeF)
else nodeF
} }
/** Creates a Neutrino node peered with the given bitcoind client, this method /** Creates a Neutrino node peered with the given bitcoind client, this method
* also calls [[org.bitcoins.node.Node.start() start]] to start the node * also calls [[org.bitcoins.node.Node.start() start]] to start the node
*/ */
def createNeutrinoNode(bitcoind: BitcoindRpcClient, callbacks: NodeCallbacks)( def createNeutrinoNode(bitcoind: BitcoindRpcClient)(implicit
implicit
system: ActorSystem, system: ActorSystem,
chainAppConfig: ChainAppConfig, chainAppConfig: ChainAppConfig,
nodeAppConfig: NodeAppConfig): Future[NeutrinoNode] = { nodeAppConfig: NodeAppConfig): Future[NeutrinoNode] = {
import system.dispatcher import system.dispatcher
nodeAppConfig.addCallbacks(callbacks)
val checkConfigF = Future { val checkConfigF = Future {
assert(nodeAppConfig.nodeType == NodeType.NeutrinoNode) assert(nodeAppConfig.nodeType == NodeType.NeutrinoNode)
} }
@ -577,7 +571,39 @@ object NodeUnitTest extends P2PLogger {
initialSyncDone = None) initialSyncDone = None)
} }
nodeF.flatMap(_.start()).flatMap(_ => nodeF) nodeF
} }
def syncNeutrinoNode(node: NeutrinoNode, bitcoind: BitcoindRpcClient)(implicit
system: ActorSystem): Future[NeutrinoNode] = {
import system.dispatcher
for {
_ <- node.sync()
_ <- NodeTestUtil.awaitSync(node, bitcoind)
_ <- NodeTestUtil.awaitCompactFilterHeadersSync(node, bitcoind)
_ <- NodeTestUtil.awaitCompactFiltersSync(node, bitcoind)
} yield node
}
def syncSpvNode(node: SpvNode, bitcoind: BitcoindRpcClient)(implicit
system: ActorSystem): Future[SpvNode] = {
import system.dispatcher
for {
_ <- node.sync()
_ <- NodeTestUtil.awaitSync(node, bitcoind)
} yield node
}
/**
* This is needed for postgres, we do not drop tables in between individual tests with postgres
* rather an entire test suite shares the same postgres database.
* therefore, we need to clean the database after each test, so that migrations can be applied during
* the setup phase for the next test.
* @param appConfig
*/
private def cleanTables(appConfig: BitcoinSAppConfig): Unit = {
appConfig.nodeConf.clean()
//appConfig.walletConf.clean()
appConfig.chainConf.clean()
}
} }

View file

@ -15,8 +15,15 @@ import org.bitcoins.core.wallet.fee._
import org.bitcoins.crypto.{DoubleSha256Digest, DoubleSha256DigestBE} import org.bitcoins.crypto.{DoubleSha256Digest, DoubleSha256DigestBE}
import org.bitcoins.db.AppConfig import org.bitcoins.db.AppConfig
import org.bitcoins.keymanager.bip39.BIP39KeyManager import org.bitcoins.keymanager.bip39.BIP39KeyManager
import org.bitcoins.node.{
NodeCallbacks,
OnBlockReceived,
OnCompactFiltersReceived,
OnMerkleBlockReceived
}
import org.bitcoins.rpc.client.common.{BitcoindRpcClient, BitcoindVersion} import org.bitcoins.rpc.client.common.{BitcoindRpcClient, BitcoindVersion}
import org.bitcoins.rpc.client.v19.BitcoindV19RpcClient import org.bitcoins.rpc.client.v19.BitcoindV19RpcClient
import org.bitcoins.rpc.util.AsyncUtil
import org.bitcoins.server.BitcoinSAppConfig import org.bitcoins.server.BitcoinSAppConfig
import org.bitcoins.server.BitcoinSAppConfig._ import org.bitcoins.server.BitcoinSAppConfig._
import org.bitcoins.testkit.Implicits.GeneratorOps import org.bitcoins.testkit.Implicits.GeneratorOps
@ -263,10 +270,13 @@ trait BitcoinSWalletTest extends BitcoinSFixture with EmbeddedPg {
bitcoind <- bitcoind <-
BitcoinSFixture BitcoinSFixture
.createBitcoindWithFunds(None) .createBitcoindWithFunds(None)
wallet <- createWalletWithBitcoindCallbacks(bitcoind = bitcoind, walletWithBitcoind <- createWalletWithBitcoindCallbacks(
bip39PasswordOpt = bitcoind = bitcoind,
bip39PasswordOpt) bip39PasswordOpt = bip39PasswordOpt)
fundedWallet <- fundWalletWithBitcoind(wallet) fundedWallet <- fundWalletWithBitcoind(walletWithBitcoind)
_ <-
SyncUtil.syncWallet(wallet = fundedWallet.wallet, bitcoind = bitcoind)
_ <- BitcoinSWalletTest.awaitWalletBalances(fundedWallet)
} yield fundedWallet } yield fundedWallet
} }
@ -284,6 +294,9 @@ trait BitcoinSWalletTest extends BitcoinSFixture with EmbeddedPg {
.map(_.asInstanceOf[BitcoindV19RpcClient]) .map(_.asInstanceOf[BitcoindV19RpcClient])
wallet <- createWalletWithBitcoindCallbacks(bitcoind, bip39PasswordOpt) wallet <- createWalletWithBitcoindCallbacks(bitcoind, bip39PasswordOpt)
fundedWallet <- fundWalletWithBitcoind(wallet) fundedWallet <- fundWalletWithBitcoind(wallet)
_ <-
SyncUtil.syncWallet(wallet = fundedWallet.wallet, bitcoind = bitcoind)
_ <- BitcoinSWalletTest.awaitWalletBalances(fundedWallet)
} yield { } yield {
WalletWithBitcoindV19(fundedWallet.wallet, bitcoind) WalletWithBitcoindV19(fundedWallet.wallet, bitcoind)
} }
@ -601,6 +614,13 @@ object BitcoinSWalletTest extends WalletLogger {
} yield funded } yield funded
} }
/** Funds a wallet with bitcoind, this method adds [[BitcoinSWalletTest.createNodeCallbacksForWallet()]]
* which processes filters/blocks that can be used to fund the wallet.
*
* It's important to note that this does NOT synchronize the wallet with a chain state.
* This should be done by the caller of this method. A useful method to help you with that
* in neutrino node cases is [[BitcoinSWalletTest.awaitWalletBalances]]
*/
def fundedWalletAndBitcoind( def fundedWalletAndBitcoind(
bitcoindRpcClient: BitcoindRpcClient, bitcoindRpcClient: BitcoindRpcClient,
nodeApi: NodeApi, nodeApi: NodeApi,
@ -616,6 +636,10 @@ object BitcoinSWalletTest extends WalletLogger {
nodeApi = nodeApi, nodeApi = nodeApi,
chainQueryApi = chainQueryApi, chainQueryApi = chainQueryApi,
bip39PasswordOpt = bip39PasswordOpt) bip39PasswordOpt = bip39PasswordOpt)
//add callbacks for wallet
nodeCallbacks =
BitcoinSWalletTest.createNeutrinoNodeCallbacksForWallet(wallet)
_ = config.nodeConf.addCallbacks(nodeCallbacks)
withBitcoind <- createWalletWithBitcoind(wallet, bitcoindRpcClient) withBitcoind <- createWalletWithBitcoind(wallet, bitcoindRpcClient)
funded <- fundWalletWithBitcoind(withBitcoind) funded <- fundWalletWithBitcoind(withBitcoind)
} yield funded } yield funded
@ -646,22 +670,7 @@ object BitcoinSWalletTest extends WalletLogger {
) )
} yield fundedAcct1 } yield fundedAcct1
//sanity check to make sure we have money fundedAccount1WalletF.map(_ => pair)
for {
fundedWallet <- fundedAccount1WalletF
balance <- fundedWallet.getBalance(defaultAccount)
_ = require(
balance == expectedDefaultAmt,
s"Funding wallet fixture failed to fund the wallet, got balance=$balance expected=$expectedDefaultAmt")
account1Balance <- fundedWallet.getBalance(hdAccount1)
_ = require(
account1Balance == expectedAccount1Amt,
s"Funding wallet fixture failed to fund account 1, " +
s"got balance=$hdAccount1 expected=$expectedAccount1Amt"
)
} yield pair
} }
def destroyWalletWithBitcoind(walletWithBitcoind: WalletWithBitcoind)(implicit def destroyWalletWithBitcoind(walletWithBitcoind: WalletWithBitcoind)(implicit
@ -686,4 +695,67 @@ object BitcoinSWalletTest extends WalletLogger {
} yield () } yield ()
} }
/** Constructs callbacks for the wallet from the node to process blocks and compact filters */
def createNeutrinoNodeCallbacksForWallet(wallet: Wallet)(implicit
ec: ExecutionContext): NodeCallbacks = {
val onBlock: OnBlockReceived = { block =>
for {
_ <- wallet.processBlock(block)
} yield ()
}
val onCompactFilters: OnCompactFiltersReceived = { blockFilters =>
for {
_ <- wallet.processCompactFilters(blockFilters)
} yield ()
}
NodeCallbacks(
onBlockReceived = Vector(onBlock),
onCompactFiltersReceived = Vector(onCompactFilters)
)
}
/** Registers a callback to handle merkle blocks given to us by a spv node */
def createSpvNodeCallbacksForWallet(wallet: Wallet)(implicit
ec: ExecutionContext): NodeCallbacks = {
val onMerkleBlockReceived: OnMerkleBlockReceived = {
case (merkleBlock, txs) =>
for {
_ <- wallet.processTransactions(txs,
Some(merkleBlock.blockHeader.hashBE))
} yield ()
}
NodeCallbacks(onMerkleBlockReceived = Vector(onMerkleBlockReceived))
}
/** Makes sure our wallet is fully funded with the default amounts specified in
* [[BitcoinSWalletTest]]. This will future won't be completed until balances satisfy [[isSameWalletBalances()]]
*/
def awaitWalletBalances(fundedWallet: WalletWithBitcoind)(implicit
config: BitcoinSAppConfig,
system: ActorSystem): Future[Unit] = {
AsyncUtil.retryUntilSatisfiedF(conditionF =
() => isSameWalletBalances(fundedWallet),
interval = 1.seconds)
}
private def isSameWalletBalances(fundedWallet: WalletWithBitcoind)(implicit
config: BitcoinSAppConfig,
system: ActorSystem): Future[Boolean] = {
import system.dispatcher
val defaultAccount = config.walletConf.defaultAccount
val hdAccount1 = WalletTestUtil.getHdAccount1(config.walletConf)
val expectedDefaultAmt = BitcoinSWalletTest.expectedDefaultAmt
val expectedAccount1Amt = BitcoinSWalletTest.expectedAccount1Amt
val defaultBalanceF = fundedWallet.wallet.getBalance(defaultAccount)
val account1BalanceF = fundedWallet.wallet.getBalance(hdAccount1)
for {
balance <- defaultBalanceF
account1Balance <- account1BalanceF
} yield {
balance == expectedDefaultAmt &&
account1Balance == expectedAccount1Amt
}
}
} }

View file

@ -8,6 +8,7 @@ import org.bitcoins.core.currency.CurrencyUnit
import org.bitcoins.core.hd.HDAccount import org.bitcoins.core.hd.HDAccount
import org.bitcoins.core.protocol.BitcoinAddress import org.bitcoins.core.protocol.BitcoinAddress
import org.bitcoins.core.protocol.transaction.TransactionOutput import org.bitcoins.core.protocol.transaction.TransactionOutput
import org.bitcoins.core.util.BitcoinSLogger
import org.bitcoins.crypto.DoubleSha256DigestBE import org.bitcoins.crypto.DoubleSha256DigestBE
import org.bitcoins.rpc.client.common.BitcoindRpcClient import org.bitcoins.rpc.client.common.BitcoindRpcClient
import org.bitcoins.server.BitcoinSAppConfig import org.bitcoins.server.BitcoinSAppConfig
@ -17,7 +18,7 @@ import org.bitcoins.wallet.Wallet
import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.{ExecutionContext, Future}
trait FundWalletUtil { trait FundWalletUtil extends BitcoinSLogger {
def fundAccountForWallet( def fundAccountForWallet(
amts: Vector[CurrencyUnit], amts: Vector[CurrencyUnit],
@ -68,11 +69,7 @@ trait FundWalletUtil {
hashes <- bitcoind.getNewAddress.flatMap(bitcoind.generateToAddress(6, _)) hashes <- bitcoind.getNewAddress.flatMap(bitcoind.generateToAddress(6, _))
} yield (tx, hashes.head) } yield (tx, hashes.head)
val fundedWalletF = txAndHashF.map(_ => wallet)
txAndHashF.map(txAndHash =>
wallet.processTransaction(txAndHash._1, Some(txAndHash._2)))
fundedWalletF.flatMap(_.map(_.asInstanceOf[Wallet]))
} }
/** Funds a bitcoin-s wallet with 3 utxos with 1, 2 and 3 bitcoin in the utxos */ /** Funds a bitcoin-s wallet with 3 utxos with 1, 2 and 3 bitcoin in the utxos */

View file

@ -0,0 +1,45 @@
package org.bitcoins.wallet.sync
import org.bitcoins.testkit.chain.SyncUtil
import org.bitcoins.testkit.wallet.{BitcoinSWalletTest, WalletWithBitcoindV19}
import org.scalatest.FutureOutcome
class WalletSyncTest extends BitcoinSWalletTest {
behavior of "WalletSync"
override type FixtureParam = WalletWithBitcoindV19
override def withFixture(test: OneArgAsyncTest): FutureOutcome =
withNewWalletAndBitcoindV19(test, getBIP39PasswordOpt())
it must "sync a wallet with bitcoind" in { param =>
val wallet = param.wallet
val bitcoind = param.bitcoind
//first we need to implement the 'getBestBlockHashFunc' and 'getBlockHeaderFunc' functions
val getBestBlockHashFunc = SyncUtil.getBestBlockHashFunc(bitcoind)
val getBlockHeaderFunc = SyncUtil.getBlockHeaderFunc(bitcoind)
val getBlockFunc = SyncUtil.getBlockFunc(bitcoind)
val syncedWalletF = WalletSync.sync(wallet,
getBlockHeaderFunc,
getBestBlockHashFunc,
getBlockFunc)
val bitcoindBestHeaderF = bitcoind.getBestBlockHeader()
for {
syncedWallet <- syncedWalletF
descriptorOpt <- syncedWallet.getSyncDescriptorOpt()
bitcoindBestHeader <- bitcoindBestHeaderF
} yield {
descriptorOpt match {
case Some(descriptor) =>
assert(descriptor.bestHash == bitcoindBestHeader.hashBE)
assert(descriptor.height == bitcoindBestHeader.height)
case None =>
fail(s"Could not sync wallet with bitcoind, got no descriptor!")
}
}
}
}

View file

@ -43,8 +43,9 @@ private[wallet] trait TransactionProcessing extends WalletLogger {
override def processBlock(block: Block): Future[Wallet] = { override def processBlock(block: Block): Future[Wallet] = {
logger.info(s"Processing block=${block.blockHeader.hash.flip}") logger.info(s"Processing block=${block.blockHeader.hash.flip}")
val resF =
block.transactions.foldLeft(Future.successful(this)) { val resF = for {
newWallet <- block.transactions.foldLeft(Future.successful(this)) {
(acc, transaction) => (acc, transaction) =>
for { for {
_ <- acc _ <- acc
@ -54,6 +55,8 @@ private[wallet] trait TransactionProcessing extends WalletLogger {
newWallet newWallet
} }
} }
} yield newWallet
val f = for { val f = for {
res <- resF res <- resF

View file

@ -0,0 +1,88 @@
package org.bitcoins.wallet.sync
import org.bitcoins.core.protocol.blockchain.{Block, BlockHeader}
import org.bitcoins.core.util.{BitcoinSLogger, FutureUtil}
import org.bitcoins.crypto.DoubleSha256DigestBE
import org.bitcoins.wallet.Wallet
import scala.concurrent.{ExecutionContext, Future}
trait WalletSync extends BitcoinSLogger {
def sync(
wallet: Wallet,
getBlockHeaderFunc: DoubleSha256DigestBE => Future[BlockHeader],
getBestBlockHashFunc: () => Future[DoubleSha256DigestBE],
getBlock: DoubleSha256DigestBE => Future[Block])(implicit
ec: ExecutionContext): Future[Wallet] = {
val bestBlockHashF = getBestBlockHashFunc()
val bestBlockHeaderF = for {
bestBlockHash <- bestBlockHashF
bestheader <- getBlockHeaderFunc(bestBlockHash)
} yield bestheader
val blocksToSyncF = for {
bestHeader <- bestBlockHeaderF
blocksToSync <- getBlocksToSync(wallet = wallet,
currentTipBlockHashBE = bestHeader.hashBE,
accum = Vector.empty,
getBlock = getBlock)
} yield blocksToSync
val syncedWalletF = for {
blocksToSync <- blocksToSyncF
syncedWallet <- FutureUtil.foldLeftAsync(wallet, blocksToSync) {
case (wallet, nextBlock) =>
wallet.processBlock(nextBlock)
}
} yield syncedWallet
syncedWalletF
}
/** Syncs the wallet by walking backwards from the currentTip until we reach our wallet's best blockHash */
private def getBlocksToSync(
wallet: Wallet,
currentTipBlockHashBE: DoubleSha256DigestBE,
accum: Vector[Block],
getBlock: DoubleSha256DigestBE => Future[Block])(implicit
ec: ExecutionContext): Future[Vector[Block]] = {
val initSyncDescriptorOptF = wallet.getSyncDescriptorOpt()
val genesisBlockHashBE = wallet.walletConfig.chain.genesisHashBE
for {
syncDescriptorOpt <- initSyncDescriptorOptF
walletBestHash = syncDescriptorOpt match {
case Some(descriptor) => descriptor.bestHash
case None => wallet.chainParams.genesisHashBE
}
currentBlockOpt <- {
if (
walletBestHash == currentTipBlockHashBE || currentTipBlockHashBE == genesisBlockHashBE
) {
Future.successful(None) // done syncing!
} else {
getBlock(currentTipBlockHashBE)
.map(Some(_))
}
}
blocks <- {
currentBlockOpt match {
case Some(currentBlock) =>
//loop again as we need to keep syncing
getBlocksToSync(wallet = wallet,
currentTipBlockHashBE =
currentBlock.blockHeader.previousBlockHashBE,
accum = currentBlock +: accum,
getBlock = getBlock)
case None =>
//yay! Done syncing, return all blocks our wallet needs to be synced with
Future.successful(accum)
}
}
} yield {
blocks
}
}
}
object WalletSync extends WalletSync