Refactor to hide the Message paramter that gets fed to the websocket queue, move it to a internal implementation detail (#4514)

This commit is contained in:
Chris Stewart 2022-07-18 07:32:18 -05:00 committed by GitHub
parent f6df30c45e
commit e3a7c0971f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 96 additions and 72 deletions

View file

@ -1,6 +1,7 @@
package org.bitcoins.commons.jsonmodels.ws
import org.bitcoins.commons.jsonmodels.bitcoind.GetBlockHeaderResult
import org.bitcoins.commons.serializers.WsPicklers
import org.bitcoins.core.api.dlc.wallet.db.IncomingDLCOfferDb
import org.bitcoins.core.api.wallet.db.SpendingInfoDb
import org.bitcoins.core.protocol.BitcoinAddress
@ -96,6 +97,7 @@ object TorWsType extends StringFactory[TorWsType] {
sealed trait WsNotification[T] {
def `type`: WsType
def payload: T
def json: ujson.Value
}
sealed trait ChainNotification[T] extends WsNotification[T] {
@ -115,41 +117,73 @@ object WalletNotification {
case class NewAddressNotification(payload: BitcoinAddress)
extends WalletNotification[BitcoinAddress] {
override val `type`: WalletWsType = WalletWsType.NewAddress
override val json: ujson.Value = {
upickle.default.writeJs(this)(WsPicklers.newAddressPickler)
}
}
case class TxProcessedNotification(payload: Transaction)
extends WalletNotification[Transaction] {
override val `type`: WalletWsType = WalletWsType.TxProcessed
override val json: ujson.Value = {
upickle.default.writeJs(this)(WsPicklers.txProcessedPickler)
}
}
case class TxBroadcastNotification(payload: Transaction)
extends WalletNotification[Transaction] {
override val `type`: WalletWsType = WalletWsType.TxBroadcast
override val json: ujson.Value = {
upickle.default.writeJs(this)(WsPicklers.txBroadcastPickler)
}
}
case class ReservedUtxosNotification(payload: Vector[SpendingInfoDb])
extends WalletNotification[Vector[SpendingInfoDb]] {
override val `type`: WalletWsType = WalletWsType.ReservedUtxos
override val json: ujson.Value = {
upickle.default.writeJs(this)(WsPicklers.reservedUtxosPickler)
}
}
case class DLCStateChangeNotification(payload: DLCStatus)
extends WalletNotification[DLCStatus] {
override val `type`: WalletWsType = WalletWsType.DLCStateChange
override val json: ujson.Value = {
upickle.default.writeJs(this)(WsPicklers.dlcStateChangePickler)
}
}
case class DLCOfferAddNotification(payload: IncomingDLCOfferDb)
extends WalletNotification[IncomingDLCOfferDb] {
override val `type`: WalletWsType = WalletWsType.DLCOfferAdd
override val json: ujson.Value = {
upickle.default.writeJs(this)(WsPicklers.dlcOfferAddPickler)
}
}
case class DLCOfferRemoveNotification(payload: Sha256Digest)
extends WalletNotification[Sha256Digest] {
override val `type`: WalletWsType = WalletWsType.DLCOfferRemove
override val json: ujson.Value = {
upickle.default.writeJs(this)(WsPicklers.dlcOfferRemovePickler)
}
}
case class RescanComplete(payload: String)
extends WalletNotification[String] {
override val `type`: WalletWsType = WalletWsType.RescanComplete
override val json: ujson.Value = {
upickle.default.writeJs(this)(WsPicklers.rescanPickler)
}
}
}
@ -158,11 +192,19 @@ object ChainNotification {
case class BlockProcessedNotification(payload: GetBlockHeaderResult)
extends ChainNotification[GetBlockHeaderResult] {
override val `type`: ChainWsType = ChainWsType.BlockProcessed
override val json: ujson.Value = {
upickle.default.writeJs(this)(WsPicklers.blockProcessedPickler)
}
}
case class SyncFlagChangedNotification(payload: Boolean)
extends ChainNotification[Boolean] {
override val `type`: ChainWsType = ChainWsType.SyncFlagChanged
override val json: ujson.Value = {
upickle.default.writeJs(this)(WsPicklers.syncFlagChangedPickler)
}
}
}
@ -171,5 +213,9 @@ object TorNotification {
case object TorStartedNotification extends TorNotification[Unit] {
override val `type`: TorWsType = TorWsType.TorStarted
override val payload: Unit = ()
override val json: ujson.Value = {
upickle.default.writeJs(this)(WsPicklers.torStartedPickler)
}
}
}

View file

@ -4,7 +4,7 @@ import akka.actor.ActorSystem
import akka.event.Logging
import akka.http.scaladsl._
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.ws.Message
import akka.http.scaladsl.model.ws.{Message, TextMessage}
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server._
import akka.http.scaladsl.server.directives.Credentials.Missing
@ -16,10 +16,11 @@ import akka.http.scaladsl.server.directives.{
PathDirectives
}
import akka.http.scaladsl.unmarshalling.FromRequestUnmarshaller
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.{Done, NotUsed}
import de.heikoseeberger.akkahttpupickle.UpickleSupport._
import org.bitcoins.commons.config.AppConfig
import org.bitcoins.commons.jsonmodels.ws.WsNotification
import org.bitcoins.server.util.{ServerBindings, WsServerConfig}
import upickle.{default => up}
@ -32,7 +33,7 @@ case class Server(
rpcport: Int,
rpcPassword: String,
wsConfigOpt: Option[WsServerConfig],
wsSource: Source[Message, NotUsed])(implicit system: ActorSystem)
wsSource: Source[WsNotification[_], NotUsed])(implicit system: ActorSystem)
extends HttpLogger {
import system.dispatcher
@ -208,9 +209,19 @@ case class Server(
}
}
private val notificationToMsgFn: WsNotification[_] => Message = {
notification =>
val msg = TextMessage.Strict(notification.json.toString())
msg
}
private val notificationToMsgFlow = Flow.fromFunction(notificationToMsgFn)
private val msgSource: Source[Message, NotUsed] = {
wsSource.viaMat(notificationToMsgFlow)(Keep.right)
}
private def wsHandler: Flow[Message, Message, Any] = {
//we don't allow input, so use Sink.ignore
Flow.fromSinkAndSource(Sink.ignore, wsSource)
Flow.fromSinkAndSource(Sink.ignore, msgSource)
}
}

View file

@ -1,7 +1,6 @@
package org.bitcoins.server
import akka.actor.ActorSystem
import akka.http.scaladsl.model.ws.Message
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{
BroadcastHub,
@ -17,6 +16,7 @@ import org.bitcoins.chain.ChainCallbacks
import org.bitcoins.chain.blockchain.ChainHandler
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.commons.jsonmodels.bitcoind.GetBlockChainInfoResult
import org.bitcoins.commons.jsonmodels.ws.WsNotification
import org.bitcoins.commons.util.{DatadirParser, ServerArgParser}
import org.bitcoins.core.api.chain.ChainApi
import org.bitcoins.core.api.node.{
@ -162,8 +162,8 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
val tuple = buildWsSource
val wsQueue: SourceQueueWithComplete[Message] = tuple._1
val wsSource: Source[Message, NotUsed] = tuple._2
val wsQueue: SourceQueueWithComplete[WsNotification[_]] = tuple._1
val wsSource: Source[WsNotification[_], NotUsed] = tuple._2
val _ = buildNeutrinoCallbacks(wsQueue, chainApi)
val torCallbacks = WebsocketUtil.buildTorCallbacks(wsQueue)
torConf.addCallbacks(torCallbacks)
@ -224,7 +224,7 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
}
private def buildNeutrinoCallbacks(
wsQueue: SourceQueueWithComplete[Message],
wsQueue: SourceQueueWithComplete[WsNotification[_]],
chainApi: ChainApi): Unit = {
val chainCallbacks = WebsocketUtil.buildChainCallbacks(wsQueue, chainApi)
chainConf.addCallbacks(chainCallbacks)
@ -274,8 +274,8 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
_ <- client.start()
} yield client
val tuple = buildWsSource
val wsQueue: SourceQueueWithComplete[Message] = tuple._1
val wsSource: Source[Message, NotUsed] = tuple._2
val wsQueue: SourceQueueWithComplete[WsNotification[_]] = tuple._1
val wsSource: Source[WsNotification[_], NotUsed] = tuple._2
val torCallbacks = WebsocketUtil.buildTorCallbacks(wsQueue)
val _ = torConf.addCallbacks(torCallbacks)
val isTorStartedF = if (torConf.torProvided) {
@ -371,7 +371,7 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
dlcNodeF: Future[DLCNode],
torConfStarted: Future[Unit],
serverCmdLineArgs: ServerArgParser,
wsSource: Source[Message, NotUsed])(implicit
wsSource: Source[WsNotification[_], NotUsed])(implicit
system: ActorSystem,
conf: BitcoinSAppConfig): Future[Server] = {
implicit val nodeConf: NodeAppConfig = conf.nodeConf
@ -502,8 +502,8 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
* to create a flow that emits websocket messages
*/
private def buildWsSource: (
SourceQueueWithComplete[Message],
Source[Message, NotUsed]) = {
SourceQueueWithComplete[WsNotification[_]],
Source[WsNotification[_], NotUsed]) = {
val maxBufferSize: Int = 25
/** This will queue [[maxBufferSize]] elements in the queue. Once the buffer size is reached,
@ -514,7 +514,7 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
//the BroadcastHub.sink is needed to avoid these errors
// 'Websocket handler failed with Processor actor'
Source
.queue[Message](maxBufferSize, OverflowStrategy.dropHead)
.queue[WsNotification[_]](maxBufferSize, OverflowStrategy.dropHead)
.toMat(BroadcastHub.sink)(Keep.both)
.run()
}

View file

@ -1,6 +1,5 @@
package org.bitcoins.server.util
import akka.http.scaladsl.model.ws.{Message, TextMessage}
import akka.stream.scaladsl.SourceQueueWithComplete
import grizzled.slf4j.Logging
import org.bitcoins.chain.{
@ -12,9 +11,10 @@ import org.bitcoins.commons.jsonmodels.ws.TorNotification.TorStartedNotification
import org.bitcoins.commons.jsonmodels.ws.{
ChainNotification,
WalletNotification,
WalletWsType
WalletWsType,
WsNotification
}
import org.bitcoins.commons.serializers.WsPicklers
import org.bitcoins.core.api.chain.ChainApi
import org.bitcoins.core.api.dlc.wallet.db.IncomingDLCOfferDb
import org.bitcoins.core.protocol.blockchain.BlockHeader
@ -36,7 +36,7 @@ import scala.concurrent.{ExecutionContext, Future}
object WebsocketUtil extends Logging {
def buildChainCallbacks(
queue: SourceQueueWithComplete[Message],
queue: SourceQueueWithComplete[WsNotification[_]],
chainApi: ChainApi)(implicit ec: ExecutionContext): ChainCallbacks = {
val onBlockProcessed: OnBlockHeaderConnected = {
case headersWithHeight: Vector[(Int, BlockHeader)] =>
@ -49,13 +49,7 @@ object WebsocketUtil extends Logging {
notifications =
results.map(result =>
ChainNotification.BlockProcessedNotification(result))
notificationsJson = notifications.map { notification =>
upickle.default.writeJs(notification)(
WsPicklers.blockProcessedPickler)
}
msgs = notificationsJson.map(n => TextMessage.Strict(n.toString()))
_ <- FutureUtil.sequentially(msgs) { case msg =>
_ <- FutureUtil.sequentially(notifications) { case msg =>
val x: Future[Unit] = queue
.offer(msg)
.map(_ => ())
@ -69,11 +63,8 @@ object WebsocketUtil extends Logging {
val onSyncFlagChanged: OnSyncFlagChanged = { syncing =>
val notification = ChainNotification.SyncFlagChangedNotification(syncing)
val notificationJson =
upickle.default.writeJs(notification)(WsPicklers.syncFlagChangedPickler)
val msg = TextMessage.Strict(notificationJson.toString())
for {
_ <- queue.offer(msg)
_ <- queue.offer(notification)
} yield ()
}
@ -83,14 +74,11 @@ object WebsocketUtil extends Logging {
/** Builds websocket callbacks for the wallet */
def buildWalletCallbacks(
walletQueue: SourceQueueWithComplete[Message],
walletQueue: SourceQueueWithComplete[WsNotification[_]],
walletName: String)(implicit ec: ExecutionContext): WalletCallbacks = {
val onAddressCreated: OnNewAddressGenerated = { addr =>
val notification = WalletNotification.NewAddressNotification(addr)
val json =
upickle.default.writeJs(notification)(WsPicklers.newAddressPickler)
val msg = TextMessage.Strict(json.toString())
val offerF = walletQueue.offer(msg)
val offerF = walletQueue.offer(notification)
offerF.map(_ => ())
}
@ -109,19 +97,13 @@ object WebsocketUtil extends Logging {
val onReservedUtxo: OnReservedUtxos = { utxos =>
val notification =
WalletNotification.ReservedUtxosNotification(utxos)
val notificationJson =
upickle.default.writeJs(notification)(WsPicklers.reservedUtxosPickler)
val msg = TextMessage.Strict(notificationJson.toString())
val offerF = walletQueue.offer(msg)
val offerF = walletQueue.offer(notification)
offerF.map(_ => ())
}
val onRescanComplete: OnRescanComplete = { _ =>
val notification = WalletNotification.RescanComplete(walletName)
val notificationJson =
upickle.default.writeJs(notification)(WsPicklers.rescanPickler)
val msg = TextMessage.Strict(notificationJson.toString())
val offerF = walletQueue.offer(msg)
val offerF = walletQueue.offer(notification)
offerF.map(_ => ())
}
@ -135,15 +117,11 @@ object WebsocketUtil extends Logging {
)
}
def buildTorCallbacks(queue: SourceQueueWithComplete[Message])(implicit
ec: ExecutionContext): TorCallbacks = {
def buildTorCallbacks(queue: SourceQueueWithComplete[WsNotification[_]])(
implicit ec: ExecutionContext): TorCallbacks = {
val onTorStarted: OnTorStarted = { _ =>
val notification = TorStartedNotification
val json =
upickle.default.writeJs(notification)(WsPicklers.torStartedPickler)
val msg = TextMessage.Strict(json.toString())
val offerF = queue.offer(msg)
val offerF = queue.offer(notification)
offerF.map(_ => ())
}
@ -153,53 +131,42 @@ object WebsocketUtil extends Logging {
private def buildTxNotification(
wsType: WalletWsType,
tx: Transaction,
walletQueue: SourceQueueWithComplete[Message])(implicit
walletQueue: SourceQueueWithComplete[WsNotification[_]])(implicit
ec: ExecutionContext): Future[Unit] = {
val json = wsType match {
val notification = wsType match {
case WalletWsType.TxProcessed =>
val notification = WalletNotification.TxProcessedNotification(tx)
upickle.default.writeJs(notification)(WsPicklers.txProcessedPickler)
WalletNotification.TxProcessedNotification(tx)
case WalletWsType.TxBroadcast =>
val notification = WalletNotification.TxBroadcastNotification(tx)
upickle.default.writeJs(notification)(WsPicklers.txBroadcastPickler)
WalletNotification.TxBroadcastNotification(tx)
case x @ (WalletWsType.NewAddress | WalletWsType.ReservedUtxos |
WalletWsType.DLCStateChange | WalletWsType.DLCOfferAdd |
WalletWsType.DLCOfferRemove | WalletWsType.RescanComplete) =>
sys.error(s"Cannot build tx notification for $x")
}
val msg = TextMessage.Strict(json.toString())
val offerF = walletQueue.offer(msg)
val offerF = walletQueue.offer(notification)
offerF.map(_ => ())
}
def buildDLCWalletCallbacks(walletQueue: SourceQueueWithComplete[Message])(
implicit ec: ExecutionContext): DLCWalletCallbacks = {
def buildDLCWalletCallbacks(
walletQueue: SourceQueueWithComplete[WsNotification[_]])(implicit
ec: ExecutionContext): DLCWalletCallbacks = {
val onStateChange: OnDLCStateChange = { status: DLCStatus =>
val notification = WalletNotification.DLCStateChangeNotification(status)
val json =
upickle.default.writeJs(notification)(WsPicklers.dlcStateChangePickler)
val msg = TextMessage.Strict(json.toString())
val offerF = walletQueue.offer(msg)
val offerF = walletQueue.offer(notification)
offerF.map(_ => ())
}
val onOfferAdd: OnDLCOfferAdd = { offerDb: IncomingDLCOfferDb =>
val notification = WalletNotification.DLCOfferAddNotification(offerDb)
val json =
upickle.default.writeJs(notification)(WsPicklers.dlcOfferAddPickler)
val msg = TextMessage.Strict(json.toString())
val offerF = walletQueue.offer(msg)
val offerF = walletQueue.offer(notification)
offerF.map(_ => ())
}
val onOfferRemove: OnDLCOfferRemove = { offerHash: Sha256Digest =>
val notification =
WalletNotification.DLCOfferRemoveNotification(offerHash)
val json =
upickle.default.writeJs(notification)(WsPicklers.dlcOfferRemovePickler)
val msg = TextMessage.Strict(json.toString())
val offerF = walletQueue.offer(msg)
val offerF = walletQueue.offer(notification)
offerF.map(_ => ())
}