Implement torstarted websocket callback (#4476)

* Implement tor started websocket callback

* Implement tor callbacks when tor is provided
This commit is contained in:
Chris Stewart 2022-07-11 10:26:30 -05:00 committed by GitHub
parent b16a8ca6aa
commit 1b169d8fa0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 199 additions and 26 deletions

View File

@ -24,6 +24,7 @@ object WsType extends StringFactory[WsType] {
sealed trait WalletWsType extends WsType
sealed trait ChainWsType extends WsType
sealed trait TorWsType extends WsType
object WalletWsType extends StringFactory[WalletWsType] {
case object TxProcessed extends WalletWsType
@ -71,6 +72,21 @@ object ChainWsType extends StringFactory[ChainWsType] {
}
}
object TorWsType extends StringFactory[TorWsType] {
case object TorStarted extends TorWsType
private val all = Vector(TorStarted)
override def fromStringOpt(string: String): Option[TorWsType] = {
all.find(_.toString.toLowerCase() == string.toLowerCase)
}
override def fromString(string: String): TorWsType = {
fromStringOpt(string)
.getOrElse(sys.error(s"Cannot find chain ws type for string=$string"))
}
}
/** A notification that we send over the websocket.
* The type of the notification is indicated by [[WsType]].
* An example is [[org.bitcoins.commons.jsonmodels.ws.WalletNotification.NewAddressNotification]]
@ -89,6 +105,10 @@ sealed trait WalletNotification[T] extends WsNotification[T] {
override def `type`: WalletWsType
}
sealed trait TorNotification[T] extends WsNotification[T] {
override def `type`: TorWsType
}
object WalletNotification {
case class NewAddressNotification(payload: BitcoinAddress)
@ -139,3 +159,11 @@ object ChainNotification {
override val `type`: ChainWsType = ChainWsType.BlockProcessed
}
}
object TorNotification {
case object TorStartedNotification extends TorNotification[Unit] {
override val `type`: TorWsType = TorWsType.TorStarted
override val payload: Unit = ()
}
}

View File

@ -14,6 +14,8 @@ import org.bitcoins.commons.jsonmodels.ws.WalletNotification.{
import org.bitcoins.commons.jsonmodels.ws.{
ChainNotification,
ChainWsType,
TorNotification,
TorWsType,
WalletNotification,
WalletWsType
}
@ -32,6 +34,11 @@ object WsPicklers {
.bimap(_.toString.toLowerCase, str => WalletWsType.fromString(str.str))
}
implicit val torWsTypePickler: ReadWriter[TorWsType] = {
readwriter[ujson.Str]
.bimap(_.toString.toLowerCase, str => TorWsType.fromString(str.str))
}
private def writeChainNotification(
notification: ChainNotification[_]): ujson.Obj = {
val payloadJson: ujson.Value = notification match {
@ -122,6 +129,28 @@ object WsPicklers {
}
}
private def writeTorNotification(
notification: TorNotification[_]): ujson.Obj = {
val payloadJson = notification.`type` match {
case TorWsType.TorStarted =>
ujson.Null
}
val notificationObj = ujson.Obj(
PicklerKeys.typeKey -> writeJs(notification.`type`),
PicklerKeys.payloadKey -> payloadJson
)
notificationObj
}
private def readTorNotification(obj: ujson.Obj): TorNotification[_] = {
val typeObj = read[TorWsType](obj(PicklerKeys.typeKey))
typeObj match {
case TorWsType.TorStarted =>
TorNotification.TorStartedNotification
}
}
implicit val newAddressPickler: ReadWriter[NewAddressNotification] = {
readwriter[ujson.Obj].bimap(
writeWalletNotification(_),
@ -185,4 +214,13 @@ object WsPicklers {
writeWalletNotification(_),
readWalletNotification(_).asInstanceOf[DLCOfferRemoveNotification])
}
implicit val torStartedPickler: ReadWriter[
TorNotification.TorStartedNotification.type] = {
readwriter[ujson.Obj].bimap(
writeTorNotification(_),
readTorNotification(_)
.asInstanceOf[TorNotification.TorStartedNotification.type])
}
}

View File

@ -168,10 +168,19 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
val callbacksF =
chainApiF.map(chainApi => buildNeutrinoCallbacks(wsQueue, chainApi))
val torCallbacks = WebsocketUtil.buildTorCallbacks(wsQueue)
val _ = torConf.addCallbacks(torCallbacks)
val isTorStartedF = if (torConf.torProvided) {
//if tor is provided we need to execute the tor started callback immediately
torConf.callBacks.executeOnTorStarted()
} else {
Future.unit
}
val startedNodeF = {
//can't start connecting to peers until tor is done starting
for {
_ <- startedTorConfigF
_ <- isTorStartedF
started <- configuredNodeF.flatMap(_.start())
} yield started
}
@ -267,7 +276,14 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
val tuple = buildWsSource
val wsQueue: SourceQueueWithComplete[Message] = tuple._1
val wsSource: Source[Message, NotUsed] = tuple._2
val torCallbacks = WebsocketUtil.buildTorCallbacks(wsQueue)
val _ = torConf.addCallbacks(torCallbacks)
val isTorStartedF = if (torConf.torProvided) {
//if tor is provided we need to emit a tor started event immediately
torConf.callBacks.executeOnTorStarted()
} else {
Future.unit
}
val walletF = bitcoindF.flatMap { bitcoind =>
val feeProvider = FeeProviderFactory.getFeeProviderOrElse(
bitcoind,
@ -282,6 +298,7 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
feeRateApi = feeProvider)
val chainCallbacks = WebsocketUtil.buildChainCallbacks(wsQueue, bitcoind)
for {
_ <- isTorStartedF
tmpWallet <- tmpWalletF
wallet = BitcoindRpcBackendUtil.createDLCWalletWithBitcoindCallbacks(
bitcoind,
@ -327,6 +344,7 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
serverCmdLineArgs = serverArgParser,
wsSource = wsSource
)
walletCallbacks = WebsocketUtil.buildWalletCallbacks(
wsQueue,
walletConf.walletNameOpt)
@ -508,7 +526,7 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
.map { _ =>
BitcoindRpcBackendUtil
.startBitcoindMempoolPolling(wallet, bitcoind) { tx =>
nodeConf.nodeCallbacks
nodeConf.callBacks
.executeOnTxReceivedCallbacks(logger, tx)
}
()

View File

@ -4,6 +4,7 @@ import akka.http.scaladsl.model.ws.{Message, TextMessage}
import akka.stream.scaladsl.SourceQueueWithComplete
import grizzled.slf4j.Logging
import org.bitcoins.chain.{ChainCallbacks, OnBlockHeaderConnected}
import org.bitcoins.commons.jsonmodels.ws.TorNotification.TorStartedNotification
import org.bitcoins.commons.jsonmodels.ws.{
ChainNotification,
WalletNotification,
@ -23,6 +24,7 @@ import org.bitcoins.dlc.wallet.{
OnDLCOfferRemove,
OnDLCStateChange
}
import org.bitcoins.tor.{OnTorStarted, TorCallbacks}
import org.bitcoins.wallet._
import scala.concurrent.{ExecutionContext, Future}
@ -121,6 +123,21 @@ object WebsocketUtil extends Logging {
)
}
def buildTorCallbacks(queue: SourceQueueWithComplete[Message])(implicit
ec: ExecutionContext): TorCallbacks = {
val onTorStarted: OnTorStarted = { _ =>
val notification = TorStartedNotification
val json =
upickle.default.writeJs(notification)(WsPicklers.torStartedPickler)
val msg = TextMessage.Strict(json.toString())
val offerF = queue.offer(msg)
offerF.map(_ => ())
}
TorCallbacks(onTorStarted)
}
private def buildTxNotification(
wsType: WalletWsType,
tx: Transaction,

View File

@ -149,13 +149,13 @@ class ChainHandler(
blockFilterCheckpoints)
createdF.map { headers =>
if (chainConfig.chainCallbacks.onBlockHeaderConnected.nonEmpty) {
if (chainConfig.callBacks.onBlockHeaderConnected.nonEmpty) {
val headersWithHeight: Vector[(Int, BlockHeader)] = {
headersToBeCreated.reverseIterator.map(h =>
(h.height, h.blockHeader))
}.toVector
chainConfig.chainCallbacks
chainConfig.callBacks
.executeOnBlockHeaderConnectedCallbacks(logger, headersWithHeight)
}
chains.foreach { c =>

View File

@ -6,6 +6,7 @@ import org.bitcoins.chain.db.ChainDbManagement
import org.bitcoins.chain.models.BlockHeaderDAO
import org.bitcoins.chain.pow.Pow
import org.bitcoins.commons.config.{AppConfigFactory, ConfigOps}
import org.bitcoins.core.api.CallbackConfig
import org.bitcoins.core.api.chain.db.BlockHeaderDbHelper
import org.bitcoins.core.util.Mutable
import org.bitcoins.db._
@ -21,7 +22,8 @@ case class ChainAppConfig(baseDatadir: Path, configOverrides: Vector[Config])(
implicit override val ec: ExecutionContext)
extends DbAppConfig
with ChainDbManagement
with JdbcProfileComponent[ChainAppConfig] {
with JdbcProfileComponent[ChainAppConfig]
with CallbackConfig[ChainCallbacks] {
override protected[bitcoins] def moduleName: String =
ChainAppConfig.moduleName
@ -35,9 +37,9 @@ case class ChainAppConfig(baseDatadir: Path, configOverrides: Vector[Config])(
private val callbacks = new Mutable(ChainCallbacks.empty)
def chainCallbacks: ChainCallbacks = callbacks.atomicGet
override def callBacks: ChainCallbacks = callbacks.atomicGet
def addCallbacks(newCallbacks: ChainCallbacks): ChainCallbacks = {
override def addCallbacks(newCallbacks: ChainCallbacks): ChainCallbacks = {
callbacks.atomicUpdate(newCallbacks)(_ + _)
}

View File

@ -0,0 +1,7 @@
package org.bitcoins.core.api
trait CallbackConfig[T] {
def addCallbacks(newCallbacks: T): T
def callBacks: T
}

View File

@ -49,7 +49,7 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
def controlMessageHandler: ControlMessageHandler
def nodeCallbacks: NodeCallbacks = nodeAppConfig.nodeCallbacks
def nodeCallbacks: NodeCallbacks = nodeAppConfig.callBacks
lazy val txDAO: BroadcastAbleTransactionDAO = BroadcastAbleTransactionDAO()

View File

@ -9,6 +9,7 @@ import org.bitcoins.chain.models.{
CompactFilterDAO,
CompactFilterHeaderDAO
}
import org.bitcoins.core.api.CallbackConfig
import org.bitcoins.core.api.node.NodeType
import org.bitcoins.core.config.{MainNet, RegTest, SigNet, TestNet3}
import org.bitcoins.core.util.{Mutable, TimeUtil}
@ -35,7 +36,8 @@ case class NodeAppConfig(baseDatadir: Path, configOverrides: Vector[Config])(
implicit val system: ActorSystem)
extends DbAppConfig
with NodeDbManagement
with JdbcProfileComponent[NodeAppConfig] {
with JdbcProfileComponent[NodeAppConfig]
with CallbackConfig[NodeCallbacks] {
override protected[bitcoins] def moduleName: String = NodeAppConfig.moduleName
override protected[bitcoins] type ConfigType = NodeAppConfig
@ -49,9 +51,9 @@ case class NodeAppConfig(baseDatadir: Path, configOverrides: Vector[Config])(
private val callbacks = new Mutable(NodeCallbacks.empty)
def nodeCallbacks: NodeCallbacks = callbacks.atomicGet
override def callBacks: NodeCallbacks = callbacks.atomicGet
def addCallbacks(newCallbacks: NodeCallbacks): NodeCallbacks = {
override def addCallbacks(newCallbacks: NodeCallbacks): NodeCallbacks = {
callbacks.atomicUpdate(newCallbacks)(_ + _)
}

View File

@ -135,7 +135,7 @@ case class DataMessageHandler(
for {
newChainApi <- chainApi.processFilters(filterBatch)
_ <-
appConfig.nodeCallbacks
appConfig.callBacks
.executeOnCompactFiltersReceivedCallbacks(logger,
blockFilters)
} yield (Vector.empty, newChainApi)
@ -266,7 +266,7 @@ case class DataMessageHandler(
for {
newApi <- chainApiF
newSyncing <- getHeadersF
_ <- appConfig.nodeCallbacks.executeOnBlockHeadersReceivedCallbacks(
_ <- appConfig.callBacks.executeOnBlockHeadersReceivedCallbacks(
logger,
headers)
} yield {
@ -286,7 +286,7 @@ case class DataMessageHandler(
for {
processedApi <- chainApi.processHeader(block.blockHeader)
_ <-
appConfig.nodeCallbacks
appConfig.callBacks
.executeOnBlockHeadersReceivedCallbacks(
logger,
Vector(block.blockHeader))
@ -298,13 +298,13 @@ case class DataMessageHandler(
for {
newApi <- newApiF
_ <-
appConfig.nodeCallbacks
appConfig.callBacks
.executeOnBlockReceivedCallbacks(logger, block)
} yield {
this.copy(chainApi = newApi)
}
case TransactionMessage(tx) =>
MerkleBuffers.putTx(tx, appConfig.nodeCallbacks).flatMap {
MerkleBuffers.putTx(tx, appConfig.callBacks).flatMap {
belongsToMerkle =>
if (belongsToMerkle) {
logger.trace(
@ -313,7 +313,7 @@ case class DataMessageHandler(
} else {
logger.trace(
s"Transaction=${tx.txIdBE} does not belong to merkleblock, processing given callbacks")
appConfig.nodeCallbacks
appConfig.callBacks
.executeOnTxReceivedCallbacks(logger, tx)
.map(_ => this)
}

View File

@ -0,0 +1,45 @@
package org.bitcoins.tor
import grizzled.slf4j.Logging
import org.bitcoins.core.api.{Callback, CallbackHandler}
import scala.concurrent.{ExecutionContext, Future}
trait OnTorStarted extends Callback[Unit]
trait TorCallbacks extends Logging {
def onTorStarted: CallbackHandler[Unit, OnTorStarted]
def executeOnTorStarted()(implicit ec: ExecutionContext): Future[Unit] = {
onTorStarted.execute(
(),
(err: Throwable) =>
logger.error(s"${onTorStarted.name} Callback failed with", err))
}
def +(other: TorCallbacks): TorCallbacks
}
object TorCallbacks {
private case class TorCallbacksImpl(
onTorStarted: CallbackHandler[Unit, OnTorStarted]
) extends TorCallbacks {
override def +(other: TorCallbacks): TorCallbacks = {
copy(onTorStarted = onTorStarted ++ other.onTorStarted)
}
}
val empty = apply(Vector.empty)
def apply(onTorStarted: OnTorStarted): TorCallbacks = {
apply(Vector(onTorStarted))
}
def apply(onTorStarted: Vector[OnTorStarted]): TorCallbacks = {
val handler =
CallbackHandler[Unit, OnTorStarted]("onTorStarted", onTorStarted)
TorCallbacksImpl(handler)
}
}

View File

@ -3,10 +3,11 @@ package org.bitcoins.tor.config
import com.typesafe.config.Config
import org.bitcoins.asyncutil.AsyncUtil
import org.bitcoins.commons.config.{AppConfig, AppConfigFactory, ConfigOps}
import org.bitcoins.core.util.NetworkUtil
import org.bitcoins.core.api.CallbackConfig
import org.bitcoins.core.util.{Mutable, NetworkUtil}
import org.bitcoins.tor.TorProtocolHandler.{Password, SafeCookie}
import org.bitcoins.tor.client.TorClient
import org.bitcoins.tor.{Socks5ProxyParams, TorParams}
import org.bitcoins.tor.{Socks5ProxyParams, TorCallbacks, TorParams}
import java.io.File
import java.net.{InetAddress, InetSocketAddress}
@ -23,7 +24,16 @@ case class TorAppConfig(
baseDatadir: Path,
private val subModuleNameOpt: Option[String],
configOverrides: Vector[Config])(implicit ec: ExecutionContext)
extends AppConfig {
extends AppConfig
with CallbackConfig[TorCallbacks] {
private val callbacks = new Mutable(TorCallbacks.empty)
override def callBacks: TorCallbacks = callbacks.atomicGet
override def addCallbacks(newCallbacks: TorCallbacks): TorCallbacks = {
callbacks.atomicUpdate(newCallbacks)(_ + _)
}
override protected[bitcoins] def moduleName: String = TorAppConfig.moduleName
override protected[bitcoins] type ConfigType = TorAppConfig
@ -171,6 +181,8 @@ case class TorAppConfig(
//see: https://github.com/bitcoin-s/bitcoin-s/pull/3558#issuecomment-899819698
AsyncUtil
.retryUntilSatisfied(checkIfLogExists, 1.second, 60)
//execute started callbacks
.flatMap(_ => callbacks.atomicGet.executeOnTorStarted())
.recover { case _: AsyncUtil.RpcRetryException =>
throw new RuntimeException(
s"Could not start tor, please try again in a few minutes")
@ -183,7 +195,9 @@ case class TorAppConfig(
val stream = Files.lines(torLogFile)
try {
stream
.filter((line: String) => line.contains(isBootstrappedLogLine))
.filter { line: String =>
line.contains(isBootstrappedLogLine)
}
.count() > 0
} finally if (stream != null) stream.close()
}

View File

@ -90,7 +90,7 @@ abstract class Wallet
val chainQueryApi: ChainQueryApi
val creationTime: Instant = keyManager.creationTime
def walletCallbacks: WalletCallbacks = walletConfig.walletCallbacks
def walletCallbacks: WalletCallbacks = walletConfig.callBacks
private def utxosWithMissingTx: Future[Vector[SpendingInfoDb]] = {
for {

View File

@ -3,6 +3,7 @@ package org.bitcoins.wallet.config
import com.typesafe.config.Config
import org.bitcoins.asyncutil.AsyncUtil
import org.bitcoins.commons.config.{AppConfigFactory, ConfigOps}
import org.bitcoins.core.api.CallbackConfig
import org.bitcoins.core.api.chain.ChainQueryApi
import org.bitcoins.core.api.feeprovider.FeeRateApi
import org.bitcoins.core.api.node.NodeApi
@ -37,7 +38,8 @@ case class WalletAppConfig(baseDatadir: Path, configOverrides: Vector[Config])(
extends DbAppConfig
with WalletDbManagement
with JdbcProfileComponent[WalletAppConfig]
with DBMasterXPubApi {
with DBMasterXPubApi
with CallbackConfig[WalletCallbacks] {
override protected[bitcoins] def moduleName: String =
WalletAppConfig.moduleName
@ -70,9 +72,9 @@ case class WalletAppConfig(baseDatadir: Path, configOverrides: Vector[Config])(
private val callbacks = new Mutable(WalletCallbacks.empty)
def walletCallbacks: WalletCallbacks = callbacks.atomicGet
override def callBacks: WalletCallbacks = callbacks.atomicGet
def addCallbacks(newCallbacks: WalletCallbacks): WalletCallbacks = {
override def addCallbacks(newCallbacks: WalletCallbacks): WalletCallbacks = {
callbacks.atomicUpdate(newCallbacks)(_ + _)
}

View File

@ -85,7 +85,7 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger {
hash = block.blockHeader.hashBE
height <- heightF
_ <- stateDescriptorDAO.updateSyncHeight(hash, height.get)
_ <- walletConfig.walletCallbacks.executeOnBlockProcessed(logger, block)
_ <- walletConfig.callBacks.executeOnBlockProcessed(logger, block)
} yield {
res
}