Implement rescancomplete callback (#4475)

* Implement rescancomplete callback

* Add walletName to payload

* Add to documentation
This commit is contained in:
Chris Stewart 2022-07-09 16:23:15 -05:00 committed by GitHub
parent de935032ea
commit 1ae4d40dac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 124 additions and 13 deletions

View File

@ -34,6 +34,7 @@ object WalletWsType extends StringFactory[WalletWsType] {
case object DLCStateChange extends WalletWsType
case object DLCOfferAdd extends WalletWsType
case object DLCOfferRemove extends WalletWsType
case object RescanComplete extends WalletWsType
private val all =
Vector(TxProcessed,
@ -42,7 +43,8 @@ object WalletWsType extends StringFactory[WalletWsType] {
NewAddress,
DLCStateChange,
DLCOfferAdd,
DLCOfferRemove)
DLCOfferRemove,
RescanComplete)
override def fromStringOpt(string: String): Option[WalletWsType] = {
all.find(_.toString.toLowerCase() == string.toLowerCase)
@ -123,6 +125,11 @@ object WalletNotification {
extends WalletNotification[Sha256Digest] {
override val `type`: WalletWsType = WalletWsType.DLCOfferRemove
}
case class RescanComplete(payload: String)
extends WalletNotification[String] {
override val `type`: WalletWsType = WalletWsType.RescanComplete
}
}
object ChainNotification {

View File

@ -2,6 +2,7 @@ package org.bitcoins.commons.serializers
import org.bitcoins.commons.jsonmodels.bitcoind.GetBlockHeaderResult
import org.bitcoins.commons.jsonmodels.bitcoind.RpcOpts.LockUnspentOutputParameter
import org.bitcoins.commons.jsonmodels.ws.WalletNotification.RescanComplete
import org.bitcoins.commons.serializers.JsonReaders.jsToSatoshis
import org.bitcoins.core.api.dlc.wallet.db.{DLCContactDb, IncomingDLCOfferDb}
import org.bitcoins.core.api.wallet.CoinSelectionAlgo
@ -1565,4 +1566,16 @@ object Picklers {
readwriter[ujson.Obj]
.bimap(writeContactDb(_), readContactDb(_))
}
implicit val rescanComplete: ReadWriter[RescanComplete] = {
readwriter[ujson.Value].bimap(writeRescanComplete(_), readRescanComplete(_))
}
private def writeRescanComplete(rescanComplete: RescanComplete): ujson.Str = {
ujson.Str(rescanComplete.payload)
}
private def readRescanComplete(value: ujson.Value): RescanComplete = {
RescanComplete(value.str)
}
}

View File

@ -6,6 +6,7 @@ import org.bitcoins.commons.jsonmodels.ws.WalletNotification.{
DLCOfferRemoveNotification,
DLCStateChangeNotification,
NewAddressNotification,
RescanComplete,
ReservedUtxosNotification,
TxBroadcastNotification,
TxProcessedNotification
@ -75,6 +76,8 @@ object WsPicklers {
upickle.default.writeJs(offerDb)(Picklers.dlcOfferAddW)
case DLCOfferRemoveNotification(offerHash) =>
upickle.default.writeJs(offerHash)(Picklers.dlcOfferRemoveW)
case r: RescanComplete =>
upickle.default.writeJs(r)(Picklers.rescanComplete)
}
val notificationObj = ujson.Obj(
@ -113,6 +116,9 @@ object WsPicklers {
val offerHash =
upickle.default.read(payloadObj)(Picklers.dlcOfferRemoveR)
DLCOfferRemoveNotification(offerHash)
case WalletWsType.RescanComplete =>
val complete = upickle.default.read(payloadObj)(Picklers.rescanComplete)
complete
}
}
@ -140,6 +146,13 @@ object WsPicklers {
readWalletNotification(_).asInstanceOf[ReservedUtxosNotification])
}
implicit val rescanPickler: ReadWriter[RescanComplete] = {
readwriter[ujson.Obj].bimap(
writeWalletNotification(_),
readWalletNotification(_).asInstanceOf[RescanComplete]
)
}
implicit val walletNotificationPickler: ReadWriter[WalletNotification[_]] = {
readwriter[ujson.Obj].bimap(writeWalletNotification, readWalletNotification)
}

View File

@ -16,6 +16,7 @@ import org.bitcoins.commons.jsonmodels.ws.WalletNotification.{
DLCOfferAddNotification,
DLCOfferRemoveNotification,
NewAddressNotification,
RescanComplete,
TxBroadcastNotification,
TxProcessedNotification
}
@ -376,6 +377,36 @@ class WebsocketTests extends BitcoinSServerMainBitcoindFixture {
}
}
it must "receive updates when a rescan is complete" in { serverWithBitcoind =>
val ServerWithBitcoind(_, server) = serverWithBitcoind
val cliConfig = Config(rpcPortOpt = Some(server.conf.rpcPort),
rpcPassword = server.conf.rpcPassword)
val req = buildReq(server.conf)
val tuple: (
Future[WebSocketUpgradeResponse],
(Future[Seq[WsNotification[_]]], Promise[Option[Message]])) = {
Http()
.singleWebSocketRequest(req, websocketFlow)
}
val notificationsF = tuple._2._1
val promise = tuple._2._2
val cmd = CliCommand.Rescan(addressBatchSize = None,
startBlock = None,
endBlock = None,
force = true,
ignoreCreationTime = false)
val _ = ConsoleCli.exec(cmd, cliConfig)
for {
_ <- AkkaUtil.nonBlockingSleep(5000.millis)
_ = promise.success(None)
notifications <- notificationsF
} yield {
val count = notifications.count(_.isInstanceOf[RescanComplete])
assert(count == 1, s"count=$count")
}
}
it must "not queue things on the websocket while there is no one connected" in {
serverWithBitcoind =>
val ServerWithBitcoind(_, server) = serverWithBitcoind

View File

@ -223,7 +223,8 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
chainApi: ChainApi): Unit = {
val chainCallbacks = WebsocketUtil.buildChainCallbacks(wsQueue, chainApi)
chainConf.addCallbacks(chainCallbacks)
val walletCallbacks = WebsocketUtil.buildWalletCallbacks(wsQueue)
val walletCallbacks =
WebsocketUtil.buildWalletCallbacks(wsQueue, walletConf.walletNameOpt)
walletConf.addCallbacks(walletCallbacks)
val dlcWalletCallbacks = WebsocketUtil.buildDLCWalletCallbacks(wsQueue)
dlcConf.addCallbacks(dlcWalletCallbacks)
@ -326,7 +327,9 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
serverCmdLineArgs = serverArgParser,
wsSource = wsSource
)
walletCallbacks = WebsocketUtil.buildWalletCallbacks(wsQueue)
walletCallbacks = WebsocketUtil.buildWalletCallbacks(
wsQueue,
walletConf.walletNameOpt)
_ = walletConf.addCallbacks(walletCallbacks)
wallet <- walletF

View File

@ -65,8 +65,10 @@ object WebsocketUtil extends Logging {
}
/** Builds websocket callbacks for the wallet */
def buildWalletCallbacks(walletQueue: SourceQueueWithComplete[Message])(
implicit ec: ExecutionContext): WalletCallbacks = {
def buildWalletCallbacks(
walletQueue: SourceQueueWithComplete[Message],
walletNameOpt: Option[String])(implicit
ec: ExecutionContext): WalletCallbacks = {
val onAddressCreated: OnNewAddressGenerated = { addr =>
val notification = WalletNotification.NewAddressNotification(addr)
val json =
@ -98,11 +100,24 @@ object WebsocketUtil extends Logging {
offerF.map(_ => ())
}
val onRescanComplete: OnRescanComplete = { _ =>
val name =
walletNameOpt.getOrElse("") // default name empty string on the wallet
val notification = WalletNotification.RescanComplete(name)
val notificationJson =
upickle.default.writeJs(notification)(WsPicklers.rescanPickler)
val msg = TextMessage.Strict(notificationJson.toString())
val offerF = walletQueue.offer(msg)
offerF.map(_ => ())
}
WalletCallbacks(
onTransactionProcessed = Vector(onTxProcessed),
onNewAddressGenerated = Vector(onAddressCreated),
onTransactionBroadcast = Vector(onTxBroadcast),
onReservedUtxos = Vector(onReservedUtxo),
onTransactionBroadcast = Vector(onTxBroadcast)
onNewAddressGenerated = Vector(onAddressCreated),
onBlockProcessed = Vector.empty,
onRescanComplete = Vector(onRescanComplete)
)
}
@ -120,7 +135,7 @@ object WebsocketUtil extends Logging {
upickle.default.writeJs(notification)(WsPicklers.txBroadcastPickler)
case x @ (WalletWsType.NewAddress | WalletWsType.ReservedUtxos |
WalletWsType.DLCStateChange | WalletWsType.DLCOfferAdd |
WalletWsType.DLCOfferRemove) =>
WalletWsType.DLCOfferRemove | WalletWsType.RescanComplete) =>
sys.error(s"Cannot build tx notification for $x")
}

View File

@ -12,6 +12,7 @@ Bitcoin-S support call backs for the following events that happen in the wallet:
3. onReservedUtxos
4. onNewAddressGenerated
5. onBlockProcessed
6. onRescanComplete
That means every time one of these events happens, we will call your callback
so that you can be notified of the event. These callbacks will be run after the message has been

View File

@ -41,6 +41,7 @@ object BitcoinSServerMainUtil {
|bitcoin-s.server.rpcport = ${RpcUtil.randomPort}
|bitcoin-s.server.wsport= ${RpcUtil.randomPort}
|bitcoin-s.server.password=topsecret
|
|""".stripMargin
ConfigFactory.parseString(configStr)

View File

@ -30,6 +30,8 @@ trait WalletCallbacks {
def onBlockProcessed: CallbackHandler[Block, OnBlockProcessed]
def onRescanComplete: CallbackHandler[Unit, OnRescanComplete]
def +(other: WalletCallbacks): WalletCallbacks
def executeOnTransactionProcessed(logger: Logger, tx: Transaction)(implicit
@ -80,6 +82,15 @@ trait WalletCallbacks {
err))
}
def executeOnRescanComplete(logger: Logger)(implicit
ec: ExecutionContext): Future[Unit] = {
onRescanComplete.execute(
(),
(err: Throwable) =>
logger.error(s"${onRescanComplete.name} Callback failed with error: ",
err))
}
}
/** Callback for handling a processed transaction */
@ -93,6 +104,9 @@ trait OnNewAddressGenerated extends Callback[BitcoinAddress]
trait OnBlockProcessed extends Callback[Block]
/** Triggered when a rescan is */
trait OnRescanComplete extends Callback[Unit]
object WalletCallbacks {
private case class WalletCallbacksImpl(
@ -106,7 +120,8 @@ object WalletCallbacks {
onNewAddressGenerated: CallbackHandler[
BitcoinAddress,
OnNewAddressGenerated],
onBlockProcessed: CallbackHandler[Block, OnBlockProcessed]
onBlockProcessed: CallbackHandler[Block, OnBlockProcessed],
onRescanComplete: CallbackHandler[Unit, OnRescanComplete]
) extends WalletCallbacks {
override def +(other: WalletCallbacks): WalletCallbacks =
@ -118,7 +133,8 @@ object WalletCallbacks {
onReservedUtxos = onReservedUtxos ++ other.onReservedUtxos,
onNewAddressGenerated =
onNewAddressGenerated ++ other.onNewAddressGenerated,
onBlockProcessed = onBlockProcessed ++ other.onBlockProcessed
onBlockProcessed = onBlockProcessed ++ other.onBlockProcessed,
onRescanComplete = onRescanComplete ++ other.onRescanComplete
)
}
@ -144,14 +160,20 @@ object WalletCallbacks {
/** Empty callbacks that does nothing with the received data */
val empty: WalletCallbacks =
apply(Vector.empty, Vector.empty, Vector.empty, Vector.empty, Vector.empty)
apply(Vector.empty,
Vector.empty,
Vector.empty,
Vector.empty,
Vector.empty,
Vector.empty)
def apply(
onTransactionProcessed: Vector[OnTransactionProcessed] = Vector.empty,
onTransactionBroadcast: Vector[OnTransactionBroadcast] = Vector.empty,
onReservedUtxos: Vector[OnReservedUtxos] = Vector.empty,
onNewAddressGenerated: Vector[OnNewAddressGenerated] = Vector.empty,
onBlockProcessed: Vector[OnBlockProcessed] = Vector.empty
onBlockProcessed: Vector[OnBlockProcessed] = Vector.empty,
onRescanComplete: Vector[OnRescanComplete] = Vector.empty
): WalletCallbacks = {
WalletCallbacksImpl(
onTransactionProcessed =
@ -173,7 +195,10 @@ object WalletCallbacks {
onBlockProcessed = CallbackHandler[Block, OnBlockProcessed](
"onBlockProcessed",
onBlockProcessed
)
),
onRescanComplete =
CallbackHandler[Unit, OnRescanComplete]("onRescanComplete",
onRescanComplete)
)
}
}

View File

@ -77,9 +77,11 @@ private[wallet] trait RescanHandling extends WalletLogger {
_ <- clearUtxos(account)
_ <- doNeutrinoRescan(account, start, endOpt, addressBatchSize)
_ <- stateDescriptorDAO.updateRescanning(false)
_ <- walletCallbacks.executeOnRescanComplete(logger)
} yield {
logger.info(s"Finished rescanning the wallet. It took ${System
.currentTimeMillis() - startTime}ms")
RescanState.RescanDone
}