Automated wallet recovery (#985)

* Automated wallet recovery

* responded to the PR comments

* some more changes

* fix docs

* cleanup
This commit is contained in:
rorp 2020-01-02 11:18:41 -08:00 committed by Chris Stewart
parent ef99f92bd4
commit 93c02f02d0
28 changed files with 682 additions and 410 deletions

View file

@ -1,26 +1,17 @@
package org.bitcoins.cli
import org.bitcoins.picklers._
import scopt.OParser
import org.bitcoins.core.config.NetworkParameters
import upickle.{default => up}
import CliReaders._
import org.bitcoins.core.protocol._
import org.bitcoins.core.currency._
import org.bitcoins.cli.CliCommand.{
GetBalance,
GetBestBlockHash,
GetBlockCount,
GetNewAddress,
GetPeers,
NoCommand,
Rescan,
SendToAddress
}
import java.net.ConnectException
import java.{util => ju}
import ujson.Num
import ujson.Str
import org.bitcoins.cli.CliCommand._
import org.bitcoins.cli.CliReaders._
import org.bitcoins.core.config.NetworkParameters
import org.bitcoins.core.currency._
import org.bitcoins.core.protocol._
import org.bitcoins.picklers._
import scopt.OParser
import ujson.{Num, Str}
import upickle.{default => up}
case class Config(
command: CliCommand = CliCommand.NoCommand,
@ -46,9 +37,10 @@ object CliCommand {
case object GetBestBlockHash extends CliCommand
case object GetBlockCount extends CliCommand
case class Rescan(
addresses: Vector[BitcoinAddress],
addressBatchSize: Option[Int],
startBlock: Option[BlockStamp],
endBlock: Option[BlockStamp])
endBlock: Option[BlockStamp],
force: Boolean)
extends CliCommand
}
@ -80,17 +72,26 @@ object Cli extends App {
.action(
(_, conf) =>
conf.copy(
command = Rescan(addresses = Vector.empty,
command = Rescan(addressBatchSize = Option.empty,
startBlock = Option.empty,
endBlock = Option.empty)))
endBlock = Option.empty,
force = false)))
.text(s"Rescan UTXOs")
.children(
opt[Seq[BitcoinAddress]]("addresses")
.required()
.action((addrs, conf) =>
opt[Unit]("force")
.optional()
.action((_, conf) =>
conf.copy(command = conf.command match {
case rescan: Rescan =>
rescan.copy(addresses = addrs.toVector)
rescan.copy(force = true)
case other => other
})),
opt[Int]("batch-size")
.optional()
.action((batchSize, conf) =>
conf.copy(command = conf.command match {
case rescan: Rescan =>
rescan.copy(addressBatchSize = Option(batchSize))
case other => other
})),
opt[BlockStamp]("start")
@ -198,11 +199,12 @@ object Cli extends App {
RequestParam("getbalance")
case GetNewAddress =>
RequestParam("getnewaddress")
case Rescan(addresses, startBlock, endBlock) =>
case Rescan(addressBatchSize, startBlock, endBlock, force) =>
RequestParam("rescan",
Seq(up.writeJs(addresses),
Seq(up.writeJs(addressBatchSize),
up.writeJs(startBlock),
up.writeJs(endBlock)))
up.writeJs(endBlock),
up.writeJs(force)))
case SendToAddress(address, bitcoins) =>
RequestParam("sendtoaddress",

View file

@ -23,7 +23,7 @@ import org.bitcoins.wallet.MockUnlockedWalletApi
import org.scalamock.scalatest.MockFactory
import org.scalatest.{Matchers, WordSpec}
import ujson.Value.InvalidData
import ujson.{Arr, Null, Num, Str}
import ujson._
import scala.concurrent.Future
@ -181,80 +181,86 @@ class RoutesSpec
"run wallet rescan" in {
// positive cases
(mockWalletApi.rescan _)
.expects(Vector(testAddress.scriptPubKey), None, None)
(mockWalletApi.discoveryBatchSize: () => Int)
.expects()
.returning(100)
.atLeastOnce()
(mockWalletApi.isEmpty: () => Future[Boolean])
.expects()
.returning(Future.successful(false))
(mockWalletApi.rescanNeutrinoWallet _)
.expects(None, None, 100)
.returning(FutureUtil.unit)
val route1 =
walletRoutes.handleCommand(
ServerCommand("rescan", Arr(Arr(Str(testAddressStr)), Null, Null)))
ServerCommand("rescan", Arr(Arr(), Null, Null, true)))
Post() ~> route1 ~> check {
contentType shouldEqual `application/json`
responseAs[String] shouldEqual """{"result":"ok","error":null}"""
responseAs[String] shouldEqual """{"result":"scheduled","error":null}"""
}
(mockWalletApi.rescan _)
(mockWalletApi.isEmpty: () => Future[Boolean])
.expects()
.returning(Future.successful(false))
(mockWalletApi.rescanNeutrinoWallet _)
.expects(
Vector(testAddress.scriptPubKey),
Some(BlockTime(
ZonedDateTime.of(2018, 10, 27, 12, 34, 56, 0, ZoneId.of("UTC")))),
None)
None,
100)
.returning(FutureUtil.unit)
val route2 =
walletRoutes.handleCommand(
ServerCommand(
"rescan",
Arr(Arr(Str(testAddressStr)), Str("2018-10-27T12:34:56Z"), Null)))
ServerCommand("rescan",
Arr(Arr(), Str("2018-10-27T12:34:56Z"), Null, true)))
Post() ~> route2 ~> check {
contentType shouldEqual `application/json`
responseAs[String] shouldEqual """{"result":"ok","error":null}"""
responseAs[String] shouldEqual """{"result":"scheduled","error":null}"""
}
(mockWalletApi.rescan _)
.expects(Vector(testAddress.scriptPubKey),
None,
Some(BlockHash(DoubleSha256DigestBE.empty)))
(mockWalletApi.isEmpty: () => Future[Boolean])
.expects()
.returning(Future.successful(false))
(mockWalletApi.rescanNeutrinoWallet _)
.expects(None, Some(BlockHash(DoubleSha256DigestBE.empty)), 100)
.returning(FutureUtil.unit)
val route3 =
walletRoutes.handleCommand(
ServerCommand("rescan",
Arr(Arr(Str(testAddressStr)),
Null,
Str(DoubleSha256DigestBE.empty.hex))))
ServerCommand(
"rescan",
Arr(Null, Null, Str(DoubleSha256DigestBE.empty.hex), true)))
Post() ~> route3 ~> check {
contentType shouldEqual `application/json`
responseAs[String] shouldEqual """{"result":"ok","error":null}"""
responseAs[String] shouldEqual """{"result":"scheduled","error":null}"""
}
(mockWalletApi.rescan _)
.expects(Vector(testAddress.scriptPubKey),
Some(BlockHeight(12345)),
Some(BlockHeight(67890)))
(mockWalletApi.isEmpty: () => Future[Boolean])
.expects()
.returning(Future.successful(false))
(mockWalletApi.rescanNeutrinoWallet _)
.expects(Some(BlockHeight(12345)), Some(BlockHeight(67890)), 100)
.returning(FutureUtil.unit)
val route4 =
walletRoutes.handleCommand(
ServerCommand(
"rescan",
Arr(Arr(Str(testAddressStr)), Str("12345"), Num(67890))))
ServerCommand("rescan", Arr(Arr(), Str("12345"), Num(67890), true)))
Post() ~> route4 ~> check {
contentType shouldEqual `application/json`
responseAs[String] shouldEqual """{"result":"ok","error":null}"""
responseAs[String] shouldEqual """{"result":"scheduled","error":null}"""
}
// negative cases
val route5 =
walletRoutes.handleCommand(
ServerCommand(
"rescan",
Arr(Arr(Str(testAddressStr)), Str("abcd"), Str("efgh"))))
ServerCommand("rescan", Arr(Null, Str("abcd"), Str("efgh"), true)))
Post() ~> route5 ~> check {
rejection shouldEqual ValidationRejection(
@ -264,9 +270,8 @@ class RoutesSpec
val route6 =
walletRoutes.handleCommand(
ServerCommand(
"rescan",
Arr(Arr(Str(testAddressStr)), Null, Str("2018-10-27T12:34:56"))))
ServerCommand("rescan",
Arr(Arr(55), Null, Str("2018-10-27T12:34:56"), true)))
Post() ~> route6 ~> check {
rejection shouldEqual ValidationRejection(
@ -276,7 +281,7 @@ class RoutesSpec
val route7 =
walletRoutes.handleCommand(
ServerCommand("rescan", Arr(Arr(Str(testAddressStr)), Num(-1), Null)))
ServerCommand("rescan", Arr(Null, Num(-1), Null, true)))
Post() ~> route7 ~> check {
rejection shouldEqual ValidationRejection(
@ -284,26 +289,21 @@ class RoutesSpec
Some(InvalidData(Num(-1), "Expected a positive integer")))
}
(mockWalletApi.isEmpty: () => Future[Boolean])
.expects()
.returning(Future.successful(false))
(mockWalletApi.rescanNeutrinoWallet _)
.expects(None, None, 55)
.returning(FutureUtil.unit)
val route8 =
walletRoutes.handleCommand(
ServerCommand("rescan", Arr(Arr(), Null, Null)))
ServerCommand("rescan", Arr(Arr(55), Arr(), Arr(), Bool(true))))
Post() ~> route8 ~> check {
rejection shouldEqual ValidationRejection(
"failure",
Some(InvalidData(Arr(), "Expected a non-empty address array")))
contentType shouldEqual `application/json`
responseAs[String] shouldEqual """{"result":"scheduled","error":null}"""
}
val route9 =
walletRoutes.handleCommand(
ServerCommand("rescan", Arr(Arr("abcdefgh"), Null, Null)))
Post() ~> route9 ~> check {
rejection shouldEqual ValidationRejection(
"failure",
Some(InvalidData("abcdefgh", "Expected a valid address")))
}
}
}

View file

@ -57,6 +57,10 @@ bitcoin-s {
defaultAccountType = legacy # legacy, segwit, nested-segwit
bloomFalsePositiveRate = 0.0001 # percentage
addressGapLimit = 20
discoveryBatchSize = 100
}
}

View file

@ -6,8 +6,8 @@ import java.nio.file.Files
import akka.actor.ActorSystem
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.core.api.ChainQueryApi
import org.bitcoins.keymanager.KeyManagerInitializeError
import org.bitcoins.keymanager.bip39.BIP39KeyManager
import org.bitcoins.keymanager.{KeyManager, KeyManagerInitializeError}
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models.Peer
import org.bitcoins.node.networking.peer.DataMessageHandler
@ -101,8 +101,10 @@ object Main extends App {
// TODO change me when we implement proper password handling
locked.unlock(BIP39KeyManager.badPassphrase) match {
case Right(wallet) => Future.successful(wallet)
case Left(kmError) => error(kmError)
case Right(wallet) =>
Future.successful(wallet)
case Left(kmError) =>
error(kmError)
}
} else {
logger.info(s"Initializing key manager")
@ -132,6 +134,7 @@ object Main extends App {
lazy val onCompactFilter: OnCompactFilterReceived = {
(blockHash, blockFilter) =>
wallet.processCompactFilter(blockHash, blockFilter)
()
}
lazy val onBlock: OnBlockReceived = { block =>
wallet.processBlock(block)

View file

@ -3,7 +3,7 @@ package org.bitcoins.server
import org.bitcoins.core.currency.Bitcoins
import org.bitcoins.core.protocol.BlockStamp.BlockHeight
import org.bitcoins.core.protocol.{BitcoinAddress, BlockStamp}
import ujson.{Arr, Null, Num, Str, Value}
import ujson._
import upickle.default._
import scala.util.{Failure, Try}
@ -16,27 +16,20 @@ object ServerCommand {
}
case class Rescan(
addresses: Vector[BitcoinAddress],
batchSize: Option[Int],
startBlock: Option[BlockStamp],
endBlock: Option[BlockStamp])
endBlock: Option[BlockStamp],
force: Boolean)
object Rescan extends ServerJsonModels {
def fromJsArr(jsArr: ujson.Arr): Try[Rescan] = {
def parseAddresses(value: Value): Vector[BitcoinAddress] = value match {
case Arr(arr) =>
if (arr.isEmpty)
throw Value.InvalidData(value, "Expected a non-empty address array")
else
arr.toVector.map(jsToBitcoinAddress)
case _: Value =>
throw Value.InvalidData(value, "Expected an Arr")
}
def nullToOpt(value: Value): Option[Value] = value match {
case Null => None
case _: Value => Some(value)
case Null => None
case Arr(arr) if arr.isEmpty => None
case Arr(arr) if arr.size == 1 => Some(arr.head)
case _: Value => Some(value)
}
def parseBlockStamp(value: Value): Option[BlockStamp] =
@ -51,13 +44,30 @@ object Rescan extends ServerJsonModels {
throw Value.InvalidData(value, "Expected a Num or a Str")
}
def parseInt(value: Value): Option[Int] =
nullToOpt(value).map {
case Str(value) => value.toInt
case Num(value) => value.toInt
case _: Value =>
throw Value.InvalidData(value, "Expected a Num or a Str")
}
def parseBoolean(value: Value): Boolean = value match {
case Bool(bool) => bool
case _: Value => throw Value.InvalidData(value, "Expected a Bool")
}
jsArr.arr.toList match {
case addrsJs :: startJs :: endJs :: Nil =>
case batchSizeJs :: startJs :: endJs :: forceJs :: Nil =>
Try {
val addresses = parseAddresses(addrsJs)
val batchSize = parseInt(batchSizeJs)
val start = parseBlockStamp(startJs)
val end = parseBlockStamp(endJs)
Rescan(addresses = addresses, startBlock = start, endBlock = end)
val force = parseBoolean(forceJs)
Rescan(batchSize = batchSize,
startBlock = start,
endBlock = end,
force = force)
}
case Nil =>
Failure(new IllegalArgumentException("Missing addresses"))
@ -65,7 +75,7 @@ object Rescan extends ServerJsonModels {
case other =>
Failure(
new IllegalArgumentException(
s"Bad number of arguments: ${other.length}. Expected: 3"))
s"Bad number of arguments: ${other.length}. Expected: 4"))
}
}

View file

@ -10,6 +10,7 @@ import org.bitcoins.node.Node
import org.bitcoins.picklers._
import org.bitcoins.wallet.api.UnlockedWalletApi
import scala.concurrent.Future
import scala.util.{Failure, Success}
case class WalletRoutes(wallet: UnlockedWalletApi, node: Node)(
@ -54,11 +55,26 @@ case class WalletRoutes(wallet: UnlockedWalletApi, node: Node)(
Rescan.fromJsArr(arr) match {
case Failure(exception) =>
reject(ValidationRejection("failure", Some(exception)))
case Success(Rescan(addresses, startBlock, endBlock)) =>
case Success(Rescan(batchSize, startBlock, endBlock, force)) =>
complete {
wallet
.rescan(addresses.map(_.scriptPubKey), startBlock, endBlock)
.map(_ => Server.httpSuccess("ok"))
val res = for {
empty <- wallet.isEmpty()
msg <- if (force || empty) {
wallet
.rescanNeutrinoWallet(
startBlock,
endBlock,
batchSize.getOrElse(wallet.discoveryBatchSize))
.map(_ => "scheduled")
} else {
Future.successful(
"DANGER! The wallet is not empty, however the rescan " +
"process destroys all existing records and creates new ones. " +
"Use force option if you really want to proceed. " +
"Don't forget to backup the wallet database.")
}
} yield msg
res.map(msg => Server.httpSuccess(msg))
}
}

View file

@ -4,8 +4,9 @@ import org.bitcoins.chain.ChainVerificationLogger
import org.bitcoins.chain.api.ChainApi
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.chain.models._
import org.bitcoins.core.api.ChainQueryApi.FilterResponse
import org.bitcoins.core.crypto.{DoubleSha256Digest, DoubleSha256DigestBE}
import org.bitcoins.core.gcs.{FilterHeader, GolombFilter}
import org.bitcoins.core.gcs.FilterHeader
import org.bitcoins.core.p2p.CompactFilterMessage
import org.bitcoins.core.protocol.BlockStamp
import org.bitcoins.core.protocol.blockchain.BlockHeader
@ -403,10 +404,12 @@ case class ChainHandler(
override def getFiltersBetweenHeights(
startHeight: Int,
endHeight: Int): Future[Vector[(GolombFilter, DoubleSha256DigestBE)]] =
endHeight: Int): Future[Vector[FilterResponse]] =
filterDAO
.getBetweenHeights(startHeight, endHeight)
.map(dbos => dbos.map(dbo => (dbo.golombFilter, dbo.blockHashBE)))
.map(dbos =>
dbos.map(dbo =>
FilterResponse(dbo.golombFilter, dbo.blockHashBE, dbo.height)))
}
object ChainHandler {

View file

@ -3,7 +3,6 @@ package org.bitcoins.core.api
import org.bitcoins.core.crypto.DoubleSha256DigestBE
import org.bitcoins.core.gcs.GolombFilter
import org.bitcoins.core.protocol.BlockStamp
import org.bitcoins.core.util.FutureUtil
import scala.concurrent.Future
@ -12,6 +11,8 @@ import scala.concurrent.Future
*/
trait ChainQueryApi {
import org.bitcoins.core.api.ChainQueryApi._
/** Gets the height of the given block */
def getBlockHeight(blockHash: DoubleSha256DigestBE): Future[Option[Int]]
@ -30,39 +31,15 @@ trait ChainQueryApi {
def getFiltersBetweenHeights(
startHeight: Int,
endHeight: Int): Future[Vector[(GolombFilter, DoubleSha256DigestBE)]]
endHeight: Int): Future[Vector[FilterResponse]]
}
object ChainQueryApi {
object NoOp extends ChainQueryApi {
/** Gets the height of the given block */
override def getBlockHeight(
blockHash: DoubleSha256DigestBE): Future[Option[Int]] =
FutureUtil.none
/** Gets the hash of the block that is what we consider "best" */
override def getBestBlockHash(): Future[DoubleSha256DigestBE] =
Future.successful(DoubleSha256DigestBE.empty)
/** Gets number of confirmations for the given block hash. It returns None of no block found */
override def getNumberOfConfirmations(
blockHashOpt: DoubleSha256DigestBE): Future[Option[Int]] =
FutureUtil.none
/** Returns the block height of the given block stamp */
override def getHeightByBlockStamp(blockStamp: BlockStamp): Future[Int] =
Future.successful(0)
/** Gets the number of compact filters in the database */
override def getFilterCount: Future[Int] = Future.successful(0)
override def getFiltersBetweenHeights(
startHeight: Int,
endHeight: Int): Future[Vector[(GolombFilter, DoubleSha256DigestBE)]] =
Future.successful(Vector.empty)
}
case class FilterResponse(
compactFilter: GolombFilter,
blockHash: DoubleSha256DigestBE,
blockHeight: Int)
sealed abstract class ChainException(message: String)
extends RuntimeException(message)

View file

@ -1,7 +1,6 @@
package org.bitcoins.core.api
import org.bitcoins.core.crypto.DoubleSha256Digest
import org.bitcoins.core.util.FutureUtil
import scala.concurrent.Future
@ -16,14 +15,3 @@ trait NodeApi {
def downloadBlocks(blockHashes: Vector[DoubleSha256Digest]): Future[Unit]
}
object NodeApi {
object NoOp extends NodeApi {
override def downloadBlocks(
blockHashes: Vector[DoubleSha256Digest]): Future[Unit] = FutureUtil.unit
}
}

View file

@ -1,10 +1,11 @@
package org.bitcoins.db
import java.sql.SQLException
import org.bitcoins.core.config.MainNet
import slick.jdbc.SQLiteProfile.api._
import scala.concurrent.{ExecutionContext, Future}
import java.sql.SQLException
import org.bitcoins.core.config.MainNet
/**
* Created by chris on 9/8/16.
@ -84,6 +85,12 @@ abstract class CRUD[T, PrimaryKeyType](
database.run(query.delete)
}
/**
* delete all records from the table
*/
def deleteAll(): Future[Int] =
database.run(table.delete)
/**
* insert the record if it does not exist, update it if it does
*
@ -127,6 +134,8 @@ abstract class CRUD[T, PrimaryKeyType](
def findAll(): Future[Vector[T]] =
database.run(table.result).map(_.toVector)
/** Returns number of rows in the table */
def count(): Future[Int] = database.run(table.length.result)
}
case class SafeDatabase(config: AppConfig) extends DatabaseLogger {

View file

@ -110,7 +110,6 @@ val syncF: Future[ChainApi] = configF.flatMap { _ =>
}
//initialize our key manager, where we store our keys
import org.bitcoins.keymanager._
import org.bitcoins.keymanager.bip39._
val keyManager = BIP39KeyManager.initialize(walletConfig.kmParams).getOrElse {
throw new RuntimeException(s"Failed to initalize key manager")
@ -122,9 +121,21 @@ val keyManager = BIP39KeyManager.initialize(walletConfig.kmParams).getOrElse {
import org.bitcoins.wallet.api.LockedWalletApi
import org.bitcoins.wallet.Wallet
import org.bitcoins.core.api._
val wallet = Wallet(keyManager,NodeApi.NoOp, ChainQueryApi.NoOp)
import org.bitcoins.core.crypto._
import org.bitcoins.core.protocol._
val wallet = Wallet(keyManager, new NodeApi {
override def downloadBlocks(blockHashes: Vector[DoubleSha256Digest]): Future[Unit] = Future.successful(())
}, new ChainQueryApi {
import org.bitcoins.core.api.ChainQueryApi._
override def getBlockHeight(blockHash: DoubleSha256DigestBE): Future[Option[Int]] = Future.successful(None)
override def getBestBlockHash(): Future[DoubleSha256DigestBE] = Future.successful(DoubleSha256DigestBE.empty)
override def getNumberOfConfirmations(blockHashOpt: DoubleSha256DigestBE): Future[Option[Int]] = Future.successful(None)
override def getFilterCount: Future[Int] = Future.successful(0)
override def getHeightByBlockStamp(blockStamp: BlockStamp): Future[Int] = Future.successful(0)
override def getFiltersBetweenHeights(startHeight: Int, endHeight: Int): Future[Vector[FilterResponse]] = Future.successful(Vector.empty)
})
val walletF: Future[LockedWalletApi] = configF.flatMap { _ =>
Wallet.initialize(wallet)
Wallet.initialize(wallet)
}
// when this future completes, ww have sent a transaction

View file

@ -56,7 +56,7 @@ case class BIP39KeyManager(
object BIP39KeyManager
extends KeyManagerCreateApi[BIP39KeyManager]
with BitcoinSLogger {
val badPassphrase = AesPassword.fromString("bad-password").get
val badPassphrase = AesPassword.fromString("changeMe").get
/** Initializes the mnemonic seed and saves it to file */
override def initializeWithEntropy(

View file

@ -10,7 +10,6 @@ import org.bitcoins.testkit.BitcoinSTestAppConfig
import org.bitcoins.testkit.fixtures.UsesExperimentalBitcoind
import org.bitcoins.testkit.node.NodeUnitTest.NeutrinoNodeFundedWalletBitcoind
import org.bitcoins.testkit.node.{NodeTestUtil, NodeUnitTest}
import org.scalatest.exceptions.TestFailedException
import org.scalatest.{DoNotDiscover, FutureOutcome}
import scala.concurrent.duration.DurationInt
@ -148,32 +147,4 @@ class NeutrinoNodeTest extends NodeUnitTest {
}
}
it must "download a block that matches a compact block filter" taggedAs (UsesExperimentalBitcoind) in {
nodeConnectedWithBitcoind: NeutrinoNodeFundedWalletBitcoind =>
val node = nodeConnectedWithBitcoind.node
val wallet = nodeConnectedWithBitcoind.wallet
val bitcoind = nodeConnectedWithBitcoind.bitcoindRpc
for {
walletUtxos <- wallet.listUtxos()
_ = {
assert(walletUtxos.nonEmpty)
utxos = walletUtxos.map(_.output.scriptPubKey).toSet
}
walletAddress <- wallet.getNewAddress()
_ <- node.sync()
_ <- NodeTestUtil.awaitCompactFiltersSync(node, bitcoind)
_ = system.scheduler.scheduleOnce(testTimeout) {
if (!assertionP.isCompleted)
assertionP.failure(
new TestFailedException(
s"Did not receive a block message after $testTimeout!",
failedCodeStackDepth = 0))
}
_ <- wallet.rescan(
walletUtxos.map(_.output.scriptPubKey) :+ walletAddress.scriptPubKey)
result <- assertionP.future
} yield assert(result)
}
}

View file

@ -199,8 +199,9 @@ class NeutrinoNodeWithWalletTest extends NodeUnitTest {
_ <- NodeTestUtil.awaitSync(node, bitcoind)
_ <- NodeTestUtil.awaitCompactFiltersSync(node, bitcoind)
addresses <- wallet.listAddresses()
_ <- wallet.rescan(addresses.map(_.scriptPubKey))
_ <- wallet.rescanNeutrinoWallet(startOpt = None,
endOpt = None,
addressBatchSize = 2)
_ <- AsyncUtil.awaitConditionF(condition)
} yield succeed

View file

@ -2,8 +2,7 @@ package org.bitcoins.node
import akka.actor.ActorSystem
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.core.crypto.DoubleSha256DigestBE
import org.bitcoins.core.gcs.GolombFilter
import org.bitcoins.core.api.ChainQueryApi.FilterResponse
import org.bitcoins.core.protocol.BlockStamp
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models.Peer
@ -55,7 +54,7 @@ case class NeutrinoNode(
override def getFiltersBetweenHeights(
startHeight: Int,
endHeight: Int): Future[Vector[(GolombFilter, DoubleSha256DigestBE)]] =
endHeight: Int): Future[Vector[FilterResponse]] =
chainApiFromDb().flatMap(_.getFiltersBetweenHeights(startHeight, endHeight))
}

View file

@ -2,9 +2,8 @@ package org.bitcoins.node
import akka.actor.ActorSystem
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.core.api.ChainQueryApi.FilterResponse
import org.bitcoins.core.bloom.BloomFilter
import org.bitcoins.core.crypto.DoubleSha256DigestBE
import org.bitcoins.core.gcs.GolombFilter
import org.bitcoins.core.protocol.transaction.Transaction
import org.bitcoins.core.protocol.{BitcoinAddress, BlockStamp}
import org.bitcoins.node.config.NodeAppConfig
@ -97,6 +96,6 @@ case class SpvNode(
override def getFiltersBetweenHeights(
startHeight: Int,
endHeight: Int): Future[Vector[(GolombFilter, DoubleSha256DigestBE)]] =
endHeight: Int): Future[Vector[FilterResponse]] =
Future.failed(new RuntimeException(cfErrMsg))
}

View file

@ -53,6 +53,10 @@ bitcoin-s {
defaultAccountType = legacy # legacy, segwit, nested-segwit
bloomFalsePositiveRate = 0.0001 # percentage
addressGapLimit = 20
discoveryBatchSize = 100
}
}

View file

@ -2,14 +2,14 @@ package org.bitcoins.testkit.wallet
import akka.actor.ActorSystem
import com.typesafe.config.{Config, ConfigFactory}
import org.bitcoins.core.api.ChainQueryApi.FilterResponse
import org.bitcoins.core.api.{ChainQueryApi, NodeApi}
import org.bitcoins.core.crypto.DoubleSha256DigestBE
import org.bitcoins.core.crypto.{DoubleSha256Digest, DoubleSha256DigestBE}
import org.bitcoins.core.currency._
import org.bitcoins.core.gcs.{BlockFilter, GolombFilter}
import org.bitcoins.core.gcs.BlockFilter
import org.bitcoins.core.protocol.BlockStamp
import org.bitcoins.core.util.FutureUtil
import org.bitcoins.db.AppConfig
import org.bitcoins.keymanager.KeyManager
import org.bitcoins.keymanager.bip39.BIP39KeyManager
import org.bitcoins.rpc.client.common.{BitcoindRpcClient, BitcoindVersion}
import org.bitcoins.server.BitcoinSAppConfig
@ -36,7 +36,7 @@ trait BitcoinSWalletTest extends BitcoinSFixture with WalletLogger {
AppConfig.throwIfDefaultDatadir(config.walletConf)
}
def nodeApi: NodeApi = NodeApi.NoOp
def nodeApi: NodeApi = MockNodeApi
// This is a random block on testnet
val testBlockHash = DoubleSha256DigestBE.fromHex(
@ -71,7 +71,7 @@ trait BitcoinSWalletTest extends BitcoinSFixture with WalletLogger {
override def getFiltersBetweenHeights(
startHeight: Int,
endHeight: Int): Future[Vector[(GolombFilter, DoubleSha256DigestBE)]] =
endHeight: Int): Future[Vector[FilterResponse]] =
Future.successful({
import scodec.bits._
@ -104,8 +104,10 @@ trait BitcoinSWalletTest extends BitcoinSFixture with WalletLogger {
hex"c14d41b2d5aefaf539e989f7fa097eac657c70b975c56e26b73fb9401ce3" ++
hex"81502f0883d52c6a3bcc956e0ea1787f0717d0205fecfe55b01edb1ac0"
Vector(
(BlockFilter.fromBytes(filterBytes, testBlockHash.flip),
testBlockHash))
FilterResponse(compactFilter = BlockFilter
.fromBytes(filterBytes, testBlockHash.flip),
blockHash = testBlockHash,
blockHeight = 1))
})
}
@ -188,6 +190,37 @@ object BitcoinSWalletTest extends WalletLogger {
lazy val initialFunds = 25.bitcoins
object MockNodeApi extends NodeApi {
override def downloadBlocks(
blockHashes: Vector[DoubleSha256Digest]): Future[Unit] = FutureUtil.unit
}
object MockChainQueryApi extends ChainQueryApi {
override def getBlockHeight(
blockHash: DoubleSha256DigestBE): Future[Option[Int]] =
FutureUtil.none
override def getBestBlockHash(): Future[DoubleSha256DigestBE] =
Future.successful(DoubleSha256DigestBE.empty)
override def getNumberOfConfirmations(
blockHashOpt: DoubleSha256DigestBE): Future[Option[Int]] =
FutureUtil.none
override def getHeightByBlockStamp(blockStamp: BlockStamp): Future[Int] =
Future.successful(0)
override def getFilterCount: Future[Int] = Future.successful(0)
override def getFiltersBetweenHeights(
startHeight: Int,
endHeight: Int): Future[Vector[FilterResponse]] =
Future.successful(Vector.empty)
}
case class WalletWithBitcoind(
wallet: UnlockedWalletApi,
bitcoind: BitcoindRpcClient)

View file

@ -1,18 +1,33 @@
package org.bitcoins.wallet
import com.typesafe.config.{Config, ConfigFactory}
import org.bitcoins.core.api.{ChainQueryApi, NodeApi}
import org.bitcoins.core.crypto.{ExtPublicKey, MnemonicCode}
import org.bitcoins.core.hd.HDChainType.{Change, External}
import org.bitcoins.core.hd._
import play.api.libs.json.JsValue
import play.api.libs.json.Json
import org.bitcoins.core.hd.HDCoinType
import org.bitcoins.core.hd.HDPurpose
import org.bitcoins.core.hd.HDPath
import org.bitcoins.core.protocol.BitcoinAddress
import org.bitcoins.core.util.FutureUtil
import org.bitcoins.keymanager.bip39.BIP39KeyManager
import org.bitcoins.keymanager.{KeyManager, KeyManagerParams}
import org.bitcoins.rpc.serializers.JsonSerializers._
import play.api.libs.json.Reads
import play.api.libs.json.JsResult
import play.api.libs.json.JsError
import play.api.libs.json.JsSuccess
import org.bitcoins.core.hd.HDChainType
import org.bitcoins.core.hd.HDPurposes
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.bitcoins.core.util.FutureUtil
import org.bitcoins.keymanager.KeyManagerParams
import org.bitcoins.keymanager.bip39.BIP39KeyManager
import org.bitcoins.testkit.BitcoinSTestAppConfig
import org.bitcoins.testkit.fixtures.EmptyFixture
import org.bitcoins.testkit.wallet.BitcoinSWalletTest
import org.bitcoins.testkit.wallet.BitcoinSWalletTest.{
MockChainQueryApi,
MockNodeApi
}
import org.bitcoins.wallet.config.WalletAppConfig
import org.bitcoins.wallet.models.{AccountDb, AddressDb}
import org.scalatest.compatible.Assertion
@ -135,18 +150,21 @@ class TrezorAddressTest extends BitcoinSWalletTest with EmptyFixture {
ConfigFactory.parseString(confStr)
}
private def getWallet(config: WalletAppConfig)(implicit ec: ExecutionContext): Future[Wallet] = {
val kmE = BIP39KeyManager.initializeWithEntropy(mnemonic.toEntropy, config.kmParams)
private def getWallet(config: WalletAppConfig)(
implicit ec: ExecutionContext): Future[Wallet] = {
val kmE =
BIP39KeyManager.initializeWithEntropy(mnemonic.toEntropy, config.kmParams)
kmE match {
case Left(err) => Future.failed(new RuntimeException(s"Failed to initialize km with err=${err}"))
case Left(err) =>
Future.failed(
new RuntimeException(s"Failed to initialize km with err=${err}"))
case Right(km) =>
val wallet = Wallet(km, NodeApi.NoOp, ChainQueryApi.NoOp)(config, ec)
val walletF = Wallet.initialize(wallet)(config,ec)
val wallet = Wallet(km, MockNodeApi, MockChainQueryApi)(config, ec)
val walletF = Wallet.initialize(wallet)(config, ec)
walletF
}
}
case class AccountAndAddrsAndVector(
account: AccountDb,
addrs: Seq[AddressDb],
@ -167,12 +185,13 @@ class TrezorAddressTest extends BitcoinSWalletTest with EmptyFixture {
assert(foundAddress.address == expectedAddress.address)
}
}
/** Creates the wallet accounts needed for this test */
private def createNeededAccounts(
wallet: Wallet,
existing: Vector[AccountDb],
keyManagerParams: KeyManagerParams,
testVectors: Vector[TestVector]): Future[Unit] = {
wallet: Wallet,
existing: Vector[AccountDb],
keyManagerParams: KeyManagerParams,
testVectors: Vector[TestVector]): Future[Unit] = {
val accountsToCreate = existing.length until testVectors.length
FutureUtil
.sequentially(accountsToCreate) { _ =>
@ -187,16 +206,16 @@ class TrezorAddressTest extends BitcoinSWalletTest with EmptyFixture {
* addresses needed to verify the test vector
*/
def getAccountsWithAddressesAndVectors(
wallet: Wallet,
accountsWithVectors: Seq[(AccountDb, TestVector)]): Future[
wallet: Wallet,
accountsWithVectors: Seq[(AccountDb, TestVector)]): Future[
Seq[AccountAndAddrsAndVector]] = {
FutureUtil.sequentially(accountsWithVectors) {
case (acc, vec) =>
val addrFutures: Future[Seq[AddressDb]] =
FutureUtil.sequentially(vec.addresses) { vector =>
val addrFut = vector.chain match {
case Change => wallet.getNewChangeAddress(acc)
case External =>
case HDChainType.Change => wallet.getNewChangeAddress(acc)
case HDChainType.External =>
wallet.getNewAddress(acc)
}
addrFut.flatMap(wallet.addressDAO.findAddress).map {
@ -224,7 +243,10 @@ class TrezorAddressTest extends BitcoinSWalletTest with EmptyFixture {
for {
wallet <- getWallet(conf)
existingAccounts <- wallet.listAccounts(purpose)
_ <- createNeededAccounts(wallet, existingAccounts, conf.kmParams, testVectors)
_ <- createNeededAccounts(wallet,
existingAccounts,
conf.kmParams,
testVectors)
accounts <- wallet.listAccounts(purpose)
// we want to find all accounts for the given account type,
// and match it with its corresponding test vector
@ -256,7 +278,7 @@ class TrezorAddressTest extends BitcoinSWalletTest with EmptyFixture {
val assertions: Seq[Assertion] = {
val nestedAssertions: Seq[Seq[Assertion]] =
accountsWithAddrsWithVecs.map {
case AccountAndAddrsAndVector(account, addresses, vec) =>
case AccountAndAddrsAndVector(account, addresses, _) =>
val acctIdx = account.hdAccount.index
val vec = vectors.find(_.xpub == account.xpub) match {
case None =>

View file

@ -7,9 +7,10 @@ import org.bitcoins.core.hd.HDChainType.{Change, External}
import org.bitcoins.core.hd.{HDChainType, HDPurpose}
import org.bitcoins.core.protocol.BitcoinAddress
import org.bitcoins.core.util.FutureUtil
import org.bitcoins.keymanager.KeyManagerUnlockError.MnemonicNotFound
import org.bitcoins.keymanager.{KeyManagerUnlockError, WalletStorage}
import org.bitcoins.keymanager.KeyManagerUnlockError.{BadPassword, JsonParsingError, MnemonicNotFound}
import org.bitcoins.testkit.wallet.BitcoinSWalletTest
import org.bitcoins.wallet.api.LockedWalletApi.BlockMatchingResponse
import org.bitcoins.wallet.api.UnlockedWalletApi
import org.bitcoins.wallet.models.AddressDb
import org.scalatest.FutureOutcome
@ -26,17 +27,18 @@ class WalletUnitTest extends BitcoinSWalletTest {
behavior of "Wallet - unit test"
it must "write the mnemonic seed to the root datadir -- NOT A NETWORK sub directory" in { wallet: UnlockedWalletApi =>
//since datadir has the path that relates it to a network ('mainnet'/'testnet'/'regtest')
//we need to get the parent of that to find where the encrypted seed should be
//this is where the bitcoin-s.conf should live too.
val datadir = wallet.walletConfig.baseDatadir
it must "write the mnemonic seed to the root datadir -- NOT A NETWORK sub directory" in {
wallet: UnlockedWalletApi =>
//since datadir has the path that relates it to a network ('mainnet'/'testnet'/'regtest')
//we need to get the parent of that to find where the encrypted seed should be
//this is where the bitcoin-s.conf should live too.
val datadir = wallet.walletConfig.baseDatadir
assert(Files.exists(datadir.resolve(WalletStorage.ENCRYPTED_SEED_FILE_NAME)))
assert(
Files.exists(datadir.resolve(WalletStorage.ENCRYPTED_SEED_FILE_NAME)))
}
it should "create a new wallet" in { wallet: UnlockedWalletApi =>
for {
accounts <- wallet.listAccounts()
@ -145,7 +147,7 @@ class WalletUnitTest extends BitcoinSWalletTest {
wallet: UnlockedWalletApi =>
val badpassphrase = AesPassword.fromNonEmptyString("bad")
val errorType = wallet.unlock(badpassphrase) match {
case Right(_) => fail("Unlocked wallet with bad password!")
case Right(_) => fail("Unlocked wallet with bad password!")
case Left(err) => err
}
errorType match {
@ -165,7 +167,9 @@ class WalletUnitTest extends BitcoinSWalletTest {
endOpt = None
)(system.dispatcher)
} yield {
assert(Vector(testBlockHash) == matched)
assert(
Vector(BlockMatchingResponse(blockHash = testBlockHash,
blockHeight = 1)) == matched)
}
}

View file

@ -10,9 +10,9 @@ import org.bitcoins.core.protocol.transaction._
import org.bitcoins.core.wallet.builder.BitcoinTxBuilder
import org.bitcoins.core.wallet.fee.FeeUnit
import org.bitcoins.core.wallet.utxo.BitcoinUTXOSpendingInfo
import org.bitcoins.keymanager.KeyManagerParams
import org.bitcoins.keymanager.bip39.BIP39KeyManager
import org.bitcoins.keymanager.util.HDUtil
import org.bitcoins.keymanager.{KeyManager, KeyManagerParams}
import org.bitcoins.wallet.api._
import org.bitcoins.wallet.config.WalletAppConfig
import org.bitcoins.wallet.models._

View file

@ -1,12 +1,10 @@
package org.bitcoins.wallet.api
import java.util.concurrent.Executors
import org.bitcoins.core.api.ChainQueryApi.InvalidBlockRange
import org.bitcoins.core.api.ChainQueryApi.{FilterResponse, InvalidBlockRange}
import org.bitcoins.core.api.{ChainQueryApi, NodeApi}
import org.bitcoins.core.bloom.BloomFilter
import org.bitcoins.core.config.NetworkParameters
import org.bitcoins.core.crypto._
import org.bitcoins.core.crypto.{DoubleSha256DigestBE, _}
import org.bitcoins.core.currency.CurrencyUnit
import org.bitcoins.core.gcs.{GolombFilter, SimpleFilterMatcher}
import org.bitcoins.core.hd.{AddressType, HDPurpose}
@ -18,6 +16,7 @@ import org.bitcoins.core.wallet.fee.FeeUnit
import org.bitcoins.keymanager._
import org.bitcoins.keymanager.bip39.{BIP39KeyManager, BIP39LockedKeyManager}
import org.bitcoins.wallet.Wallet
import org.bitcoins.wallet.api.LockedWalletApi.BlockMatchingResponse
import org.bitcoins.wallet.config.WalletAppConfig
import org.bitcoins.wallet.models.{AccountDb, AddressDb, SpendingInfoDb}
@ -121,6 +120,9 @@ trait LockedWalletApi extends WalletApi {
def listAddresses(): Future[Vector[AddressDb]]
/** Checks if the wallet contains any data */
def isEmpty(): Future[Boolean]
/**
* Gets a new external address with the specified
* type. Calling this method multiple
@ -208,10 +210,159 @@ trait LockedWalletApi extends WalletApi {
def listAccounts(purpose: HDPurpose): Future[Vector[AccountDb]] =
listAccounts().map(_.filter(_.hdAccount.purpose == purpose))
/**
* Iterates over the block filters in order to find filters that match to the given addresses
*
* I queries the filter database for [[batchSize]] filters a time
* and tries to run [[GolombFilter.matchesAny]] for each filter.
*
* It tries to match the filters in parallel using [[parallelismLevel]] threads.
* For best results use it with a separate execution context.
*
* @param scripts list of [[ScriptPubKey]]'s to watch
* @param startOpt start point (if empty it starts with the genesis block)
* @param endOpt end point (if empty it ends with the best tip)
* @param batchSize number of filters that can be matched in one batch
* @param parallelismLevel max number of threads required to perform matching
* (default [[Runtime.getRuntime.availableProcessors()]])
* @return a list of matching block hashes
*/
def getMatchingBlocks(
scripts: Vector[ScriptPubKey],
startOpt: Option[BlockStamp] = None,
endOpt: Option[BlockStamp] = None,
batchSize: Int = 100,
parallelismLevel: Int = Runtime.getRuntime.availableProcessors())(
implicit ec: ExecutionContext): Future[Vector[BlockMatchingResponse]] = {
require(batchSize > 0, "batch size must be greater than zero")
require(parallelismLevel > 0, "parallelism level must be greater than zero")
if (scripts.isEmpty) {
Future.successful(Vector.empty)
} else {
val bytes = scripts.map(_.asmBytes)
/** Calculates group size to split a filter vector into [[parallelismLevel]] groups.
* It's needed to limit number of threads required to run the matching */
def calcGroupSize(vectorSize: Int): Int =
if (vectorSize / parallelismLevel * parallelismLevel < vectorSize)
vectorSize / parallelismLevel + 1
else vectorSize / parallelismLevel
def findMatches(filters: Vector[FilterResponse]): Future[
Iterator[BlockMatchingResponse]] = {
if (filters.isEmpty)
Future.successful(Iterator.empty)
else {
/* Iterates over the grouped vector of filters to find matches with the given [[bytes]]. */
val groupSize = calcGroupSize(filters.size)
val filterGroups = filters.grouped(groupSize)
// Sequence on the filter groups making sure the number of threads doesn't exceed [[parallelismLevel]].
Future
.sequence(filterGroups.map { filterGroup =>
// We need to wrap in a future here to make sure we can
// potentially run these matches in parallel
Future {
// Find any matches in the group and add the corresponding block hashes into the result
filterGroup
.foldLeft(Vector.empty[BlockMatchingResponse]) {
(blocks, filter) =>
val matcher = SimpleFilterMatcher(filter.compactFilter)
if (matcher.matchesAny(bytes)) {
blocks :+ BlockMatchingResponse(filter.blockHash,
filter.blockHeight)
} else {
blocks
}
}
}
})
.map(_.flatten)
}
}
/** Iterates over all filters in the range to find matches */
@tailrec
def loop(
start: Int,
end: Int,
acc: Future[Vector[BlockMatchingResponse]]): Future[
Vector[BlockMatchingResponse]] = {
if (end <= start) {
acc
} else {
val startHeight = end - (batchSize - 1)
val endHeight = end
val newAcc = for {
compactFilterDbs <- chainQueryApi.getFiltersBetweenHeights(
startHeight,
endHeight)
filtered <- findMatches(compactFilterDbs)
res <- acc
} yield {
res ++ filtered
}
val newEnd = Math.max(start, endHeight - batchSize)
loop(start, newEnd, newAcc)
}
}
for {
startHeight <- startOpt.fold(Future.successful(0))(
chainQueryApi.getHeightByBlockStamp)
_ = if (startHeight < 0)
throw InvalidBlockRange(s"Start position cannot negative")
endHeight <- endOpt.fold(chainQueryApi.getFilterCount)(
chainQueryApi.getHeightByBlockStamp)
_ = if (startHeight > endHeight)
throw InvalidBlockRange(
s"End position cannot precede start: $startHeight:$endHeight")
matched <- loop(startHeight, endHeight, Future.successful(Vector.empty))
} yield {
matched
}
}
}
/**
* Recreates the account using BIP-157 approach
*
* DANGER! This method removes all records from the wallet database
* and creates new ones while the account discovery process.
*
* The Wallet UI should check if the database is empty before calling
* this method and let the end users to decide whether they want to proceed or not.
*
* This method generates [[addressBatchSize]] of addresses, then matches them against the BIP-158 compact filters,
* and downloads and processes the matched blocks. This method keeps doing the steps until there are [[WalletConfig.addressGapLimit]]
* or more unused addresses in a row. In this case it considers the discovery process completed.
*
* [[addressBatchSize]] - the number of addresses we should generate from a keychain to attempt to match in in a rescan
* [[WalletConfig.addressGapLimit]] - the number of addresses required to go without a match before we determine that our wallet is "discovered".
* For instance, if addressBatchSize=100, and AddressGapLimit=20 we do a rescan and the last address we find containing
* funds is at index 75, we would not generate more addresses to try and rescan. However if the last index containing
* funds was 81, we would generate another 100 addresses from the keychain and attempt to rescan those.
*
* @param startOpt start block (if None it starts from the genesis block)
* @param endOpt end block (if None it ends at the current tip)
* @param addressBatchSize how many addresses to match in a single pass
*/
def rescanNeutrinoWallet(
startOpt: Option[BlockStamp],
endOpt: Option[BlockStamp],
addressBatchSize: Int): Future[Unit]
/**
* Recreates the account using BIP-44 approach
*/
def rescanSPVWallet(): Future[Unit]
}
trait UnlockedWalletApi extends LockedWalletApi {
def discoveryBatchSize(): Int = walletConfig.discoveryBatchSize
def keyManager: BIP39KeyManager
/**
@ -258,140 +409,11 @@ trait UnlockedWalletApi extends LockedWalletApi {
*/
def createNewAccount(keyManagerParams: KeyManagerParams): Future[WalletApi]
/**
* Iterates over the block filters in order to find filters that match to the given addresses
*
* I queries the filter database for [[batchSize]] filters a time
* and tries to run [[GolombFilter.matchesAny]] for each filter.
*
* It tries to match the filters in parallel using [[parallelismLevel]] threads.
* For best results use it with a separate execution context.
*
* @param scripts list of [[ScriptPubKey]]'s to watch
* @param startOpt start point (if empty it starts with the genesis block)
* @param endOpt end point (if empty it ends with the best tip)
* @param batchSize number of filters that can be matched in one batch
* @param parallelismLevel max number of threads required to perform matching
* (default [[Runtime.getRuntime.availableProcessors()]])
* @return a list of matching block hashes
*/
def getMatchingBlocks(
scripts: Vector[ScriptPubKey],
startOpt: Option[BlockStamp] = None,
endOpt: Option[BlockStamp] = None,
batchSize: Int = 100,
parallelismLevel: Int = Runtime.getRuntime.availableProcessors())(
implicit ec: ExecutionContext): Future[Vector[DoubleSha256DigestBE]] = {
require(batchSize > 0, "batch size must be greater than zero")
require(parallelismLevel > 0, "parallelism level must be greater than zero")
}
if (scripts.isEmpty) {
Future.successful(Vector.empty)
} else {
val bytes = scripts.map(_.asmBytes)
/** Calculates group size to split a filter vector into [[parallelismLevel]] groups.
* It's needed to limit number of threads required to run the matching */
def calcGroupSize(vectorSize: Int): Int =
if (vectorSize / parallelismLevel * parallelismLevel < vectorSize)
vectorSize / parallelismLevel + 1
else vectorSize / parallelismLevel
def findMatches(
filters: Vector[(GolombFilter, DoubleSha256DigestBE)]): Future[
Iterator[DoubleSha256DigestBE]] = {
if (filters.isEmpty)
Future.successful(Iterator.empty)
else {
/* Iterates over the grouped vector of filters to find matches with the given [[bytes]]. */
val groupSize = calcGroupSize(filters.size)
val filterGroups = filters.grouped(groupSize)
// Sequence on the filter groups making sure the number of threads doesn't exceed [[parallelismLevel]].
Future
.sequence(filterGroups.map { filterGroup =>
// We need to wrap in a future here to make sure we can
// potentially run these matches in parallel
Future {
// Find any matches in the group and add the corresponding block hashes into the result
filterGroup.foldLeft(Vector.empty[DoubleSha256DigestBE]) {
(blocks, filter) =>
val matcher = new SimpleFilterMatcher(filter._1)
if (matcher.matchesAny(bytes)) {
blocks :+ filter._2
} else {
blocks
}
}
}
})
.map(_.flatten)
}
}
/** Iterates over all filters in the range to find matches */
@tailrec
def loop(
start: Int,
end: Int,
acc: Future[Vector[DoubleSha256DigestBE]]): Future[
Vector[DoubleSha256DigestBE]] = {
if (end <= start) {
acc
} else {
val startHeight = end - (batchSize - 1)
val endHeight = end
val newAcc = for {
compactFilterDbs <- chainQueryApi.getFiltersBetweenHeights(
startHeight,
endHeight)
filtered <- findMatches(compactFilterDbs)
res <- acc
} yield {
res ++ filtered
}
val newEnd = Math.max(start, endHeight - batchSize)
loop(start, newEnd, newAcc)
}
}
for {
startHeight <- startOpt.fold(Future.successful(0))(
chainQueryApi.getHeightByBlockStamp)
_ = if (startHeight < 0)
throw InvalidBlockRange(s"Start position cannot negative")
endHeight <- endOpt.fold(chainQueryApi.getFilterCount)(
chainQueryApi.getHeightByBlockStamp)
_ = if (startHeight > endHeight)
throw InvalidBlockRange(
s"End position cannot precede start: $startHeight:$endHeight")
matched <- loop(startHeight, endHeight, Future.successful(Vector.empty))
} yield {
matched
}
}
}
def rescan(
scriptPubKeys: Vector[ScriptPubKey],
startOpt: Option[BlockStamp] = None,
endOpt: Option[BlockStamp] = None): Future[Unit] = {
val threadPool =
Executors.newFixedThreadPool(Runtime.getRuntime.availableProcessors() * 2)
val res = for {
blockHashes <- getMatchingBlocks(scriptPubKeys, startOpt, endOpt)(
ExecutionContext.fromExecutor(threadPool))
res <- nodeApi.downloadBlocks(blockHashes.map(_.flip))
} yield {
res
}
res.onComplete(_ => threadPool.shutdown())
// res.failed.foreach(logger.error("Cannot rescan", _))
res
}
object LockedWalletApi {
case class BlockMatchingResponse(
blockHash: DoubleSha256DigestBE,
blockHeight: Int)
}

View file

@ -52,6 +52,10 @@ case class WalletAppConfig(
lazy val bloomFalsePositiveRate: Double =
config.getDouble("wallet.bloomFalsePositiveRate")
lazy val addressGapLimit: Int = config.getInt("wallet.addressGapLimit")
lazy val discoveryBatchSize: Int = config.getInt("wallet.discoveryBatchSize")
override def initialize()(implicit ec: ExecutionContext): Future[Unit] = {
logger.debug(s"Initializing wallet setup")

View file

@ -1,27 +1,21 @@
package org.bitcoins.wallet.internal
import org.bitcoins.wallet._
import scala.concurrent.Future
import org.bitcoins.wallet.models.AddressDb
import org.bitcoins.core.crypto.ECPublicKey
import org.bitcoins.wallet.models.AccountDb
import org.bitcoins.core.hd.HDChainType
import org.bitcoins.core.protocol.BitcoinAddress
import org.bitcoins.core.hd.HDPath
import org.bitcoins.core.hd.HDAddress
import scala.util.Failure
import scala.util.Success
import org.bitcoins.wallet.models.AddressDbHelper
import org.bitcoins.core.hd.SegWitHDPath
import org.bitcoins.core.hd.LegacyHDPath
import org.bitcoins.core.hd.NestedSegWitHDPath
import org.bitcoins.wallet.api.AddressInfo
import org.bitcoins.core.protocol.transaction.Transaction
import org.bitcoins.core.protocol.transaction.TransactionOutput
import org.bitcoins.core.protocol.script.ScriptPubKey
import org.bitcoins.core.protocol.transaction.TransactionOutPoint
import org.bitcoins.core.hd._
import org.bitcoins.core.number.UInt32
import org.bitcoins.core.hd.AddressType
import org.bitcoins.core.protocol.BitcoinAddress
import org.bitcoins.core.protocol.script.ScriptPubKey
import org.bitcoins.core.protocol.transaction.{
Transaction,
TransactionOutPoint,
TransactionOutput
}
import org.bitcoins.wallet._
import org.bitcoins.wallet.api.AddressInfo
import org.bitcoins.wallet.models.{AccountDb, AddressDb, AddressDbHelper}
import scala.concurrent.Future
import scala.util.{Failure, Success}
/**
* Provides functionality related to addresses. This includes

View file

@ -0,0 +1,156 @@
package org.bitcoins.wallet.internal
import java.util.concurrent.Executors
import org.bitcoins.core.crypto.DoubleSha256Digest
import org.bitcoins.core.hd.HDChainType
import org.bitcoins.core.protocol.script.ScriptPubKey
import org.bitcoins.core.protocol.{BitcoinAddress, BlockStamp}
import org.bitcoins.core.util.FutureUtil
import org.bitcoins.wallet.{LockedWallet, WalletLogger}
import scala.concurrent.{ExecutionContext, Future}
private[wallet] trait RescanHandling extends WalletLogger {
self: LockedWallet =>
/////////////////////
// Public facing API
/** @inheritdoc */
override def rescanNeutrinoWallet(
startOpt: Option[BlockStamp],
endOpt: Option[BlockStamp],
addressBatchSize: Int): Future[Unit] = {
logger.info(s"Starting rescanning the wallet.")
val res = for {
_ <- spendingInfoDAO.deleteAll()
_ <- addressDAO.deleteAll()
_ <- doNeutrinoRescan(startOpt, endOpt, addressBatchSize)
} yield ()
res.onComplete(_ => logger.info("Finished rescanning the wallet"))
res
}
/** @inheritdoc */
override def rescanSPVWallet(): Future[Unit] =
Future.failed(new RuntimeException("Rescan not implemented for SPV wallet"))
/////////////////////
// Private methods
private def doNeutrinoRescan(
startOpt: Option[BlockStamp],
endOpt: Option[BlockStamp],
addressBatchSize: Int): Future[Unit] = {
for {
scriptPubKeys <- generateScriptPubKeys(addressBatchSize)
blocks <- matchBlocks(scriptPubKeys, endOpt, startOpt)
_ <- downloadAndProcessBlocks(blocks)
externalGap <- calcAddressGap(HDChainType.External)
changeGap <- calcAddressGap(HDChainType.Change)
res <- if (externalGap >= walletConfig.addressGapLimit && changeGap >= walletConfig.addressGapLimit)
pruneUnusedAddresses()
else doNeutrinoRescan(startOpt, endOpt, addressBatchSize)
} yield res
}
private def pruneUnusedAddresses(): Future[Unit] = {
for {
addressDbs <- addressDAO.findAll()
_ <- addressDbs.foldLeft(FutureUtil.unit) { (prevF, addressDb) =>
for {
_ <- prevF
spendingInfoDbs <- spendingInfoDAO.findByScriptPubKey(
addressDb.scriptPubKey)
_ <- if (spendingInfoDbs.isEmpty) addressDAO.delete(addressDb)
else FutureUtil.unit
} yield ()
}
} yield ()
}
private def calcAddressGap(chainType: HDChainType): Future[Int] = {
for {
addressDbs <- addressDAO.findAll()
addressGap <- addressDbs
//make sure all addressDb are of the correct chainType
//and they are sorted according to their index so we can
//calculate the gap accurately
.filter(_.path.chain.chainType == chainType)
.sortBy(_.path.address.index)
.foldLeft(Future.successful(0)) { (prevNF, addressDb) =>
for {
prevN <- prevNF
spendingInfoDbs <- spendingInfoDAO.findByScriptPubKey(
addressDb.scriptPubKey)
} yield {
if (spendingInfoDbs.isEmpty) prevN + 1 else 0
}
}
} yield {
logger.debug(s"Address gap: $addressGap")
addressGap
}
}
private def downloadAndProcessBlocks(
blocks: Vector[DoubleSha256Digest]): Future[Unit] = {
logger.debug(s"Requesting ${blocks.size} block(s)")
blocks.foldLeft(FutureUtil.unit) { (prevF, blockHash) =>
val completedF = subscribeForBlockProcessingCompletionSignal(blockHash)
for {
_ <- prevF
_ <- nodeApi.downloadBlocks(Vector(blockHash))
_ <- completedF
} yield ()
}
}
private def matchBlocks(
scriptPubKeys: Vector[ScriptPubKey],
endOpt: Option[BlockStamp],
startOpt: Option[BlockStamp]): Future[Vector[DoubleSha256Digest]] = {
val threadPool =
Executors.newFixedThreadPool(Runtime.getRuntime.availableProcessors() * 2)
val blocksF = for {
blocks <- getMatchingBlocks(scriptPubKeys, startOpt, endOpt)(
ExecutionContext.fromExecutor(threadPool))
} yield {
blocks.sortBy(_.blockHeight).map(_.blockHash.flip)
}
blocksF.onComplete(_ => threadPool.shutdown())
blocksF
}
private def generateScriptPubKeys(
count: Int): Future[Vector[ScriptPubKey]] = {
for {
addresses <- 1
.to(count)
.foldLeft(Future.successful(Vector.empty[BitcoinAddress])) {
(prevFuture, _) =>
for {
prev <- prevFuture
address <- getNewAddress()
} yield prev :+ address
}
changeAddresses <- 1
.to(count)
.foldLeft(Future.successful(Vector.empty[BitcoinAddress])) {
(prevFuture, _) =>
for {
prev <- prevFuture
address <- getNewChangeAddress()
} yield prev :+ address
}
} yield addresses.map(_.scriptPubKey) ++ changeAddresses.map(_.scriptPubKey)
}
}

View file

@ -1,6 +1,6 @@
package org.bitcoins.wallet.internal
import org.bitcoins.core.crypto.DoubleSha256DigestBE
import org.bitcoins.core.crypto.{DoubleSha256Digest, DoubleSha256DigestBE}
import org.bitcoins.core.number.UInt32
import org.bitcoins.core.protocol.blockchain.Block
import org.bitcoins.core.protocol.transaction.{Transaction, TransactionOutput}
@ -9,7 +9,8 @@ import org.bitcoins.wallet._
import org.bitcoins.wallet.api.{AddUtxoError, AddUtxoSuccess}
import org.bitcoins.wallet.models._
import scala.concurrent.Future
import scala.concurrent.{Future, Promise}
import scala.util.{Failure, Success, Try}
/** Provides functionality for processing transactions. This
* includes importing UTXOs spent to our wallet, updating
@ -29,13 +30,12 @@ private[wallet] trait TransactionProcessing extends WalletLogger {
for {
result <- processTransactionImpl(transaction, blockHashOpt)
} yield {
logger.info(
logger.debug(
s"Finished processing of transaction=${transaction.txIdBE}. Relevant incomingTXOs=${result.updatedIncoming.length}, outgoingTXOs=${result.updatedOutgoing.length}")
this
}
}
/** @inheritdoc */
override def processBlock(block: Block): Future[LockedWallet] = {
logger.info(s"Processing block=${block.blockHeader.hash.flip}")
val res = block.transactions.foldLeft(Future.successful(this)) {
@ -48,6 +48,8 @@ private[wallet] trait TransactionProcessing extends WalletLogger {
newWallet
}
}
res.onComplete(failure =>
signalBlockProcessingCompletion(block.blockHeader.hash, failure))
res.foreach(
_ =>
logger.info(
@ -89,6 +91,36 @@ private[wallet] trait TransactionProcessing extends WalletLogger {
/////////////////////
// Private methods
private var blockProcessingSignals =
Map.empty[DoubleSha256Digest, Promise[DoubleSha256Digest]]
private[wallet] def subscribeForBlockProcessingCompletionSignal(
blockHash: DoubleSha256Digest): Future[DoubleSha256Digest] =
synchronized {
blockProcessingSignals.get(blockHash) match {
case Some(existingSignal) => existingSignal.future
case None =>
val newSignal = Promise[DoubleSha256Digest]()
blockProcessingSignals =
blockProcessingSignals.updated(blockHash, newSignal)
newSignal.future
}
}
private def signalBlockProcessingCompletion(
blockHash: DoubleSha256Digest,
failure: Try[_]): Unit =
synchronized {
blockProcessingSignals.get(blockHash).foreach { signal =>
blockProcessingSignals =
blockProcessingSignals.filterNot(_._1 == blockHash)
failure match {
case Success(_) => signal.success(blockHash)
case Failure(exception) => signal.failure(exception)
}
}
}
/** Does the grunt work of processing a TX.
* This is called by either the internal or public TX
* processing method, which logs and transforms the
@ -98,7 +130,7 @@ private[wallet] trait TransactionProcessing extends WalletLogger {
transaction: Transaction,
blockHashOpt: Option[DoubleSha256DigestBE]): Future[ProcessTxResult] = {
logger.info(
logger.debug(
s"Processing transaction=${transaction.txIdBE} with blockHash=$blockHashOpt")
for {
aggregate <- {

View file

@ -1,18 +1,17 @@
package org.bitcoins.wallet.models
import org.bitcoins.core.crypto.ECPublicKey
import org.bitcoins.core.hd.{HDChainType, HDPurpose}
import org.bitcoins.core.protocol.BitcoinAddress
import org.bitcoins.core.protocol.script.ScriptPubKey
import org.bitcoins.db.{CRUD, SlickUtil}
import org.bitcoins.wallet.config.WalletAppConfig
import slick.dbio.Effect
import slick.jdbc.SQLiteProfile.api._
import slick.lifted.TableQuery
import slick.sql.SqlAction
import scala.concurrent.{ExecutionContext, Future}
import org.bitcoins.core.hd.HDChainType
import org.bitcoins.wallet.config.WalletAppConfig
import org.bitcoins.core.crypto.ECPublicKey
import org.bitcoins.core.protocol.script.ScriptPubKey
import org.bitcoins.core.hd.HDPurpose
case class AddressDAO()(
implicit ec: ExecutionContext,

View file

@ -1,15 +1,17 @@
package org.bitcoins.wallet.models
import org.bitcoins.core.crypto.DoubleSha256DigestBE
import org.bitcoins.core.protocol.script.ScriptPubKey
import org.bitcoins.core.protocol.transaction.{
Transaction,
TransactionOutPoint,
TransactionOutput
}
import org.bitcoins.db.CRUDAutoInc
import org.bitcoins.wallet.config._
import slick.jdbc.SQLiteProfile.api._
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import org.bitcoins.core.protocol.transaction.Transaction
import org.bitcoins.core.crypto.DoubleSha256DigestBE
import org.bitcoins.core.protocol.transaction.TransactionOutput
import org.bitcoins.core.protocol.transaction.TransactionOutPoint
import scala.concurrent.{ExecutionContext, Future}
case class SpendingInfoDAO()(
implicit val ec: ExecutionContext,
@ -91,6 +93,12 @@ case class SpendingInfoDAO()(
database.runVec(filtered.result)
}
def findByScriptPubKey(
scriptPubKey: ScriptPubKey): Future[Vector[SpendingInfoDb]] = {
val filtered = table.filter(_.scriptPubKey === scriptPubKey)
database.runVec(filtered.result)
}
/** Enumerates all unspent TX outputs in the wallet */
def findAllUnspent(): Future[Vector[SpendingInfoDb]] = {
val query = table.filter(!_.spent)
@ -103,4 +111,5 @@ case class SpendingInfoDAO()(
val query = table.map(_.outPoint)
database.runVec(query.result).map(_.toVector)
}
}