diff --git a/app-commons/src/main/scala/org/bitcoins/commons/jsonmodels/ws/WsModels.scala b/app-commons/src/main/scala/org/bitcoins/commons/jsonmodels/ws/WsModels.scala index bed53b96fa..abafbfa1fa 100644 --- a/app-commons/src/main/scala/org/bitcoins/commons/jsonmodels/ws/WsModels.scala +++ b/app-commons/src/main/scala/org/bitcoins/commons/jsonmodels/ws/WsModels.scala @@ -13,22 +13,27 @@ sealed trait WsType object WsType extends StringFactory[WsType] { override def fromString(string: String): WsType = { - WalletWsType.fromString(string) + ChainWsType.fromStringOpt(string) match { + case Some(t) => t + case None => + WalletWsType.fromString(string) + } } } sealed trait WalletWsType extends WsType +sealed trait ChainWsType extends WsType object WalletWsType extends StringFactory[WalletWsType] { case object TxProcessed extends WalletWsType case object TxBroadcast extends WalletWsType case object ReservedUtxos extends WalletWsType case object NewAddress extends WalletWsType - case object BlockProcessed extends WalletWsType + case object DLCStateChange extends WalletWsType private val all = - Vector(TxProcessed, TxBroadcast, ReservedUtxos, NewAddress, BlockProcessed) + Vector(TxProcessed, TxBroadcast, ReservedUtxos, NewAddress) override def fromStringOpt(string: String): Option[WalletWsType] = { all.find(_.toString.toLowerCase() == string.toLowerCase) @@ -40,6 +45,21 @@ object WalletWsType extends StringFactory[WalletWsType] { } } +object ChainWsType extends StringFactory[ChainWsType] { + case object BlockProcessed extends ChainWsType + + private val all: Vector[ChainWsType] = Vector(BlockProcessed) + + override def fromStringOpt(string: String): Option[ChainWsType] = { + all.find(_.toString.toLowerCase() == string.toLowerCase) + } + + override def fromString(string: String): ChainWsType = { + fromStringOpt(string) + .getOrElse(sys.error(s"Cannot find chain ws type for string=$string")) + } +} + /** A notification that we send over the websocket. * The type of the notification is indicated by [[WsType]]. * An example is [[org.bitcoins.commons.jsonmodels.ws.WalletNotification.NewAddressNotification]] @@ -50,6 +70,10 @@ sealed trait WsNotification[T] { def payload: T } +sealed trait ChainNotification[T] extends WsNotification[T] { + override def `type`: ChainWsType +} + sealed trait WalletNotification[T] extends WsNotification[T] { override def `type`: WalletWsType } @@ -76,13 +100,16 @@ object WalletNotification { override val `type`: WalletWsType = WalletWsType.ReservedUtxos } - case class BlockProcessedNotification(payload: GetBlockHeaderResult) - extends WalletNotification[GetBlockHeaderResult] { - override val `type`: WalletWsType = WalletWsType.BlockProcessed - } - case class DLCStateChangeNotification(payload: DLCStatus) extends WalletNotification[DLCStatus] { override val `type`: WalletWsType = WalletWsType.DLCStateChange } } + +object ChainNotification { + + case class BlockProcessedNotification(payload: GetBlockHeaderResult) + extends ChainNotification[GetBlockHeaderResult] { + override val `type`: ChainWsType = ChainWsType.BlockProcessed + } +} diff --git a/app-commons/src/main/scala/org/bitcoins/commons/serializers/WsPicklers.scala b/app-commons/src/main/scala/org/bitcoins/commons/serializers/WsPicklers.scala index 90f2eab58d..98170686fe 100644 --- a/app-commons/src/main/scala/org/bitcoins/commons/serializers/WsPicklers.scala +++ b/app-commons/src/main/scala/org/bitcoins/commons/serializers/WsPicklers.scala @@ -1,24 +1,59 @@ package org.bitcoins.commons.serializers +import org.bitcoins.commons.jsonmodels.ws.ChainNotification.BlockProcessedNotification import org.bitcoins.commons.jsonmodels.ws.WalletNotification.{ - BlockProcessedNotification, DLCStateChangeNotification, NewAddressNotification, ReservedUtxosNotification, TxBroadcastNotification, TxProcessedNotification } -import org.bitcoins.commons.jsonmodels.ws.{WalletNotification, WalletWsType} +import org.bitcoins.commons.jsonmodels.ws.{ + ChainNotification, + ChainWsType, + WalletNotification, + WalletWsType +} import org.bitcoins.core.serializers.PicklerKeys import upickle.default._ object WsPicklers { + implicit val chainWsTypePickler: ReadWriter[ChainWsType] = { + readwriter[ujson.Str] + .bimap(_.toString.toLowerCase, str => ChainWsType.fromString(str.str)) + } + implicit val walletWsTypePickler: ReadWriter[WalletWsType] = { readwriter[ujson.Str] .bimap(_.toString.toLowerCase, str => WalletWsType.fromString(str.str)) } + private def writeChainNotification( + notification: ChainNotification[_]): ujson.Obj = { + val payloadJson: ujson.Value = notification match { + case BlockProcessedNotification(block) => + upickle.default.writeJs(block)(Picklers.getBlockHeaderResultPickler) + } + val notificationObj = ujson.Obj( + PicklerKeys.typeKey -> writeJs(notification.`type`), + PicklerKeys.payloadKey -> payloadJson + ) + notificationObj + } + + private def readChainNotification(obj: ujson.Obj): ChainNotification[_] = { + val typeObj = read[ChainWsType](obj(PicklerKeys.typeKey)) + val payloadObj = obj(PicklerKeys.payloadKey) + + typeObj match { + case ChainWsType.BlockProcessed => + val block = + upickle.default.read(payloadObj)(Picklers.getBlockHeaderResultPickler) + BlockProcessedNotification(block) + } + } + private def writeWalletNotification( notification: WalletNotification[_]): ujson.Obj = { val payloadJson: ujson.Value = notification match { @@ -32,8 +67,6 @@ object WsPicklers { val vec = utxos.map(u => upickle.default.writeJs(u)(Picklers.spendingInfoDbPickler)) ujson.Arr.from(vec) - case BlockProcessedNotification(block) => - upickle.default.writeJs(block)(Picklers.getBlockHeaderResultPickler) case DLCStateChangeNotification(status) => upickle.default.writeJs(status)(Picklers.dlcStatusW) } @@ -64,10 +97,6 @@ object WsPicklers { upickle.default.read(utxoJson)(Picklers.spendingInfoDbPickler) } ReservedUtxosNotification(utxos) - case WalletWsType.BlockProcessed => - val block = - upickle.default.read(payloadObj)(Picklers.getBlockHeaderResultPickler) - BlockProcessedNotification(block) case WalletWsType.DLCStateChange => val status = upickle.default.read(payloadObj)(Picklers.dlcStatusR) DLCStateChangeNotification(status) @@ -102,10 +131,14 @@ object WsPicklers { readwriter[ujson.Obj].bimap(writeWalletNotification, readWalletNotification) } + implicit val chainNotificationPickler: ReadWriter[ChainNotification[_]] = { + readwriter[ujson.Obj].bimap(writeChainNotification, readChainNotification) + } + implicit val blockProcessedPickler: ReadWriter[BlockProcessedNotification] = { readwriter[ujson.Obj].bimap( - writeWalletNotification(_), - readWalletNotification(_).asInstanceOf[BlockProcessedNotification] + writeChainNotification(_), + readChainNotification(_).asInstanceOf[BlockProcessedNotification] ) } diff --git a/app/oracle-server/src/main/scala/org/bitcoins/oracle/server/OracleServerMain.scala b/app/oracle-server/src/main/scala/org/bitcoins/oracle/server/OracleServerMain.scala index c7829fd23d..d39555675b 100644 --- a/app/oracle-server/src/main/scala/org/bitcoins/oracle/server/OracleServerMain.scala +++ b/app/oracle-server/src/main/scala/org/bitcoins/oracle/server/OracleServerMain.scala @@ -1,6 +1,7 @@ package org.bitcoins.oracle.server import akka.actor.ActorSystem +import akka.stream.scaladsl.Source import org.bitcoins.commons.util.{DatadirParser, ServerArgParser} import org.bitcoins.dlc.oracle.DLCOracle import org.bitcoins.dlc.oracle.config.DLCOracleAppConfig @@ -33,13 +34,15 @@ class OracleServerMain(override val serverArgParser: ServerArgParser)(implicit handlers = routes, rpcbindOpt = bindConfOpt, rpcport = rpcport, - None) + None, + Source.empty) case None => Server(conf = conf, handlers = routes, rpcbindOpt = bindConfOpt, rpcport = conf.rpcPort, - None) + None, + Source.empty) } _ <- server.start() diff --git a/app/server-routes/src/main/scala/org/bitcoins/server/routes/Server.scala b/app/server-routes/src/main/scala/org/bitcoins/server/routes/Server.scala index 26375a01cf..8e761666ee 100644 --- a/app/server-routes/src/main/scala/org/bitcoins/server/routes/Server.scala +++ b/app/server-routes/src/main/scala/org/bitcoins/server/routes/Server.scala @@ -9,15 +9,7 @@ import akka.http.scaladsl.model.ws.Message import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.server._ import akka.http.scaladsl.server.directives.DebuggingDirectives -import akka.stream.OverflowStrategy -import akka.stream.scaladsl.{ - BroadcastHub, - Flow, - Keep, - Sink, - Source, - SourceQueueWithComplete -} +import akka.stream.scaladsl.{Flow, Sink, Source} import de.heikoseeberger.akkahttpupickle.UpickleSupport._ import org.bitcoins.commons.config.AppConfig import org.bitcoins.server.util.{ServerBindings, WsServerConfig} @@ -30,7 +22,8 @@ case class Server( handlers: Seq[ServerRoute], rpcbindOpt: Option[String], rpcport: Int, - wsConfigOpt: Option[WsServerConfig])(implicit system: ActorSystem) + wsConfigOpt: Option[WsServerConfig], + wsSource: Source[Message, NotUsed])(implicit system: ActorSystem) extends HttpLogger { import system.dispatcher @@ -123,24 +116,6 @@ case class Server( } } - private val maxBufferSize: Int = 25 - - /** This will queue [[maxBufferSize]] elements in the queue. Once the buffer size is reached, - * we will drop the first element in the buffer - */ - private val tuple = { - //from: https://github.com/akka/akka-http/issues/3039#issuecomment-610263181 - //the BroadcastHub.sink is needed to avoid these errors - // 'Websocket handler failed with Processor actor' - Source - .queue[Message](maxBufferSize, OverflowStrategy.dropHead) - .toMat(BroadcastHub.sink)(Keep.both) - .run() - } - - def walletQueue: SourceQueueWithComplete[Message] = tuple._1 - def source: Source[Message, NotUsed] = tuple._2 - private val eventsRoute = "events" private def wsRoutes: Route = { @@ -151,7 +126,7 @@ case class Server( private def wsHandler: Flow[Message, Message, Any] = { //we don't allow input, so use Sink.ignore - Flow.fromSinkAndSource(Sink.ignore, source) + Flow.fromSinkAndSource(Sink.ignore, wsSource) } } diff --git a/app/server-test/src/test/scala/org/bitcoins/server/WebsocketTests.scala b/app/server-test/src/test/scala/org/bitcoins/server/WebsocketTests.scala index 1da9e78c56..1fda9182a2 100644 --- a/app/server-test/src/test/scala/org/bitcoins/server/WebsocketTests.scala +++ b/app/server-test/src/test/scala/org/bitcoins/server/WebsocketTests.scala @@ -9,13 +9,18 @@ import akka.http.scaladsl.model.ws.{ } import akka.stream.scaladsl.{Flow, Keep, Sink, Source} import org.bitcoins.cli.{CliCommand, Config, ConsoleCli} -import org.bitcoins.commons.jsonmodels.ws.{WalletNotification, WalletWsType} +import org.bitcoins.commons.jsonmodels.ws.ChainNotification.BlockProcessedNotification import org.bitcoins.commons.jsonmodels.ws.WalletNotification.{ - BlockProcessedNotification, NewAddressNotification, TxBroadcastNotification, TxProcessedNotification } +import org.bitcoins.commons.jsonmodels.ws.{ + ChainNotification, + WalletNotification, + WalletWsType, + WsNotification +} import org.bitcoins.commons.serializers.{Picklers, WsPicklers} import org.bitcoins.core.currency.Bitcoins import org.bitcoins.core.protocol.BitcoinAddress @@ -30,23 +35,27 @@ import org.bitcoins.testkit.util.AkkaUtil import scala.concurrent.duration.DurationInt import scala.concurrent.{Future, Promise} +import scala.util.Try class WebsocketTests extends BitcoinSServerMainBitcoindFixture { behavior of "Websocket Tests" - val endSink: Sink[WalletNotification[_], Future[Seq[WalletNotification[_]]]] = - Sink.seq[WalletNotification[_]] + val endSink: Sink[WsNotification[_], Future[Seq[WsNotification[_]]]] = + Sink.seq[WsNotification[_]] - val sink: Sink[Message, Future[Seq[WalletNotification[_]]]] = Flow[Message] + val sink: Sink[Message, Future[Seq[WsNotification[_]]]] = Flow[Message] .map { case message: TextMessage.Strict => //we should be able to parse the address message val text = message.text - val notification: WalletNotification[_] = + val walletNotificationOpt: Option[WalletNotification[_]] = Try( upickle.default.read[WalletNotification[_]](text)( - WsPicklers.walletNotificationPickler) - notification + WsPicklers.walletNotificationPickler)).toOption + val chainNotificationOpt: Option[ChainNotification[_]] = Try( + upickle.default.read[ChainNotification[_]](text)( + WsPicklers.chainNotificationPickler)).toOption + walletNotificationOpt.getOrElse(chainNotificationOpt.get) case msg => fail(s"Unexpected msg type received in the sink, msg=$msg") } @@ -59,7 +68,7 @@ class WebsocketTests extends BitcoinSServerMainBitcoindFixture { val websocketFlow: Flow[ Message, Message, - (Future[Seq[WalletNotification[_]]], Promise[Option[Message]])] = { + (Future[Seq[WsNotification[_]]], Promise[Option[Message]])] = { Flow .fromSinkAndSourceCoupledMat(sink, Source.maybe[Message])(Keep.both) } @@ -72,12 +81,12 @@ class WebsocketTests extends BitcoinSServerMainBitcoindFixture { val req = buildReq(server.conf) val notificationsF: ( Future[WebSocketUpgradeResponse], - (Future[Seq[WalletNotification[_]]], Promise[Option[Message]])) = { + (Future[Seq[WsNotification[_]]], Promise[Option[Message]])) = { Http() .singleWebSocketRequest(req, websocketFlow) } - val walletNotificationsF: Future[Seq[WalletNotification[_]]] = + val walletNotificationsF: Future[Seq[WsNotification[_]]] = notificationsF._2._1 val promise: Promise[Option[Message]] = notificationsF._2._2 @@ -104,7 +113,7 @@ class WebsocketTests extends BitcoinSServerMainBitcoindFixture { val req = buildReq(server.conf) val tuple: ( Future[WebSocketUpgradeResponse], - (Future[Seq[WalletNotification[_]]], Promise[Option[Message]])) = { + (Future[Seq[WsNotification[_]]], Promise[Option[Message]])) = { Http() .singleWebSocketRequest(req, websocketFlow) } @@ -142,7 +151,7 @@ class WebsocketTests extends BitcoinSServerMainBitcoindFixture { val req = buildReq(server.conf) val tuple: ( Future[WebSocketUpgradeResponse], - (Future[Seq[WalletNotification[_]]], Promise[Option[Message]])) = { + (Future[Seq[WsNotification[_]]], Promise[Option[Message]])) = { Http() .singleWebSocketRequest(req, websocketFlow) } @@ -179,7 +188,7 @@ class WebsocketTests extends BitcoinSServerMainBitcoindFixture { val req = buildReq(server.conf) val tuple: ( Future[WebSocketUpgradeResponse], - (Future[Seq[WalletNotification[_]]], Promise[Option[Message]])) = { + (Future[Seq[WsNotification[_]]], Promise[Option[Message]])) = { Http() .singleWebSocketRequest(req, websocketFlow) } @@ -188,7 +197,8 @@ class WebsocketTests extends BitcoinSServerMainBitcoindFixture { val promise = tuple._2._2 val addressF = bitcoind.getNewAddress - + val timeout = + 15.seconds //any way we can remove this timeout and just check? for { address <- addressF hashes <- bitcoind.generateToAddress(1, address) @@ -196,16 +206,13 @@ class WebsocketTests extends BitcoinSServerMainBitcoindFixture { getBlockHeaderResultStr = ConsoleCli.exec(cmd, cliConfig) getBlockHeaderResult = upickle.default.read(getBlockHeaderResultStr.get)( Picklers.getBlockHeaderResultPickler) - block <- bitcoind.getBlockRaw(hashes.head) - wallet <- server.walletConf.createHDWallet(bitcoind, bitcoind, bitcoind) - _ <- wallet.processBlock(block) - _ <- AkkaUtil.nonBlockingSleep(500.millis) + _ <- AkkaUtil.nonBlockingSleep(timeout) _ = promise.success(None) notifications <- notificationsF } yield { - assert( - notifications.exists( - _ == BlockProcessedNotification(getBlockHeaderResult))) + val count = notifications.count( + _ == BlockProcessedNotification(getBlockHeaderResult)) + assert(count == 1, s"count=$count") } } @@ -217,12 +224,12 @@ class WebsocketTests extends BitcoinSServerMainBitcoindFixture { val req = buildReq(server.conf) val tuple: ( Future[WebSocketUpgradeResponse], - (Future[Seq[WalletNotification[_]]], Promise[Option[Message]])) = { + (Future[Seq[WsNotification[_]]], Promise[Option[Message]])) = { Http() .singleWebSocketRequest(req, websocketFlow) } - val notificationsF: Future[Seq[WalletNotification[_]]] = tuple._2._1 + val notificationsF: Future[Seq[WsNotification[_]]] = tuple._2._1 val promise = tuple._2._2 //lock all utxos diff --git a/app/server/src/main/scala/org/bitcoins/server/BitcoinSServerMain.scala b/app/server/src/main/scala/org/bitcoins/server/BitcoinSServerMain.scala index a921f829aa..24120850ba 100644 --- a/app/server/src/main/scala/org/bitcoins/server/BitcoinSServerMain.scala +++ b/app/server/src/main/scala/org/bitcoins/server/BitcoinSServerMain.scala @@ -1,7 +1,16 @@ package org.bitcoins.server +import akka.NotUsed import akka.actor.ActorSystem import akka.dispatch.Dispatchers +import akka.http.scaladsl.model.ws.Message +import akka.stream.OverflowStrategy +import akka.stream.scaladsl.{ + BroadcastHub, + Keep, + Source, + SourceQueueWithComplete +} import org.bitcoins.asyncutil.AsyncUtil import org.bitcoins.chain.blockchain.ChainHandler import org.bitcoins.chain.config.ChainAppConfig @@ -101,6 +110,7 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit } def startBitcoinSBackend(): Future[Unit] = { + logger.info(s"startBitcoinSBackend()") val start = System.currentTimeMillis() if (nodeConf.peers.isEmpty) { throw new IllegalArgumentException( @@ -156,6 +166,10 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit node = dlcNodeConf.createDLCNode(wallet) } yield node + val tuple = buildWsSource + + val wsQueue: SourceQueueWithComplete[Message] = tuple._1 + val wsSource: Source[Message, NotUsed] = tuple._2 //start our http server now that we are synced for { node <- configuredNodeF @@ -174,16 +188,17 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit dlcNode <- dlcNodeF _ <- dlcNode.start() - server <- startHttpServer(nodeApi = node, - chainApi = chainApi, - wallet = wallet, - dlcNode = dlcNode, - serverCmdLineArgs = serverArgParser) - walletCallbacks = WebsocketUtil.buildWalletCallbacks(server.walletQueue, - chainApi) + _ <- startHttpServer(nodeApi = node, + chainApi = chainApi, + wallet = wallet, + dlcNode = dlcNode, + serverCmdLineArgs = serverArgParser, + wsSource = wsSource) + chainCallbacks = WebsocketUtil.buildChainCallbacks(wsQueue, chainApi) + _ = chainConf.addCallbacks(chainCallbacks) + walletCallbacks = WebsocketUtil.buildWalletCallbacks(wsQueue) _ = walletConf.addCallbacks(walletCallbacks) - dlcWalletCallbacks = WebsocketUtil.buildDLCWalletCallbacks( - server.walletQueue) + dlcWalletCallbacks = WebsocketUtil.buildDLCWalletCallbacks(wsQueue) _ = dlcConf.addCallbacks(dlcWalletCallbacks) _ = { logger.info( @@ -218,7 +233,7 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit } def startBitcoindBackend(): Future[Unit] = { - + logger.info(s"startBitcoindBackend()") val bitcoindF = for { client <- bitcoindRpcConf.clientF _ <- client.start() @@ -231,6 +246,10 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit feeRateApi = feeProvider) } + val tuple = buildWsSource + val wsQueue: SourceQueueWithComplete[Message] = tuple._1 + val wsSource: Source[Message, NotUsed] = tuple._2 + for { _ <- bitcoindRpcConf.start() bitcoind <- bitcoindRpcConf.clientF @@ -242,10 +261,12 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit s"bitcoind ($bitcoindNetwork) on different network than wallet (${walletConf.network})") _ = logger.info("Creating wallet") + chainCallbacks = WebsocketUtil.buildChainCallbacks(wsQueue, bitcoind) tmpWallet <- tmpWalletF wallet = BitcoindRpcBackendUtil.createDLCWalletWithBitcoindCallbacks( bitcoind, - tmpWallet) + tmpWallet, + Some(chainCallbacks)) _ = logger.info("Starting wallet") _ <- wallet.start().recoverWith { //https://github.com/bitcoin-s/bitcoin-s/issues/2917 @@ -255,22 +276,22 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit handleMissingSpendingInfoDb(err, wallet) } + dlcNode = dlcNodeConf.createDLCNode(wallet) + _ <- dlcNode.start() + _ <- startHttpServer(nodeApi = bitcoind, + chainApi = bitcoind, + wallet = wallet, + dlcNode = dlcNode, + serverCmdLineArgs = serverArgParser, + wsSource = wsSource) + walletCallbacks = WebsocketUtil.buildWalletCallbacks(wsQueue) + _ = walletConf.addCallbacks(walletCallbacks) + //intentionally doesn't map on this otherwise we //wait until we are done syncing the entire wallet //which could take 1 hour _ = syncWalletWithBitcoindAndStartPolling(bitcoind, wallet) - dlcNode = dlcNodeConf.createDLCNode(wallet) - _ <- dlcNode.start() - server <- startHttpServer(nodeApi = bitcoind, - chainApi = bitcoind, - wallet = wallet, - dlcNode = dlcNode, - serverCmdLineArgs = serverArgParser) - walletCallbacks = WebsocketUtil.buildWalletCallbacks(server.walletQueue, - bitcoind) - _ = walletConf.addCallbacks(walletCallbacks) - dlcWalletCallbacks = WebsocketUtil.buildDLCWalletCallbacks( - server.walletQueue) + dlcWalletCallbacks = WebsocketUtil.buildDLCWalletCallbacks(wsQueue) _ = dlcConf.addCallbacks(dlcWalletCallbacks) } yield { logger.info(s"Done starting Main!") @@ -365,7 +386,8 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit chainApi: ChainApi, wallet: DLCWallet, dlcNode: DLCNode, - serverCmdLineArgs: ServerArgParser)(implicit + serverCmdLineArgs: ServerArgParser, + wsSource: Source[Message, NotUsed])(implicit system: ActorSystem, conf: BitcoinSAppConfig): Future[Server] = { implicit val nodeConf: NodeAppConfig = conf.nodeConf @@ -411,13 +433,15 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit handlers = handlers, rpcbindOpt = rpcBindConfOpt, rpcport = rpcport, - wsConfigOpt = Some(wsServerConfig)) + wsConfigOpt = Some(wsServerConfig), + wsSource) case None => Server(conf = nodeConf, handlers = handlers, rpcbindOpt = rpcBindConfOpt, rpcport = conf.rpcPort, - wsConfigOpt = Some(wsServerConfig)) + wsConfigOpt = Some(wsServerConfig), + wsSource) } } val bindingF = server.start() @@ -522,6 +546,31 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit logger.error(s"Error syncing bitcoin-s wallet with bitcoind", err)) f } + + /** Builds a websocket queue that you can feed elements to. + * The Source can be wired up with Directives.handleWebSocketMessages + * to create a flow that emits websocket messages + */ + private def buildWsSource: ( + SourceQueueWithComplete[Message], + Source[Message, NotUsed]) = { + val maxBufferSize: Int = 25 + + /** This will queue [[maxBufferSize]] elements in the queue. Once the buffer size is reached, + * we will drop the first element in the buffer + */ + val tuple = { + //from: https://github.com/akka/akka-http/issues/3039#issuecomment-610263181 + //the BroadcastHub.sink is needed to avoid these errors + // 'Websocket handler failed with Processor actor' + Source + .queue[Message](maxBufferSize, OverflowStrategy.dropHead) + .toMat(BroadcastHub.sink)(Keep.both) + .run() + } + + tuple + } } object BitcoinSServerMain extends BitcoinSAppScalaDaemon { diff --git a/app/server/src/main/scala/org/bitcoins/server/BitcoindRpcBackendUtil.scala b/app/server/src/main/scala/org/bitcoins/server/BitcoindRpcBackendUtil.scala index cfefbc3501..713d1634c3 100644 --- a/app/server/src/main/scala/org/bitcoins/server/BitcoindRpcBackendUtil.scala +++ b/app/server/src/main/scala/org/bitcoins/server/BitcoindRpcBackendUtil.scala @@ -4,6 +4,8 @@ import akka.Done import akka.actor.{ActorSystem, Cancellable} import akka.stream.scaladsl.{Keep, Sink, Source} import grizzled.slf4j.Logging +import org.bitcoins.chain.{ChainCallbacks} + import org.bitcoins.core.api.node.NodeApi import org.bitcoins.core.api.wallet.WalletApi import org.bitcoins.core.gcs.FilterType @@ -32,6 +34,8 @@ object BitcoindRpcBackendUtil extends Logging { for { bitcoindHeight <- bitcoind.getBlockCount walletStateOpt <- wallet.getSyncDescriptorOpt() + _ = logger.info( + s"bitcoindHeight=$bitcoindHeight walletStateOpt=$walletStateOpt") _ <- walletStateOpt match { case None => for { @@ -40,9 +44,10 @@ object BitcoindRpcBackendUtil extends Logging { _ <- lastConfirmedOpt match { case None => for { - header <- bitcoind.getBestBlockHeader() - _ <- wallet.stateDescriptorDAO.updateSyncHeight(header.hashBE, - header.height) + _ <- doSync(walletHeight = bitcoindHeight - 1, + bitcoindHeight = bitcoindHeight, + bitcoind = bitcoind, + wallet = wallet) } yield () case Some(txDb) => for { @@ -100,9 +105,9 @@ object BitcoindRpcBackendUtil extends Logging { hashes <- hashFs.map(_.toVector) hasFilters <- hasFiltersF _ <- { - if (hasFilters) + if (hasFilters) { filterSync(hashes, bitcoind.asInstanceOf[V19BlockFilterRpc], wallet) - else wallet.nodeApi.downloadBlocks(hashes) + } else wallet.nodeApi.downloadBlocks(hashes) } } yield wallet } @@ -110,7 +115,9 @@ object BitcoindRpcBackendUtil extends Logging { def createWalletWithBitcoindCallbacks( bitcoind: BitcoindRpcClient, - wallet: Wallet)(implicit system: ActorSystem): Wallet = { + wallet: Wallet, + chainCallbacksOpt: Option[ChainCallbacks])(implicit + system: ActorSystem): Wallet = { // We need to create a promise so we can inject the wallet with the callback // after we have created it into SyncUtil.getNodeApiWalletCallback // so we don't lose the internal state of the wallet @@ -118,8 +125,9 @@ object BitcoindRpcBackendUtil extends Logging { val pairedWallet = Wallet( nodeApi = - BitcoindRpcBackendUtil.getNodeApiWalletCallback(bitcoind, - walletCallbackP.future), + BitcoindRpcBackendUtil.buildBitcoindNodeApi(bitcoind, + walletCallbackP.future, + chainCallbacksOpt), chainQueryApi = bitcoind, feeRateApi = wallet.feeRateApi )(wallet.walletConfig, wallet.ec) @@ -169,7 +177,9 @@ object BitcoindRpcBackendUtil extends Logging { def createDLCWalletWithBitcoindCallbacks( bitcoind: BitcoindRpcClient, - wallet: DLCWallet)(implicit system: ActorSystem): DLCWallet = { + wallet: DLCWallet, + chainCallbacksOpt: Option[ChainCallbacks])(implicit + system: ActorSystem): DLCWallet = { // We need to create a promise so we can inject the wallet with the callback // after we have created it into SyncUtil.getNodeApiWalletCallback // so we don't lose the internal state of the wallet @@ -177,8 +187,9 @@ object BitcoindRpcBackendUtil extends Logging { val pairedWallet = DLCWallet( nodeApi = - BitcoindRpcBackendUtil.getNodeApiWalletCallback(bitcoind, - walletCallbackP.future), + BitcoindRpcBackendUtil.buildBitcoindNodeApi(bitcoind, + walletCallbackP.future, + chainCallbacksOpt), chainQueryApi = bitcoind, feeRateApi = wallet.feeRateApi )(wallet.walletConfig, wallet.dlcConfig, wallet.ec) @@ -216,9 +227,14 @@ object BitcoindRpcBackendUtil extends Logging { } } - private def getNodeApiWalletCallback( + /** Creates an anonymous [[NodeApi]] that downloads blocks using + * akka streams from bitcoind, and then calls [[Wallet.processBlock]] + */ + private def buildBitcoindNodeApi( bitcoindRpcClient: BitcoindRpcClient, - walletF: Future[Wallet])(implicit system: ActorSystem): NodeApi = { + walletF: Future[Wallet], + chainCallbacksOpt: Option[ChainCallbacks])(implicit + system: ActorSystem): NodeApi = { import system.dispatcher new NodeApi { @@ -230,10 +246,29 @@ object BitcoindRpcBackendUtil extends Logging { .flatMap { wallet => val runStream: Future[Done] = Source(blockHashes) .mapAsync(parallelism = numParallelism) { hash => - bitcoindRpcClient.getBlockRaw(hash) + val blockF = bitcoindRpcClient.getBlockRaw(hash) + val blockHeaderResultF = bitcoindRpcClient.getBlockHeader(hash) + for { + block <- blockF + blockHeaderResult <- blockHeaderResultF + } yield (block, blockHeaderResult) } - .foldAsync(wallet) { case (wallet, block) => - wallet.processBlock(block) + .foldAsync(wallet) { case (wallet, (block, blockHeaderResult)) => + val blockProcessedF = wallet.processBlock(block) + val executeCallbackF: Future[Wallet] = blockProcessedF.flatMap { + wallet => + chainCallbacksOpt match { + case None => Future.successful(wallet) + case Some(callback) => + val f = callback + .executeOnBlockHeaderConnectedCallbacks( + logger, + blockHeaderResult.height, + blockHeaderResult.blockHeader) + f.map(_ => wallet) + } + } + executeCallbackF } .run() runStream.map(_ => wallet) @@ -264,16 +299,21 @@ object BitcoindRpcBackendUtil extends Logging { interval: FiniteDuration = 10.seconds)(implicit system: ActorSystem, ec: ExecutionContext): Future[Cancellable] = { - bitcoind.getBlockCount.map { startCount => + val walletSyncStateF = wallet.getSyncState() + val resultF: Future[Cancellable] = for { + walletSyncState <- walletSyncStateF + } yield { val numParallelism = Runtime.getRuntime.availableProcessors() - val atomicPrevCount: AtomicInteger = new AtomicInteger(startCount) + val atomicPrevCount: AtomicInteger = new AtomicInteger( + walletSyncState.height) system.scheduler.scheduleWithFixedDelay(0.seconds, interval) { () => { - logger.debug("Polling bitcoind for block count") + logger.info("Polling bitcoind for block count") bitcoind.getBlockCount.flatMap { count => val prevCount = atomicPrevCount.get() if (prevCount < count) { - logger.debug("Bitcoind has new block(s), requesting...") + logger.info( + s"Bitcoind has new block(s), requesting... ${count - prevCount} blocks") // use .tail so we don't process the previous block that we already did val range = prevCount.to(count).tail @@ -306,11 +346,16 @@ object BitcoindRpcBackendUtil extends Logging { } else if (prevCount > count) { Future.failed(new RuntimeException( s"Bitcoind is at a block height ($count) before the wallet's ($prevCount)")) - } else Future.unit + } else { + logger.info(s"In sync $prevCount count=$count") + Future.unit + } } () } } } + + resultF } } diff --git a/app/server/src/main/scala/org/bitcoins/server/util/WebsocketUtil.scala b/app/server/src/main/scala/org/bitcoins/server/util/WebsocketUtil.scala index c08dcea8bc..44be4facd4 100644 --- a/app/server/src/main/scala/org/bitcoins/server/util/WebsocketUtil.scala +++ b/app/server/src/main/scala/org/bitcoins/server/util/WebsocketUtil.scala @@ -2,14 +2,19 @@ package org.bitcoins.server.util import akka.http.scaladsl.model.ws.{Message, TextMessage} import akka.stream.scaladsl.SourceQueueWithComplete -import org.bitcoins.commons.jsonmodels.ws.{WalletNotification, WalletWsType} +import grizzled.slf4j.Logging +import org.bitcoins.chain.{ChainCallbacks, OnBlockHeaderConnected} +import org.bitcoins.commons.jsonmodels.ws.{ + ChainNotification, + WalletNotification, + WalletWsType +} import org.bitcoins.commons.serializers.WsPicklers import org.bitcoins.core.api.chain.ChainApi import org.bitcoins.core.protocol.dlc.models.DLCStatus import org.bitcoins.core.protocol.transaction.Transaction import org.bitcoins.dlc.wallet.{DLCWalletCallbacks, OnDLCStateChange} import org.bitcoins.wallet.{ - OnBlockProcessed, OnNewAddressGenerated, OnReservedUtxos, OnTransactionBroadcast, @@ -19,12 +24,35 @@ import org.bitcoins.wallet.{ import scala.concurrent.{ExecutionContext, Future} -object WebsocketUtil { +object WebsocketUtil extends Logging { + + def buildChainCallbacks( + queue: SourceQueueWithComplete[Message], + chainApi: ChainApi)(implicit ec: ExecutionContext): ChainCallbacks = { + val onBlockProcessed: OnBlockHeaderConnected = { case (_, header) => + val resultF = + ChainUtil.getBlockHeaderResult(header.hashBE, chainApi) + val f = for { + result <- resultF + notification = + ChainNotification.BlockProcessedNotification(result) + notificationJson = + upickle.default.writeJs(notification)( + WsPicklers.blockProcessedPickler) + msg = TextMessage.Strict(notificationJson.toString()) + _ <- queue.offer(msg) + } yield { + () + } + f + } + + ChainCallbacks.onBlockHeaderConnected(onBlockProcessed) + } /** Builds websocket callbacks for the wallet */ - def buildWalletCallbacks( - walletQueue: SourceQueueWithComplete[Message], - chainApi: ChainApi)(implicit ec: ExecutionContext): WalletCallbacks = { + def buildWalletCallbacks(walletQueue: SourceQueueWithComplete[Message])( + implicit ec: ExecutionContext): WalletCallbacks = { val onAddressCreated: OnNewAddressGenerated = { addr => val notification = WalletNotification.NewAddressNotification(addr) val json = @@ -56,31 +84,11 @@ object WebsocketUtil { offerF.map(_ => ()) } - val onBlockProcessed: OnBlockProcessed = { block => - val resultF = - ChainUtil.getBlockHeaderResult(block.blockHeader.hashBE, chainApi) - val f = for { - result <- resultF - notification = - WalletNotification.BlockProcessedNotification(result) - notificationJson = - upickle.default.writeJs(notification)( - WsPicklers.blockProcessedPickler) - msg = TextMessage.Strict(notificationJson.toString()) - _ <- walletQueue.offer(msg) - } yield { - () - } - - f - } - WalletCallbacks( onTransactionProcessed = Vector(onTxProcessed), onNewAddressGenerated = Vector(onAddressCreated), onReservedUtxos = Vector(onReservedUtxo), - onTransactionBroadcast = Vector(onTxBroadcast), - onBlockProcessed = Vector(onBlockProcessed) + onTransactionBroadcast = Vector(onTxBroadcast) ) } @@ -97,7 +105,7 @@ object WebsocketUtil { val notification = WalletNotification.TxBroadcastNotification(tx) upickle.default.writeJs(notification)(WsPicklers.txBroadcastPickler) case x @ (WalletWsType.NewAddress | WalletWsType.ReservedUtxos | - WalletWsType.BlockProcessed | WalletWsType.DLCStateChange) => + WalletWsType.DLCStateChange) => sys.error(s"Cannot build tx notification for $x") } diff --git a/testkit/src/main/scala/org/bitcoins/testkit/wallet/FundWalletUtil.scala b/testkit/src/main/scala/org/bitcoins/testkit/wallet/FundWalletUtil.scala index 21c2dd8f81..7aceb301a4 100644 --- a/testkit/src/main/scala/org/bitcoins/testkit/wallet/FundWalletUtil.scala +++ b/testkit/src/main/scala/org/bitcoins/testkit/wallet/FundWalletUtil.scala @@ -235,7 +235,8 @@ object FundWalletUtil extends FundWalletUtil { wallet = BitcoindRpcBackendUtil.createDLCWalletWithBitcoindCallbacks( bitcoind, - tmp) + tmp, + None)(system) funded1 <- fundAccountForWalletWithBitcoind( BitcoinSWalletTest.defaultAcctAmts, diff --git a/wallet-test/src/test/scala/org/bitcoins/wallet/BitcoindBackendTest.scala b/wallet-test/src/test/scala/org/bitcoins/wallet/BitcoindBackendTest.scala index 5017908481..65e251880e 100644 --- a/wallet-test/src/test/scala/org/bitcoins/wallet/BitcoindBackendTest.scala +++ b/wallet-test/src/test/scala/org/bitcoins/wallet/BitcoindBackendTest.scala @@ -206,7 +206,8 @@ class BitcoindBackendTest extends WalletAppConfigWithBitcoindNewestFixtures { bip39PasswordOpt = walletAppConfig.bip39PasswordOpt) } yield { BitcoindRpcBackendUtil.createWalletWithBitcoindCallbacks(bitcoind, - tmpWallet) + tmpWallet, + None) } } } diff --git a/wallet-test/src/test/scala/org/bitcoins/wallet/BitcoindBlockPollingTest.scala b/wallet-test/src/test/scala/org/bitcoins/wallet/BitcoindBlockPollingTest.scala index 8ec1c4c6a1..5bca18588a 100644 --- a/wallet-test/src/test/scala/org/bitcoins/wallet/BitcoindBlockPollingTest.scala +++ b/wallet-test/src/test/scala/org/bitcoins/wallet/BitcoindBlockPollingTest.scala @@ -27,7 +27,8 @@ class BitcoindBlockPollingTest BitcoinSWalletTest.createDefaultWallet(bitcoind, bitcoind, None) wallet = BitcoindRpcBackendUtil.createWalletWithBitcoindCallbacks(bitcoind, - tmpWallet) + tmpWallet, + None) // Assert wallet is empty isEmpty <- wallet.isEmpty() _ = assert(isEmpty) diff --git a/wallet-test/src/test/scala/org/bitcoins/wallet/BitcoindZMQBackendTest.scala b/wallet-test/src/test/scala/org/bitcoins/wallet/BitcoindZMQBackendTest.scala index 0da45bd543..92f3ad2992 100644 --- a/wallet-test/src/test/scala/org/bitcoins/wallet/BitcoindZMQBackendTest.scala +++ b/wallet-test/src/test/scala/org/bitcoins/wallet/BitcoindZMQBackendTest.scala @@ -46,8 +46,10 @@ class BitcoindZMQBackendTest extends WalletAppConfigWithBitcoindNewestFixtures { tmpWallet <- BitcoinSWalletTest.createDefaultWallet(bitcoind, bitcoind, None) wallet = - BitcoindRpcBackendUtil.createWalletWithBitcoindCallbacks(bitcoind, - tmpWallet) + BitcoindRpcBackendUtil.createWalletWithBitcoindCallbacks( + bitcoind = bitcoind, + wallet = tmpWallet, + chainCallbacksOpt = None) // Assert wallet is empty isEmpty <- wallet.isEmpty() _ = assert(isEmpty)