diff --git a/app-commons/src/main/scala/org/bitcoins/commons/jsonmodels/ws/WsModels.scala b/app-commons/src/main/scala/org/bitcoins/commons/jsonmodels/ws/WsModels.scala index 6ab431822b..1fea590af5 100644 --- a/app-commons/src/main/scala/org/bitcoins/commons/jsonmodels/ws/WsModels.scala +++ b/app-commons/src/main/scala/org/bitcoins/commons/jsonmodels/ws/WsModels.scala @@ -1,6 +1,7 @@ package org.bitcoins.commons.jsonmodels.ws import org.bitcoins.commons.jsonmodels.bitcoind.GetBlockHeaderResult +import org.bitcoins.commons.serializers.WsPicklers import org.bitcoins.core.api.dlc.wallet.db.IncomingDLCOfferDb import org.bitcoins.core.api.wallet.db.SpendingInfoDb import org.bitcoins.core.protocol.BitcoinAddress @@ -96,6 +97,7 @@ object TorWsType extends StringFactory[TorWsType] { sealed trait WsNotification[T] { def `type`: WsType def payload: T + def json: ujson.Value } sealed trait ChainNotification[T] extends WsNotification[T] { @@ -115,41 +117,73 @@ object WalletNotification { case class NewAddressNotification(payload: BitcoinAddress) extends WalletNotification[BitcoinAddress] { override val `type`: WalletWsType = WalletWsType.NewAddress + + override val json: ujson.Value = { + upickle.default.writeJs(this)(WsPicklers.newAddressPickler) + } } case class TxProcessedNotification(payload: Transaction) extends WalletNotification[Transaction] { override val `type`: WalletWsType = WalletWsType.TxProcessed + + override val json: ujson.Value = { + upickle.default.writeJs(this)(WsPicklers.txProcessedPickler) + } } case class TxBroadcastNotification(payload: Transaction) extends WalletNotification[Transaction] { override val `type`: WalletWsType = WalletWsType.TxBroadcast + + override val json: ujson.Value = { + upickle.default.writeJs(this)(WsPicklers.txBroadcastPickler) + } } case class ReservedUtxosNotification(payload: Vector[SpendingInfoDb]) extends WalletNotification[Vector[SpendingInfoDb]] { override val `type`: WalletWsType = WalletWsType.ReservedUtxos + + override val json: ujson.Value = { + upickle.default.writeJs(this)(WsPicklers.reservedUtxosPickler) + } } case class DLCStateChangeNotification(payload: DLCStatus) extends WalletNotification[DLCStatus] { override val `type`: WalletWsType = WalletWsType.DLCStateChange + + override val json: ujson.Value = { + upickle.default.writeJs(this)(WsPicklers.dlcStateChangePickler) + } } case class DLCOfferAddNotification(payload: IncomingDLCOfferDb) extends WalletNotification[IncomingDLCOfferDb] { override val `type`: WalletWsType = WalletWsType.DLCOfferAdd + + override val json: ujson.Value = { + upickle.default.writeJs(this)(WsPicklers.dlcOfferAddPickler) + } } case class DLCOfferRemoveNotification(payload: Sha256Digest) extends WalletNotification[Sha256Digest] { override val `type`: WalletWsType = WalletWsType.DLCOfferRemove + + override val json: ujson.Value = { + upickle.default.writeJs(this)(WsPicklers.dlcOfferRemovePickler) + } } case class RescanComplete(payload: String) extends WalletNotification[String] { override val `type`: WalletWsType = WalletWsType.RescanComplete + + override val json: ujson.Value = { + upickle.default.writeJs(this)(WsPicklers.rescanPickler) + } } } @@ -158,11 +192,19 @@ object ChainNotification { case class BlockProcessedNotification(payload: GetBlockHeaderResult) extends ChainNotification[GetBlockHeaderResult] { override val `type`: ChainWsType = ChainWsType.BlockProcessed + + override val json: ujson.Value = { + upickle.default.writeJs(this)(WsPicklers.blockProcessedPickler) + } } case class SyncFlagChangedNotification(payload: Boolean) extends ChainNotification[Boolean] { override val `type`: ChainWsType = ChainWsType.SyncFlagChanged + + override val json: ujson.Value = { + upickle.default.writeJs(this)(WsPicklers.syncFlagChangedPickler) + } } } @@ -171,5 +213,9 @@ object TorNotification { case object TorStartedNotification extends TorNotification[Unit] { override val `type`: TorWsType = TorWsType.TorStarted override val payload: Unit = () + + override val json: ujson.Value = { + upickle.default.writeJs(this)(WsPicklers.torStartedPickler) + } } } diff --git a/app/server-routes/src/main/scala/org/bitcoins/server/routes/Server.scala b/app/server-routes/src/main/scala/org/bitcoins/server/routes/Server.scala index fe184190c6..0b6ae8a183 100644 --- a/app/server-routes/src/main/scala/org/bitcoins/server/routes/Server.scala +++ b/app/server-routes/src/main/scala/org/bitcoins/server/routes/Server.scala @@ -4,7 +4,7 @@ import akka.actor.ActorSystem import akka.event.Logging import akka.http.scaladsl._ import akka.http.scaladsl.model._ -import akka.http.scaladsl.model.ws.Message +import akka.http.scaladsl.model.ws.{Message, TextMessage} import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.server._ import akka.http.scaladsl.server.directives.Credentials.Missing @@ -16,10 +16,11 @@ import akka.http.scaladsl.server.directives.{ PathDirectives } import akka.http.scaladsl.unmarshalling.FromRequestUnmarshaller -import akka.stream.scaladsl.{Flow, Sink, Source} +import akka.stream.scaladsl.{Flow, Keep, Sink, Source} import akka.{Done, NotUsed} import de.heikoseeberger.akkahttpupickle.UpickleSupport._ import org.bitcoins.commons.config.AppConfig +import org.bitcoins.commons.jsonmodels.ws.WsNotification import org.bitcoins.server.util.{ServerBindings, WsServerConfig} import upickle.{default => up} @@ -32,7 +33,7 @@ case class Server( rpcport: Int, rpcPassword: String, wsConfigOpt: Option[WsServerConfig], - wsSource: Source[Message, NotUsed])(implicit system: ActorSystem) + wsSource: Source[WsNotification[_], NotUsed])(implicit system: ActorSystem) extends HttpLogger { import system.dispatcher @@ -208,9 +209,19 @@ case class Server( } } + private val notificationToMsgFn: WsNotification[_] => Message = { + notification => + val msg = TextMessage.Strict(notification.json.toString()) + msg + } + private val notificationToMsgFlow = Flow.fromFunction(notificationToMsgFn) + + private val msgSource: Source[Message, NotUsed] = { + wsSource.viaMat(notificationToMsgFlow)(Keep.right) + } + private def wsHandler: Flow[Message, Message, Any] = { - //we don't allow input, so use Sink.ignore - Flow.fromSinkAndSource(Sink.ignore, wsSource) + Flow.fromSinkAndSource(Sink.ignore, msgSource) } } diff --git a/app/server/src/main/scala/org/bitcoins/server/BitcoinSServerMain.scala b/app/server/src/main/scala/org/bitcoins/server/BitcoinSServerMain.scala index bec26bcc70..1c31fd9c81 100644 --- a/app/server/src/main/scala/org/bitcoins/server/BitcoinSServerMain.scala +++ b/app/server/src/main/scala/org/bitcoins/server/BitcoinSServerMain.scala @@ -1,7 +1,6 @@ package org.bitcoins.server import akka.actor.ActorSystem -import akka.http.scaladsl.model.ws.Message import akka.stream.OverflowStrategy import akka.stream.scaladsl.{ BroadcastHub, @@ -17,6 +16,7 @@ import org.bitcoins.chain.ChainCallbacks import org.bitcoins.chain.blockchain.ChainHandler import org.bitcoins.chain.config.ChainAppConfig import org.bitcoins.commons.jsonmodels.bitcoind.GetBlockChainInfoResult +import org.bitcoins.commons.jsonmodels.ws.WsNotification import org.bitcoins.commons.util.{DatadirParser, ServerArgParser} import org.bitcoins.core.api.chain.ChainApi import org.bitcoins.core.api.node.{ @@ -162,8 +162,8 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit val tuple = buildWsSource - val wsQueue: SourceQueueWithComplete[Message] = tuple._1 - val wsSource: Source[Message, NotUsed] = tuple._2 + val wsQueue: SourceQueueWithComplete[WsNotification[_]] = tuple._1 + val wsSource: Source[WsNotification[_], NotUsed] = tuple._2 val _ = buildNeutrinoCallbacks(wsQueue, chainApi) val torCallbacks = WebsocketUtil.buildTorCallbacks(wsQueue) torConf.addCallbacks(torCallbacks) @@ -224,7 +224,7 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit } private def buildNeutrinoCallbacks( - wsQueue: SourceQueueWithComplete[Message], + wsQueue: SourceQueueWithComplete[WsNotification[_]], chainApi: ChainApi): Unit = { val chainCallbacks = WebsocketUtil.buildChainCallbacks(wsQueue, chainApi) chainConf.addCallbacks(chainCallbacks) @@ -274,8 +274,8 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit _ <- client.start() } yield client val tuple = buildWsSource - val wsQueue: SourceQueueWithComplete[Message] = tuple._1 - val wsSource: Source[Message, NotUsed] = tuple._2 + val wsQueue: SourceQueueWithComplete[WsNotification[_]] = tuple._1 + val wsSource: Source[WsNotification[_], NotUsed] = tuple._2 val torCallbacks = WebsocketUtil.buildTorCallbacks(wsQueue) val _ = torConf.addCallbacks(torCallbacks) val isTorStartedF = if (torConf.torProvided) { @@ -371,7 +371,7 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit dlcNodeF: Future[DLCNode], torConfStarted: Future[Unit], serverCmdLineArgs: ServerArgParser, - wsSource: Source[Message, NotUsed])(implicit + wsSource: Source[WsNotification[_], NotUsed])(implicit system: ActorSystem, conf: BitcoinSAppConfig): Future[Server] = { implicit val nodeConf: NodeAppConfig = conf.nodeConf @@ -502,8 +502,8 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit * to create a flow that emits websocket messages */ private def buildWsSource: ( - SourceQueueWithComplete[Message], - Source[Message, NotUsed]) = { + SourceQueueWithComplete[WsNotification[_]], + Source[WsNotification[_], NotUsed]) = { val maxBufferSize: Int = 25 /** This will queue [[maxBufferSize]] elements in the queue. Once the buffer size is reached, @@ -514,7 +514,7 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit //the BroadcastHub.sink is needed to avoid these errors // 'Websocket handler failed with Processor actor' Source - .queue[Message](maxBufferSize, OverflowStrategy.dropHead) + .queue[WsNotification[_]](maxBufferSize, OverflowStrategy.dropHead) .toMat(BroadcastHub.sink)(Keep.both) .run() } diff --git a/app/server/src/main/scala/org/bitcoins/server/util/WebsocketUtil.scala b/app/server/src/main/scala/org/bitcoins/server/util/WebsocketUtil.scala index 1c6a3f579e..2b048f1fce 100644 --- a/app/server/src/main/scala/org/bitcoins/server/util/WebsocketUtil.scala +++ b/app/server/src/main/scala/org/bitcoins/server/util/WebsocketUtil.scala @@ -1,6 +1,5 @@ package org.bitcoins.server.util -import akka.http.scaladsl.model.ws.{Message, TextMessage} import akka.stream.scaladsl.SourceQueueWithComplete import grizzled.slf4j.Logging import org.bitcoins.chain.{ @@ -12,9 +11,10 @@ import org.bitcoins.commons.jsonmodels.ws.TorNotification.TorStartedNotification import org.bitcoins.commons.jsonmodels.ws.{ ChainNotification, WalletNotification, - WalletWsType + WalletWsType, + WsNotification } -import org.bitcoins.commons.serializers.WsPicklers + import org.bitcoins.core.api.chain.ChainApi import org.bitcoins.core.api.dlc.wallet.db.IncomingDLCOfferDb import org.bitcoins.core.protocol.blockchain.BlockHeader @@ -36,7 +36,7 @@ import scala.concurrent.{ExecutionContext, Future} object WebsocketUtil extends Logging { def buildChainCallbacks( - queue: SourceQueueWithComplete[Message], + queue: SourceQueueWithComplete[WsNotification[_]], chainApi: ChainApi)(implicit ec: ExecutionContext): ChainCallbacks = { val onBlockProcessed: OnBlockHeaderConnected = { case headersWithHeight: Vector[(Int, BlockHeader)] => @@ -49,13 +49,7 @@ object WebsocketUtil extends Logging { notifications = results.map(result => ChainNotification.BlockProcessedNotification(result)) - notificationsJson = notifications.map { notification => - upickle.default.writeJs(notification)( - WsPicklers.blockProcessedPickler) - } - - msgs = notificationsJson.map(n => TextMessage.Strict(n.toString())) - _ <- FutureUtil.sequentially(msgs) { case msg => + _ <- FutureUtil.sequentially(notifications) { case msg => val x: Future[Unit] = queue .offer(msg) .map(_ => ()) @@ -69,11 +63,8 @@ object WebsocketUtil extends Logging { val onSyncFlagChanged: OnSyncFlagChanged = { syncing => val notification = ChainNotification.SyncFlagChangedNotification(syncing) - val notificationJson = - upickle.default.writeJs(notification)(WsPicklers.syncFlagChangedPickler) - val msg = TextMessage.Strict(notificationJson.toString()) for { - _ <- queue.offer(msg) + _ <- queue.offer(notification) } yield () } @@ -83,14 +74,11 @@ object WebsocketUtil extends Logging { /** Builds websocket callbacks for the wallet */ def buildWalletCallbacks( - walletQueue: SourceQueueWithComplete[Message], + walletQueue: SourceQueueWithComplete[WsNotification[_]], walletName: String)(implicit ec: ExecutionContext): WalletCallbacks = { val onAddressCreated: OnNewAddressGenerated = { addr => val notification = WalletNotification.NewAddressNotification(addr) - val json = - upickle.default.writeJs(notification)(WsPicklers.newAddressPickler) - val msg = TextMessage.Strict(json.toString()) - val offerF = walletQueue.offer(msg) + val offerF = walletQueue.offer(notification) offerF.map(_ => ()) } @@ -109,19 +97,13 @@ object WebsocketUtil extends Logging { val onReservedUtxo: OnReservedUtxos = { utxos => val notification = WalletNotification.ReservedUtxosNotification(utxos) - val notificationJson = - upickle.default.writeJs(notification)(WsPicklers.reservedUtxosPickler) - val msg = TextMessage.Strict(notificationJson.toString()) - val offerF = walletQueue.offer(msg) + val offerF = walletQueue.offer(notification) offerF.map(_ => ()) } val onRescanComplete: OnRescanComplete = { _ => val notification = WalletNotification.RescanComplete(walletName) - val notificationJson = - upickle.default.writeJs(notification)(WsPicklers.rescanPickler) - val msg = TextMessage.Strict(notificationJson.toString()) - val offerF = walletQueue.offer(msg) + val offerF = walletQueue.offer(notification) offerF.map(_ => ()) } @@ -135,15 +117,11 @@ object WebsocketUtil extends Logging { ) } - def buildTorCallbacks(queue: SourceQueueWithComplete[Message])(implicit - ec: ExecutionContext): TorCallbacks = { + def buildTorCallbacks(queue: SourceQueueWithComplete[WsNotification[_]])( + implicit ec: ExecutionContext): TorCallbacks = { val onTorStarted: OnTorStarted = { _ => val notification = TorStartedNotification - val json = - upickle.default.writeJs(notification)(WsPicklers.torStartedPickler) - - val msg = TextMessage.Strict(json.toString()) - val offerF = queue.offer(msg) + val offerF = queue.offer(notification) offerF.map(_ => ()) } @@ -153,53 +131,42 @@ object WebsocketUtil extends Logging { private def buildTxNotification( wsType: WalletWsType, tx: Transaction, - walletQueue: SourceQueueWithComplete[Message])(implicit + walletQueue: SourceQueueWithComplete[WsNotification[_]])(implicit ec: ExecutionContext): Future[Unit] = { - val json = wsType match { + val notification = wsType match { case WalletWsType.TxProcessed => - val notification = WalletNotification.TxProcessedNotification(tx) - upickle.default.writeJs(notification)(WsPicklers.txProcessedPickler) + WalletNotification.TxProcessedNotification(tx) case WalletWsType.TxBroadcast => - val notification = WalletNotification.TxBroadcastNotification(tx) - upickle.default.writeJs(notification)(WsPicklers.txBroadcastPickler) + WalletNotification.TxBroadcastNotification(tx) case x @ (WalletWsType.NewAddress | WalletWsType.ReservedUtxos | WalletWsType.DLCStateChange | WalletWsType.DLCOfferAdd | WalletWsType.DLCOfferRemove | WalletWsType.RescanComplete) => sys.error(s"Cannot build tx notification for $x") } - val msg = TextMessage.Strict(json.toString()) - val offerF = walletQueue.offer(msg) + val offerF = walletQueue.offer(notification) offerF.map(_ => ()) } - def buildDLCWalletCallbacks(walletQueue: SourceQueueWithComplete[Message])( - implicit ec: ExecutionContext): DLCWalletCallbacks = { + def buildDLCWalletCallbacks( + walletQueue: SourceQueueWithComplete[WsNotification[_]])(implicit + ec: ExecutionContext): DLCWalletCallbacks = { val onStateChange: OnDLCStateChange = { status: DLCStatus => val notification = WalletNotification.DLCStateChangeNotification(status) - val json = - upickle.default.writeJs(notification)(WsPicklers.dlcStateChangePickler) - val msg = TextMessage.Strict(json.toString()) - val offerF = walletQueue.offer(msg) + val offerF = walletQueue.offer(notification) offerF.map(_ => ()) } val onOfferAdd: OnDLCOfferAdd = { offerDb: IncomingDLCOfferDb => val notification = WalletNotification.DLCOfferAddNotification(offerDb) - val json = - upickle.default.writeJs(notification)(WsPicklers.dlcOfferAddPickler) - val msg = TextMessage.Strict(json.toString()) - val offerF = walletQueue.offer(msg) + val offerF = walletQueue.offer(notification) offerF.map(_ => ()) } val onOfferRemove: OnDLCOfferRemove = { offerHash: Sha256Digest => val notification = WalletNotification.DLCOfferRemoveNotification(offerHash) - val json = - upickle.default.writeJs(notification)(WsPicklers.dlcOfferRemovePickler) - val msg = TextMessage.Strict(json.toString()) - val offerF = walletQueue.offer(msg) + val offerF = walletQueue.offer(notification) offerF.map(_ => ()) }