From 5d309fbae0722c6a4b7c6be2dd01d666fdcc66a2 Mon Sep 17 00:00:00 2001 From: rorp Date: Tue, 19 Jul 2022 14:41:06 -0700 Subject: [PATCH] Fee rate WS notification (#4518) * Fee rate WS notification * make the test more CI friendly * fix the default config * always return errors * fix build * scalafmt * send notifications every time --- .../commons/jsonmodels/ws/WsModels.scala | 14 +++++++- .../commons/serializers/Picklers.scala | 18 +++++++++- .../commons/serializers/WsPicklers.scala | 12 +++++++ .../org/bitcoins/server/WebsocketTests.scala | 34 ++++++++++++++----- .../bitcoins/server/util/WebsocketUtil.scala | 12 +++++-- db-commons/src/main/resources/reference.conf | 5 +++ .../server/BitcoinSServerMainUtil.scala | 1 + .../scala/org/bitcoins/wallet/Wallet.scala | 28 +++++++++++++++ .../org/bitcoins/wallet/WalletCallbacks.scala | 32 ++++++++++++++--- .../wallet/config/WalletAppConfig.scala | 15 +++++++- 10 files changed, 154 insertions(+), 17 deletions(-) diff --git a/app-commons/src/main/scala/org/bitcoins/commons/jsonmodels/ws/WsModels.scala b/app-commons/src/main/scala/org/bitcoins/commons/jsonmodels/ws/WsModels.scala index 1fea590af5..d4870bc538 100644 --- a/app-commons/src/main/scala/org/bitcoins/commons/jsonmodels/ws/WsModels.scala +++ b/app-commons/src/main/scala/org/bitcoins/commons/jsonmodels/ws/WsModels.scala @@ -7,6 +7,7 @@ import org.bitcoins.core.api.wallet.db.SpendingInfoDb import org.bitcoins.core.protocol.BitcoinAddress import org.bitcoins.core.protocol.dlc.models.DLCStatus import org.bitcoins.core.protocol.transaction.Transaction +import org.bitcoins.core.wallet.fee.FeeUnit import org.bitcoins.crypto.{Sha256Digest, StringFactory} /** The event type being sent over the websocket. An example is [[WalletWsType.BlockProcessed]] */ @@ -37,6 +38,7 @@ object WalletWsType extends StringFactory[WalletWsType] { case object DLCOfferAdd extends WalletWsType case object DLCOfferRemove extends WalletWsType case object RescanComplete extends WalletWsType + case object FeeRateChange extends WalletWsType private val all = Vector(TxProcessed, @@ -46,7 +48,8 @@ object WalletWsType extends StringFactory[WalletWsType] { DLCStateChange, DLCOfferAdd, DLCOfferRemove, - RescanComplete) + RescanComplete, + FeeRateChange) override def fromStringOpt(string: String): Option[WalletWsType] = { all.find(_.toString.toLowerCase() == string.toLowerCase) @@ -185,6 +188,15 @@ object WalletNotification { upickle.default.writeJs(this)(WsPicklers.rescanPickler) } } + + case class FeeRateChange(payload: FeeUnit) + extends WalletNotification[FeeUnit] { + override val `type`: WalletWsType = WalletWsType.FeeRateChange + + override val json: ujson.Value = { + upickle.default.writeJs(this)(WsPicklers.feeRatePickler) + } + } } object ChainNotification { diff --git a/app-commons/src/main/scala/org/bitcoins/commons/serializers/Picklers.scala b/app-commons/src/main/scala/org/bitcoins/commons/serializers/Picklers.scala index 8a4bdfed82..7af3e06417 100644 --- a/app-commons/src/main/scala/org/bitcoins/commons/serializers/Picklers.scala +++ b/app-commons/src/main/scala/org/bitcoins/commons/serializers/Picklers.scala @@ -34,7 +34,7 @@ import org.bitcoins.core.psbt.PSBT import org.bitcoins.core.serializers.PicklerKeys import org.bitcoins.core.util.{NetworkUtil, TimeUtil} import org.bitcoins.core.util.TimeUtil._ -import org.bitcoins.core.wallet.fee.SatoshisPerVirtualByte +import org.bitcoins.core.wallet.fee.{FeeUnit, SatoshisPerVirtualByte} import org.bitcoins.core.wallet.utxo.{AddressLabelTag, TxoState} import org.bitcoins.crypto._ import scodec.bits.ByteVector @@ -1578,4 +1578,20 @@ object Picklers { private def readRescanComplete(value: ujson.Value): RescanComplete = { RescanComplete(value.str) } + + implicit val feeUnit: ReadWriter[FeeUnit] = { + readwriter[ujson.Value].bimap(writeFeeUnit(_), readFeeUnit(_)) + } + + private def writeFeeUnit(unit: FeeUnit): Value = unit match { + case SatoshisPerVirtualByte(currencyUnit) => + ujson.Num(currencyUnit.satoshis.toDouble) + case err: FeeUnit => + throw new RuntimeException(s"Unsupported fee unit type: `$err`") + } + + private def readFeeUnit(value: Value): FeeUnit = { + SatoshisPerVirtualByte.fromLong(value.num.toLong) + } + } diff --git a/app-commons/src/main/scala/org/bitcoins/commons/serializers/WsPicklers.scala b/app-commons/src/main/scala/org/bitcoins/commons/serializers/WsPicklers.scala index 04d0932c7e..0e712bbe85 100644 --- a/app-commons/src/main/scala/org/bitcoins/commons/serializers/WsPicklers.scala +++ b/app-commons/src/main/scala/org/bitcoins/commons/serializers/WsPicklers.scala @@ -8,6 +8,7 @@ import org.bitcoins.commons.jsonmodels.ws.WalletNotification.{ DLCOfferAddNotification, DLCOfferRemoveNotification, DLCStateChangeNotification, + FeeRateChange, NewAddressNotification, RescanComplete, ReservedUtxosNotification, @@ -93,6 +94,8 @@ object WsPicklers { upickle.default.writeJs(offerHash)(Picklers.dlcOfferRemoveW) case r: RescanComplete => upickle.default.writeJs(r)(Picklers.rescanComplete) + case FeeRateChange(feeRate) => + upickle.default.writeJs(feeRate)(Picklers.feeUnit) } val notificationObj = ujson.Obj( @@ -134,6 +137,8 @@ object WsPicklers { case WalletWsType.RescanComplete => val complete = upickle.default.read(payloadObj)(Picklers.rescanComplete) complete + case WalletWsType.FeeRateChange => + FeeRateChange(upickle.default.read(payloadObj)(Picklers.feeUnit)) } } @@ -190,6 +195,13 @@ object WsPicklers { ) } + implicit val feeRatePickler: ReadWriter[FeeRateChange] = { + readwriter[ujson.Obj].bimap( + writeWalletNotification(_), + readWalletNotification(_).asInstanceOf[FeeRateChange] + ) + } + implicit val walletNotificationPickler: ReadWriter[WalletNotification[_]] = { readwriter[ujson.Obj].bimap(writeWalletNotification, readWalletNotification) } diff --git a/app/server-test/src/test/scala/org/bitcoins/server/WebsocketTests.scala b/app/server-test/src/test/scala/org/bitcoins/server/WebsocketTests.scala index a149c0738a..b89b29d145 100644 --- a/app/server-test/src/test/scala/org/bitcoins/server/WebsocketTests.scala +++ b/app/server-test/src/test/scala/org/bitcoins/server/WebsocketTests.scala @@ -15,14 +15,7 @@ import org.bitcoins.commons.jsonmodels.ws.ChainNotification.{ BlockProcessedNotification, SyncFlagChangedNotification } -import org.bitcoins.commons.jsonmodels.ws.WalletNotification.{ - DLCOfferAddNotification, - DLCOfferRemoveNotification, - NewAddressNotification, - RescanComplete, - TxBroadcastNotification, - TxProcessedNotification -} +import org.bitcoins.commons.jsonmodels.ws.WalletNotification._ import org.bitcoins.commons.jsonmodels.ws.{ ChainNotification, WalletNotification, @@ -433,6 +426,30 @@ class WebsocketTests extends BitcoinSServerMainBitcoindFixture { } } + it must "receive updates when fee rate changes" in { serverWithBitcoind => + val ServerWithBitcoind(_, server) = serverWithBitcoind + + val req = buildReq(server.conf) + val tuple: ( + Future[WebSocketUpgradeResponse], + (Future[Seq[WsNotification[_]]], Promise[Option[Message]])) = { + Http() + .singleWebSocketRequest(req, websocketFlow) + } + val notificationsF = tuple._2._1 + val promise = tuple._2._2 + for { + _ <- AkkaUtil.nonBlockingSleep(2.seconds) + _ = promise.success(None) + notifications <- notificationsF + } yield { + val feeRateNotifications = + notifications.filter(_.isInstanceOf[FeeRateChange]) + assert(feeRateNotifications.nonEmpty) + } + } + + /* TODO implement a real test for this case it must "not queue things on the websocket while there is no one connected" in { serverWithBitcoind => val ServerWithBitcoind(_, server) = serverWithBitcoind @@ -454,4 +471,5 @@ class WebsocketTests extends BitcoinSServerMainBitcoindFixture { notifications <- notificationsF } yield assert(notifications.isEmpty) } + */ } diff --git a/app/server/src/main/scala/org/bitcoins/server/util/WebsocketUtil.scala b/app/server/src/main/scala/org/bitcoins/server/util/WebsocketUtil.scala index 2b048f1fce..d580181301 100644 --- a/app/server/src/main/scala/org/bitcoins/server/util/WebsocketUtil.scala +++ b/app/server/src/main/scala/org/bitcoins/server/util/WebsocketUtil.scala @@ -107,13 +107,20 @@ object WebsocketUtil extends Logging { offerF.map(_ => ()) } + val onFeeRate: OnFeeRateChanged = { feeRate => + val notification = WalletNotification.FeeRateChange(feeRate) + val offerF = walletQueue.offer(notification) + offerF.map(_ => ()) + } + WalletCallbacks( onTransactionProcessed = Vector(onTxProcessed), onTransactionBroadcast = Vector(onTxBroadcast), onReservedUtxos = Vector(onReservedUtxo), onNewAddressGenerated = Vector(onAddressCreated), onBlockProcessed = Vector.empty, - onRescanComplete = Vector(onRescanComplete) + onRescanComplete = Vector(onRescanComplete), + onFeeRateChanged = Vector(onFeeRate) ) } @@ -140,7 +147,8 @@ object WebsocketUtil extends Logging { WalletNotification.TxBroadcastNotification(tx) case x @ (WalletWsType.NewAddress | WalletWsType.ReservedUtxos | WalletWsType.DLCStateChange | WalletWsType.DLCOfferAdd | - WalletWsType.DLCOfferRemove | WalletWsType.RescanComplete) => + WalletWsType.DLCOfferRemove | WalletWsType.RescanComplete | + WalletWsType.FeeRateChange) => sys.error(s"Cannot build tx notification for $x") } diff --git a/db-commons/src/main/resources/reference.conf b/db-commons/src/main/resources/reference.conf index 0caee0546d..9b8d024130 100644 --- a/db-commons/src/main/resources/reference.conf +++ b/db-commons/src/main/resources/reference.conf @@ -218,6 +218,11 @@ bitcoin-s { enabled = false } } + + fee-provider { + poll-delay = 0s + poll-interval = 1m + } } akka { diff --git a/testkit/src/main/scala/org/bitcoins/testkit/server/BitcoinSServerMainUtil.scala b/testkit/src/main/scala/org/bitcoins/testkit/server/BitcoinSServerMainUtil.scala index 66968b1433..671091a571 100644 --- a/testkit/src/main/scala/org/bitcoins/testkit/server/BitcoinSServerMainUtil.scala +++ b/testkit/src/main/scala/org/bitcoins/testkit/server/BitcoinSServerMainUtil.scala @@ -41,6 +41,7 @@ object BitcoinSServerMainUtil { |bitcoin-s.server.rpcport = ${RpcUtil.randomPort} |bitcoin-s.server.wsport= ${RpcUtil.randomPort} |bitcoin-s.server.password=topsecret + |bitcoin-s.fee-provider.poll-interval = 1s | |""".stripMargin diff --git a/wallet/src/main/scala/org/bitcoins/wallet/Wallet.scala b/wallet/src/main/scala/org/bitcoins/wallet/Wallet.scala index c6f1529e3c..070d423123 100644 --- a/wallet/src/main/scala/org/bitcoins/wallet/Wallet.scala +++ b/wallet/src/main/scala/org/bitcoins/wallet/Wallet.scala @@ -40,7 +40,9 @@ import scodec.bits.ByteVector import slick.dbio.{DBIOAction, Effect, NoStream} import java.time.Instant +import java.util.concurrent.TimeUnit import scala.concurrent.{ExecutionContext, Future} +import scala.util.control.NonFatal import scala.util.{Failure, Random, Success} abstract class Wallet @@ -150,6 +152,7 @@ abstract class Wallet _ <- checkRootAccount _ <- downloadMissingUtxos _ = walletConfig.startRebroadcastTxsScheduler(this) + _ = startFeeRateCallbackScheduler() } yield { this } @@ -916,6 +919,31 @@ abstract class Wallet this } } + + def startFeeRateCallbackScheduler(): Unit = { + val feeRateChangedRunnable = new Runnable { + override def run(): Unit = { + getFeeRate() + .map(feeRate => Some(feeRate)) + .recover { case NonFatal(ex) => + logger.error("Cannot get fee rate ", ex) + None + } + .foreach { feeRateOpt => + walletCallbacks.executeOnFeeRateChanged( + logger, + feeRateOpt.getOrElse(SatoshisPerVirtualByte.negativeOne)) + } + () + } + } + + val _ = scheduler.scheduleAtFixedRate( + feeRateChangedRunnable, + walletConfig.feeRatePollDelay.toSeconds, + walletConfig.feeRatePollInterval.toSeconds, + TimeUnit.SECONDS) + } } // todo: create multiple wallets, need to maintain multiple databases diff --git a/wallet/src/main/scala/org/bitcoins/wallet/WalletCallbacks.scala b/wallet/src/main/scala/org/bitcoins/wallet/WalletCallbacks.scala index 3eda7f8eeb..9229e6eb14 100644 --- a/wallet/src/main/scala/org/bitcoins/wallet/WalletCallbacks.scala +++ b/wallet/src/main/scala/org/bitcoins/wallet/WalletCallbacks.scala @@ -7,6 +7,7 @@ import org.bitcoins.core.api.{Callback, CallbackHandler} import org.bitcoins.core.protocol.BitcoinAddress import org.bitcoins.core.protocol.blockchain.Block import org.bitcoins.core.protocol.transaction.Transaction +import org.bitcoins.core.wallet.fee.FeeUnit import scala.concurrent.{ExecutionContext, Future} @@ -33,6 +34,8 @@ trait WalletCallbacks extends ModuleCallbacks[WalletCallbacks] { def onRescanComplete: CallbackHandler[Unit, OnRescanComplete] + def onFeeRateChanged: CallbackHandler[FeeUnit, OnFeeRateChanged] + def +(other: WalletCallbacks): WalletCallbacks def executeOnTransactionProcessed(logger: Logger, tx: Transaction)(implicit @@ -92,6 +95,15 @@ trait WalletCallbacks extends ModuleCallbacks[WalletCallbacks] { err)) } + def executeOnFeeRateChanged(logger: Logger, feeRate: FeeUnit)(implicit + ec: ExecutionContext): Future[Unit] = { + onFeeRateChanged.execute( + feeRate, + (err: Throwable) => + logger.error(s"${onFeeRateChanged.name} Callback failed with error: ", + err)) + } + } /** Callback for handling a processed transaction */ @@ -108,6 +120,8 @@ trait OnBlockProcessed extends Callback[Block] /** Triggered when a rescan is */ trait OnRescanComplete extends Callback[Unit] +trait OnFeeRateChanged extends Callback[FeeUnit] + object WalletCallbacks extends CallbackFactory[WalletCallbacks] { private case class WalletCallbacksImpl( @@ -122,7 +136,8 @@ object WalletCallbacks extends CallbackFactory[WalletCallbacks] { BitcoinAddress, OnNewAddressGenerated], onBlockProcessed: CallbackHandler[Block, OnBlockProcessed], - onRescanComplete: CallbackHandler[Unit, OnRescanComplete] + onRescanComplete: CallbackHandler[Unit, OnRescanComplete], + onFeeRateChanged: CallbackHandler[FeeUnit, OnFeeRateChanged] ) extends WalletCallbacks { override def +(other: WalletCallbacks): WalletCallbacks = @@ -135,7 +150,8 @@ object WalletCallbacks extends CallbackFactory[WalletCallbacks] { onNewAddressGenerated = onNewAddressGenerated ++ other.onNewAddressGenerated, onBlockProcessed = onBlockProcessed ++ other.onBlockProcessed, - onRescanComplete = onRescanComplete ++ other.onRescanComplete + onRescanComplete = onRescanComplete ++ other.onRescanComplete, + onFeeRateChanged = onFeeRateChanged ++ other.onFeeRateChanged ) } @@ -159,6 +175,10 @@ object WalletCallbacks extends CallbackFactory[WalletCallbacks] { WalletCallbacks(onBlockProcessed = Vector(f)) } + def onFeeRateChanged(f: OnFeeRateChanged): WalletCallbacks = { + WalletCallbacks(onFeeRateChanged = Vector(f)) + } + /** Empty callbacks that does nothing with the received data */ override val empty: WalletCallbacks = apply(Vector.empty, @@ -174,7 +194,8 @@ object WalletCallbacks extends CallbackFactory[WalletCallbacks] { onReservedUtxos: Vector[OnReservedUtxos] = Vector.empty, onNewAddressGenerated: Vector[OnNewAddressGenerated] = Vector.empty, onBlockProcessed: Vector[OnBlockProcessed] = Vector.empty, - onRescanComplete: Vector[OnRescanComplete] = Vector.empty + onRescanComplete: Vector[OnRescanComplete] = Vector.empty, + onFeeRateChanged: Vector[OnFeeRateChanged] = Vector.empty ): WalletCallbacks = { WalletCallbacksImpl( onTransactionProcessed = @@ -199,7 +220,10 @@ object WalletCallbacks extends CallbackFactory[WalletCallbacks] { ), onRescanComplete = CallbackHandler[Unit, OnRescanComplete]("onRescanComplete", - onRescanComplete) + onRescanComplete), + onFeeRateChanged = + CallbackHandler[FeeUnit, OnFeeRateChanged]("onFeeRateChanged", + onFeeRateChanged) ) } } diff --git a/wallet/src/main/scala/org/bitcoins/wallet/config/WalletAppConfig.scala b/wallet/src/main/scala/org/bitcoins/wallet/config/WalletAppConfig.scala index c68fe0603f..473f2e91ab 100644 --- a/wallet/src/main/scala/org/bitcoins/wallet/config/WalletAppConfig.scala +++ b/wallet/src/main/scala/org/bitcoins/wallet/config/WalletAppConfig.scala @@ -25,7 +25,12 @@ import org.bitcoins.wallet.{Wallet, WalletCallbacks, WalletLogger} import java.nio.file.{Files, Path, Paths} import java.time.Instant import java.util.concurrent._ -import scala.concurrent.duration.{Duration, DurationInt, FiniteDuration} +import scala.concurrent.duration.{ + Duration, + DurationInt, + DurationLong, + FiniteDuration +} import scala.concurrent.{Await, ExecutionContext, Future} /** Configuration for the Bitcoin-S wallet @@ -141,6 +146,14 @@ case class WalletAppConfig(baseDatadir: Path, configOverrides: Vector[Config])( lazy val feeProviderTargetOpt: Option[Int] = config.getIntOpt("bitcoin-s.fee-provider.target") + lazy val feeRatePollInterval: FiniteDuration = config + .getDuration("bitcoin-s.fee-provider.poll-interval") + .getSeconds + .seconds + + lazy val feeRatePollDelay: FiniteDuration = + config.getDuration("bitcoin-s.fee-provider.poll-delay").getSeconds.seconds + lazy val allowExternalDLCAddresses: Boolean = config.getBoolean("bitcoin-s.wallet.allowExternalDLCAddresses")