Rescan and fetch blocks (#835)

* Rescan and fetch blocks

* unit test

* fix log level

* addessed the PR comments

* fix test timeout

* improve unit tests
This commit is contained in:
rorp 2019-10-30 07:51:03 -07:00 committed by Chris Stewart
parent f169a6b1e9
commit abaa0581c0
10 changed files with 202 additions and 65 deletions

View file

@ -395,26 +395,33 @@ case class ChainHandler(
vectorSize / parallelismLevel + 1
else vectorSize / parallelismLevel
/** Iterates over the grouped vector of filters to find matches with the given [[bytes]].
*/
def findMatches(filterGroups: Iterator[Vector[CompactFilterDb]]): Future[
def findMatches(filters: Vector[CompactFilterDb]): Future[
Iterator[DoubleSha256DigestBE]] = {
// Sequence on the filter groups making sure the number of threads doesn't exceed [[parallelismLevel]].
Future
.sequence(filterGroups.map { filterGroup =>
// We need to wrap in a future here to make sure we can
// potentially run these matches in parallel
Future {
// Find any matches in the group and add the corresponding block hashes into the result
filterGroup.foldLeft(Vector.empty[DoubleSha256DigestBE]) {
(blocks, filter) =>
if (filter.golombFilter.matchesAny(bytes))
blocks :+ filter.blockHashBE
else blocks
if (filters.isEmpty)
Future.successful(Iterator.empty)
else {
/* Iterates over the grouped vector of filters to find matches with the given [[bytes]]. */
val groupSize = calcGroupSize(filters.size)
val filterGroups = filters.grouped(groupSize)
// Sequence on the filter groups making sure the number of threads doesn't exceed [[parallelismLevel]].
Future
.sequence(filterGroups.map { filterGroup =>
// We need to wrap in a future here to make sure we can
// potentially run these matches in parallel
Future {
// Find any matches in the group and add the corresponding block hashes into the result
filterGroup.foldLeft(Vector.empty[DoubleSha256DigestBE]) {
(blocks, filter) =>
if (filter.golombFilter.matchesAny(bytes)) {
blocks :+ filter.blockHashBE
} else {
blocks
}
}
}
}
})
.map(_.flatten)
})
.map(_.flatten)
}
}
/** Iterates over all filters in the range to find matches */
@ -429,12 +436,10 @@ case class ChainHandler(
} else {
val startHeight = end - (batchSize - 1)
val endHeight = end
val newAcc: Future[Vector[DoubleSha256DigestBE]] = for {
val newAcc = for {
compactFilterDbs <- filterDAO.getBetweenHeights(startHeight,
endHeight)
groupSize = calcGroupSize(compactFilterDbs.size)
grouped = compactFilterDbs.grouped(groupSize)
filtered <- findMatches(grouped)
filtered <- findMatches(compactFilterDbs)
res <- acc
} yield {
res ++ filtered

View file

@ -1,17 +1,20 @@
package org.bitcoins.node
import org.bitcoins.core.crypto.DoubleSha256DigestBE
import org.bitcoins.core.protocol.blockchain.Block
import org.bitcoins.core.protocol.script.ScriptPubKey
import org.bitcoins.rpc.client.common.BitcoindVersion
import org.bitcoins.rpc.util.RpcUtil
import org.bitcoins.server.BitcoinSAppConfig
import org.bitcoins.testkit.BitcoinSTestAppConfig
import org.bitcoins.testkit.fixtures.UsesExperimentalBitcoind
import org.bitcoins.testkit.node.fixture.NeutrinoNodeConnectedWithBitcoind
import org.bitcoins.testkit.node.NodeUnitTest.NeutrinoNodeFundedWalletBitcoind
import org.bitcoins.testkit.node.{NodeTestUtil, NodeUnitTest}
import org.scalatest.exceptions.TestFailedException
import org.scalatest.{DoNotDiscover, FutureOutcome}
import scala.concurrent.Future
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Future, Promise}
@DoNotDiscover
class NeutrinoNodeTest extends NodeUnitTest {
@ -20,18 +23,42 @@ class NeutrinoNodeTest extends NodeUnitTest {
implicit override protected def config: BitcoinSAppConfig =
BitcoinSTestAppConfig.getNeutrinoTestConfig()
override type FixtureParam = NeutrinoNodeConnectedWithBitcoind
override type FixtureParam = NeutrinoNodeFundedWalletBitcoind
override def withFixture(test: OneArgAsyncTest): FutureOutcome =
withNeutrinoNodeConnectedToBitcoind(test,
Some(BitcoindVersion.Experimental))
withNeutrinoNodeFundedWalletBitcoind(test,
callbacks,
Some(BitcoindVersion.Experimental))
private val testTimeout = 30.seconds
private var assertionP: Promise[Boolean] = Promise()
after {
//reset assertion after a test runs, because we
//are doing mutation to work around our callback
//limitations, we can't currently modify callbacks
//after a NeutrinoNode is constructed :-(
assertionP = Promise()
}
private var utxos: Set[ScriptPubKey] = _
private def blockCallback(block: Block): Unit = {
if (!assertionP.isCompleted) {
val scriptPubKeys =
block.transactions.flatMap(tx => tx.outputs.map(_.scriptPubKey)).toSet
assertionP.success(utxos.intersect(scriptPubKeys) == utxos)
}
}
def callbacks: SpvNodeCallbacks = {
SpvNodeCallbacks(onBlockReceived = Vector(blockCallback))
}
behavior of "NeutrinoNode"
it must "receive notification that a block occurred on the p2p network" taggedAs (UsesExperimentalBitcoind) in {
nodeConnectedWithBitcoind: NeutrinoNodeConnectedWithBitcoind =>
nodeConnectedWithBitcoind: NeutrinoNodeFundedWalletBitcoind =>
val node = nodeConnectedWithBitcoind.node
val bitcoind = nodeConnectedWithBitcoind.bitcoind
val bitcoind = nodeConnectedWithBitcoind.bitcoindRpc
val assert1F = for {
_ <- node.isConnected.map(assert(_))
@ -57,9 +84,9 @@ class NeutrinoNodeTest extends NodeUnitTest {
}
it must "stay in sync with a bitcoind instance" taggedAs (UsesExperimentalBitcoind) in {
nodeConnectedWithBitcoind: NeutrinoNodeConnectedWithBitcoind =>
nodeConnectedWithBitcoind: NeutrinoNodeFundedWalletBitcoind =>
val node = nodeConnectedWithBitcoind.node
val bitcoind = nodeConnectedWithBitcoind.bitcoind
val bitcoind = nodeConnectedWithBitcoind.bitcoindRpc
//we need to generate 1 block for bitcoind to consider
//itself out of IBD. bitcoind will not sendheaders
@ -90,33 +117,63 @@ class NeutrinoNodeTest extends NodeUnitTest {
startGenF.flatMap { _ =>
//we should expect 5 headers have been announced to us via
//the send headers message.
def has6BlocksF =
val ExpectedCount = 113
def hasBlocksF =
RpcUtil.retryUntilSatisfiedF(conditionF = () => {
node
.chainApiFromDb()
.flatMap(_.getBlockCount.map(_ == 6))
.flatMap(_.getBlockCount.map(_ == ExpectedCount))
}, duration = 1000.millis)
def has6FilterHeadersF =
def hasFilterHeadersF =
RpcUtil.retryUntilSatisfiedF(conditionF = () => {
node
.chainApiFromDb()
.flatMap(_.getFilterHeaderCount.map(_ == 6))
.flatMap(_.getFilterHeaderCount.map(_ == ExpectedCount))
}, duration = 1000.millis)
def has6FiltersF =
def hasFiltersF =
RpcUtil.retryUntilSatisfiedF(conditionF = () => {
node
.chainApiFromDb()
.flatMap(_.getFilterCount.map(_ == 6))
.flatMap(_.getFilterCount.map(_ == ExpectedCount))
}, duration = 1000.millis)
for {
_ <- has6BlocksF
_ <- has6FilterHeadersF
_ <- has6FiltersF
_ <- hasBlocksF
_ <- hasFilterHeadersF
_ <- hasFiltersF
} yield succeed
}
}
it must "download a block that matches a compact block filter" taggedAs (UsesExperimentalBitcoind) in {
nodeConnectedWithBitcoind: NeutrinoNodeFundedWalletBitcoind =>
val node = nodeConnectedWithBitcoind.node
val wallet = nodeConnectedWithBitcoind.wallet
val bitcoind = nodeConnectedWithBitcoind.bitcoindRpc
for {
walletUtxos <- wallet.listUtxos()
_ = {
assert(walletUtxos.nonEmpty)
utxos = walletUtxos.map(_.output.scriptPubKey).toSet
}
walletAddress <- wallet.getNewAddress()
_ <- node.sync()
_ <- NodeTestUtil.awaitCompactFiltersSync(node, bitcoind)
_ = system.scheduler.scheduleOnce(testTimeout) {
if (!assertionP.isCompleted)
assertionP.failure(
new TestFailedException(
s"Did not receive a block message after $testTimeout!",
failedCodeStackDepth = 0))
}
_ <- node.rescan(
walletUtxos.map(_.output.scriptPubKey) :+ walletAddress.scriptPubKey)
result <- assertionP.future
} yield assert(result)
}
}

View file

@ -1,11 +1,16 @@
package org.bitcoins.node
import java.util.concurrent.Executors
import akka.actor.ActorSystem
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.core.p2p.TypeIdentifier
import org.bitcoins.core.protocol.BlockStamp
import org.bitcoins.core.protocol.script.ScriptPubKey
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models.Peer
import scala.concurrent.Future
import scala.concurrent.{ExecutionContext, Future}
case class NeutrinoNode(
nodePeer: Peer,
@ -28,15 +33,16 @@ case class NeutrinoNode(
override val callbacks: SpvNodeCallbacks = nodeCallbacks
override def onStart(): Future[Unit] = {
override def start(): Future[Node] = {
val res = for {
node <- super.start()
chainApi <- chainApiFromDb()
bestHash <- chainApi.getBestBlockHash
peerMsgSender <- peerMsgSenderF
_ <- peerMsgSender.sendGetCompactFilterCheckPointMessage(
stopHash = bestHash.flip)
} yield {
()
node
}
res.failed.foreach(logger.error("Cannot start Neutrino node", _))
@ -44,4 +50,31 @@ case class NeutrinoNode(
res
}
def rescan(
scriptPubKeysToWatch: Vector[ScriptPubKey],
startOpt: Option[BlockStamp] = None,
endOpt: Option[BlockStamp] = None): Future[Unit] = {
val threadPool =
Executors.newFixedThreadPool(Runtime.getRuntime.availableProcessors() * 2)
val res = for {
chainApi <- chainApiFromDb()
blockHashes <- chainApi.getMatchingBlocks(
scriptPubKeysToWatch,
startOpt,
endOpt)(ExecutionContext.fromExecutor(threadPool))
peerMsgSender <- peerMsgSenderF
_ <- peerMsgSender.sendGetDataMessage(TypeIdentifier.MsgBlock,
blockHashes.map(_.flip): _*)
} yield {
()
}
res.onComplete(_ => threadPool.shutdown())
res.failed.foreach(logger.error("Cannot rescan", _))
res
}
}

View file

@ -52,7 +52,8 @@ trait Node extends P2PLogger {
* This involves database calls which can be slow and expensive to construct
* our [[org.bitcoins.chain.blockchain.Blockchain Blockchain]]
* */
def chainApiFromDb(): Future[ChainApi] = {
def chainApiFromDb()(
implicit executionContext: ExecutionContext): Future[ChainApi] = {
ChainHandler.fromDatabase(BlockHeaderDAO(),
CompactFilterHeaderDAO(),
CompactFilterDAO())
@ -126,14 +127,11 @@ trait Node extends P2PLogger {
this
}
}
_ <- onStart()
} yield {
node
}
}
def onStart(): Future[Unit]
/** Stops our node */
def stop(): Future[Node] = {
logger.info(s"Stopping node")

View file

@ -65,14 +65,16 @@ case class SpvNode(
}
}
override def onStart(): Future[Unit] = {
override def start(): Future[Node] = {
for {
node <- super.start()
peerMsgSender <- peerMsgSenderF
_ <- peerMsgSender.sendFilterLoadMessage(bloomFilter)
} yield {
logger(nodeAppConfig).info(
s"Sending bloomfilter=${bloomFilter.hex} to $peer")
logger.info(s"Sending bloomfilter=${bloomFilter.hex} to $peer")
node
}
}

View file

@ -142,7 +142,8 @@ case class DataMessageHandler(
if (appConfig.isSPVEnabled) {
logger.trace(s"Requesting data for headers=${headers.length}")
peerMsgSender.sendGetDataMessage(headers: _*)
peerMsgSender.sendGetDataMessage(TypeIdentifier.MsgFilteredBlock,
headers.map(_.hash): _*)
}
val getHeadersF = chainApiF
@ -188,6 +189,8 @@ case class DataMessageHandler(
this.copy(chainApi = newApi, syncing = newSyncing)
}
case msg: BlockMessage =>
logger.info(
s"Received block message with hash ${msg.block.blockHeader.hash.flip}")
Future {
callbacks.onBlockReceived.foreach(_.apply(msg.block))
this

View file

@ -121,10 +121,11 @@ case class PeerMessageSender(client: P2PClient)(implicit conf: NodeAppConfig)
}
/** Sends a request for filtered blocks matching the given headers */
def sendGetDataMessage(headers: BlockHeader*): Future[Unit] = {
def sendGetDataMessage(
typeIdentifier: TypeIdentifier,
hashes: DoubleSha256Digest*): Future[Unit] = {
val inventories =
headers.map(header =>
Inventory(TypeIdentifier.MsgFilteredBlock, header.hash))
hashes.map(hash => Inventory(typeIdentifier, hash))
val message = GetDataMessage(inventories)
logger.info(s"Sending getdata=$message to peer=${client.peer}")
sendMsg(message)

View file

@ -117,6 +117,24 @@ abstract class NodeTestUtil extends P2PLogger {
}
}
def isSameBestFilterHeight(node: NeutrinoNode, rpc: BitcoindRpcClient)(
implicit ec: ExecutionContext): Future[Boolean] = {
val rpcCountF = rpc.getBlockCount
for {
count <- node.chainApiFromDb().flatMap(_.getFilterCount)
rpcCount <- rpcCountF
} yield rpcCount == count
}
def isSameBestFilterHeaderHeight(node: NeutrinoNode, rpc: BitcoindRpcClient)(
implicit ec: ExecutionContext): Future[Boolean] = {
val rpcCountF = rpc.getBlockCount
for {
count <- node.chainApiFromDb().flatMap(_.getFilterHeaderCount)
rpcCount <- rpcCountF
} yield rpcCount == count
}
/** Checks if the given light client and bitcoind
* has the same number of blocks in their blockchains
*/
@ -144,7 +162,8 @@ abstract class NodeTestUtil extends P2PLogger {
implicit sys: ActorSystem): Future[Unit] = {
import sys.dispatcher
TestAsyncUtil
.retryUntilSatisfiedF(() => isSameBestHash(node, rpc), 1000.milliseconds)
.retryUntilSatisfiedF(() => isSameBestFilterHeaderHeight(node, rpc),
1000.milliseconds)
}
/** Awaits sync between the given node and bitcoind client */
@ -152,7 +171,8 @@ abstract class NodeTestUtil extends P2PLogger {
implicit sys: ActorSystem): Future[Unit] = {
import sys.dispatcher
TestAsyncUtil
.retryUntilSatisfiedF(() => isSameBestHash(node, rpc), 1000.milliseconds)
.retryUntilSatisfiedF(() => isSameBestFilterHeight(node, rpc),
1000.milliseconds)
}
/** The future doesn't complete until the nodes best hash is the given hash */

View file

@ -119,14 +119,17 @@ trait NodeUnitTest extends BitcoinSFixture {
def withNeutrinoNodeFundedWalletBitcoind(
test: OneArgAsyncTest,
callbacks: SpvNodeCallbacks)(
callbacks: SpvNodeCallbacks,
versionOpt: Option[BitcoindVersion] = None)(
implicit system: ActorSystem,
appConfig: BitcoinSAppConfig): FutureOutcome = {
makeDependentFixture(
build = () =>
NodeUnitTest
.createNeutrinoNodeFundedWalletBitcoind(callbacks)(system, appConfig),
.createNeutrinoNodeFundedWalletBitcoind(callbacks, versionOpt)(
system,
appConfig),
destroy = NodeUnitTest.destroyNodeFundedWalletBitcoind(
_: NodeFundedWalletBitcoind)(system, appConfig)
)(test)
@ -238,12 +241,14 @@ object NodeUnitTest extends P2PLogger {
}
/** Creates a spv node, a funded bitcoin-s wallet, all of which are connected to bitcoind */
def createSpvNodeFundedWalletBitcoind(callbacks: SpvNodeCallbacks)(
def createSpvNodeFundedWalletBitcoind(
callbacks: SpvNodeCallbacks,
versionOpt: Option[BitcoindVersion] = None)(
implicit system: ActorSystem,
appConfig: BitcoinSAppConfig): Future[SpvNodeFundedWalletBitcoind] = {
import system.dispatcher
require(appConfig.isSPVEnabled && !appConfig.isNeutrinoEnabled)
val fundedWalletF = BitcoinSWalletTest.fundedWalletAndBitcoind()
val fundedWalletF = BitcoinSWalletTest.fundedWalletAndBitcoind(versionOpt)
for {
fundedWallet <- fundedWalletF
node <- createSpvNode(fundedWallet.bitcoind, callbacks)
@ -255,12 +260,14 @@ object NodeUnitTest extends P2PLogger {
}
/** Creates a neutrino node, a funded bitcoin-s wallet, all of which are connected to bitcoind */
def createNeutrinoNodeFundedWalletBitcoind(callbacks: SpvNodeCallbacks)(
def createNeutrinoNodeFundedWalletBitcoind(
callbacks: SpvNodeCallbacks,
versionOpt: Option[BitcoindVersion])(
implicit system: ActorSystem,
appConfig: BitcoinSAppConfig): Future[NeutrinoNodeFundedWalletBitcoind] = {
import system.dispatcher
require(appConfig.isNeutrinoEnabled && !appConfig.isSPVEnabled)
val fundedWalletF = BitcoinSWalletTest.fundedWalletAndBitcoind()
val fundedWalletF = BitcoinSWalletTest.fundedWalletAndBitcoind(versionOpt)
for {
fundedWallet <- fundedWalletF
node <- createNeutrinoNode(fundedWallet.bitcoind, callbacks)

View file

@ -4,7 +4,7 @@ import akka.actor.ActorSystem
import com.typesafe.config.{Config, ConfigFactory}
import org.bitcoins.core.currency._
import org.bitcoins.db.AppConfig
import org.bitcoins.rpc.client.common.BitcoindRpcClient
import org.bitcoins.rpc.client.common.{BitcoindRpcClient, BitcoindVersion}
import org.bitcoins.server.BitcoinSAppConfig
import org.bitcoins.server.BitcoinSAppConfig._
import org.bitcoins.testkit.BitcoinSTestAppConfig
@ -127,13 +127,24 @@ object BitcoinSWalletTest extends WalletLogger {
createNewWallet(None)(config, ec)() // get the standard config
/** Pairs the given wallet with a bitcoind instance that has money in the bitcoind wallet */
def createWalletWithBitcoind(wallet: UnlockedWalletApi)(
implicit system: ActorSystem): Future[WalletWithBitcoind] = {
def createWalletWithBitcoind(
wallet: UnlockedWalletApi
)(implicit system: ActorSystem): Future[WalletWithBitcoind] = {
import system.dispatcher
val bitcoindF = BitcoinSFixture.createBitcoindWithFunds()
bitcoindF.map(WalletWithBitcoind(wallet, _))
}
/** Pairs the given wallet with a bitcoind instance that has money in the bitcoind wallet */
def createWalletWithBitcoind(
wallet: UnlockedWalletApi,
versionOpt: Option[BitcoindVersion]
)(implicit system: ActorSystem): Future[WalletWithBitcoind] = {
import system.dispatcher
val bitcoindF = BitcoinSFixture.createBitcoindWithFunds(versionOpt)
bitcoindF.map(WalletWithBitcoind(wallet, _))
}
/** Creates a default wallet, and then pairs it with a bitcoind instance that has money in the bitcoind wallet */
def createWalletWithBitcoind()(
implicit system: ActorSystem,
@ -144,13 +155,13 @@ object BitcoinSWalletTest extends WalletLogger {
}
/** Gives us a funded bitcoin-s wallet and the bitcoind instance that funded that wallet */
def fundedWalletAndBitcoind()(
def fundedWalletAndBitcoind(versionOpt: Option[BitcoindVersion])(
implicit config: BitcoinSAppConfig,
system: ActorSystem): Future[WalletWithBitcoind] = {
import system.dispatcher
for {
wallet <- createDefaultWallet()
withBitcoind <- createWalletWithBitcoind(wallet)
withBitcoind <- createWalletWithBitcoind(wallet, versionOpt)
funded <- fundWalletWithBitcoind(withBitcoind)
} yield funded
}