2021 12 28 blockprocessed callback (#3946)

* WIP

* Get appServerTest passing with chain notifications

* Reorder where SourceQueue gets initialized so that we can use it to construct the wallet's callbacks

* DRY

* Fix missing callback for neutrino chaincallbacks

* Bump timeout

* Bump timeout

* When wallet is empty call doSync() with the last blockheader bitcoind has seen

* Revert skipping compact filters on IBD for bitcoind

* Revert logging
This commit is contained in:
Chris Stewart 2021-12-30 13:41:17 -06:00 committed by GitHub
parent c3d1b2ee12
commit d06b064b6b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 305 additions and 153 deletions

View File

@ -13,22 +13,27 @@ sealed trait WsType
object WsType extends StringFactory[WsType] {
override def fromString(string: String): WsType = {
WalletWsType.fromString(string)
ChainWsType.fromStringOpt(string) match {
case Some(t) => t
case None =>
WalletWsType.fromString(string)
}
}
}
sealed trait WalletWsType extends WsType
sealed trait ChainWsType extends WsType
object WalletWsType extends StringFactory[WalletWsType] {
case object TxProcessed extends WalletWsType
case object TxBroadcast extends WalletWsType
case object ReservedUtxos extends WalletWsType
case object NewAddress extends WalletWsType
case object BlockProcessed extends WalletWsType
case object DLCStateChange extends WalletWsType
private val all =
Vector(TxProcessed, TxBroadcast, ReservedUtxos, NewAddress, BlockProcessed)
Vector(TxProcessed, TxBroadcast, ReservedUtxos, NewAddress)
override def fromStringOpt(string: String): Option[WalletWsType] = {
all.find(_.toString.toLowerCase() == string.toLowerCase)
@ -40,6 +45,21 @@ object WalletWsType extends StringFactory[WalletWsType] {
}
}
object ChainWsType extends StringFactory[ChainWsType] {
case object BlockProcessed extends ChainWsType
private val all: Vector[ChainWsType] = Vector(BlockProcessed)
override def fromStringOpt(string: String): Option[ChainWsType] = {
all.find(_.toString.toLowerCase() == string.toLowerCase)
}
override def fromString(string: String): ChainWsType = {
fromStringOpt(string)
.getOrElse(sys.error(s"Cannot find chain ws type for string=$string"))
}
}
/** A notification that we send over the websocket.
* The type of the notification is indicated by [[WsType]].
* An example is [[org.bitcoins.commons.jsonmodels.ws.WalletNotification.NewAddressNotification]]
@ -50,6 +70,10 @@ sealed trait WsNotification[T] {
def payload: T
}
sealed trait ChainNotification[T] extends WsNotification[T] {
override def `type`: ChainWsType
}
sealed trait WalletNotification[T] extends WsNotification[T] {
override def `type`: WalletWsType
}
@ -76,13 +100,16 @@ object WalletNotification {
override val `type`: WalletWsType = WalletWsType.ReservedUtxos
}
case class BlockProcessedNotification(payload: GetBlockHeaderResult)
extends WalletNotification[GetBlockHeaderResult] {
override val `type`: WalletWsType = WalletWsType.BlockProcessed
}
case class DLCStateChangeNotification(payload: DLCStatus)
extends WalletNotification[DLCStatus] {
override val `type`: WalletWsType = WalletWsType.DLCStateChange
}
}
object ChainNotification {
case class BlockProcessedNotification(payload: GetBlockHeaderResult)
extends ChainNotification[GetBlockHeaderResult] {
override val `type`: ChainWsType = ChainWsType.BlockProcessed
}
}

View File

@ -1,24 +1,59 @@
package org.bitcoins.commons.serializers
import org.bitcoins.commons.jsonmodels.ws.ChainNotification.BlockProcessedNotification
import org.bitcoins.commons.jsonmodels.ws.WalletNotification.{
BlockProcessedNotification,
DLCStateChangeNotification,
NewAddressNotification,
ReservedUtxosNotification,
TxBroadcastNotification,
TxProcessedNotification
}
import org.bitcoins.commons.jsonmodels.ws.{WalletNotification, WalletWsType}
import org.bitcoins.commons.jsonmodels.ws.{
ChainNotification,
ChainWsType,
WalletNotification,
WalletWsType
}
import org.bitcoins.core.serializers.PicklerKeys
import upickle.default._
object WsPicklers {
implicit val chainWsTypePickler: ReadWriter[ChainWsType] = {
readwriter[ujson.Str]
.bimap(_.toString.toLowerCase, str => ChainWsType.fromString(str.str))
}
implicit val walletWsTypePickler: ReadWriter[WalletWsType] = {
readwriter[ujson.Str]
.bimap(_.toString.toLowerCase, str => WalletWsType.fromString(str.str))
}
private def writeChainNotification(
notification: ChainNotification[_]): ujson.Obj = {
val payloadJson: ujson.Value = notification match {
case BlockProcessedNotification(block) =>
upickle.default.writeJs(block)(Picklers.getBlockHeaderResultPickler)
}
val notificationObj = ujson.Obj(
PicklerKeys.typeKey -> writeJs(notification.`type`),
PicklerKeys.payloadKey -> payloadJson
)
notificationObj
}
private def readChainNotification(obj: ujson.Obj): ChainNotification[_] = {
val typeObj = read[ChainWsType](obj(PicklerKeys.typeKey))
val payloadObj = obj(PicklerKeys.payloadKey)
typeObj match {
case ChainWsType.BlockProcessed =>
val block =
upickle.default.read(payloadObj)(Picklers.getBlockHeaderResultPickler)
BlockProcessedNotification(block)
}
}
private def writeWalletNotification(
notification: WalletNotification[_]): ujson.Obj = {
val payloadJson: ujson.Value = notification match {
@ -32,8 +67,6 @@ object WsPicklers {
val vec = utxos.map(u =>
upickle.default.writeJs(u)(Picklers.spendingInfoDbPickler))
ujson.Arr.from(vec)
case BlockProcessedNotification(block) =>
upickle.default.writeJs(block)(Picklers.getBlockHeaderResultPickler)
case DLCStateChangeNotification(status) =>
upickle.default.writeJs(status)(Picklers.dlcStatusW)
}
@ -64,10 +97,6 @@ object WsPicklers {
upickle.default.read(utxoJson)(Picklers.spendingInfoDbPickler)
}
ReservedUtxosNotification(utxos)
case WalletWsType.BlockProcessed =>
val block =
upickle.default.read(payloadObj)(Picklers.getBlockHeaderResultPickler)
BlockProcessedNotification(block)
case WalletWsType.DLCStateChange =>
val status = upickle.default.read(payloadObj)(Picklers.dlcStatusR)
DLCStateChangeNotification(status)
@ -102,10 +131,14 @@ object WsPicklers {
readwriter[ujson.Obj].bimap(writeWalletNotification, readWalletNotification)
}
implicit val chainNotificationPickler: ReadWriter[ChainNotification[_]] = {
readwriter[ujson.Obj].bimap(writeChainNotification, readChainNotification)
}
implicit val blockProcessedPickler: ReadWriter[BlockProcessedNotification] = {
readwriter[ujson.Obj].bimap(
writeWalletNotification(_),
readWalletNotification(_).asInstanceOf[BlockProcessedNotification]
writeChainNotification(_),
readChainNotification(_).asInstanceOf[BlockProcessedNotification]
)
}

View File

@ -1,6 +1,7 @@
package org.bitcoins.oracle.server
import akka.actor.ActorSystem
import akka.stream.scaladsl.Source
import org.bitcoins.commons.util.{DatadirParser, ServerArgParser}
import org.bitcoins.dlc.oracle.DLCOracle
import org.bitcoins.dlc.oracle.config.DLCOracleAppConfig
@ -33,13 +34,15 @@ class OracleServerMain(override val serverArgParser: ServerArgParser)(implicit
handlers = routes,
rpcbindOpt = bindConfOpt,
rpcport = rpcport,
None)
None,
Source.empty)
case None =>
Server(conf = conf,
handlers = routes,
rpcbindOpt = bindConfOpt,
rpcport = conf.rpcPort,
None)
None,
Source.empty)
}
_ <- server.start()

View File

@ -9,15 +9,7 @@ import akka.http.scaladsl.model.ws.Message
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server._
import akka.http.scaladsl.server.directives.DebuggingDirectives
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{
BroadcastHub,
Flow,
Keep,
Sink,
Source,
SourceQueueWithComplete
}
import akka.stream.scaladsl.{Flow, Sink, Source}
import de.heikoseeberger.akkahttpupickle.UpickleSupport._
import org.bitcoins.commons.config.AppConfig
import org.bitcoins.server.util.{ServerBindings, WsServerConfig}
@ -30,7 +22,8 @@ case class Server(
handlers: Seq[ServerRoute],
rpcbindOpt: Option[String],
rpcport: Int,
wsConfigOpt: Option[WsServerConfig])(implicit system: ActorSystem)
wsConfigOpt: Option[WsServerConfig],
wsSource: Source[Message, NotUsed])(implicit system: ActorSystem)
extends HttpLogger {
import system.dispatcher
@ -123,24 +116,6 @@ case class Server(
}
}
private val maxBufferSize: Int = 25
/** This will queue [[maxBufferSize]] elements in the queue. Once the buffer size is reached,
* we will drop the first element in the buffer
*/
private val tuple = {
//from: https://github.com/akka/akka-http/issues/3039#issuecomment-610263181
//the BroadcastHub.sink is needed to avoid these errors
// 'Websocket handler failed with Processor actor'
Source
.queue[Message](maxBufferSize, OverflowStrategy.dropHead)
.toMat(BroadcastHub.sink)(Keep.both)
.run()
}
def walletQueue: SourceQueueWithComplete[Message] = tuple._1
def source: Source[Message, NotUsed] = tuple._2
private val eventsRoute = "events"
private def wsRoutes: Route = {
@ -151,7 +126,7 @@ case class Server(
private def wsHandler: Flow[Message, Message, Any] = {
//we don't allow input, so use Sink.ignore
Flow.fromSinkAndSource(Sink.ignore, source)
Flow.fromSinkAndSource(Sink.ignore, wsSource)
}
}

View File

@ -9,13 +9,18 @@ import akka.http.scaladsl.model.ws.{
}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import org.bitcoins.cli.{CliCommand, Config, ConsoleCli}
import org.bitcoins.commons.jsonmodels.ws.{WalletNotification, WalletWsType}
import org.bitcoins.commons.jsonmodels.ws.ChainNotification.BlockProcessedNotification
import org.bitcoins.commons.jsonmodels.ws.WalletNotification.{
BlockProcessedNotification,
NewAddressNotification,
TxBroadcastNotification,
TxProcessedNotification
}
import org.bitcoins.commons.jsonmodels.ws.{
ChainNotification,
WalletNotification,
WalletWsType,
WsNotification
}
import org.bitcoins.commons.serializers.{Picklers, WsPicklers}
import org.bitcoins.core.currency.Bitcoins
import org.bitcoins.core.protocol.BitcoinAddress
@ -30,23 +35,27 @@ import org.bitcoins.testkit.util.AkkaUtil
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Future, Promise}
import scala.util.Try
class WebsocketTests extends BitcoinSServerMainBitcoindFixture {
behavior of "Websocket Tests"
val endSink: Sink[WalletNotification[_], Future[Seq[WalletNotification[_]]]] =
Sink.seq[WalletNotification[_]]
val endSink: Sink[WsNotification[_], Future[Seq[WsNotification[_]]]] =
Sink.seq[WsNotification[_]]
val sink: Sink[Message, Future[Seq[WalletNotification[_]]]] = Flow[Message]
val sink: Sink[Message, Future[Seq[WsNotification[_]]]] = Flow[Message]
.map {
case message: TextMessage.Strict =>
//we should be able to parse the address message
val text = message.text
val notification: WalletNotification[_] =
val walletNotificationOpt: Option[WalletNotification[_]] = Try(
upickle.default.read[WalletNotification[_]](text)(
WsPicklers.walletNotificationPickler)
notification
WsPicklers.walletNotificationPickler)).toOption
val chainNotificationOpt: Option[ChainNotification[_]] = Try(
upickle.default.read[ChainNotification[_]](text)(
WsPicklers.chainNotificationPickler)).toOption
walletNotificationOpt.getOrElse(chainNotificationOpt.get)
case msg =>
fail(s"Unexpected msg type received in the sink, msg=$msg")
}
@ -59,7 +68,7 @@ class WebsocketTests extends BitcoinSServerMainBitcoindFixture {
val websocketFlow: Flow[
Message,
Message,
(Future[Seq[WalletNotification[_]]], Promise[Option[Message]])] = {
(Future[Seq[WsNotification[_]]], Promise[Option[Message]])] = {
Flow
.fromSinkAndSourceCoupledMat(sink, Source.maybe[Message])(Keep.both)
}
@ -72,12 +81,12 @@ class WebsocketTests extends BitcoinSServerMainBitcoindFixture {
val req = buildReq(server.conf)
val notificationsF: (
Future[WebSocketUpgradeResponse],
(Future[Seq[WalletNotification[_]]], Promise[Option[Message]])) = {
(Future[Seq[WsNotification[_]]], Promise[Option[Message]])) = {
Http()
.singleWebSocketRequest(req, websocketFlow)
}
val walletNotificationsF: Future[Seq[WalletNotification[_]]] =
val walletNotificationsF: Future[Seq[WsNotification[_]]] =
notificationsF._2._1
val promise: Promise[Option[Message]] = notificationsF._2._2
@ -104,7 +113,7 @@ class WebsocketTests extends BitcoinSServerMainBitcoindFixture {
val req = buildReq(server.conf)
val tuple: (
Future[WebSocketUpgradeResponse],
(Future[Seq[WalletNotification[_]]], Promise[Option[Message]])) = {
(Future[Seq[WsNotification[_]]], Promise[Option[Message]])) = {
Http()
.singleWebSocketRequest(req, websocketFlow)
}
@ -142,7 +151,7 @@ class WebsocketTests extends BitcoinSServerMainBitcoindFixture {
val req = buildReq(server.conf)
val tuple: (
Future[WebSocketUpgradeResponse],
(Future[Seq[WalletNotification[_]]], Promise[Option[Message]])) = {
(Future[Seq[WsNotification[_]]], Promise[Option[Message]])) = {
Http()
.singleWebSocketRequest(req, websocketFlow)
}
@ -179,7 +188,7 @@ class WebsocketTests extends BitcoinSServerMainBitcoindFixture {
val req = buildReq(server.conf)
val tuple: (
Future[WebSocketUpgradeResponse],
(Future[Seq[WalletNotification[_]]], Promise[Option[Message]])) = {
(Future[Seq[WsNotification[_]]], Promise[Option[Message]])) = {
Http()
.singleWebSocketRequest(req, websocketFlow)
}
@ -188,7 +197,8 @@ class WebsocketTests extends BitcoinSServerMainBitcoindFixture {
val promise = tuple._2._2
val addressF = bitcoind.getNewAddress
val timeout =
15.seconds //any way we can remove this timeout and just check?
for {
address <- addressF
hashes <- bitcoind.generateToAddress(1, address)
@ -196,16 +206,13 @@ class WebsocketTests extends BitcoinSServerMainBitcoindFixture {
getBlockHeaderResultStr = ConsoleCli.exec(cmd, cliConfig)
getBlockHeaderResult = upickle.default.read(getBlockHeaderResultStr.get)(
Picklers.getBlockHeaderResultPickler)
block <- bitcoind.getBlockRaw(hashes.head)
wallet <- server.walletConf.createHDWallet(bitcoind, bitcoind, bitcoind)
_ <- wallet.processBlock(block)
_ <- AkkaUtil.nonBlockingSleep(500.millis)
_ <- AkkaUtil.nonBlockingSleep(timeout)
_ = promise.success(None)
notifications <- notificationsF
} yield {
assert(
notifications.exists(
_ == BlockProcessedNotification(getBlockHeaderResult)))
val count = notifications.count(
_ == BlockProcessedNotification(getBlockHeaderResult))
assert(count == 1, s"count=$count")
}
}
@ -217,12 +224,12 @@ class WebsocketTests extends BitcoinSServerMainBitcoindFixture {
val req = buildReq(server.conf)
val tuple: (
Future[WebSocketUpgradeResponse],
(Future[Seq[WalletNotification[_]]], Promise[Option[Message]])) = {
(Future[Seq[WsNotification[_]]], Promise[Option[Message]])) = {
Http()
.singleWebSocketRequest(req, websocketFlow)
}
val notificationsF: Future[Seq[WalletNotification[_]]] = tuple._2._1
val notificationsF: Future[Seq[WsNotification[_]]] = tuple._2._1
val promise = tuple._2._2
//lock all utxos

View File

@ -1,7 +1,16 @@
package org.bitcoins.server
import akka.NotUsed
import akka.actor.ActorSystem
import akka.dispatch.Dispatchers
import akka.http.scaladsl.model.ws.Message
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{
BroadcastHub,
Keep,
Source,
SourceQueueWithComplete
}
import org.bitcoins.asyncutil.AsyncUtil
import org.bitcoins.chain.blockchain.ChainHandler
import org.bitcoins.chain.config.ChainAppConfig
@ -101,6 +110,7 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
}
def startBitcoinSBackend(): Future[Unit] = {
logger.info(s"startBitcoinSBackend()")
val start = System.currentTimeMillis()
if (nodeConf.peers.isEmpty) {
throw new IllegalArgumentException(
@ -156,6 +166,10 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
node = dlcNodeConf.createDLCNode(wallet)
} yield node
val tuple = buildWsSource
val wsQueue: SourceQueueWithComplete[Message] = tuple._1
val wsSource: Source[Message, NotUsed] = tuple._2
//start our http server now that we are synced
for {
node <- configuredNodeF
@ -174,16 +188,17 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
dlcNode <- dlcNodeF
_ <- dlcNode.start()
server <- startHttpServer(nodeApi = node,
chainApi = chainApi,
wallet = wallet,
dlcNode = dlcNode,
serverCmdLineArgs = serverArgParser)
walletCallbacks = WebsocketUtil.buildWalletCallbacks(server.walletQueue,
chainApi)
_ <- startHttpServer(nodeApi = node,
chainApi = chainApi,
wallet = wallet,
dlcNode = dlcNode,
serverCmdLineArgs = serverArgParser,
wsSource = wsSource)
chainCallbacks = WebsocketUtil.buildChainCallbacks(wsQueue, chainApi)
_ = chainConf.addCallbacks(chainCallbacks)
walletCallbacks = WebsocketUtil.buildWalletCallbacks(wsQueue)
_ = walletConf.addCallbacks(walletCallbacks)
dlcWalletCallbacks = WebsocketUtil.buildDLCWalletCallbacks(
server.walletQueue)
dlcWalletCallbacks = WebsocketUtil.buildDLCWalletCallbacks(wsQueue)
_ = dlcConf.addCallbacks(dlcWalletCallbacks)
_ = {
logger.info(
@ -218,7 +233,7 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
}
def startBitcoindBackend(): Future[Unit] = {
logger.info(s"startBitcoindBackend()")
val bitcoindF = for {
client <- bitcoindRpcConf.clientF
_ <- client.start()
@ -231,6 +246,10 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
feeRateApi = feeProvider)
}
val tuple = buildWsSource
val wsQueue: SourceQueueWithComplete[Message] = tuple._1
val wsSource: Source[Message, NotUsed] = tuple._2
for {
_ <- bitcoindRpcConf.start()
bitcoind <- bitcoindRpcConf.clientF
@ -242,10 +261,12 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
s"bitcoind ($bitcoindNetwork) on different network than wallet (${walletConf.network})")
_ = logger.info("Creating wallet")
chainCallbacks = WebsocketUtil.buildChainCallbacks(wsQueue, bitcoind)
tmpWallet <- tmpWalletF
wallet = BitcoindRpcBackendUtil.createDLCWalletWithBitcoindCallbacks(
bitcoind,
tmpWallet)
tmpWallet,
Some(chainCallbacks))
_ = logger.info("Starting wallet")
_ <- wallet.start().recoverWith {
//https://github.com/bitcoin-s/bitcoin-s/issues/2917
@ -255,22 +276,22 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
handleMissingSpendingInfoDb(err, wallet)
}
dlcNode = dlcNodeConf.createDLCNode(wallet)
_ <- dlcNode.start()
_ <- startHttpServer(nodeApi = bitcoind,
chainApi = bitcoind,
wallet = wallet,
dlcNode = dlcNode,
serverCmdLineArgs = serverArgParser,
wsSource = wsSource)
walletCallbacks = WebsocketUtil.buildWalletCallbacks(wsQueue)
_ = walletConf.addCallbacks(walletCallbacks)
//intentionally doesn't map on this otherwise we
//wait until we are done syncing the entire wallet
//which could take 1 hour
_ = syncWalletWithBitcoindAndStartPolling(bitcoind, wallet)
dlcNode = dlcNodeConf.createDLCNode(wallet)
_ <- dlcNode.start()
server <- startHttpServer(nodeApi = bitcoind,
chainApi = bitcoind,
wallet = wallet,
dlcNode = dlcNode,
serverCmdLineArgs = serverArgParser)
walletCallbacks = WebsocketUtil.buildWalletCallbacks(server.walletQueue,
bitcoind)
_ = walletConf.addCallbacks(walletCallbacks)
dlcWalletCallbacks = WebsocketUtil.buildDLCWalletCallbacks(
server.walletQueue)
dlcWalletCallbacks = WebsocketUtil.buildDLCWalletCallbacks(wsQueue)
_ = dlcConf.addCallbacks(dlcWalletCallbacks)
} yield {
logger.info(s"Done starting Main!")
@ -365,7 +386,8 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
chainApi: ChainApi,
wallet: DLCWallet,
dlcNode: DLCNode,
serverCmdLineArgs: ServerArgParser)(implicit
serverCmdLineArgs: ServerArgParser,
wsSource: Source[Message, NotUsed])(implicit
system: ActorSystem,
conf: BitcoinSAppConfig): Future[Server] = {
implicit val nodeConf: NodeAppConfig = conf.nodeConf
@ -411,13 +433,15 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
handlers = handlers,
rpcbindOpt = rpcBindConfOpt,
rpcport = rpcport,
wsConfigOpt = Some(wsServerConfig))
wsConfigOpt = Some(wsServerConfig),
wsSource)
case None =>
Server(conf = nodeConf,
handlers = handlers,
rpcbindOpt = rpcBindConfOpt,
rpcport = conf.rpcPort,
wsConfigOpt = Some(wsServerConfig))
wsConfigOpt = Some(wsServerConfig),
wsSource)
}
}
val bindingF = server.start()
@ -522,6 +546,31 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
logger.error(s"Error syncing bitcoin-s wallet with bitcoind", err))
f
}
/** Builds a websocket queue that you can feed elements to.
* The Source can be wired up with Directives.handleWebSocketMessages
* to create a flow that emits websocket messages
*/
private def buildWsSource: (
SourceQueueWithComplete[Message],
Source[Message, NotUsed]) = {
val maxBufferSize: Int = 25
/** This will queue [[maxBufferSize]] elements in the queue. Once the buffer size is reached,
* we will drop the first element in the buffer
*/
val tuple = {
//from: https://github.com/akka/akka-http/issues/3039#issuecomment-610263181
//the BroadcastHub.sink is needed to avoid these errors
// 'Websocket handler failed with Processor actor'
Source
.queue[Message](maxBufferSize, OverflowStrategy.dropHead)
.toMat(BroadcastHub.sink)(Keep.both)
.run()
}
tuple
}
}
object BitcoinSServerMain extends BitcoinSAppScalaDaemon {

View File

@ -4,6 +4,8 @@ import akka.Done
import akka.actor.{ActorSystem, Cancellable}
import akka.stream.scaladsl.{Keep, Sink, Source}
import grizzled.slf4j.Logging
import org.bitcoins.chain.{ChainCallbacks}
import org.bitcoins.core.api.node.NodeApi
import org.bitcoins.core.api.wallet.WalletApi
import org.bitcoins.core.gcs.FilterType
@ -32,6 +34,8 @@ object BitcoindRpcBackendUtil extends Logging {
for {
bitcoindHeight <- bitcoind.getBlockCount
walletStateOpt <- wallet.getSyncDescriptorOpt()
_ = logger.info(
s"bitcoindHeight=$bitcoindHeight walletStateOpt=$walletStateOpt")
_ <- walletStateOpt match {
case None =>
for {
@ -40,9 +44,10 @@ object BitcoindRpcBackendUtil extends Logging {
_ <- lastConfirmedOpt match {
case None =>
for {
header <- bitcoind.getBestBlockHeader()
_ <- wallet.stateDescriptorDAO.updateSyncHeight(header.hashBE,
header.height)
_ <- doSync(walletHeight = bitcoindHeight - 1,
bitcoindHeight = bitcoindHeight,
bitcoind = bitcoind,
wallet = wallet)
} yield ()
case Some(txDb) =>
for {
@ -100,9 +105,9 @@ object BitcoindRpcBackendUtil extends Logging {
hashes <- hashFs.map(_.toVector)
hasFilters <- hasFiltersF
_ <- {
if (hasFilters)
if (hasFilters) {
filterSync(hashes, bitcoind.asInstanceOf[V19BlockFilterRpc], wallet)
else wallet.nodeApi.downloadBlocks(hashes)
} else wallet.nodeApi.downloadBlocks(hashes)
}
} yield wallet
}
@ -110,7 +115,9 @@ object BitcoindRpcBackendUtil extends Logging {
def createWalletWithBitcoindCallbacks(
bitcoind: BitcoindRpcClient,
wallet: Wallet)(implicit system: ActorSystem): Wallet = {
wallet: Wallet,
chainCallbacksOpt: Option[ChainCallbacks])(implicit
system: ActorSystem): Wallet = {
// We need to create a promise so we can inject the wallet with the callback
// after we have created it into SyncUtil.getNodeApiWalletCallback
// so we don't lose the internal state of the wallet
@ -118,8 +125,9 @@ object BitcoindRpcBackendUtil extends Logging {
val pairedWallet = Wallet(
nodeApi =
BitcoindRpcBackendUtil.getNodeApiWalletCallback(bitcoind,
walletCallbackP.future),
BitcoindRpcBackendUtil.buildBitcoindNodeApi(bitcoind,
walletCallbackP.future,
chainCallbacksOpt),
chainQueryApi = bitcoind,
feeRateApi = wallet.feeRateApi
)(wallet.walletConfig, wallet.ec)
@ -169,7 +177,9 @@ object BitcoindRpcBackendUtil extends Logging {
def createDLCWalletWithBitcoindCallbacks(
bitcoind: BitcoindRpcClient,
wallet: DLCWallet)(implicit system: ActorSystem): DLCWallet = {
wallet: DLCWallet,
chainCallbacksOpt: Option[ChainCallbacks])(implicit
system: ActorSystem): DLCWallet = {
// We need to create a promise so we can inject the wallet with the callback
// after we have created it into SyncUtil.getNodeApiWalletCallback
// so we don't lose the internal state of the wallet
@ -177,8 +187,9 @@ object BitcoindRpcBackendUtil extends Logging {
val pairedWallet = DLCWallet(
nodeApi =
BitcoindRpcBackendUtil.getNodeApiWalletCallback(bitcoind,
walletCallbackP.future),
BitcoindRpcBackendUtil.buildBitcoindNodeApi(bitcoind,
walletCallbackP.future,
chainCallbacksOpt),
chainQueryApi = bitcoind,
feeRateApi = wallet.feeRateApi
)(wallet.walletConfig, wallet.dlcConfig, wallet.ec)
@ -216,9 +227,14 @@ object BitcoindRpcBackendUtil extends Logging {
}
}
private def getNodeApiWalletCallback(
/** Creates an anonymous [[NodeApi]] that downloads blocks using
* akka streams from bitcoind, and then calls [[Wallet.processBlock]]
*/
private def buildBitcoindNodeApi(
bitcoindRpcClient: BitcoindRpcClient,
walletF: Future[Wallet])(implicit system: ActorSystem): NodeApi = {
walletF: Future[Wallet],
chainCallbacksOpt: Option[ChainCallbacks])(implicit
system: ActorSystem): NodeApi = {
import system.dispatcher
new NodeApi {
@ -230,10 +246,29 @@ object BitcoindRpcBackendUtil extends Logging {
.flatMap { wallet =>
val runStream: Future[Done] = Source(blockHashes)
.mapAsync(parallelism = numParallelism) { hash =>
bitcoindRpcClient.getBlockRaw(hash)
val blockF = bitcoindRpcClient.getBlockRaw(hash)
val blockHeaderResultF = bitcoindRpcClient.getBlockHeader(hash)
for {
block <- blockF
blockHeaderResult <- blockHeaderResultF
} yield (block, blockHeaderResult)
}
.foldAsync(wallet) { case (wallet, block) =>
wallet.processBlock(block)
.foldAsync(wallet) { case (wallet, (block, blockHeaderResult)) =>
val blockProcessedF = wallet.processBlock(block)
val executeCallbackF: Future[Wallet] = blockProcessedF.flatMap {
wallet =>
chainCallbacksOpt match {
case None => Future.successful(wallet)
case Some(callback) =>
val f = callback
.executeOnBlockHeaderConnectedCallbacks(
logger,
blockHeaderResult.height,
blockHeaderResult.blockHeader)
f.map(_ => wallet)
}
}
executeCallbackF
}
.run()
runStream.map(_ => wallet)
@ -264,16 +299,21 @@ object BitcoindRpcBackendUtil extends Logging {
interval: FiniteDuration = 10.seconds)(implicit
system: ActorSystem,
ec: ExecutionContext): Future[Cancellable] = {
bitcoind.getBlockCount.map { startCount =>
val walletSyncStateF = wallet.getSyncState()
val resultF: Future[Cancellable] = for {
walletSyncState <- walletSyncStateF
} yield {
val numParallelism = Runtime.getRuntime.availableProcessors()
val atomicPrevCount: AtomicInteger = new AtomicInteger(startCount)
val atomicPrevCount: AtomicInteger = new AtomicInteger(
walletSyncState.height)
system.scheduler.scheduleWithFixedDelay(0.seconds, interval) { () =>
{
logger.debug("Polling bitcoind for block count")
logger.info("Polling bitcoind for block count")
bitcoind.getBlockCount.flatMap { count =>
val prevCount = atomicPrevCount.get()
if (prevCount < count) {
logger.debug("Bitcoind has new block(s), requesting...")
logger.info(
s"Bitcoind has new block(s), requesting... ${count - prevCount} blocks")
// use .tail so we don't process the previous block that we already did
val range = prevCount.to(count).tail
@ -306,11 +346,16 @@ object BitcoindRpcBackendUtil extends Logging {
} else if (prevCount > count) {
Future.failed(new RuntimeException(
s"Bitcoind is at a block height ($count) before the wallet's ($prevCount)"))
} else Future.unit
} else {
logger.info(s"In sync $prevCount count=$count")
Future.unit
}
}
()
}
}
}
resultF
}
}

View File

@ -2,14 +2,19 @@ package org.bitcoins.server.util
import akka.http.scaladsl.model.ws.{Message, TextMessage}
import akka.stream.scaladsl.SourceQueueWithComplete
import org.bitcoins.commons.jsonmodels.ws.{WalletNotification, WalletWsType}
import grizzled.slf4j.Logging
import org.bitcoins.chain.{ChainCallbacks, OnBlockHeaderConnected}
import org.bitcoins.commons.jsonmodels.ws.{
ChainNotification,
WalletNotification,
WalletWsType
}
import org.bitcoins.commons.serializers.WsPicklers
import org.bitcoins.core.api.chain.ChainApi
import org.bitcoins.core.protocol.dlc.models.DLCStatus
import org.bitcoins.core.protocol.transaction.Transaction
import org.bitcoins.dlc.wallet.{DLCWalletCallbacks, OnDLCStateChange}
import org.bitcoins.wallet.{
OnBlockProcessed,
OnNewAddressGenerated,
OnReservedUtxos,
OnTransactionBroadcast,
@ -19,12 +24,35 @@ import org.bitcoins.wallet.{
import scala.concurrent.{ExecutionContext, Future}
object WebsocketUtil {
object WebsocketUtil extends Logging {
def buildChainCallbacks(
queue: SourceQueueWithComplete[Message],
chainApi: ChainApi)(implicit ec: ExecutionContext): ChainCallbacks = {
val onBlockProcessed: OnBlockHeaderConnected = { case (_, header) =>
val resultF =
ChainUtil.getBlockHeaderResult(header.hashBE, chainApi)
val f = for {
result <- resultF
notification =
ChainNotification.BlockProcessedNotification(result)
notificationJson =
upickle.default.writeJs(notification)(
WsPicklers.blockProcessedPickler)
msg = TextMessage.Strict(notificationJson.toString())
_ <- queue.offer(msg)
} yield {
()
}
f
}
ChainCallbacks.onBlockHeaderConnected(onBlockProcessed)
}
/** Builds websocket callbacks for the wallet */
def buildWalletCallbacks(
walletQueue: SourceQueueWithComplete[Message],
chainApi: ChainApi)(implicit ec: ExecutionContext): WalletCallbacks = {
def buildWalletCallbacks(walletQueue: SourceQueueWithComplete[Message])(
implicit ec: ExecutionContext): WalletCallbacks = {
val onAddressCreated: OnNewAddressGenerated = { addr =>
val notification = WalletNotification.NewAddressNotification(addr)
val json =
@ -56,31 +84,11 @@ object WebsocketUtil {
offerF.map(_ => ())
}
val onBlockProcessed: OnBlockProcessed = { block =>
val resultF =
ChainUtil.getBlockHeaderResult(block.blockHeader.hashBE, chainApi)
val f = for {
result <- resultF
notification =
WalletNotification.BlockProcessedNotification(result)
notificationJson =
upickle.default.writeJs(notification)(
WsPicklers.blockProcessedPickler)
msg = TextMessage.Strict(notificationJson.toString())
_ <- walletQueue.offer(msg)
} yield {
()
}
f
}
WalletCallbacks(
onTransactionProcessed = Vector(onTxProcessed),
onNewAddressGenerated = Vector(onAddressCreated),
onReservedUtxos = Vector(onReservedUtxo),
onTransactionBroadcast = Vector(onTxBroadcast),
onBlockProcessed = Vector(onBlockProcessed)
onTransactionBroadcast = Vector(onTxBroadcast)
)
}
@ -97,7 +105,7 @@ object WebsocketUtil {
val notification = WalletNotification.TxBroadcastNotification(tx)
upickle.default.writeJs(notification)(WsPicklers.txBroadcastPickler)
case x @ (WalletWsType.NewAddress | WalletWsType.ReservedUtxos |
WalletWsType.BlockProcessed | WalletWsType.DLCStateChange) =>
WalletWsType.DLCStateChange) =>
sys.error(s"Cannot build tx notification for $x")
}

View File

@ -235,7 +235,8 @@ object FundWalletUtil extends FundWalletUtil {
wallet = BitcoindRpcBackendUtil.createDLCWalletWithBitcoindCallbacks(
bitcoind,
tmp)
tmp,
None)(system)
funded1 <- fundAccountForWalletWithBitcoind(
BitcoinSWalletTest.defaultAcctAmts,

View File

@ -206,7 +206,8 @@ class BitcoindBackendTest extends WalletAppConfigWithBitcoindNewestFixtures {
bip39PasswordOpt = walletAppConfig.bip39PasswordOpt)
} yield {
BitcoindRpcBackendUtil.createWalletWithBitcoindCallbacks(bitcoind,
tmpWallet)
tmpWallet,
None)
}
}
}

View File

@ -27,7 +27,8 @@ class BitcoindBlockPollingTest
BitcoinSWalletTest.createDefaultWallet(bitcoind, bitcoind, None)
wallet =
BitcoindRpcBackendUtil.createWalletWithBitcoindCallbacks(bitcoind,
tmpWallet)
tmpWallet,
None)
// Assert wallet is empty
isEmpty <- wallet.isEmpty()
_ = assert(isEmpty)

View File

@ -46,8 +46,10 @@ class BitcoindZMQBackendTest extends WalletAppConfigWithBitcoindNewestFixtures {
tmpWallet <-
BitcoinSWalletTest.createDefaultWallet(bitcoind, bitcoind, None)
wallet =
BitcoindRpcBackendUtil.createWalletWithBitcoindCallbacks(bitcoind,
tmpWallet)
BitcoindRpcBackendUtil.createWalletWithBitcoindCallbacks(
bitcoind = bitcoind,
wallet = tmpWallet,
chainCallbacksOpt = None)
// Assert wallet is empty
isEmpty <- wallet.isEmpty()
_ = assert(isEmpty)