From 98c5d816ac9fac10da020394377d060f58110cd7 Mon Sep 17 00:00:00 2001 From: Chris Stewart Date: Wed, 26 Jan 2022 13:16:15 -0600 Subject: [PATCH] 2022 01 25 issue 4014 (#4015) * Refactor testkit tests to use same nodeCallbacks that BitcoinSServerMain uses for neutrino * Add test case * Add logs for callbacks * Cleanup * Add test case for when funds are spent when we are offline * Turn off logging again * Cleanup more logs and comments --- .../bitcoins/server/BitcoinSServerMain.scala | 46 +------------ .../bitcoins/server/util/CallbackUtil.scala | 66 ++++++++++++++++++ .../node/NeutrinoNodeWithWalletTest.scala | 67 +++++++++++++++++++ .../main/scala/org/bitcoins/node/Node.scala | 22 +++--- .../scala/org/bitcoins/node/PeerManager.scala | 2 +- .../networking/peer/DataMessageHandler.scala | 4 +- .../.jvm/src/main/resources/logback-test.xml | 8 +-- .../testkit/wallet/BitcoinSWalletTest.scala | 34 +++------- .../wallet/internal/UtxoHandling.scala | 5 +- 9 files changed, 163 insertions(+), 91 deletions(-) create mode 100644 app/server/src/main/scala/org/bitcoins/server/util/CallbackUtil.scala diff --git a/app/server/src/main/scala/org/bitcoins/server/BitcoinSServerMain.scala b/app/server/src/main/scala/org/bitcoins/server/BitcoinSServerMain.scala index 5578dd5ef0..1db1eb1309 100644 --- a/app/server/src/main/scala/org/bitcoins/server/BitcoinSServerMain.scala +++ b/app/server/src/main/scala/org/bitcoins/server/BitcoinSServerMain.scala @@ -21,7 +21,6 @@ import org.bitcoins.commons.util.{DatadirParser, ServerArgParser} import org.bitcoins.core.api.chain.ChainApi import org.bitcoins.core.api.feeprovider.FeeRateApi import org.bitcoins.core.api.node.{ - ExternalImplementationNodeType, InternalImplementationNodeType, NodeApi, NodeType @@ -42,6 +41,7 @@ import org.bitcoins.rpc.config.{BitcoindRpcAppConfig, ZmqConfig} import org.bitcoins.server.routes.{BitcoinSServerRunner, CommonRoutes, Server} import org.bitcoins.server.util.{ BitcoinSAppScalaDaemon, + CallbackUtil, ServerBindings, WebsocketUtil, WsServerConfig @@ -130,7 +130,7 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit chainApi <- chainApiF _ = logger.info("Initialized chain api") wallet <- dlcConf.createDLCWallet(node, chainApi, feeProvider) - nodeCallbacks <- createCallbacks(wallet) + nodeCallbacks <- CallbackUtil.createNeutrinoNodeCallbacksForWallet(wallet) _ = nodeConf.addCallbacks(nodeCallbacks) } yield { logger.info( @@ -287,48 +287,6 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit } } - private def createCallbacks(wallet: Wallet)(implicit - nodeConf: NodeAppConfig, - ec: ExecutionContext): Future[NodeCallbacks] = { - lazy val onTx: OnTxReceived = { tx => - logger.debug(s"Receiving transaction txid=${tx.txIdBE.hex} as a callback") - wallet.processTransaction(tx, blockHashOpt = None).map(_ => ()) - } - lazy val onCompactFilters: OnCompactFiltersReceived = { blockFilters => - wallet - .processCompactFilters(blockFilters = blockFilters) - .map(_ => ()) - } - lazy val onBlock: OnBlockReceived = { block => - wallet.processBlock(block).map(_ => ()) - } - lazy val onHeaders: OnBlockHeadersReceived = { headers => - if (headers.isEmpty) { - Future.unit - } else { - wallet.updateUtxoPendingStates().map(_ => ()) - } - } - nodeConf.nodeType match { - case NodeType.SpvNode => - Future.successful( - NodeCallbacks(onTxReceived = Vector(onTx), - onBlockHeadersReceived = Vector(onHeaders))) - case NodeType.NeutrinoNode => - Future.successful( - NodeCallbacks(onTxReceived = Vector(onTx), - onBlockReceived = Vector(onBlock), - onCompactFiltersReceived = Vector(onCompactFilters), - onBlockHeadersReceived = Vector(onHeaders))) - case NodeType.FullNode => - Future.failed(new RuntimeException("Not yet implemented")) - case _: ExternalImplementationNodeType => - Future.failed( - new RuntimeException( - "Cannot create callbacks for an external implementation")) - } - } - private def setBloomFilter(node: Node, wallet: Wallet)(implicit ec: ExecutionContext): Future[Node] = { for { diff --git a/app/server/src/main/scala/org/bitcoins/server/util/CallbackUtil.scala b/app/server/src/main/scala/org/bitcoins/server/util/CallbackUtil.scala new file mode 100644 index 0000000000..a93ee0a393 --- /dev/null +++ b/app/server/src/main/scala/org/bitcoins/server/util/CallbackUtil.scala @@ -0,0 +1,66 @@ +package org.bitcoins.server.util + +import grizzled.slf4j.Logging +import org.bitcoins.core.api.node.{ExternalImplementationNodeType, NodeType} +import org.bitcoins.node.{ + NodeCallbacks, + OnBlockHeadersReceived, + OnBlockReceived, + OnCompactFiltersReceived, + OnTxReceived +} +import org.bitcoins.node.config.NodeAppConfig +import org.bitcoins.wallet.Wallet + +import scala.concurrent.{ExecutionContext, Future} + +object CallbackUtil extends Logging { + + def createNeutrinoNodeCallbacksForWallet(wallet: Wallet)(implicit + nodeConf: NodeAppConfig, + ec: ExecutionContext): Future[NodeCallbacks] = { + lazy val onTx: OnTxReceived = { tx => + logger.debug(s"Receiving transaction txid=${tx.txIdBE.hex} as a callback") + wallet.processTransaction(tx, blockHashOpt = None).map(_ => ()) + } + lazy val onCompactFilters: OnCompactFiltersReceived = { blockFilters => + logger.debug( + s"Executing onCompactFilters callback with filter count=${blockFilters.length}") + wallet + .processCompactFilters(blockFilters = blockFilters) + .map(_ => ()) + } + lazy val onBlock: OnBlockReceived = { block => + logger.debug( + s"Executing onBlock callback=${block.blockHeader.hashBE.hex}") + wallet.processBlock(block).map(_ => ()) + } + lazy val onHeaders: OnBlockHeadersReceived = { headers => + logger.debug( + s"Executing block header with header count=${headers.length}") + if (headers.isEmpty) { + Future.unit + } else { + wallet.updateUtxoPendingStates().map(_ => ()) + } + } + nodeConf.nodeType match { + case NodeType.SpvNode => + Future.successful( + NodeCallbacks(onTxReceived = Vector(onTx), + onBlockHeadersReceived = Vector(onHeaders))) + case NodeType.NeutrinoNode => + Future.successful( + NodeCallbacks(onTxReceived = Vector(onTx), + onBlockReceived = Vector(onBlock), + onCompactFiltersReceived = Vector(onCompactFilters), + onBlockHeadersReceived = Vector(onHeaders))) + case NodeType.FullNode => + Future.failed(new RuntimeException("Not yet implemented")) + case _: ExternalImplementationNodeType => + Future.failed( + new RuntimeException( + "Cannot create callbacks for an external implementation")) + } + } +} diff --git a/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeWithWalletTest.scala b/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeWithWalletTest.scala index 37fd35d065..84b52e62a4 100644 --- a/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeWithWalletTest.scala +++ b/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeWithWalletTest.scala @@ -210,4 +210,71 @@ class NeutrinoNodeWithWalletTest extends NodeTestWithCachedBitcoindNewest { _ <- AsyncUtil.awaitConditionF(condition) } yield succeed } + + it must "receive funds while the node is offline when we restart" in { + param => + val NeutrinoNodeFundedWalletBitcoind(node, wallet, bitcoind, _) = param + + val initBalanceF = wallet.getBalance() + val receivedAddrF = wallet.getNewAddress() + val bitcoindAddrF = bitcoind.getNewAddress + val sendAmt = Bitcoins.one + //stop the node to take us offline + val stopF = node.stop() + for { + initBalance <- initBalanceF + receiveAddr <- receivedAddrF + bitcoindAddr <- bitcoindAddrF + stoppedNode <- stopF + //send money and generate a block to confirm the funds while we are offline + _ <- bitcoind.sendToAddress(receiveAddr, sendAmt) + //generate a block to confirm the tx + _ <- bitcoind.generateToAddress(1, bitcoindAddr) + //restart the node now that we have received funds + startedNode <- stoppedNode.start() + _ <- startedNode.sync() + _ <- NodeTestUtil.awaitCompactFiltersSync(node = node, rpc = bitcoind) + _ <- AsyncUtil.retryUntilSatisfiedF(() => { + for { + balance <- wallet.getBalance() + } yield { + balance == initBalance + sendAmt + } + }) + balance <- wallet.getBalance() + } yield { + assert(balance > initBalance) + } + } + + it must "recognize funds were spent while we were offline" in { param => + //useful test for the case where we are in a DLC + //and the counterparty broadcasts the funding tx or a CET + val NeutrinoNodeFundedWalletBitcoind(node, wallet, bitcoind, _) = param + val initBalanceF = wallet.getBalance() + val bitcoindAddrF = bitcoind.getNewAddress + val sendAmt = Bitcoins.one + + //stop the node to take us offline + val stopF = node.stop() + for { + initBalance <- initBalanceF + bitcoindAddr <- bitcoindAddrF + stoppedNode <- stopF + + //create a transaction that spends to bitcoind with our wallet + tx <- wallet.sendToAddress(bitcoindAddr, sendAmt, SatoshisPerByte.one) + //broadcast tx + _ <- bitcoind.sendRawTransaction(tx) + _ <- bitcoind.generateToAddress(6, bitcoindAddr) + + //bring node back online + startedNode <- stoppedNode.start() + _ <- startedNode.sync() + _ <- NodeTestUtil.awaitCompactFiltersSync(node, bitcoind) + balanceAfterSpend <- wallet.getBalance() + } yield { + assert(balanceAfterSpend < initBalance) + } + } } diff --git a/node/src/main/scala/org/bitcoins/node/Node.scala b/node/src/main/scala/org/bitcoins/node/Node.scala index 23f18714dd..192fd0260c 100644 --- a/node/src/main/scala/org/bitcoins/node/Node.scala +++ b/node/src/main/scala/org/bitcoins/node/Node.scala @@ -118,13 +118,7 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger { logger.info("Starting node") val start = System.currentTimeMillis() - val startConfsF = for { - _ <- chainAppConfig.start() - _ <- nodeAppConfig.start() - } yield () - - val chainApiF = startConfsF.flatMap(_ => chainApiFromDb()) - + val chainApiF = chainApiFromDb() val startNodeF = for { peers <- peerManager.getPeers _ = peers.foreach(peerManager.addPeer) @@ -141,10 +135,7 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger { val filterCountF = chainApiF.flatMap(_.getFilterCount()) for { - _ <- startConfsF node <- startNodeF - - _ = logger.trace("Fetching node starting point") bestHash <- bestHashF bestHeight <- bestHeightF filterHeaderCount <- filterHeaderCountF @@ -164,7 +155,6 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger { val disconnectF = for { disconnect <- Future.sequence(disconnectFs) - _ <- nodeAppConfig.stop() } yield disconnect def isAllDisconnectedF: Future[Boolean] = { @@ -180,11 +170,17 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger { AsyncUtil.retryUntilSatisfiedF(() => isAllDisconnectedF, 500.millis) } - isStoppedF.failed.foreach { e => + val peers = peerManager.peers + val removedPeersF = for { + _ <- isStoppedF + _ <- Future.sequence(peers.map(peerManager.removePeer)) + } yield () + + removedPeersF.failed.foreach { e => logger.warn(s"Cannot stop node", e) } - isStoppedF.map { _ => + removedPeersF.map { _ => logger.info( s"Node stopped! It took=${System.currentTimeMillis() - start}ms") this diff --git a/node/src/main/scala/org/bitcoins/node/PeerManager.scala b/node/src/main/scala/org/bitcoins/node/PeerManager.scala index 9be628789f..acf626bf14 100644 --- a/node/src/main/scala/org/bitcoins/node/PeerManager.scala +++ b/node/src/main/scala/org/bitcoins/node/PeerManager.scala @@ -147,7 +147,7 @@ case class PeerManager(node: Node, configPeers: Vector[Peer] = Vector.empty)( } for { _ <- disconnectF - _ <- removePeer(peer) + _ = _peerData.remove(peer) } yield () } else { logger.debug(s"Key $peer not found in peerData") diff --git a/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala b/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala index 897eceab6d..78cf41c511 100644 --- a/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala +++ b/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala @@ -66,7 +66,7 @@ case class DataMessageHandler( this.copy(chainApi = newChainApi) } case filterHeader: CompactFilterHeadersMessage => - logger.info( + logger.debug( s"Got ${filterHeader.filterHashes.size} compact filter header hashes") val filterHeaders = filterHeader.filterHeaders for { @@ -75,7 +75,7 @@ case class DataMessageHandler( filterHeader.stopHash.flip) newSyncing <- if (filterHeaders.size == chainConfig.filterHeaderBatchSize) { - logger.info( + logger.debug( s"Received maximum amount of filter headers in one header message. This means we are not synced, requesting more") sendNextGetCompactFilterHeadersCommand( peerWithCompactFilters, diff --git a/testkit-core/.jvm/src/main/resources/logback-test.xml b/testkit-core/.jvm/src/main/resources/logback-test.xml index 06fbbf99e0..b3d0310cad 100644 --- a/testkit-core/.jvm/src/main/resources/logback-test.xml +++ b/testkit-core/.jvm/src/main/resources/logback-test.xml @@ -51,16 +51,16 @@ - + - + - + - + diff --git a/testkit/src/main/scala/org/bitcoins/testkit/wallet/BitcoinSWalletTest.scala b/testkit/src/main/scala/org/bitcoins/testkit/wallet/BitcoinSWalletTest.scala index ca635e550c..4f554ec5a7 100644 --- a/testkit/src/main/scala/org/bitcoins/testkit/wallet/BitcoinSWalletTest.scala +++ b/testkit/src/main/scala/org/bitcoins/testkit/wallet/BitcoinSWalletTest.scala @@ -15,15 +15,12 @@ import org.bitcoins.core.util.FutureUtil import org.bitcoins.core.wallet.fee._ import org.bitcoins.crypto.{DoubleSha256Digest, DoubleSha256DigestBE} import org.bitcoins.dlc.wallet.{DLCAppConfig, DLCWallet} -import org.bitcoins.node.{ - NodeCallbacks, - OnBlockReceived, - OnCompactFiltersReceived, - OnMerkleBlockReceived -} +import org.bitcoins.node.config.NodeAppConfig +import org.bitcoins.node.{NodeCallbacks, OnMerkleBlockReceived} import org.bitcoins.rpc.client.common.{BitcoindRpcClient, BitcoindVersion} import org.bitcoins.rpc.client.v19.BitcoindV19RpcClient import org.bitcoins.server.BitcoinSAppConfig +import org.bitcoins.server.util.CallbackUtil import org.bitcoins.testkit.EmbeddedPg import org.bitcoins.testkit.chain.SyncUtil import org.bitcoins.testkit.fixtures.BitcoinSFixture @@ -647,8 +644,10 @@ object BitcoinSWalletTest extends WalletLogger { chainQueryApi = chainQueryApi, bip39PasswordOpt = bip39PasswordOpt)(config.walletConf, system) //add callbacks for wallet - nodeCallbacks = - BitcoinSWalletTest.createNeutrinoNodeCallbacksForWallet(wallet) + nodeCallbacks <- + BitcoinSWalletTest.createNeutrinoNodeCallbacksForWallet(wallet)( + config.nodeConf, + system.dispatcher) _ = config.nodeConf.addCallbacks(nodeCallbacks) withBitcoind <- createWalletWithBitcoind(wallet, bitcoindRpcClient) funded <- FundWalletUtil.fundWalletWithBitcoind(withBitcoind) @@ -692,22 +691,9 @@ object BitcoinSWalletTest extends WalletLogger { /** Constructs callbacks for the wallet from the node to process blocks and compact filters */ def createNeutrinoNodeCallbacksForWallet(wallet: Wallet)(implicit - ec: ExecutionContext): NodeCallbacks = { - val onBlock: OnBlockReceived = { block => - for { - _ <- wallet.processBlock(block) - } yield () - } - val onCompactFilters: OnCompactFiltersReceived = { blockFilters => - for { - _ <- wallet.processCompactFilters(blockFilters) - } yield () - } - - NodeCallbacks( - onBlockReceived = Vector(onBlock), - onCompactFiltersReceived = Vector(onCompactFilters) - ) + nodeAppConfig: NodeAppConfig, + ec: ExecutionContext): Future[NodeCallbacks] = { + CallbackUtil.createNeutrinoNodeCallbacksForWallet(wallet) } /** Registers a callback to handle merkle blocks given to us by a spv node */ diff --git a/wallet/src/main/scala/org/bitcoins/wallet/internal/UtxoHandling.scala b/wallet/src/main/scala/org/bitcoins/wallet/internal/UtxoHandling.scala index b79244c04a..46460d7ae8 100644 --- a/wallet/src/main/scala/org/bitcoins/wallet/internal/UtxoHandling.scala +++ b/wallet/src/main/scala/org/bitcoins/wallet/internal/UtxoHandling.scala @@ -277,7 +277,7 @@ private[wallet] trait UtxoHandling extends WalletLogger { } yield { val writtenOut = written.outPoint logger.info( - s"Successfully inserted UTXO ${writtenOut.txIdBE.hex}:${writtenOut.vout.toInt} into DB") + s"Successfully inserted UTXO ${writtenOut.txIdBE.hex}:${writtenOut.vout.toInt} amt=${output.value} into DB") logger.debug(s"UTXO details: ${written.output}") written } @@ -297,8 +297,7 @@ private[wallet] trait UtxoHandling extends WalletLogger { } else { val output = transaction.outputs(vout.toInt) val outPoint = TransactionOutPoint(transaction.txId, vout) - logger.info( - s"Adding UTXO to wallet: ${transaction.txIdBE.hex}:${vout.toInt} amt=${output.value}") + // insert the UTXO into the DB val insertedUtxoEF: Either[AddUtxoError, Future[SpendingInfoDb]] = for { addressDb <- addressDbE