Update callbacks for LN support (#1938)

Update callbacks for LN support
This commit is contained in:
rorp 2020-09-09 11:02:31 -07:00 committed by GitHub
parent 4149271b77
commit c62be8b5f6
19 changed files with 196 additions and 59 deletions

View file

@ -132,7 +132,7 @@ object Main extends App with BitcoinSLogger {
//get a node that isn't started //get a node that isn't started
val nodeF = configInitializedF.flatMap { _ => val nodeF = configInitializedF.flatMap { _ =>
nodeConf.createNode(peer)(chainConf, system) nodeConf.createNode(peer, None)(chainConf, system)
} }
//get our wallet //get our wallet

View file

@ -0,0 +1,56 @@
package org.bitcoins.chain
import org.bitcoins.core.api.{Callback2, CallbackHandler}
import org.bitcoins.core.protocol.blockchain.BlockHeader
import org.slf4j.Logger
import scala.concurrent.{ExecutionContext, Future}
trait ChainCallbacks {
def onBlockHeaderConnected: CallbackHandler[
(Int, BlockHeader),
OnBlockHeaderConnected]
def +(other: ChainCallbacks): ChainCallbacks
def executeOnBlockHeaderConnectedCallbacks(
logger: Logger,
height: Int,
header: BlockHeader)(implicit ec: ExecutionContext): Future[Unit] = {
onBlockHeaderConnected.execute(logger, (height, header))
}
}
/** Callback for handling a received block header */
trait OnBlockHeaderConnected extends Callback2[Int, BlockHeader]
object ChainCallbacks {
private case class ChainCallbacksImpl(
onBlockHeaderConnected: CallbackHandler[
(Int, BlockHeader),
OnBlockHeaderConnected])
extends ChainCallbacks {
override def +(other: ChainCallbacks): ChainCallbacks =
copy(onBlockHeaderConnected =
onBlockHeaderConnected ++ other.onBlockHeaderConnected)
/** Constructs a set of callbacks that only acts on block headers connected */
def onBlockHeaderConnected(f: OnBlockHeaderConnected): ChainCallbacks =
ChainCallbacks(onBlockHeaderConnected = Vector(f))
}
lazy val empty: ChainCallbacks =
ChainCallbacks(onBlockHeaderConnected = Vector.empty)
def apply(
onBlockHeaderConnected: Vector[OnBlockHeaderConnected] =
Vector.empty): ChainCallbacks =
ChainCallbacksImpl(onBlockHeaderConnected =
CallbackHandler[(Int, BlockHeader), OnBlockHeaderConnected](
"onBlockHeaderConnected",
onBlockHeaderConnected))
}

View file

@ -117,7 +117,19 @@ case class ChainHandler(
val newChainHandler = this.copy(blockchains = chains) val newChainHandler = this.copy(blockchains = chains)
createdF.map { _ => createdF.map { headers =>
if (chainConfig.chainCallbacks.onBlockHeaderConnected.nonEmpty) {
headers.reverseIterator.foldLeft(FutureUtil.unit) { (acc, header) =>
for {
_ <- acc
_ <-
chainConfig.chainCallbacks
.executeOnBlockHeaderConnectedCallbacks(logger,
header.height,
header.blockHeader)
} yield ()
}
}
chains.foreach { c => chains.foreach { c =>
logger.info( logger.info(
s"Processed headers from height=${c.height - headers.length} to ${c.height}. Best hash=${c.tip.hashBE.hex}") s"Processed headers from height=${c.height - headers.length} to ${c.height}. Best hash=${c.tip.hashBE.hex}")

View file

@ -3,11 +3,12 @@ package org.bitcoins.chain.config
import java.nio.file.Path import java.nio.file.Path
import com.typesafe.config.{Config, ConfigException} import com.typesafe.config.{Config, ConfigException}
import org.bitcoins.chain.ChainCallbacks
import org.bitcoins.chain.db.ChainDbManagement import org.bitcoins.chain.db.ChainDbManagement
import org.bitcoins.chain.models.BlockHeaderDAO import org.bitcoins.chain.models.BlockHeaderDAO
import org.bitcoins.chain.pow.Pow import org.bitcoins.chain.pow.Pow
import org.bitcoins.core.api.chain.db.BlockHeaderDbHelper import org.bitcoins.core.api.chain.db.BlockHeaderDbHelper
import org.bitcoins.core.util.FutureUtil import org.bitcoins.core.util.{FutureUtil, Mutable}
import org.bitcoins.db._ import org.bitcoins.db._
import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.{ExecutionContext, Future}
@ -34,6 +35,14 @@ case class ChainAppConfig(
override lazy val appConfig: ChainAppConfig = this override lazy val appConfig: ChainAppConfig = this
private val callbacks = new Mutable(ChainCallbacks.empty)
def chainCallbacks: ChainCallbacks = callbacks.atomicGet
def addCallbacks(newCallbacks: ChainCallbacks): ChainCallbacks = {
callbacks.atomicUpdate(newCallbacks)(_ + _)
}
/** /**
* Checks whether or not the chain project is initialized by * Checks whether or not the chain project is initialized by
* trying to read the genesis block header from our block * trying to read the genesis block header from our block

View file

@ -108,7 +108,8 @@ val nodeF = for {
NeutrinoNode(nodePeer = peer, NeutrinoNode(nodePeer = peer,
nodeConfig = nodeConfig, nodeConfig = nodeConfig,
chainConfig = chainConfig, chainConfig = chainConfig,
actorSystem = system) actorSystem = system,
initialSyncDone = None)
} }
//let's start it //let's start it

View file

@ -2,7 +2,7 @@ package org.bitcoins.node
import org.bitcoins.core.currency._ import org.bitcoins.core.currency._
import org.bitcoins.core.protocol.script.MultiSignatureScriptPubKey import org.bitcoins.core.protocol.script.MultiSignatureScriptPubKey
import org.bitcoins.core.protocol.transaction.{Transaction, TransactionOutput} import org.bitcoins.core.protocol.transaction.TransactionOutput
import org.bitcoins.core.util.EnvUtil import org.bitcoins.core.util.EnvUtil
import org.bitcoins.core.wallet.fee.SatoshisPerByte import org.bitcoins.core.wallet.fee.SatoshisPerByte
import org.bitcoins.crypto.ECPublicKey import org.bitcoins.crypto.ECPublicKey
@ -17,7 +17,7 @@ import org.bitcoins.testkit.node.{
NodeUnitTest NodeUnitTest
} }
import org.bitcoins.testkit.wallet.BitcoinSWalletTest import org.bitcoins.testkit.wallet.BitcoinSWalletTest
import org.bitcoins.wallet.{OnTransactionProcessed, Wallet, WalletCallbacks} import org.bitcoins.wallet.Wallet
import org.scalatest.FutureOutcome import org.scalatest.FutureOutcome
import scala.concurrent.{Future, Promise} import scala.concurrent.{Future, Promise}

View file

@ -11,15 +11,8 @@ import org.bitcoins.crypto.DoubleSha256Digest
import org.bitcoins.node.NodeCallbacks import org.bitcoins.node.NodeCallbacks
import org.bitcoins.node.models.Peer import org.bitcoins.node.models.Peer
import org.bitcoins.node.networking.peer.PeerMessageReceiver import org.bitcoins.node.networking.peer.PeerMessageReceiver
import org.bitcoins.server.BitcoinSAppConfig
import org.bitcoins.testkit.BitcoinSTestAppConfig
import org.bitcoins.testkit.async.TestAsyncUtil import org.bitcoins.testkit.async.TestAsyncUtil
import org.bitcoins.testkit.node.{ import org.bitcoins.testkit.node.{CachedBitcoinSAppConfig, NodeTestUtil}
CachedAppConfig,
CachedBitcoinSAppConfig,
NodeTestUtil
}
import org.bitcoins.testkit.node.{CachedAppConfig, NodeTestUtil}
import org.bitcoins.testkit.rpc.BitcoindRpcTestUtil import org.bitcoins.testkit.rpc.BitcoindRpcTestUtil
import org.bitcoins.testkit.util.BitcoindRpcTest import org.bitcoins.testkit.util.BitcoindRpcTest
import org.scalatest._ import org.scalatest._
@ -167,7 +160,7 @@ class P2PClientTest extends BitcoindRpcTest with CachedBitcoinSAppConfig {
val probe = TestProbe() val probe = TestProbe()
val remote = peer.socket val remote = peer.socket
val peerMessageReceiverF = val peerMessageReceiverF =
PeerMessageReceiver.preConnection(peer, NodeCallbacks.empty) PeerMessageReceiver.preConnection(peer, NodeCallbacks.empty, None)
val clientActorF: Future[TestActorRef[P2PClientActor]] = val clientActorF: Future[TestActorRef[P2PClientActor]] =
peerMessageReceiverF.map { peerMsgRecv => peerMessageReceiverF.map { peerMsgRecv =>

View file

@ -9,7 +9,7 @@ import org.bitcoins.testkit.core.gen.{
BlockchainElementsGenerator, BlockchainElementsGenerator,
TransactionGenerators TransactionGenerators
} }
import org.bitcoins.testkit.node.{CachedAppConfig, CachedBitcoinSAppConfig} import org.bitcoins.testkit.node.CachedBitcoinSAppConfig
import org.bitcoins.testkit.util.BitcoinSAsyncTest import org.bitcoins.testkit.util.BitcoinSAsyncTest
import org.scalacheck.Gen import org.scalacheck.Gen

View file

@ -1,5 +1,6 @@
package org.bitcoins.node package org.bitcoins.node
import akka.Done
import akka.actor.ActorSystem import akka.actor.ActorSystem
import org.bitcoins.chain.config.ChainAppConfig import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.core.api.chain.ChainQueryApi.FilterResponse import org.bitcoins.core.api.chain.ChainQueryApi.FilterResponse
@ -7,12 +8,13 @@ import org.bitcoins.core.protocol.BlockStamp
import org.bitcoins.node.config.NodeAppConfig import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models.Peer import org.bitcoins.node.models.Peer
import scala.concurrent.Future import scala.concurrent.{Future, Promise}
case class NeutrinoNode( case class NeutrinoNode(
nodePeer: Peer, nodePeer: Peer,
nodeConfig: NodeAppConfig, nodeConfig: NodeAppConfig,
chainConfig: ChainAppConfig, chainConfig: ChainAppConfig,
initialSyncDone: Option[Promise[Done]],
actorSystem: ActorSystem) actorSystem: ActorSystem)
extends Node { extends Node {
require( require(

View file

@ -1,5 +1,6 @@
package org.bitcoins.node package org.bitcoins.node
import akka.Done
import akka.actor.ActorSystem import akka.actor.ActorSystem
import org.bitcoins.chain.blockchain.ChainHandler import org.bitcoins.chain.blockchain.ChainHandler
import org.bitcoins.chain.config.ChainAppConfig import org.bitcoins.chain.config.ChainAppConfig
@ -28,7 +29,7 @@ import org.bitcoins.node.networking.peer.{
import org.bitcoins.rpc.util.AsyncUtil import org.bitcoins.rpc.util.AsyncUtil
import scala.concurrent.duration.DurationInt import scala.concurrent.duration.DurationInt
import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Failure, Success} import scala.util.{Failure, Success}
/** /**
@ -46,6 +47,8 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
val peer: Peer val peer: Peer
protected val initialSyncDone: Option[Promise[Done]]
def nodeCallbacks: NodeCallbacks = nodeAppConfig.nodeCallbacks def nodeCallbacks: NodeCallbacks = nodeAppConfig.nodeCallbacks
lazy val txDAO: BroadcastAbleTransactionDAO = BroadcastAbleTransactionDAO() lazy val txDAO: BroadcastAbleTransactionDAO = BroadcastAbleTransactionDAO()
@ -72,7 +75,8 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
val peerMsgRecv: PeerMessageReceiver = val peerMsgRecv: PeerMessageReceiver =
PeerMessageReceiver.newReceiver(chainApi = chainApi, PeerMessageReceiver.newReceiver(chainApi = chainApi,
peer = peer, peer = peer,
callbacks = nodeCallbacks) callbacks = nodeCallbacks,
initialSyncDone = initialSyncDone)
val p2p = P2PClient(context = system, val p2p = P2PClient(context = system,
peer = peer, peer = peer,
peerMessageReceiver = peerMsgRecv) peerMessageReceiver = peerMsgRecv)

View file

@ -1,5 +1,6 @@
package org.bitcoins.node package org.bitcoins.node
import akka.Done
import akka.actor.ActorSystem import akka.actor.ActorSystem
import org.bitcoins.chain.config.ChainAppConfig import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.core.api.chain.ChainQueryApi.FilterResponse import org.bitcoins.core.api.chain.ChainQueryApi.FilterResponse
@ -10,12 +11,13 @@ import org.bitcoins.core.util.Mutable
import org.bitcoins.node.config.NodeAppConfig import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models.Peer import org.bitcoins.node.models.Peer
import scala.concurrent.Future import scala.concurrent.{Future, Promise}
case class SpvNode( case class SpvNode(
nodePeer: Peer, nodePeer: Peer,
nodeConfig: NodeAppConfig, nodeConfig: NodeAppConfig,
chainConfig: ChainAppConfig, chainConfig: ChainAppConfig,
initialSyncDone: Option[Promise[Done]],
actorSystem: ActorSystem) actorSystem: ActorSystem)
extends Node { extends Node {
require(nodeConfig.nodeType == NodeType.SpvNode, require(nodeConfig.nodeType == NodeType.SpvNode,

View file

@ -2,6 +2,7 @@ package org.bitcoins.node.config
import java.nio.file.Path import java.nio.file.Path
import akka.Done
import akka.actor.ActorSystem import akka.actor.ActorSystem
import com.typesafe.config.Config import com.typesafe.config.Config
import org.bitcoins.chain.config.ChainAppConfig import org.bitcoins.chain.config.ChainAppConfig
@ -11,7 +12,7 @@ import org.bitcoins.node._
import org.bitcoins.node.db.NodeDbManagement import org.bitcoins.node.db.NodeDbManagement
import org.bitcoins.node.models.Peer import org.bitcoins.node.models.Peer
import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.{ExecutionContext, Future, Promise}
/** Configuration for the Bitcoin-S node /** Configuration for the Bitcoin-S node
* @param directory The data directory of the node * @param directory The data directory of the node
@ -71,10 +72,10 @@ case class NodeAppConfig(
} }
/** Creates either a neutrino node or a spv node based on the [[NodeAppConfig]] given */ /** Creates either a neutrino node or a spv node based on the [[NodeAppConfig]] given */
def createNode(peer: Peer)( def createNode(peer: Peer, initialSyncDone: Option[Promise[Done]])(
chainConf: ChainAppConfig, chainConf: ChainAppConfig,
system: ActorSystem): Future[Node] = { system: ActorSystem): Future[Node] = {
NodeAppConfig.createNode(peer)(this, chainConf, system) NodeAppConfig.createNode(peer, initialSyncDone)(this, chainConf, system)
} }
} }
@ -88,15 +89,17 @@ object NodeAppConfig extends AppConfigFactory[NodeAppConfig] {
NodeAppConfig(datadir, confs: _*) NodeAppConfig(datadir, confs: _*)
/** Creates either a neutrino node or a spv node based on the [[NodeAppConfig]] given */ /** Creates either a neutrino node or a spv node based on the [[NodeAppConfig]] given */
def createNode(peer: Peer)(implicit def createNode(peer: Peer, initialSyncDone: Option[Promise[Done]])(implicit
nodeConf: NodeAppConfig, nodeConf: NodeAppConfig,
chainConf: ChainAppConfig, chainConf: ChainAppConfig,
system: ActorSystem): Future[Node] = { system: ActorSystem): Future[Node] = {
nodeConf.nodeType match { nodeConf.nodeType match {
case NodeType.SpvNode => case NodeType.SpvNode =>
Future.successful(SpvNode(peer, nodeConf, chainConf, system)) Future.successful(
SpvNode(peer, nodeConf, chainConf, initialSyncDone, system))
case NodeType.NeutrinoNode => case NodeType.NeutrinoNode =>
Future.successful(NeutrinoNode(peer, nodeConf, chainConf, system)) Future.successful(
NeutrinoNode(peer, nodeConf, chainConf, initialSyncDone, system))
case NodeType.FullNode => case NodeType.FullNode =>
Future.failed(new RuntimeException("Not implemented")) Future.failed(new RuntimeException("Not implemented"))
} }

View file

@ -1,5 +1,6 @@
package org.bitcoins.node.networking.peer package org.bitcoins.node.networking.peer
import akka.Done
import org.bitcoins.chain.config.ChainAppConfig import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.core.api.chain.db.ChainApi import org.bitcoins.core.api.chain.db.ChainApi
import org.bitcoins.core.gcs.BlockFilter import org.bitcoins.core.gcs.BlockFilter
@ -9,7 +10,8 @@ import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models.BroadcastAbleTransactionDAO import org.bitcoins.node.models.BroadcastAbleTransactionDAO
import org.bitcoins.node.{NodeCallbacks, NodeType, P2PLogger} import org.bitcoins.node.{NodeCallbacks, NodeType, P2PLogger}
import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.Try
/** This actor is meant to handle a [[org.bitcoins.core.p2p.DataPayload DataPayload]] /** This actor is meant to handle a [[org.bitcoins.core.p2p.DataPayload DataPayload]]
* that a peer to sent to us on the p2p network, for instance, if we a receive a * that a peer to sent to us on the p2p network, for instance, if we a receive a
@ -21,6 +23,7 @@ import scala.concurrent.{ExecutionContext, Future}
case class DataMessageHandler( case class DataMessageHandler(
chainApi: ChainApi, chainApi: ChainApi,
callbacks: NodeCallbacks, callbacks: NodeCallbacks,
initialSyncDone: Option[Promise[Done]] = None,
currentFilterBatch: Vector[CompactFilterMessage] = Vector.empty, currentFilterBatch: Vector[CompactFilterMessage] = Vector.empty,
filterHeaderHeightOpt: Option[Int] = None, filterHeaderHeightOpt: Option[Int] = None,
filterHeightOpt: Option[Int] = None, filterHeightOpt: Option[Int] = None,
@ -66,10 +69,10 @@ case class DataMessageHandler(
logger.debug( logger.debug(
s"Received filter headers=${filterHeaders.size} in one message, " + s"Received filter headers=${filterHeaders.size} in one message, " +
"which is less than max. This means we are synced.") "which is less than max. This means we are synced.")
sendFirstGetCompactFilterCommand(peerMsgSender).map { synced => sendFirstGetCompactFilterCommand(peerMsgSender).map { syncing =>
if (!synced) if (!syncing)
logger.info("We are synced") logger.info("We are synced")
synced syncing
} }
} }
newFilterHeaderHeight <- filterHeaderHeightOpt match { newFilterHeaderHeight <- filterHeaderHeightOpt match {
@ -110,6 +113,7 @@ case class DataMessageHandler(
val syncing = newFilterHeight < newFilterHeaderHeight val syncing = newFilterHeight < newFilterHeaderHeight
if (!syncing) { if (!syncing) {
logger.info(s"We are synced") logger.info(s"We are synced")
Try(initialSyncDone.map(_.success(Done)))
} }
Future.successful(syncing) Future.successful(syncing)
} }
@ -219,12 +223,16 @@ case class DataMessageHandler(
filterHeightOpt.isEmpty)) filterHeightOpt.isEmpty))
) )
sendFirstGetCompactFilterHeadersCommand(peerMsgSender) sendFirstGetCompactFilterHeadersCommand(peerMsgSender)
else else {
Try(initialSyncDone.map(_.success(Done)))
Future.successful(syncing) Future.successful(syncing)
} }
} else }
} else {
Try(initialSyncDone.map(_.success(Done)))
Future.successful(syncing) Future.successful(syncing)
} }
}
getHeadersF.failed.map { err => getHeadersF.failed.map { err =>
logger.error(s"Error when processing headers message", err) logger.error(s"Error when processing headers message", err)

View file

@ -1,5 +1,6 @@
package org.bitcoins.node.networking.peer package org.bitcoins.node.networking.peer
import akka.Done
import akka.actor.ActorRefFactory import akka.actor.ActorRefFactory
import org.bitcoins.chain.blockchain.ChainHandler import org.bitcoins.chain.blockchain.ChainHandler
import org.bitcoins.chain.config.ChainAppConfig import org.bitcoins.chain.config.ChainAppConfig
@ -21,7 +22,7 @@ import org.bitcoins.node.networking.peer.PeerMessageReceiverState.{
} }
import org.bitcoins.node.{NodeCallbacks, NodeType, P2PLogger} import org.bitcoins.node.{NodeCallbacks, NodeType, P2PLogger}
import scala.concurrent.Future import scala.concurrent.{Future, Promise}
/** /**
* Responsible for receiving messages from a peer on the * Responsible for receiving messages from a peer on the
@ -253,13 +254,14 @@ object PeerMessageReceiver {
state: PeerMessageReceiverState, state: PeerMessageReceiverState,
chainApi: ChainApi, chainApi: ChainApi,
peer: Peer, peer: Peer,
callbacks: NodeCallbacks)(implicit callbacks: NodeCallbacks,
initialSyncDone: Option[Promise[Done]])(implicit
ref: ActorRefFactory, ref: ActorRefFactory,
nodeAppConfig: NodeAppConfig, nodeAppConfig: NodeAppConfig,
chainAppConfig: ChainAppConfig chainAppConfig: ChainAppConfig
): PeerMessageReceiver = { ): PeerMessageReceiver = {
import ref.dispatcher import ref.dispatcher
val dataHandler = new DataMessageHandler(chainApi, callbacks) val dataHandler = DataMessageHandler(chainApi, callbacks, initialSyncDone)
new PeerMessageReceiver(dataMessageHandler = dataHandler, new PeerMessageReceiver(dataMessageHandler = dataHandler,
state = state, state = state,
peer = peer, peer = peer,
@ -271,7 +273,10 @@ object PeerMessageReceiver {
* to be connected to a peer. This can be given to [[org.bitcoins.node.networking.P2PClient.props() P2PClient]] * to be connected to a peer. This can be given to [[org.bitcoins.node.networking.P2PClient.props() P2PClient]]
* to connect to a peer on the network * to connect to a peer on the network
*/ */
def preConnection(peer: Peer, callbacks: NodeCallbacks)(implicit def preConnection(
peer: Peer,
callbacks: NodeCallbacks,
initialSyncDone: Option[Promise[Done]])(implicit
ref: ActorRefFactory, ref: ActorRefFactory,
nodeAppConfig: NodeAppConfig, nodeAppConfig: NodeAppConfig,
chainAppConfig: ChainAppConfig chainAppConfig: ChainAppConfig
@ -288,18 +293,23 @@ object PeerMessageReceiver {
PeerMessageReceiver(state = PeerMessageReceiverState.fresh(), PeerMessageReceiver(state = PeerMessageReceiverState.fresh(),
chainApi = chainHandler, chainApi = chainHandler,
peer = peer, peer = peer,
callbacks = callbacks) callbacks = callbacks,
initialSyncDone = initialSyncDone)
} }
} }
def newReceiver(chainApi: ChainApi, peer: Peer, callbacks: NodeCallbacks)( def newReceiver(
implicit chainApi: ChainApi,
peer: Peer,
callbacks: NodeCallbacks,
initialSyncDone: Option[Promise[Done]])(implicit
nodeAppConfig: NodeAppConfig, nodeAppConfig: NodeAppConfig,
chainAppConfig: ChainAppConfig, chainAppConfig: ChainAppConfig,
ref: ActorRefFactory): PeerMessageReceiver = { ref: ActorRefFactory): PeerMessageReceiver = {
PeerMessageReceiver(state = PeerMessageReceiverState.fresh(), PeerMessageReceiver(state = PeerMessageReceiverState.fresh(),
chainApi = chainApi, chainApi = chainApi,
peer = peer, peer = peer,
callbacks = callbacks) callbacks = callbacks,
initialSyncDone = initialSyncDone)
} }
} }

View file

@ -325,7 +325,8 @@ object NodeUnitTest extends P2PLogger {
PeerMessageReceiver(state = PeerMessageReceiverState.fresh(), PeerMessageReceiver(state = PeerMessageReceiverState.fresh(),
chainApi = chainApi, chainApi = chainApi,
peer = peer, peer = peer,
callbacks = NodeCallbacks.empty) callbacks = NodeCallbacks.empty,
initialSyncDone = None)
Future.successful(receiver) Future.successful(receiver)
} }
@ -336,7 +337,7 @@ object NodeUnitTest extends P2PLogger {
import system.dispatcher import system.dispatcher
val chainApiF = ChainUnitTest.createChainHandler() val chainApiF = ChainUnitTest.createChainHandler()
val peerMsgReceiverF = chainApiF.flatMap { _ => val peerMsgReceiverF = chainApiF.flatMap { _ =>
PeerMessageReceiver.preConnection(peer, NodeCallbacks.empty) PeerMessageReceiver.preConnection(peer, NodeCallbacks.empty, None)
} }
//the problem here is the 'self', this needs to be an ordinary peer message handler //the problem here is the 'self', this needs to be an ordinary peer message handler
//that can handle the handshake //that can handle the handshake
@ -465,7 +466,8 @@ object NodeUnitTest extends P2PLogger {
PeerMessageReceiver(state = PeerMessageReceiverState.fresh(), PeerMessageReceiver(state = PeerMessageReceiverState.fresh(),
chainApi = chainApi, chainApi = chainApi,
peer = peer, peer = peer,
callbacks = NodeCallbacks.empty) callbacks = NodeCallbacks.empty,
initialSyncDone = None)
Future.successful(receiver) Future.successful(receiver)
} }
@ -506,7 +508,8 @@ object NodeUnitTest extends P2PLogger {
nodePeer = peer, nodePeer = peer,
nodeConfig = nodeAppConfig, nodeConfig = nodeAppConfig,
chainConfig = chainAppConfig, chainConfig = chainAppConfig,
actorSystem = system actorSystem = system,
initialSyncDone = None
).setBloomFilter(NodeTestUtil.emptyBloomFilter) ).setBloomFilter(NodeTestUtil.emptyBloomFilter)
} }
@ -539,7 +542,8 @@ object NodeUnitTest extends P2PLogger {
NeutrinoNode(nodePeer = peer, NeutrinoNode(nodePeer = peer,
nodeConfig = nodeAppConfig, nodeConfig = nodeAppConfig,
chainConfig = chainAppConfig, chainConfig = chainAppConfig,
actorSystem = system) actorSystem = system,
initialSyncDone = None)
} }
nodeF.flatMap(_.start()).flatMap(_ => nodeF) nodeF.flatMap(_.start()).flatMap(_ => nodeF)

View file

@ -73,4 +73,15 @@ class ScriptPubKeyDAOTest extends BitcoinSWalletTest with WalletDAOFixture {
recoverToSucceededIf[SQLException](insertF) recoverToSucceededIf[SQLException](insertF)
} }
it must "be able to create new scripts if they don't exist" in { daos =>
val scriptPubKeyDAO = daos.scriptPubKeyDAO
val pkh = P2PKHScriptPubKey(ECPublicKey.freshPublicKey)
for {
db1 <- scriptPubKeyDAO.createIfNotExists(ScriptPubKeyDb(pkh))
db2 <- scriptPubKeyDAO.createIfNotExists(ScriptPubKeyDb(pkh))
} yield {
assert(db1 == db2)
}
}
} }

View file

@ -109,7 +109,8 @@ private[wallet] trait AddressHandling extends WalletLogger {
override def watchScriptPubKey( override def watchScriptPubKey(
scriptPubKey: ScriptPubKey): Future[ScriptPubKeyDb] = scriptPubKey: ScriptPubKey): Future[ScriptPubKeyDb] =
scriptPubKeyDAO.create(ScriptPubKeyDb(scriptPubKey = scriptPubKey)) scriptPubKeyDAO.createIfNotExists(
ScriptPubKeyDb(scriptPubKey = scriptPubKey))
/** Enumerates the public keys in this wallet */ /** Enumerates the public keys in this wallet */
protected[wallet] def listPubkeys(): Future[Vector[ECPublicKey]] = protected[wallet] def listPubkeys(): Future[Vector[ECPublicKey]] =

View file

@ -42,7 +42,8 @@ private[wallet] trait TransactionProcessing extends WalletLogger {
override def processBlock(block: Block): Future[Wallet] = { override def processBlock(block: Block): Future[Wallet] = {
logger.info(s"Processing block=${block.blockHeader.hash.flip}") logger.info(s"Processing block=${block.blockHeader.hash.flip}")
val res = block.transactions.foldLeft(Future.successful(this)) { val res =
block.transactions.foldLeft(Future.successful(this)) {
(acc, transaction) => (acc, transaction) =>
for { for {
_ <- acc _ <- acc

View file

@ -5,8 +5,9 @@ import org.bitcoins.core.protocol.script.ScriptPubKey
import org.bitcoins.core.script.ScriptType import org.bitcoins.core.script.ScriptType
import org.bitcoins.db.CRUDAutoInc import org.bitcoins.db.CRUDAutoInc
import org.bitcoins.wallet.config.WalletAppConfig import org.bitcoins.wallet.config.WalletAppConfig
import slick.dbio.DBIOAction
import scala.concurrent.ExecutionContext import scala.concurrent.{ExecutionContext, Future}
case class ScriptPubKeyDAO()(implicit case class ScriptPubKeyDAO()(implicit
ec: ExecutionContext, ec: ExecutionContext,
@ -14,14 +15,33 @@ case class ScriptPubKeyDAO()(implicit
) extends CRUDAutoInc[ScriptPubKeyDb] { ) extends CRUDAutoInc[ScriptPubKeyDb] {
import profile.api._ import profile.api._
private val mappers = new org.bitcoins.db.DbCommonsColumnMappers(profile) private val mappers = new org.bitcoins.db.DbCommonsColumnMappers(profile)
import mappers._ import mappers._
override val table: profile.api.TableQuery[ScriptPubKeyTable] = override val table: profile.api.TableQuery[ScriptPubKeyTable] =
TableQuery[ScriptPubKeyTable] TableQuery[ScriptPubKeyTable]
/** Creates a new row in the database only if the given SPK (not ID) does not exists. */
def createIfNotExists(spkDb: ScriptPubKeyDb): Future[ScriptPubKeyDb] = {
val spkFind = table.filter(_.scriptPubKey === spkDb.scriptPubKey).result
val actions = for {
spkOpt <- spkFind.headOption
spk <- spkOpt match {
case Some(foundSpk) =>
DBIOAction.successful(foundSpk)
case None =>
for {
newSpkId <- (table returning table.map(_.id)) += spkDb
} yield {
spkDb.copyWithId(newSpkId)
}
}
} yield spk
database
.run(actions.transactionally)
}
case class ScriptPubKeyTable(tag: Tag) case class ScriptPubKeyTable(tag: Tag)
extends TableAutoInc[ScriptPubKeyDb](tag, schemaName, "pub_key_scripts") { extends TableAutoInc[ScriptPubKeyDb](tag, schemaName, "pub_key_scripts") {