2022 01 25 issue 4014 (#4015)

* Refactor testkit tests to use same nodeCallbacks that BitcoinSServerMain uses for neutrino

* Add test case

* Add logs for callbacks

* Cleanup

* Add test case for when funds are spent when we are offline

* Turn off logging again

* Cleanup more logs and comments
This commit is contained in:
Chris Stewart 2022-01-26 13:16:15 -06:00 committed by GitHub
parent d983ad14f3
commit 98c5d816ac
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 163 additions and 91 deletions

View file

@ -21,7 +21,6 @@ import org.bitcoins.commons.util.{DatadirParser, ServerArgParser}
import org.bitcoins.core.api.chain.ChainApi import org.bitcoins.core.api.chain.ChainApi
import org.bitcoins.core.api.feeprovider.FeeRateApi import org.bitcoins.core.api.feeprovider.FeeRateApi
import org.bitcoins.core.api.node.{ import org.bitcoins.core.api.node.{
ExternalImplementationNodeType,
InternalImplementationNodeType, InternalImplementationNodeType,
NodeApi, NodeApi,
NodeType NodeType
@ -42,6 +41,7 @@ import org.bitcoins.rpc.config.{BitcoindRpcAppConfig, ZmqConfig}
import org.bitcoins.server.routes.{BitcoinSServerRunner, CommonRoutes, Server} import org.bitcoins.server.routes.{BitcoinSServerRunner, CommonRoutes, Server}
import org.bitcoins.server.util.{ import org.bitcoins.server.util.{
BitcoinSAppScalaDaemon, BitcoinSAppScalaDaemon,
CallbackUtil,
ServerBindings, ServerBindings,
WebsocketUtil, WebsocketUtil,
WsServerConfig WsServerConfig
@ -130,7 +130,7 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
chainApi <- chainApiF chainApi <- chainApiF
_ = logger.info("Initialized chain api") _ = logger.info("Initialized chain api")
wallet <- dlcConf.createDLCWallet(node, chainApi, feeProvider) wallet <- dlcConf.createDLCWallet(node, chainApi, feeProvider)
nodeCallbacks <- createCallbacks(wallet) nodeCallbacks <- CallbackUtil.createNeutrinoNodeCallbacksForWallet(wallet)
_ = nodeConf.addCallbacks(nodeCallbacks) _ = nodeConf.addCallbacks(nodeCallbacks)
} yield { } yield {
logger.info( logger.info(
@ -287,48 +287,6 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
} }
} }
private def createCallbacks(wallet: Wallet)(implicit
nodeConf: NodeAppConfig,
ec: ExecutionContext): Future[NodeCallbacks] = {
lazy val onTx: OnTxReceived = { tx =>
logger.debug(s"Receiving transaction txid=${tx.txIdBE.hex} as a callback")
wallet.processTransaction(tx, blockHashOpt = None).map(_ => ())
}
lazy val onCompactFilters: OnCompactFiltersReceived = { blockFilters =>
wallet
.processCompactFilters(blockFilters = blockFilters)
.map(_ => ())
}
lazy val onBlock: OnBlockReceived = { block =>
wallet.processBlock(block).map(_ => ())
}
lazy val onHeaders: OnBlockHeadersReceived = { headers =>
if (headers.isEmpty) {
Future.unit
} else {
wallet.updateUtxoPendingStates().map(_ => ())
}
}
nodeConf.nodeType match {
case NodeType.SpvNode =>
Future.successful(
NodeCallbacks(onTxReceived = Vector(onTx),
onBlockHeadersReceived = Vector(onHeaders)))
case NodeType.NeutrinoNode =>
Future.successful(
NodeCallbacks(onTxReceived = Vector(onTx),
onBlockReceived = Vector(onBlock),
onCompactFiltersReceived = Vector(onCompactFilters),
onBlockHeadersReceived = Vector(onHeaders)))
case NodeType.FullNode =>
Future.failed(new RuntimeException("Not yet implemented"))
case _: ExternalImplementationNodeType =>
Future.failed(
new RuntimeException(
"Cannot create callbacks for an external implementation"))
}
}
private def setBloomFilter(node: Node, wallet: Wallet)(implicit private def setBloomFilter(node: Node, wallet: Wallet)(implicit
ec: ExecutionContext): Future[Node] = { ec: ExecutionContext): Future[Node] = {
for { for {

View file

@ -0,0 +1,66 @@
package org.bitcoins.server.util
import grizzled.slf4j.Logging
import org.bitcoins.core.api.node.{ExternalImplementationNodeType, NodeType}
import org.bitcoins.node.{
NodeCallbacks,
OnBlockHeadersReceived,
OnBlockReceived,
OnCompactFiltersReceived,
OnTxReceived
}
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.wallet.Wallet
import scala.concurrent.{ExecutionContext, Future}
object CallbackUtil extends Logging {
def createNeutrinoNodeCallbacksForWallet(wallet: Wallet)(implicit
nodeConf: NodeAppConfig,
ec: ExecutionContext): Future[NodeCallbacks] = {
lazy val onTx: OnTxReceived = { tx =>
logger.debug(s"Receiving transaction txid=${tx.txIdBE.hex} as a callback")
wallet.processTransaction(tx, blockHashOpt = None).map(_ => ())
}
lazy val onCompactFilters: OnCompactFiltersReceived = { blockFilters =>
logger.debug(
s"Executing onCompactFilters callback with filter count=${blockFilters.length}")
wallet
.processCompactFilters(blockFilters = blockFilters)
.map(_ => ())
}
lazy val onBlock: OnBlockReceived = { block =>
logger.debug(
s"Executing onBlock callback=${block.blockHeader.hashBE.hex}")
wallet.processBlock(block).map(_ => ())
}
lazy val onHeaders: OnBlockHeadersReceived = { headers =>
logger.debug(
s"Executing block header with header count=${headers.length}")
if (headers.isEmpty) {
Future.unit
} else {
wallet.updateUtxoPendingStates().map(_ => ())
}
}
nodeConf.nodeType match {
case NodeType.SpvNode =>
Future.successful(
NodeCallbacks(onTxReceived = Vector(onTx),
onBlockHeadersReceived = Vector(onHeaders)))
case NodeType.NeutrinoNode =>
Future.successful(
NodeCallbacks(onTxReceived = Vector(onTx),
onBlockReceived = Vector(onBlock),
onCompactFiltersReceived = Vector(onCompactFilters),
onBlockHeadersReceived = Vector(onHeaders)))
case NodeType.FullNode =>
Future.failed(new RuntimeException("Not yet implemented"))
case _: ExternalImplementationNodeType =>
Future.failed(
new RuntimeException(
"Cannot create callbacks for an external implementation"))
}
}
}

View file

@ -210,4 +210,71 @@ class NeutrinoNodeWithWalletTest extends NodeTestWithCachedBitcoindNewest {
_ <- AsyncUtil.awaitConditionF(condition) _ <- AsyncUtil.awaitConditionF(condition)
} yield succeed } yield succeed
} }
it must "receive funds while the node is offline when we restart" in {
param =>
val NeutrinoNodeFundedWalletBitcoind(node, wallet, bitcoind, _) = param
val initBalanceF = wallet.getBalance()
val receivedAddrF = wallet.getNewAddress()
val bitcoindAddrF = bitcoind.getNewAddress
val sendAmt = Bitcoins.one
//stop the node to take us offline
val stopF = node.stop()
for {
initBalance <- initBalanceF
receiveAddr <- receivedAddrF
bitcoindAddr <- bitcoindAddrF
stoppedNode <- stopF
//send money and generate a block to confirm the funds while we are offline
_ <- bitcoind.sendToAddress(receiveAddr, sendAmt)
//generate a block to confirm the tx
_ <- bitcoind.generateToAddress(1, bitcoindAddr)
//restart the node now that we have received funds
startedNode <- stoppedNode.start()
_ <- startedNode.sync()
_ <- NodeTestUtil.awaitCompactFiltersSync(node = node, rpc = bitcoind)
_ <- AsyncUtil.retryUntilSatisfiedF(() => {
for {
balance <- wallet.getBalance()
} yield {
balance == initBalance + sendAmt
}
})
balance <- wallet.getBalance()
} yield {
assert(balance > initBalance)
}
}
it must "recognize funds were spent while we were offline" in { param =>
//useful test for the case where we are in a DLC
//and the counterparty broadcasts the funding tx or a CET
val NeutrinoNodeFundedWalletBitcoind(node, wallet, bitcoind, _) = param
val initBalanceF = wallet.getBalance()
val bitcoindAddrF = bitcoind.getNewAddress
val sendAmt = Bitcoins.one
//stop the node to take us offline
val stopF = node.stop()
for {
initBalance <- initBalanceF
bitcoindAddr <- bitcoindAddrF
stoppedNode <- stopF
//create a transaction that spends to bitcoind with our wallet
tx <- wallet.sendToAddress(bitcoindAddr, sendAmt, SatoshisPerByte.one)
//broadcast tx
_ <- bitcoind.sendRawTransaction(tx)
_ <- bitcoind.generateToAddress(6, bitcoindAddr)
//bring node back online
startedNode <- stoppedNode.start()
_ <- startedNode.sync()
_ <- NodeTestUtil.awaitCompactFiltersSync(node, bitcoind)
balanceAfterSpend <- wallet.getBalance()
} yield {
assert(balanceAfterSpend < initBalance)
}
}
} }

View file

@ -118,13 +118,7 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
logger.info("Starting node") logger.info("Starting node")
val start = System.currentTimeMillis() val start = System.currentTimeMillis()
val startConfsF = for { val chainApiF = chainApiFromDb()
_ <- chainAppConfig.start()
_ <- nodeAppConfig.start()
} yield ()
val chainApiF = startConfsF.flatMap(_ => chainApiFromDb())
val startNodeF = for { val startNodeF = for {
peers <- peerManager.getPeers peers <- peerManager.getPeers
_ = peers.foreach(peerManager.addPeer) _ = peers.foreach(peerManager.addPeer)
@ -141,10 +135,7 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
val filterCountF = chainApiF.flatMap(_.getFilterCount()) val filterCountF = chainApiF.flatMap(_.getFilterCount())
for { for {
_ <- startConfsF
node <- startNodeF node <- startNodeF
_ = logger.trace("Fetching node starting point")
bestHash <- bestHashF bestHash <- bestHashF
bestHeight <- bestHeightF bestHeight <- bestHeightF
filterHeaderCount <- filterHeaderCountF filterHeaderCount <- filterHeaderCountF
@ -164,7 +155,6 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
val disconnectF = for { val disconnectF = for {
disconnect <- Future.sequence(disconnectFs) disconnect <- Future.sequence(disconnectFs)
_ <- nodeAppConfig.stop()
} yield disconnect } yield disconnect
def isAllDisconnectedF: Future[Boolean] = { def isAllDisconnectedF: Future[Boolean] = {
@ -180,11 +170,17 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
AsyncUtil.retryUntilSatisfiedF(() => isAllDisconnectedF, 500.millis) AsyncUtil.retryUntilSatisfiedF(() => isAllDisconnectedF, 500.millis)
} }
isStoppedF.failed.foreach { e => val peers = peerManager.peers
val removedPeersF = for {
_ <- isStoppedF
_ <- Future.sequence(peers.map(peerManager.removePeer))
} yield ()
removedPeersF.failed.foreach { e =>
logger.warn(s"Cannot stop node", e) logger.warn(s"Cannot stop node", e)
} }
isStoppedF.map { _ => removedPeersF.map { _ =>
logger.info( logger.info(
s"Node stopped! It took=${System.currentTimeMillis() - start}ms") s"Node stopped! It took=${System.currentTimeMillis() - start}ms")
this this

View file

@ -147,7 +147,7 @@ case class PeerManager(node: Node, configPeers: Vector[Peer] = Vector.empty)(
} }
for { for {
_ <- disconnectF _ <- disconnectF
_ <- removePeer(peer) _ = _peerData.remove(peer)
} yield () } yield ()
} else { } else {
logger.debug(s"Key $peer not found in peerData") logger.debug(s"Key $peer not found in peerData")

View file

@ -66,7 +66,7 @@ case class DataMessageHandler(
this.copy(chainApi = newChainApi) this.copy(chainApi = newChainApi)
} }
case filterHeader: CompactFilterHeadersMessage => case filterHeader: CompactFilterHeadersMessage =>
logger.info( logger.debug(
s"Got ${filterHeader.filterHashes.size} compact filter header hashes") s"Got ${filterHeader.filterHashes.size} compact filter header hashes")
val filterHeaders = filterHeader.filterHeaders val filterHeaders = filterHeader.filterHeaders
for { for {
@ -75,7 +75,7 @@ case class DataMessageHandler(
filterHeader.stopHash.flip) filterHeader.stopHash.flip)
newSyncing <- newSyncing <-
if (filterHeaders.size == chainConfig.filterHeaderBatchSize) { if (filterHeaders.size == chainConfig.filterHeaderBatchSize) {
logger.info( logger.debug(
s"Received maximum amount of filter headers in one header message. This means we are not synced, requesting more") s"Received maximum amount of filter headers in one header message. This means we are not synced, requesting more")
sendNextGetCompactFilterHeadersCommand( sendNextGetCompactFilterHeadersCommand(
peerWithCompactFilters, peerWithCompactFilters,

View file

@ -51,16 +51,16 @@
<!-- ╚═════════════════╝ --> <!-- ╚═════════════════╝ -->
<!-- See incoming message names and the peer it's sent from --> <!-- See incoming message names and the peer it's sent from -->
<logger name="org.bitcoins.node.networking.peer.PeerMessageReceiver" level="WARN"/> <logger name="org.bitcoins.node.networking.peer.PeerMessageReceiver" level="INFO"/>
<!-- See outgoing message names and the peer it's sent to --> <!-- See outgoing message names and the peer it's sent to -->
<logger name="org.bitcoins.node.networking.peer.PeerMessageSender" level="WARN"/> <logger name="org.bitcoins.node.networking.peer.PeerMessageSender" level="INFO"/>
<!-- Inspect handling of headers and inventory messages --> <!-- Inspect handling of headers and inventory messages -->
<logger name="org.bitcoins.node.networking.peer.DataMessageHandler" level="WARN"/> <logger name="org.bitcoins.node.networking.peer.DataMessageHandler" level="INFO"/>
<!-- inspect TCP details --> <!-- inspect TCP details -->
<logger name="org.bitcoins.node.networking.P2PClientActor" level="WARN"/> <logger name="org.bitcoins.node.networking.P2PClientActor" level="INFO"/>
<!-- ╔════════════════════╗ --> <!-- ╔════════════════════╗ -->
<!-- ║ Chain module ║ --> <!-- ║ Chain module ║ -->

View file

@ -15,15 +15,12 @@ import org.bitcoins.core.util.FutureUtil
import org.bitcoins.core.wallet.fee._ import org.bitcoins.core.wallet.fee._
import org.bitcoins.crypto.{DoubleSha256Digest, DoubleSha256DigestBE} import org.bitcoins.crypto.{DoubleSha256Digest, DoubleSha256DigestBE}
import org.bitcoins.dlc.wallet.{DLCAppConfig, DLCWallet} import org.bitcoins.dlc.wallet.{DLCAppConfig, DLCWallet}
import org.bitcoins.node.{ import org.bitcoins.node.config.NodeAppConfig
NodeCallbacks, import org.bitcoins.node.{NodeCallbacks, OnMerkleBlockReceived}
OnBlockReceived,
OnCompactFiltersReceived,
OnMerkleBlockReceived
}
import org.bitcoins.rpc.client.common.{BitcoindRpcClient, BitcoindVersion} import org.bitcoins.rpc.client.common.{BitcoindRpcClient, BitcoindVersion}
import org.bitcoins.rpc.client.v19.BitcoindV19RpcClient import org.bitcoins.rpc.client.v19.BitcoindV19RpcClient
import org.bitcoins.server.BitcoinSAppConfig import org.bitcoins.server.BitcoinSAppConfig
import org.bitcoins.server.util.CallbackUtil
import org.bitcoins.testkit.EmbeddedPg import org.bitcoins.testkit.EmbeddedPg
import org.bitcoins.testkit.chain.SyncUtil import org.bitcoins.testkit.chain.SyncUtil
import org.bitcoins.testkit.fixtures.BitcoinSFixture import org.bitcoins.testkit.fixtures.BitcoinSFixture
@ -647,8 +644,10 @@ object BitcoinSWalletTest extends WalletLogger {
chainQueryApi = chainQueryApi, chainQueryApi = chainQueryApi,
bip39PasswordOpt = bip39PasswordOpt)(config.walletConf, system) bip39PasswordOpt = bip39PasswordOpt)(config.walletConf, system)
//add callbacks for wallet //add callbacks for wallet
nodeCallbacks = nodeCallbacks <-
BitcoinSWalletTest.createNeutrinoNodeCallbacksForWallet(wallet) BitcoinSWalletTest.createNeutrinoNodeCallbacksForWallet(wallet)(
config.nodeConf,
system.dispatcher)
_ = config.nodeConf.addCallbacks(nodeCallbacks) _ = config.nodeConf.addCallbacks(nodeCallbacks)
withBitcoind <- createWalletWithBitcoind(wallet, bitcoindRpcClient) withBitcoind <- createWalletWithBitcoind(wallet, bitcoindRpcClient)
funded <- FundWalletUtil.fundWalletWithBitcoind(withBitcoind) funded <- FundWalletUtil.fundWalletWithBitcoind(withBitcoind)
@ -692,22 +691,9 @@ object BitcoinSWalletTest extends WalletLogger {
/** Constructs callbacks for the wallet from the node to process blocks and compact filters */ /** Constructs callbacks for the wallet from the node to process blocks and compact filters */
def createNeutrinoNodeCallbacksForWallet(wallet: Wallet)(implicit def createNeutrinoNodeCallbacksForWallet(wallet: Wallet)(implicit
ec: ExecutionContext): NodeCallbacks = { nodeAppConfig: NodeAppConfig,
val onBlock: OnBlockReceived = { block => ec: ExecutionContext): Future[NodeCallbacks] = {
for { CallbackUtil.createNeutrinoNodeCallbacksForWallet(wallet)
_ <- wallet.processBlock(block)
} yield ()
}
val onCompactFilters: OnCompactFiltersReceived = { blockFilters =>
for {
_ <- wallet.processCompactFilters(blockFilters)
} yield ()
}
NodeCallbacks(
onBlockReceived = Vector(onBlock),
onCompactFiltersReceived = Vector(onCompactFilters)
)
} }
/** Registers a callback to handle merkle blocks given to us by a spv node */ /** Registers a callback to handle merkle blocks given to us by a spv node */

View file

@ -277,7 +277,7 @@ private[wallet] trait UtxoHandling extends WalletLogger {
} yield { } yield {
val writtenOut = written.outPoint val writtenOut = written.outPoint
logger.info( logger.info(
s"Successfully inserted UTXO ${writtenOut.txIdBE.hex}:${writtenOut.vout.toInt} into DB") s"Successfully inserted UTXO ${writtenOut.txIdBE.hex}:${writtenOut.vout.toInt} amt=${output.value} into DB")
logger.debug(s"UTXO details: ${written.output}") logger.debug(s"UTXO details: ${written.output}")
written written
} }
@ -297,8 +297,7 @@ private[wallet] trait UtxoHandling extends WalletLogger {
} else { } else {
val output = transaction.outputs(vout.toInt) val output = transaction.outputs(vout.toInt)
val outPoint = TransactionOutPoint(transaction.txId, vout) val outPoint = TransactionOutPoint(transaction.txId, vout)
logger.info(
s"Adding UTXO to wallet: ${transaction.txIdBE.hex}:${vout.toInt} amt=${output.value}")
// insert the UTXO into the DB // insert the UTXO into the DB
val insertedUtxoEF: Either[AddUtxoError, Future[SpendingInfoDb]] = for { val insertedUtxoEF: Either[AddUtxoError, Future[SpendingInfoDb]] = for {
addressDb <- addressDbE addressDb <- addressDbE