Fee rate WS notification (#4518)

* Fee rate WS notification

* make the test more CI friendly

* fix the default config

* always return errors

* fix build

* scalafmt

* send notifications every time
This commit is contained in:
rorp 2022-07-19 14:41:06 -07:00 committed by GitHub
parent 2e309086d5
commit 5d309fbae0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 154 additions and 17 deletions

View File

@ -7,6 +7,7 @@ import org.bitcoins.core.api.wallet.db.SpendingInfoDb
import org.bitcoins.core.protocol.BitcoinAddress
import org.bitcoins.core.protocol.dlc.models.DLCStatus
import org.bitcoins.core.protocol.transaction.Transaction
import org.bitcoins.core.wallet.fee.FeeUnit
import org.bitcoins.crypto.{Sha256Digest, StringFactory}
/** The event type being sent over the websocket. An example is [[WalletWsType.BlockProcessed]] */
@ -37,6 +38,7 @@ object WalletWsType extends StringFactory[WalletWsType] {
case object DLCOfferAdd extends WalletWsType
case object DLCOfferRemove extends WalletWsType
case object RescanComplete extends WalletWsType
case object FeeRateChange extends WalletWsType
private val all =
Vector(TxProcessed,
@ -46,7 +48,8 @@ object WalletWsType extends StringFactory[WalletWsType] {
DLCStateChange,
DLCOfferAdd,
DLCOfferRemove,
RescanComplete)
RescanComplete,
FeeRateChange)
override def fromStringOpt(string: String): Option[WalletWsType] = {
all.find(_.toString.toLowerCase() == string.toLowerCase)
@ -185,6 +188,15 @@ object WalletNotification {
upickle.default.writeJs(this)(WsPicklers.rescanPickler)
}
}
case class FeeRateChange(payload: FeeUnit)
extends WalletNotification[FeeUnit] {
override val `type`: WalletWsType = WalletWsType.FeeRateChange
override val json: ujson.Value = {
upickle.default.writeJs(this)(WsPicklers.feeRatePickler)
}
}
}
object ChainNotification {

View File

@ -34,7 +34,7 @@ import org.bitcoins.core.psbt.PSBT
import org.bitcoins.core.serializers.PicklerKeys
import org.bitcoins.core.util.{NetworkUtil, TimeUtil}
import org.bitcoins.core.util.TimeUtil._
import org.bitcoins.core.wallet.fee.SatoshisPerVirtualByte
import org.bitcoins.core.wallet.fee.{FeeUnit, SatoshisPerVirtualByte}
import org.bitcoins.core.wallet.utxo.{AddressLabelTag, TxoState}
import org.bitcoins.crypto._
import scodec.bits.ByteVector
@ -1578,4 +1578,20 @@ object Picklers {
private def readRescanComplete(value: ujson.Value): RescanComplete = {
RescanComplete(value.str)
}
implicit val feeUnit: ReadWriter[FeeUnit] = {
readwriter[ujson.Value].bimap(writeFeeUnit(_), readFeeUnit(_))
}
private def writeFeeUnit(unit: FeeUnit): Value = unit match {
case SatoshisPerVirtualByte(currencyUnit) =>
ujson.Num(currencyUnit.satoshis.toDouble)
case err: FeeUnit =>
throw new RuntimeException(s"Unsupported fee unit type: `$err`")
}
private def readFeeUnit(value: Value): FeeUnit = {
SatoshisPerVirtualByte.fromLong(value.num.toLong)
}
}

View File

@ -8,6 +8,7 @@ import org.bitcoins.commons.jsonmodels.ws.WalletNotification.{
DLCOfferAddNotification,
DLCOfferRemoveNotification,
DLCStateChangeNotification,
FeeRateChange,
NewAddressNotification,
RescanComplete,
ReservedUtxosNotification,
@ -93,6 +94,8 @@ object WsPicklers {
upickle.default.writeJs(offerHash)(Picklers.dlcOfferRemoveW)
case r: RescanComplete =>
upickle.default.writeJs(r)(Picklers.rescanComplete)
case FeeRateChange(feeRate) =>
upickle.default.writeJs(feeRate)(Picklers.feeUnit)
}
val notificationObj = ujson.Obj(
@ -134,6 +137,8 @@ object WsPicklers {
case WalletWsType.RescanComplete =>
val complete = upickle.default.read(payloadObj)(Picklers.rescanComplete)
complete
case WalletWsType.FeeRateChange =>
FeeRateChange(upickle.default.read(payloadObj)(Picklers.feeUnit))
}
}
@ -190,6 +195,13 @@ object WsPicklers {
)
}
implicit val feeRatePickler: ReadWriter[FeeRateChange] = {
readwriter[ujson.Obj].bimap(
writeWalletNotification(_),
readWalletNotification(_).asInstanceOf[FeeRateChange]
)
}
implicit val walletNotificationPickler: ReadWriter[WalletNotification[_]] = {
readwriter[ujson.Obj].bimap(writeWalletNotification, readWalletNotification)
}

View File

@ -15,14 +15,7 @@ import org.bitcoins.commons.jsonmodels.ws.ChainNotification.{
BlockProcessedNotification,
SyncFlagChangedNotification
}
import org.bitcoins.commons.jsonmodels.ws.WalletNotification.{
DLCOfferAddNotification,
DLCOfferRemoveNotification,
NewAddressNotification,
RescanComplete,
TxBroadcastNotification,
TxProcessedNotification
}
import org.bitcoins.commons.jsonmodels.ws.WalletNotification._
import org.bitcoins.commons.jsonmodels.ws.{
ChainNotification,
WalletNotification,
@ -433,6 +426,30 @@ class WebsocketTests extends BitcoinSServerMainBitcoindFixture {
}
}
it must "receive updates when fee rate changes" in { serverWithBitcoind =>
val ServerWithBitcoind(_, server) = serverWithBitcoind
val req = buildReq(server.conf)
val tuple: (
Future[WebSocketUpgradeResponse],
(Future[Seq[WsNotification[_]]], Promise[Option[Message]])) = {
Http()
.singleWebSocketRequest(req, websocketFlow)
}
val notificationsF = tuple._2._1
val promise = tuple._2._2
for {
_ <- AkkaUtil.nonBlockingSleep(2.seconds)
_ = promise.success(None)
notifications <- notificationsF
} yield {
val feeRateNotifications =
notifications.filter(_.isInstanceOf[FeeRateChange])
assert(feeRateNotifications.nonEmpty)
}
}
/* TODO implement a real test for this case
it must "not queue things on the websocket while there is no one connected" in {
serverWithBitcoind =>
val ServerWithBitcoind(_, server) = serverWithBitcoind
@ -454,4 +471,5 @@ class WebsocketTests extends BitcoinSServerMainBitcoindFixture {
notifications <- notificationsF
} yield assert(notifications.isEmpty)
}
*/
}

View File

@ -107,13 +107,20 @@ object WebsocketUtil extends Logging {
offerF.map(_ => ())
}
val onFeeRate: OnFeeRateChanged = { feeRate =>
val notification = WalletNotification.FeeRateChange(feeRate)
val offerF = walletQueue.offer(notification)
offerF.map(_ => ())
}
WalletCallbacks(
onTransactionProcessed = Vector(onTxProcessed),
onTransactionBroadcast = Vector(onTxBroadcast),
onReservedUtxos = Vector(onReservedUtxo),
onNewAddressGenerated = Vector(onAddressCreated),
onBlockProcessed = Vector.empty,
onRescanComplete = Vector(onRescanComplete)
onRescanComplete = Vector(onRescanComplete),
onFeeRateChanged = Vector(onFeeRate)
)
}
@ -140,7 +147,8 @@ object WebsocketUtil extends Logging {
WalletNotification.TxBroadcastNotification(tx)
case x @ (WalletWsType.NewAddress | WalletWsType.ReservedUtxos |
WalletWsType.DLCStateChange | WalletWsType.DLCOfferAdd |
WalletWsType.DLCOfferRemove | WalletWsType.RescanComplete) =>
WalletWsType.DLCOfferRemove | WalletWsType.RescanComplete |
WalletWsType.FeeRateChange) =>
sys.error(s"Cannot build tx notification for $x")
}

View File

@ -218,6 +218,11 @@ bitcoin-s {
enabled = false
}
}
fee-provider {
poll-delay = 0s
poll-interval = 1m
}
}
akka {

View File

@ -41,6 +41,7 @@ object BitcoinSServerMainUtil {
|bitcoin-s.server.rpcport = ${RpcUtil.randomPort}
|bitcoin-s.server.wsport= ${RpcUtil.randomPort}
|bitcoin-s.server.password=topsecret
|bitcoin-s.fee-provider.poll-interval = 1s
|
|""".stripMargin

View File

@ -40,7 +40,9 @@ import scodec.bits.ByteVector
import slick.dbio.{DBIOAction, Effect, NoStream}
import java.time.Instant
import java.util.concurrent.TimeUnit
import scala.concurrent.{ExecutionContext, Future}
import scala.util.control.NonFatal
import scala.util.{Failure, Random, Success}
abstract class Wallet
@ -150,6 +152,7 @@ abstract class Wallet
_ <- checkRootAccount
_ <- downloadMissingUtxos
_ = walletConfig.startRebroadcastTxsScheduler(this)
_ = startFeeRateCallbackScheduler()
} yield {
this
}
@ -916,6 +919,31 @@ abstract class Wallet
this
}
}
def startFeeRateCallbackScheduler(): Unit = {
val feeRateChangedRunnable = new Runnable {
override def run(): Unit = {
getFeeRate()
.map(feeRate => Some(feeRate))
.recover { case NonFatal(ex) =>
logger.error("Cannot get fee rate ", ex)
None
}
.foreach { feeRateOpt =>
walletCallbacks.executeOnFeeRateChanged(
logger,
feeRateOpt.getOrElse(SatoshisPerVirtualByte.negativeOne))
}
()
}
}
val _ = scheduler.scheduleAtFixedRate(
feeRateChangedRunnable,
walletConfig.feeRatePollDelay.toSeconds,
walletConfig.feeRatePollInterval.toSeconds,
TimeUnit.SECONDS)
}
}
// todo: create multiple wallets, need to maintain multiple databases

View File

@ -7,6 +7,7 @@ import org.bitcoins.core.api.{Callback, CallbackHandler}
import org.bitcoins.core.protocol.BitcoinAddress
import org.bitcoins.core.protocol.blockchain.Block
import org.bitcoins.core.protocol.transaction.Transaction
import org.bitcoins.core.wallet.fee.FeeUnit
import scala.concurrent.{ExecutionContext, Future}
@ -33,6 +34,8 @@ trait WalletCallbacks extends ModuleCallbacks[WalletCallbacks] {
def onRescanComplete: CallbackHandler[Unit, OnRescanComplete]
def onFeeRateChanged: CallbackHandler[FeeUnit, OnFeeRateChanged]
def +(other: WalletCallbacks): WalletCallbacks
def executeOnTransactionProcessed(logger: Logger, tx: Transaction)(implicit
@ -92,6 +95,15 @@ trait WalletCallbacks extends ModuleCallbacks[WalletCallbacks] {
err))
}
def executeOnFeeRateChanged(logger: Logger, feeRate: FeeUnit)(implicit
ec: ExecutionContext): Future[Unit] = {
onFeeRateChanged.execute(
feeRate,
(err: Throwable) =>
logger.error(s"${onFeeRateChanged.name} Callback failed with error: ",
err))
}
}
/** Callback for handling a processed transaction */
@ -108,6 +120,8 @@ trait OnBlockProcessed extends Callback[Block]
/** Triggered when a rescan is */
trait OnRescanComplete extends Callback[Unit]
trait OnFeeRateChanged extends Callback[FeeUnit]
object WalletCallbacks extends CallbackFactory[WalletCallbacks] {
private case class WalletCallbacksImpl(
@ -122,7 +136,8 @@ object WalletCallbacks extends CallbackFactory[WalletCallbacks] {
BitcoinAddress,
OnNewAddressGenerated],
onBlockProcessed: CallbackHandler[Block, OnBlockProcessed],
onRescanComplete: CallbackHandler[Unit, OnRescanComplete]
onRescanComplete: CallbackHandler[Unit, OnRescanComplete],
onFeeRateChanged: CallbackHandler[FeeUnit, OnFeeRateChanged]
) extends WalletCallbacks {
override def +(other: WalletCallbacks): WalletCallbacks =
@ -135,7 +150,8 @@ object WalletCallbacks extends CallbackFactory[WalletCallbacks] {
onNewAddressGenerated =
onNewAddressGenerated ++ other.onNewAddressGenerated,
onBlockProcessed = onBlockProcessed ++ other.onBlockProcessed,
onRescanComplete = onRescanComplete ++ other.onRescanComplete
onRescanComplete = onRescanComplete ++ other.onRescanComplete,
onFeeRateChanged = onFeeRateChanged ++ other.onFeeRateChanged
)
}
@ -159,6 +175,10 @@ object WalletCallbacks extends CallbackFactory[WalletCallbacks] {
WalletCallbacks(onBlockProcessed = Vector(f))
}
def onFeeRateChanged(f: OnFeeRateChanged): WalletCallbacks = {
WalletCallbacks(onFeeRateChanged = Vector(f))
}
/** Empty callbacks that does nothing with the received data */
override val empty: WalletCallbacks =
apply(Vector.empty,
@ -174,7 +194,8 @@ object WalletCallbacks extends CallbackFactory[WalletCallbacks] {
onReservedUtxos: Vector[OnReservedUtxos] = Vector.empty,
onNewAddressGenerated: Vector[OnNewAddressGenerated] = Vector.empty,
onBlockProcessed: Vector[OnBlockProcessed] = Vector.empty,
onRescanComplete: Vector[OnRescanComplete] = Vector.empty
onRescanComplete: Vector[OnRescanComplete] = Vector.empty,
onFeeRateChanged: Vector[OnFeeRateChanged] = Vector.empty
): WalletCallbacks = {
WalletCallbacksImpl(
onTransactionProcessed =
@ -199,7 +220,10 @@ object WalletCallbacks extends CallbackFactory[WalletCallbacks] {
),
onRescanComplete =
CallbackHandler[Unit, OnRescanComplete]("onRescanComplete",
onRescanComplete)
onRescanComplete),
onFeeRateChanged =
CallbackHandler[FeeUnit, OnFeeRateChanged]("onFeeRateChanged",
onFeeRateChanged)
)
}
}

View File

@ -25,7 +25,12 @@ import org.bitcoins.wallet.{Wallet, WalletCallbacks, WalletLogger}
import java.nio.file.{Files, Path, Paths}
import java.time.Instant
import java.util.concurrent._
import scala.concurrent.duration.{Duration, DurationInt, FiniteDuration}
import scala.concurrent.duration.{
Duration,
DurationInt,
DurationLong,
FiniteDuration
}
import scala.concurrent.{Await, ExecutionContext, Future}
/** Configuration for the Bitcoin-S wallet
@ -141,6 +146,14 @@ case class WalletAppConfig(baseDatadir: Path, configOverrides: Vector[Config])(
lazy val feeProviderTargetOpt: Option[Int] =
config.getIntOpt("bitcoin-s.fee-provider.target")
lazy val feeRatePollInterval: FiniteDuration = config
.getDuration("bitcoin-s.fee-provider.poll-interval")
.getSeconds
.seconds
lazy val feeRatePollDelay: FiniteDuration =
config.getDuration("bitcoin-s.fee-provider.poll-delay").getSeconds.seconds
lazy val allowExternalDLCAddresses: Boolean =
config.getBoolean("bitcoin-s.wallet.allowExternalDLCAddresses")