diff --git a/app-commons/src/main/scala/org/bitcoins/commons/jsonmodels/ws/WsModels.scala b/app-commons/src/main/scala/org/bitcoins/commons/jsonmodels/ws/WsModels.scala index c254b5a5e8..d8b45eaccc 100644 --- a/app-commons/src/main/scala/org/bitcoins/commons/jsonmodels/ws/WsModels.scala +++ b/app-commons/src/main/scala/org/bitcoins/commons/jsonmodels/ws/WsModels.scala @@ -34,6 +34,7 @@ object WalletWsType extends StringFactory[WalletWsType] { case object DLCStateChange extends WalletWsType case object DLCOfferAdd extends WalletWsType case object DLCOfferRemove extends WalletWsType + case object RescanComplete extends WalletWsType private val all = Vector(TxProcessed, @@ -42,7 +43,8 @@ object WalletWsType extends StringFactory[WalletWsType] { NewAddress, DLCStateChange, DLCOfferAdd, - DLCOfferRemove) + DLCOfferRemove, + RescanComplete) override def fromStringOpt(string: String): Option[WalletWsType] = { all.find(_.toString.toLowerCase() == string.toLowerCase) @@ -123,6 +125,11 @@ object WalletNotification { extends WalletNotification[Sha256Digest] { override val `type`: WalletWsType = WalletWsType.DLCOfferRemove } + + case class RescanComplete(payload: String) + extends WalletNotification[String] { + override val `type`: WalletWsType = WalletWsType.RescanComplete + } } object ChainNotification { diff --git a/app-commons/src/main/scala/org/bitcoins/commons/serializers/Picklers.scala b/app-commons/src/main/scala/org/bitcoins/commons/serializers/Picklers.scala index 2c9dd93d7e..8a4bdfed82 100644 --- a/app-commons/src/main/scala/org/bitcoins/commons/serializers/Picklers.scala +++ b/app-commons/src/main/scala/org/bitcoins/commons/serializers/Picklers.scala @@ -2,6 +2,7 @@ package org.bitcoins.commons.serializers import org.bitcoins.commons.jsonmodels.bitcoind.GetBlockHeaderResult import org.bitcoins.commons.jsonmodels.bitcoind.RpcOpts.LockUnspentOutputParameter +import org.bitcoins.commons.jsonmodels.ws.WalletNotification.RescanComplete import org.bitcoins.commons.serializers.JsonReaders.jsToSatoshis import org.bitcoins.core.api.dlc.wallet.db.{DLCContactDb, IncomingDLCOfferDb} import org.bitcoins.core.api.wallet.CoinSelectionAlgo @@ -1565,4 +1566,16 @@ object Picklers { readwriter[ujson.Obj] .bimap(writeContactDb(_), readContactDb(_)) } + + implicit val rescanComplete: ReadWriter[RescanComplete] = { + readwriter[ujson.Value].bimap(writeRescanComplete(_), readRescanComplete(_)) + } + + private def writeRescanComplete(rescanComplete: RescanComplete): ujson.Str = { + ujson.Str(rescanComplete.payload) + } + + private def readRescanComplete(value: ujson.Value): RescanComplete = { + RescanComplete(value.str) + } } diff --git a/app-commons/src/main/scala/org/bitcoins/commons/serializers/WsPicklers.scala b/app-commons/src/main/scala/org/bitcoins/commons/serializers/WsPicklers.scala index 2fdaa5a776..bdd642b79c 100644 --- a/app-commons/src/main/scala/org/bitcoins/commons/serializers/WsPicklers.scala +++ b/app-commons/src/main/scala/org/bitcoins/commons/serializers/WsPicklers.scala @@ -6,6 +6,7 @@ import org.bitcoins.commons.jsonmodels.ws.WalletNotification.{ DLCOfferRemoveNotification, DLCStateChangeNotification, NewAddressNotification, + RescanComplete, ReservedUtxosNotification, TxBroadcastNotification, TxProcessedNotification @@ -75,6 +76,8 @@ object WsPicklers { upickle.default.writeJs(offerDb)(Picklers.dlcOfferAddW) case DLCOfferRemoveNotification(offerHash) => upickle.default.writeJs(offerHash)(Picklers.dlcOfferRemoveW) + case r: RescanComplete => + upickle.default.writeJs(r)(Picklers.rescanComplete) } val notificationObj = ujson.Obj( @@ -113,6 +116,9 @@ object WsPicklers { val offerHash = upickle.default.read(payloadObj)(Picklers.dlcOfferRemoveR) DLCOfferRemoveNotification(offerHash) + case WalletWsType.RescanComplete => + val complete = upickle.default.read(payloadObj)(Picklers.rescanComplete) + complete } } @@ -140,6 +146,13 @@ object WsPicklers { readWalletNotification(_).asInstanceOf[ReservedUtxosNotification]) } + implicit val rescanPickler: ReadWriter[RescanComplete] = { + readwriter[ujson.Obj].bimap( + writeWalletNotification(_), + readWalletNotification(_).asInstanceOf[RescanComplete] + ) + } + implicit val walletNotificationPickler: ReadWriter[WalletNotification[_]] = { readwriter[ujson.Obj].bimap(writeWalletNotification, readWalletNotification) } diff --git a/app/server-test/src/test/scala/org/bitcoins/server/WebsocketTests.scala b/app/server-test/src/test/scala/org/bitcoins/server/WebsocketTests.scala index a8a7c5eb5b..d796dab9f3 100644 --- a/app/server-test/src/test/scala/org/bitcoins/server/WebsocketTests.scala +++ b/app/server-test/src/test/scala/org/bitcoins/server/WebsocketTests.scala @@ -16,6 +16,7 @@ import org.bitcoins.commons.jsonmodels.ws.WalletNotification.{ DLCOfferAddNotification, DLCOfferRemoveNotification, NewAddressNotification, + RescanComplete, TxBroadcastNotification, TxProcessedNotification } @@ -376,6 +377,36 @@ class WebsocketTests extends BitcoinSServerMainBitcoindFixture { } } + it must "receive updates when a rescan is complete" in { serverWithBitcoind => + val ServerWithBitcoind(_, server) = serverWithBitcoind + val cliConfig = Config(rpcPortOpt = Some(server.conf.rpcPort), + rpcPassword = server.conf.rpcPassword) + + val req = buildReq(server.conf) + val tuple: ( + Future[WebSocketUpgradeResponse], + (Future[Seq[WsNotification[_]]], Promise[Option[Message]])) = { + Http() + .singleWebSocketRequest(req, websocketFlow) + } + val notificationsF = tuple._2._1 + val promise = tuple._2._2 + val cmd = CliCommand.Rescan(addressBatchSize = None, + startBlock = None, + endBlock = None, + force = true, + ignoreCreationTime = false) + val _ = ConsoleCli.exec(cmd, cliConfig) + for { + _ <- AkkaUtil.nonBlockingSleep(5000.millis) + _ = promise.success(None) + notifications <- notificationsF + } yield { + val count = notifications.count(_.isInstanceOf[RescanComplete]) + assert(count == 1, s"count=$count") + } + } + it must "not queue things on the websocket while there is no one connected" in { serverWithBitcoind => val ServerWithBitcoind(_, server) = serverWithBitcoind diff --git a/app/server/src/main/scala/org/bitcoins/server/BitcoinSServerMain.scala b/app/server/src/main/scala/org/bitcoins/server/BitcoinSServerMain.scala index 6bdc0354af..5d490d2c3e 100644 --- a/app/server/src/main/scala/org/bitcoins/server/BitcoinSServerMain.scala +++ b/app/server/src/main/scala/org/bitcoins/server/BitcoinSServerMain.scala @@ -223,7 +223,8 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit chainApi: ChainApi): Unit = { val chainCallbacks = WebsocketUtil.buildChainCallbacks(wsQueue, chainApi) chainConf.addCallbacks(chainCallbacks) - val walletCallbacks = WebsocketUtil.buildWalletCallbacks(wsQueue) + val walletCallbacks = + WebsocketUtil.buildWalletCallbacks(wsQueue, walletConf.walletNameOpt) walletConf.addCallbacks(walletCallbacks) val dlcWalletCallbacks = WebsocketUtil.buildDLCWalletCallbacks(wsQueue) dlcConf.addCallbacks(dlcWalletCallbacks) @@ -326,7 +327,9 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit serverCmdLineArgs = serverArgParser, wsSource = wsSource ) - walletCallbacks = WebsocketUtil.buildWalletCallbacks(wsQueue) + walletCallbacks = WebsocketUtil.buildWalletCallbacks( + wsQueue, + walletConf.walletNameOpt) _ = walletConf.addCallbacks(walletCallbacks) wallet <- walletF diff --git a/app/server/src/main/scala/org/bitcoins/server/util/WebsocketUtil.scala b/app/server/src/main/scala/org/bitcoins/server/util/WebsocketUtil.scala index 303a1926ff..9418e3965a 100644 --- a/app/server/src/main/scala/org/bitcoins/server/util/WebsocketUtil.scala +++ b/app/server/src/main/scala/org/bitcoins/server/util/WebsocketUtil.scala @@ -65,8 +65,10 @@ object WebsocketUtil extends Logging { } /** Builds websocket callbacks for the wallet */ - def buildWalletCallbacks(walletQueue: SourceQueueWithComplete[Message])( - implicit ec: ExecutionContext): WalletCallbacks = { + def buildWalletCallbacks( + walletQueue: SourceQueueWithComplete[Message], + walletNameOpt: Option[String])(implicit + ec: ExecutionContext): WalletCallbacks = { val onAddressCreated: OnNewAddressGenerated = { addr => val notification = WalletNotification.NewAddressNotification(addr) val json = @@ -98,11 +100,24 @@ object WebsocketUtil extends Logging { offerF.map(_ => ()) } + val onRescanComplete: OnRescanComplete = { _ => + val name = + walletNameOpt.getOrElse("") // default name empty string on the wallet + val notification = WalletNotification.RescanComplete(name) + val notificationJson = + upickle.default.writeJs(notification)(WsPicklers.rescanPickler) + val msg = TextMessage.Strict(notificationJson.toString()) + val offerF = walletQueue.offer(msg) + offerF.map(_ => ()) + } + WalletCallbacks( onTransactionProcessed = Vector(onTxProcessed), - onNewAddressGenerated = Vector(onAddressCreated), + onTransactionBroadcast = Vector(onTxBroadcast), onReservedUtxos = Vector(onReservedUtxo), - onTransactionBroadcast = Vector(onTxBroadcast) + onNewAddressGenerated = Vector(onAddressCreated), + onBlockProcessed = Vector.empty, + onRescanComplete = Vector(onRescanComplete) ) } @@ -120,7 +135,7 @@ object WebsocketUtil extends Logging { upickle.default.writeJs(notification)(WsPicklers.txBroadcastPickler) case x @ (WalletWsType.NewAddress | WalletWsType.ReservedUtxos | WalletWsType.DLCStateChange | WalletWsType.DLCOfferAdd | - WalletWsType.DLCOfferRemove) => + WalletWsType.DLCOfferRemove | WalletWsType.RescanComplete) => sys.error(s"Cannot build tx notification for $x") } diff --git a/docs/wallet/wallet-callbacks.md b/docs/wallet/wallet-callbacks.md index ea83fa8b41..c58c1f0fbd 100644 --- a/docs/wallet/wallet-callbacks.md +++ b/docs/wallet/wallet-callbacks.md @@ -12,6 +12,7 @@ Bitcoin-S support call backs for the following events that happen in the wallet: 3. onReservedUtxos 4. onNewAddressGenerated 5. onBlockProcessed +6. onRescanComplete That means every time one of these events happens, we will call your callback so that you can be notified of the event. These callbacks will be run after the message has been diff --git a/testkit/src/main/scala/org/bitcoins/testkit/server/BitcoinSServerMainUtil.scala b/testkit/src/main/scala/org/bitcoins/testkit/server/BitcoinSServerMainUtil.scala index e23499aff0..66968b1433 100644 --- a/testkit/src/main/scala/org/bitcoins/testkit/server/BitcoinSServerMainUtil.scala +++ b/testkit/src/main/scala/org/bitcoins/testkit/server/BitcoinSServerMainUtil.scala @@ -41,6 +41,7 @@ object BitcoinSServerMainUtil { |bitcoin-s.server.rpcport = ${RpcUtil.randomPort} |bitcoin-s.server.wsport= ${RpcUtil.randomPort} |bitcoin-s.server.password=topsecret + | |""".stripMargin ConfigFactory.parseString(configStr) diff --git a/wallet/src/main/scala/org/bitcoins/wallet/WalletCallbacks.scala b/wallet/src/main/scala/org/bitcoins/wallet/WalletCallbacks.scala index f905dedd1e..fcb7a24dd3 100644 --- a/wallet/src/main/scala/org/bitcoins/wallet/WalletCallbacks.scala +++ b/wallet/src/main/scala/org/bitcoins/wallet/WalletCallbacks.scala @@ -30,6 +30,8 @@ trait WalletCallbacks { def onBlockProcessed: CallbackHandler[Block, OnBlockProcessed] + def onRescanComplete: CallbackHandler[Unit, OnRescanComplete] + def +(other: WalletCallbacks): WalletCallbacks def executeOnTransactionProcessed(logger: Logger, tx: Transaction)(implicit @@ -80,6 +82,15 @@ trait WalletCallbacks { err)) } + def executeOnRescanComplete(logger: Logger)(implicit + ec: ExecutionContext): Future[Unit] = { + onRescanComplete.execute( + (), + (err: Throwable) => + logger.error(s"${onRescanComplete.name} Callback failed with error: ", + err)) + } + } /** Callback for handling a processed transaction */ @@ -93,6 +104,9 @@ trait OnNewAddressGenerated extends Callback[BitcoinAddress] trait OnBlockProcessed extends Callback[Block] +/** Triggered when a rescan is */ +trait OnRescanComplete extends Callback[Unit] + object WalletCallbacks { private case class WalletCallbacksImpl( @@ -106,7 +120,8 @@ object WalletCallbacks { onNewAddressGenerated: CallbackHandler[ BitcoinAddress, OnNewAddressGenerated], - onBlockProcessed: CallbackHandler[Block, OnBlockProcessed] + onBlockProcessed: CallbackHandler[Block, OnBlockProcessed], + onRescanComplete: CallbackHandler[Unit, OnRescanComplete] ) extends WalletCallbacks { override def +(other: WalletCallbacks): WalletCallbacks = @@ -118,7 +133,8 @@ object WalletCallbacks { onReservedUtxos = onReservedUtxos ++ other.onReservedUtxos, onNewAddressGenerated = onNewAddressGenerated ++ other.onNewAddressGenerated, - onBlockProcessed = onBlockProcessed ++ other.onBlockProcessed + onBlockProcessed = onBlockProcessed ++ other.onBlockProcessed, + onRescanComplete = onRescanComplete ++ other.onRescanComplete ) } @@ -144,14 +160,20 @@ object WalletCallbacks { /** Empty callbacks that does nothing with the received data */ val empty: WalletCallbacks = - apply(Vector.empty, Vector.empty, Vector.empty, Vector.empty, Vector.empty) + apply(Vector.empty, + Vector.empty, + Vector.empty, + Vector.empty, + Vector.empty, + Vector.empty) def apply( onTransactionProcessed: Vector[OnTransactionProcessed] = Vector.empty, onTransactionBroadcast: Vector[OnTransactionBroadcast] = Vector.empty, onReservedUtxos: Vector[OnReservedUtxos] = Vector.empty, onNewAddressGenerated: Vector[OnNewAddressGenerated] = Vector.empty, - onBlockProcessed: Vector[OnBlockProcessed] = Vector.empty + onBlockProcessed: Vector[OnBlockProcessed] = Vector.empty, + onRescanComplete: Vector[OnRescanComplete] = Vector.empty ): WalletCallbacks = { WalletCallbacksImpl( onTransactionProcessed = @@ -173,7 +195,10 @@ object WalletCallbacks { onBlockProcessed = CallbackHandler[Block, OnBlockProcessed]( "onBlockProcessed", onBlockProcessed - ) + ), + onRescanComplete = + CallbackHandler[Unit, OnRescanComplete]("onRescanComplete", + onRescanComplete) ) } } diff --git a/wallet/src/main/scala/org/bitcoins/wallet/internal/RescanHandling.scala b/wallet/src/main/scala/org/bitcoins/wallet/internal/RescanHandling.scala index 297166b120..079c2a8ea4 100644 --- a/wallet/src/main/scala/org/bitcoins/wallet/internal/RescanHandling.scala +++ b/wallet/src/main/scala/org/bitcoins/wallet/internal/RescanHandling.scala @@ -77,9 +77,11 @@ private[wallet] trait RescanHandling extends WalletLogger { _ <- clearUtxos(account) _ <- doNeutrinoRescan(account, start, endOpt, addressBatchSize) _ <- stateDescriptorDAO.updateRescanning(false) + _ <- walletCallbacks.executeOnRescanComplete(logger) } yield { logger.info(s"Finished rescanning the wallet. It took ${System .currentTimeMillis() - startTime}ms") + RescanState.RescanDone }