diff --git a/app/server/src/main/scala/org/bitcoins/server/BitcoinSServerMain.scala b/app/server/src/main/scala/org/bitcoins/server/BitcoinSServerMain.scala
index 5578dd5ef0..1db1eb1309 100644
--- a/app/server/src/main/scala/org/bitcoins/server/BitcoinSServerMain.scala
+++ b/app/server/src/main/scala/org/bitcoins/server/BitcoinSServerMain.scala
@@ -21,7 +21,6 @@ import org.bitcoins.commons.util.{DatadirParser, ServerArgParser}
import org.bitcoins.core.api.chain.ChainApi
import org.bitcoins.core.api.feeprovider.FeeRateApi
import org.bitcoins.core.api.node.{
- ExternalImplementationNodeType,
InternalImplementationNodeType,
NodeApi,
NodeType
@@ -42,6 +41,7 @@ import org.bitcoins.rpc.config.{BitcoindRpcAppConfig, ZmqConfig}
import org.bitcoins.server.routes.{BitcoinSServerRunner, CommonRoutes, Server}
import org.bitcoins.server.util.{
BitcoinSAppScalaDaemon,
+ CallbackUtil,
ServerBindings,
WebsocketUtil,
WsServerConfig
@@ -130,7 +130,7 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
chainApi <- chainApiF
_ = logger.info("Initialized chain api")
wallet <- dlcConf.createDLCWallet(node, chainApi, feeProvider)
- nodeCallbacks <- createCallbacks(wallet)
+ nodeCallbacks <- CallbackUtil.createNeutrinoNodeCallbacksForWallet(wallet)
_ = nodeConf.addCallbacks(nodeCallbacks)
} yield {
logger.info(
@@ -287,48 +287,6 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
}
}
- private def createCallbacks(wallet: Wallet)(implicit
- nodeConf: NodeAppConfig,
- ec: ExecutionContext): Future[NodeCallbacks] = {
- lazy val onTx: OnTxReceived = { tx =>
- logger.debug(s"Receiving transaction txid=${tx.txIdBE.hex} as a callback")
- wallet.processTransaction(tx, blockHashOpt = None).map(_ => ())
- }
- lazy val onCompactFilters: OnCompactFiltersReceived = { blockFilters =>
- wallet
- .processCompactFilters(blockFilters = blockFilters)
- .map(_ => ())
- }
- lazy val onBlock: OnBlockReceived = { block =>
- wallet.processBlock(block).map(_ => ())
- }
- lazy val onHeaders: OnBlockHeadersReceived = { headers =>
- if (headers.isEmpty) {
- Future.unit
- } else {
- wallet.updateUtxoPendingStates().map(_ => ())
- }
- }
- nodeConf.nodeType match {
- case NodeType.SpvNode =>
- Future.successful(
- NodeCallbacks(onTxReceived = Vector(onTx),
- onBlockHeadersReceived = Vector(onHeaders)))
- case NodeType.NeutrinoNode =>
- Future.successful(
- NodeCallbacks(onTxReceived = Vector(onTx),
- onBlockReceived = Vector(onBlock),
- onCompactFiltersReceived = Vector(onCompactFilters),
- onBlockHeadersReceived = Vector(onHeaders)))
- case NodeType.FullNode =>
- Future.failed(new RuntimeException("Not yet implemented"))
- case _: ExternalImplementationNodeType =>
- Future.failed(
- new RuntimeException(
- "Cannot create callbacks for an external implementation"))
- }
- }
-
private def setBloomFilter(node: Node, wallet: Wallet)(implicit
ec: ExecutionContext): Future[Node] = {
for {
diff --git a/app/server/src/main/scala/org/bitcoins/server/util/CallbackUtil.scala b/app/server/src/main/scala/org/bitcoins/server/util/CallbackUtil.scala
new file mode 100644
index 0000000000..a93ee0a393
--- /dev/null
+++ b/app/server/src/main/scala/org/bitcoins/server/util/CallbackUtil.scala
@@ -0,0 +1,66 @@
+package org.bitcoins.server.util
+
+import grizzled.slf4j.Logging
+import org.bitcoins.core.api.node.{ExternalImplementationNodeType, NodeType}
+import org.bitcoins.node.{
+ NodeCallbacks,
+ OnBlockHeadersReceived,
+ OnBlockReceived,
+ OnCompactFiltersReceived,
+ OnTxReceived
+}
+import org.bitcoins.node.config.NodeAppConfig
+import org.bitcoins.wallet.Wallet
+
+import scala.concurrent.{ExecutionContext, Future}
+
+object CallbackUtil extends Logging {
+
+ def createNeutrinoNodeCallbacksForWallet(wallet: Wallet)(implicit
+ nodeConf: NodeAppConfig,
+ ec: ExecutionContext): Future[NodeCallbacks] = {
+ lazy val onTx: OnTxReceived = { tx =>
+ logger.debug(s"Receiving transaction txid=${tx.txIdBE.hex} as a callback")
+ wallet.processTransaction(tx, blockHashOpt = None).map(_ => ())
+ }
+ lazy val onCompactFilters: OnCompactFiltersReceived = { blockFilters =>
+ logger.debug(
+ s"Executing onCompactFilters callback with filter count=${blockFilters.length}")
+ wallet
+ .processCompactFilters(blockFilters = blockFilters)
+ .map(_ => ())
+ }
+ lazy val onBlock: OnBlockReceived = { block =>
+ logger.debug(
+ s"Executing onBlock callback=${block.blockHeader.hashBE.hex}")
+ wallet.processBlock(block).map(_ => ())
+ }
+ lazy val onHeaders: OnBlockHeadersReceived = { headers =>
+ logger.debug(
+ s"Executing block header with header count=${headers.length}")
+ if (headers.isEmpty) {
+ Future.unit
+ } else {
+ wallet.updateUtxoPendingStates().map(_ => ())
+ }
+ }
+ nodeConf.nodeType match {
+ case NodeType.SpvNode =>
+ Future.successful(
+ NodeCallbacks(onTxReceived = Vector(onTx),
+ onBlockHeadersReceived = Vector(onHeaders)))
+ case NodeType.NeutrinoNode =>
+ Future.successful(
+ NodeCallbacks(onTxReceived = Vector(onTx),
+ onBlockReceived = Vector(onBlock),
+ onCompactFiltersReceived = Vector(onCompactFilters),
+ onBlockHeadersReceived = Vector(onHeaders)))
+ case NodeType.FullNode =>
+ Future.failed(new RuntimeException("Not yet implemented"))
+ case _: ExternalImplementationNodeType =>
+ Future.failed(
+ new RuntimeException(
+ "Cannot create callbacks for an external implementation"))
+ }
+ }
+}
diff --git a/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeWithWalletTest.scala b/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeWithWalletTest.scala
index 37fd35d065..84b52e62a4 100644
--- a/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeWithWalletTest.scala
+++ b/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeWithWalletTest.scala
@@ -210,4 +210,71 @@ class NeutrinoNodeWithWalletTest extends NodeTestWithCachedBitcoindNewest {
_ <- AsyncUtil.awaitConditionF(condition)
} yield succeed
}
+
+ it must "receive funds while the node is offline when we restart" in {
+ param =>
+ val NeutrinoNodeFundedWalletBitcoind(node, wallet, bitcoind, _) = param
+
+ val initBalanceF = wallet.getBalance()
+ val receivedAddrF = wallet.getNewAddress()
+ val bitcoindAddrF = bitcoind.getNewAddress
+ val sendAmt = Bitcoins.one
+ //stop the node to take us offline
+ val stopF = node.stop()
+ for {
+ initBalance <- initBalanceF
+ receiveAddr <- receivedAddrF
+ bitcoindAddr <- bitcoindAddrF
+ stoppedNode <- stopF
+ //send money and generate a block to confirm the funds while we are offline
+ _ <- bitcoind.sendToAddress(receiveAddr, sendAmt)
+ //generate a block to confirm the tx
+ _ <- bitcoind.generateToAddress(1, bitcoindAddr)
+ //restart the node now that we have received funds
+ startedNode <- stoppedNode.start()
+ _ <- startedNode.sync()
+ _ <- NodeTestUtil.awaitCompactFiltersSync(node = node, rpc = bitcoind)
+ _ <- AsyncUtil.retryUntilSatisfiedF(() => {
+ for {
+ balance <- wallet.getBalance()
+ } yield {
+ balance == initBalance + sendAmt
+ }
+ })
+ balance <- wallet.getBalance()
+ } yield {
+ assert(balance > initBalance)
+ }
+ }
+
+ it must "recognize funds were spent while we were offline" in { param =>
+ //useful test for the case where we are in a DLC
+ //and the counterparty broadcasts the funding tx or a CET
+ val NeutrinoNodeFundedWalletBitcoind(node, wallet, bitcoind, _) = param
+ val initBalanceF = wallet.getBalance()
+ val bitcoindAddrF = bitcoind.getNewAddress
+ val sendAmt = Bitcoins.one
+
+ //stop the node to take us offline
+ val stopF = node.stop()
+ for {
+ initBalance <- initBalanceF
+ bitcoindAddr <- bitcoindAddrF
+ stoppedNode <- stopF
+
+ //create a transaction that spends to bitcoind with our wallet
+ tx <- wallet.sendToAddress(bitcoindAddr, sendAmt, SatoshisPerByte.one)
+ //broadcast tx
+ _ <- bitcoind.sendRawTransaction(tx)
+ _ <- bitcoind.generateToAddress(6, bitcoindAddr)
+
+ //bring node back online
+ startedNode <- stoppedNode.start()
+ _ <- startedNode.sync()
+ _ <- NodeTestUtil.awaitCompactFiltersSync(node, bitcoind)
+ balanceAfterSpend <- wallet.getBalance()
+ } yield {
+ assert(balanceAfterSpend < initBalance)
+ }
+ }
}
diff --git a/node/src/main/scala/org/bitcoins/node/Node.scala b/node/src/main/scala/org/bitcoins/node/Node.scala
index 23f18714dd..192fd0260c 100644
--- a/node/src/main/scala/org/bitcoins/node/Node.scala
+++ b/node/src/main/scala/org/bitcoins/node/Node.scala
@@ -118,13 +118,7 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
logger.info("Starting node")
val start = System.currentTimeMillis()
- val startConfsF = for {
- _ <- chainAppConfig.start()
- _ <- nodeAppConfig.start()
- } yield ()
-
- val chainApiF = startConfsF.flatMap(_ => chainApiFromDb())
-
+ val chainApiF = chainApiFromDb()
val startNodeF = for {
peers <- peerManager.getPeers
_ = peers.foreach(peerManager.addPeer)
@@ -141,10 +135,7 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
val filterCountF = chainApiF.flatMap(_.getFilterCount())
for {
- _ <- startConfsF
node <- startNodeF
-
- _ = logger.trace("Fetching node starting point")
bestHash <- bestHashF
bestHeight <- bestHeightF
filterHeaderCount <- filterHeaderCountF
@@ -164,7 +155,6 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
val disconnectF = for {
disconnect <- Future.sequence(disconnectFs)
- _ <- nodeAppConfig.stop()
} yield disconnect
def isAllDisconnectedF: Future[Boolean] = {
@@ -180,11 +170,17 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
AsyncUtil.retryUntilSatisfiedF(() => isAllDisconnectedF, 500.millis)
}
- isStoppedF.failed.foreach { e =>
+ val peers = peerManager.peers
+ val removedPeersF = for {
+ _ <- isStoppedF
+ _ <- Future.sequence(peers.map(peerManager.removePeer))
+ } yield ()
+
+ removedPeersF.failed.foreach { e =>
logger.warn(s"Cannot stop node", e)
}
- isStoppedF.map { _ =>
+ removedPeersF.map { _ =>
logger.info(
s"Node stopped! It took=${System.currentTimeMillis() - start}ms")
this
diff --git a/node/src/main/scala/org/bitcoins/node/PeerManager.scala b/node/src/main/scala/org/bitcoins/node/PeerManager.scala
index 9be628789f..acf626bf14 100644
--- a/node/src/main/scala/org/bitcoins/node/PeerManager.scala
+++ b/node/src/main/scala/org/bitcoins/node/PeerManager.scala
@@ -147,7 +147,7 @@ case class PeerManager(node: Node, configPeers: Vector[Peer] = Vector.empty)(
}
for {
_ <- disconnectF
- _ <- removePeer(peer)
+ _ = _peerData.remove(peer)
} yield ()
} else {
logger.debug(s"Key $peer not found in peerData")
diff --git a/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala b/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala
index 897eceab6d..78cf41c511 100644
--- a/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala
+++ b/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala
@@ -66,7 +66,7 @@ case class DataMessageHandler(
this.copy(chainApi = newChainApi)
}
case filterHeader: CompactFilterHeadersMessage =>
- logger.info(
+ logger.debug(
s"Got ${filterHeader.filterHashes.size} compact filter header hashes")
val filterHeaders = filterHeader.filterHeaders
for {
@@ -75,7 +75,7 @@ case class DataMessageHandler(
filterHeader.stopHash.flip)
newSyncing <-
if (filterHeaders.size == chainConfig.filterHeaderBatchSize) {
- logger.info(
+ logger.debug(
s"Received maximum amount of filter headers in one header message. This means we are not synced, requesting more")
sendNextGetCompactFilterHeadersCommand(
peerWithCompactFilters,
diff --git a/testkit-core/.jvm/src/main/resources/logback-test.xml b/testkit-core/.jvm/src/main/resources/logback-test.xml
index 06fbbf99e0..b3d0310cad 100644
--- a/testkit-core/.jvm/src/main/resources/logback-test.xml
+++ b/testkit-core/.jvm/src/main/resources/logback-test.xml
@@ -51,16 +51,16 @@
-
+
-
+
-
+
-
+
diff --git a/testkit/src/main/scala/org/bitcoins/testkit/wallet/BitcoinSWalletTest.scala b/testkit/src/main/scala/org/bitcoins/testkit/wallet/BitcoinSWalletTest.scala
index ca635e550c..4f554ec5a7 100644
--- a/testkit/src/main/scala/org/bitcoins/testkit/wallet/BitcoinSWalletTest.scala
+++ b/testkit/src/main/scala/org/bitcoins/testkit/wallet/BitcoinSWalletTest.scala
@@ -15,15 +15,12 @@ import org.bitcoins.core.util.FutureUtil
import org.bitcoins.core.wallet.fee._
import org.bitcoins.crypto.{DoubleSha256Digest, DoubleSha256DigestBE}
import org.bitcoins.dlc.wallet.{DLCAppConfig, DLCWallet}
-import org.bitcoins.node.{
- NodeCallbacks,
- OnBlockReceived,
- OnCompactFiltersReceived,
- OnMerkleBlockReceived
-}
+import org.bitcoins.node.config.NodeAppConfig
+import org.bitcoins.node.{NodeCallbacks, OnMerkleBlockReceived}
import org.bitcoins.rpc.client.common.{BitcoindRpcClient, BitcoindVersion}
import org.bitcoins.rpc.client.v19.BitcoindV19RpcClient
import org.bitcoins.server.BitcoinSAppConfig
+import org.bitcoins.server.util.CallbackUtil
import org.bitcoins.testkit.EmbeddedPg
import org.bitcoins.testkit.chain.SyncUtil
import org.bitcoins.testkit.fixtures.BitcoinSFixture
@@ -647,8 +644,10 @@ object BitcoinSWalletTest extends WalletLogger {
chainQueryApi = chainQueryApi,
bip39PasswordOpt = bip39PasswordOpt)(config.walletConf, system)
//add callbacks for wallet
- nodeCallbacks =
- BitcoinSWalletTest.createNeutrinoNodeCallbacksForWallet(wallet)
+ nodeCallbacks <-
+ BitcoinSWalletTest.createNeutrinoNodeCallbacksForWallet(wallet)(
+ config.nodeConf,
+ system.dispatcher)
_ = config.nodeConf.addCallbacks(nodeCallbacks)
withBitcoind <- createWalletWithBitcoind(wallet, bitcoindRpcClient)
funded <- FundWalletUtil.fundWalletWithBitcoind(withBitcoind)
@@ -692,22 +691,9 @@ object BitcoinSWalletTest extends WalletLogger {
/** Constructs callbacks for the wallet from the node to process blocks and compact filters */
def createNeutrinoNodeCallbacksForWallet(wallet: Wallet)(implicit
- ec: ExecutionContext): NodeCallbacks = {
- val onBlock: OnBlockReceived = { block =>
- for {
- _ <- wallet.processBlock(block)
- } yield ()
- }
- val onCompactFilters: OnCompactFiltersReceived = { blockFilters =>
- for {
- _ <- wallet.processCompactFilters(blockFilters)
- } yield ()
- }
-
- NodeCallbacks(
- onBlockReceived = Vector(onBlock),
- onCompactFiltersReceived = Vector(onCompactFilters)
- )
+ nodeAppConfig: NodeAppConfig,
+ ec: ExecutionContext): Future[NodeCallbacks] = {
+ CallbackUtil.createNeutrinoNodeCallbacksForWallet(wallet)
}
/** Registers a callback to handle merkle blocks given to us by a spv node */
diff --git a/wallet/src/main/scala/org/bitcoins/wallet/internal/UtxoHandling.scala b/wallet/src/main/scala/org/bitcoins/wallet/internal/UtxoHandling.scala
index b79244c04a..46460d7ae8 100644
--- a/wallet/src/main/scala/org/bitcoins/wallet/internal/UtxoHandling.scala
+++ b/wallet/src/main/scala/org/bitcoins/wallet/internal/UtxoHandling.scala
@@ -277,7 +277,7 @@ private[wallet] trait UtxoHandling extends WalletLogger {
} yield {
val writtenOut = written.outPoint
logger.info(
- s"Successfully inserted UTXO ${writtenOut.txIdBE.hex}:${writtenOut.vout.toInt} into DB")
+ s"Successfully inserted UTXO ${writtenOut.txIdBE.hex}:${writtenOut.vout.toInt} amt=${output.value} into DB")
logger.debug(s"UTXO details: ${written.output}")
written
}
@@ -297,8 +297,7 @@ private[wallet] trait UtxoHandling extends WalletLogger {
} else {
val output = transaction.outputs(vout.toInt)
val outPoint = TransactionOutPoint(transaction.txId, vout)
- logger.info(
- s"Adding UTXO to wallet: ${transaction.txIdBE.hex}:${vout.toInt} amt=${output.value}")
+
// insert the UTXO into the DB
val insertedUtxoEF: Either[AddUtxoError, Future[SpendingInfoDb]] = for {
addressDb <- addressDbE