Start the process of refactoring our ChainHandler to be able to avoid… (#655)

* Start the process of refactoring our ChainHandler to be able to avoid database calls on TipValidation

WIP: Begin explicity passing state back and forth in return types of PeerMessageReceiver, P2PClient, , DataMessageHandler. This commit also implements the ability to keep our blockchain completely in memory. Previously when we were updating the tip of the chain, we had to make a database read to figure out what the best tips are. This is suboptimal for performance because a database read needs to be done for every block header we see, now we just keep the chain in memory

Fix bug in DataMessageHandler that pre-emptively sent a getheadersmsg to our peer. Make 'chainApiF' internal to our spvNode (not a parameter). This forces the chainApi to be created from disk everytime a new SpvNode is spun up. This keeps us in sync with the blockchain at disk at the cost of disk access and less modularity of SpvNode

Address torkel code review

Fix rebase issues

Address code review

Address nadav code review

* Rebase onto master, fix api changes
This commit is contained in:
Chris Stewart 2019-08-06 13:31:54 -05:00 committed by GitHub
parent c934d8efc2
commit b0b1c1cc42
20 changed files with 710 additions and 392 deletions

View file

@ -1,30 +1,22 @@
package org.bitcoins.server
import org.bitcoins.rpc.config.BitcoindInstance
import org.bitcoins.node.models.Peer
import org.bitcoins.rpc.client.common.BitcoindRpcClient
import akka.actor.ActorSystem
import scala.concurrent.Await
import scala.concurrent.duration._
import org.bitcoins.wallet.config.WalletAppConfig
import org.bitcoins.node.config.NodeAppConfig
import java.nio.file.Files
import scala.concurrent.Future
import org.bitcoins.wallet.LockedWallet
import org.bitcoins.wallet.Wallet
import org.bitcoins.wallet.api.InitializeWalletSuccess
import org.bitcoins.wallet.api.InitializeWalletError
import org.bitcoins.node.SpvNode
import org.bitcoins.chain.blockchain.ChainHandler
import akka.actor.ActorSystem
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.wallet.api.UnlockedWalletApi
import org.bitcoins.wallet.api.UnlockWalletSuccess
import org.bitcoins.wallet.api.UnlockWalletError
import org.bitcoins.node.networking.peer.DataMessageHandler
import org.bitcoins.node.SpvNodeCallbacks
import org.bitcoins.wallet.WalletStorage
import org.bitcoins.db.AppLoggers
import org.bitcoins.chain.models.BlockHeaderDAO
import org.bitcoins.node.{SpvNode, SpvNodeCallbacks}
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models.Peer
import org.bitcoins.node.networking.peer.DataMessageHandler
import org.bitcoins.rpc.client.common.BitcoindRpcClient
import org.bitcoins.rpc.config.BitcoindInstance
import org.bitcoins.wallet.{LockedWallet, Wallet, WalletStorage}
import org.bitcoins.wallet.api._
import org.bitcoins.wallet.config.WalletAppConfig
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
object Main extends App {
implicit val conf = {
@ -107,20 +99,17 @@ object Main extends App {
SpvNodeCallbacks(onTxReceived = Seq(onTX))
}
val blockheaderDAO = BlockHeaderDAO()
val chain = ChainHandler(blockheaderDAO)
SpvNode(peer, chain, bloom, callbacks).start()
SpvNode(peer, bloom, callbacks).start()
}
_ = logger.info(s"Starting SPV node sync")
_ <- node.sync()
chainApi <- node.chainApiFromDb()
start <- {
val walletRoutes = WalletRoutes(wallet, node)
val nodeRoutes = NodeRoutes(node)
val chainRoutes = ChainRoutes(node.chainApi)
val server =
Server(nodeConf, // could use either of configurations
Seq(walletRoutes, nodeRoutes, chainRoutes))
val chainRoutes = ChainRoutes(chainApi)
val server = Server(nodeConf, Seq(walletRoutes, nodeRoutes, chainRoutes))
server.start()
}
} yield {

View file

@ -12,7 +12,7 @@ class BlockchainTest extends ChainUnitTest {
override def withFixture(test: OneArgAsyncTest): FutureOutcome =
withBlockHeaderDAO(test)
override implicit val system: ActorSystem = ActorSystem("BlockchainTest")
implicit override val system: ActorSystem = ActorSystem("BlockchainTest")
behavior of "Blockchain"
@ -26,7 +26,8 @@ class BlockchainTest extends ChainUnitTest {
BlockHeaderHelper.buildNextHeader(ChainUnitTest.genesisHeaderDb)
val connectTipF = Blockchain.connectTip(header = newHeader.blockHeader,
blockHeaderDAO = bhDAO)
blockHeaderDAO = bhDAO,
Vector(blockchain))
connectTipF.map {
case BlockchainUpdate.Successful(_, connectedHeader) =>

View file

@ -120,7 +120,9 @@ class ChainHandlerTest extends ChainUnitTest {
val createdF = chainHandler.blockHeaderDAO.createAll(firstThreeBlocks)
createdF.flatMap { _ =>
val processorF = Future.successful(chainHandler)
val blockchain = Blockchain.fromHeaders(firstThreeBlocks.reverse)
val handler = ChainHandler(chainHandler.blockHeaderDAO, blockchain)
val processorF = Future.successful(handler)
// Takes way too long to do all blocks
val blockHeadersToTest = blockHeaders.tail
.take(

View file

@ -44,6 +44,7 @@ case class Blockchain(headers: Vector[BlockHeaderDb])
object Blockchain extends ChainVerificationLogger {
def fromHeaders(headers: Vector[BlockHeaderDb]): Blockchain = {
Blockchain(headers)
}
@ -60,59 +61,63 @@ object Blockchain extends ChainVerificationLogger {
* we [[org.bitcoins.chain.blockchain.BlockchainUpdate.Successful successful]] connected the tip,
* or [[org.bitcoins.chain.blockchain.BlockchainUpdate.Failed Failed]] to connect to a tip
*/
def connectTip(header: BlockHeader, blockHeaderDAO: BlockHeaderDAO)(
def connectTip(
header: BlockHeader,
blockHeaderDAO: BlockHeaderDAO,
blockchains: Vector[Blockchain])(
implicit ec: ExecutionContext,
conf: ChainAppConfig): Future[BlockchainUpdate] = {
logger.debug(
s"Attempting to add new tip=${header.hashBE.hex} with prevhash=${header.previousBlockHashBE.hex} to chain")
//get all competing chains we have
val blockchainsF: Future[Vector[Blockchain]] =
blockHeaderDAO.getBlockchains()
val tipResultF: Future[BlockchainUpdate] = blockchainsF.flatMap {
blockchains =>
val nested: Vector[Future[BlockchainUpdate]] = blockchains.map {
blockchain =>
val prevBlockHeaderOpt =
blockchain.find(_.hashBE == header.previousBlockHashBE)
prevBlockHeaderOpt match {
case None =>
logger.warn(
s"No common ancestor found in the chain to connect header=${header.hashBE.hex}")
val err = TipUpdateResult.BadPreviousBlockHash(header)
val failed = BlockchainUpdate.Failed(blockchain = blockchain,
failedHeader = header,
tipUpdateFailure = err)
Future.successful(failed)
case Some(prevBlockHeader) =>
//found a header to connect to!
logger.trace(
s"Found ancestor=${prevBlockHeader.hashBE.hex} for header=${header.hashBE.hex}")
val tipResultF =
TipValidation.checkNewTip(newPotentialTip = header,
currentTip = prevBlockHeader,
blockHeaderDAO = blockHeaderDAO)
tipResultF.map { tipResult =>
tipResult match {
case TipUpdateResult.Success(headerDb) =>
logger.debug(
s"Successfully verified=${headerDb.hashBE.hex}, connecting to chain")
val newChain =
Blockchain.fromHeaders(headerDb +: blockchain.headers)
BlockchainUpdate.Successful(newChain, headerDb)
case fail: TipUpdateResult.Failure =>
logger.warn(
s"Could not verify header=${header.hashBE.hex}, reason=$fail")
BlockchainUpdate.Failed(blockchain, header, fail)
}
}
val tipResultF: Future[BlockchainUpdate] = {
val nested: Vector[Future[BlockchainUpdate]] = blockchains.map {
blockchain =>
val prevBlockHeaderIdxOpt =
blockchain.headers.zipWithIndex.find {
case (headerDb, _) =>
headerDb.hashBE == header.previousBlockHashBE
}
}
parseSuccessOrFailure(nested = nested)
prevBlockHeaderIdxOpt match {
case None =>
logger.warn(
s"No common ancestor found in the chain to connect to ${header.hashBE}")
val err = TipUpdateResult.BadPreviousBlockHash(header)
val failed = BlockchainUpdate.Failed(blockchain = blockchain,
failedHeader = header,
tipUpdateFailure = err)
Future.successful(failed)
case Some((prevBlockHeader, prevHeaderIdx)) =>
//found a header to connect to!
logger.debug(
s"Attempting to add new tip=${header.hashBE.hex} with prevhash=${header.previousBlockHashBE.hex} to chain")
val tipResultF =
TipValidation.checkNewTip(newPotentialTip = header,
currentTip = prevBlockHeader,
blockHeaderDAO = blockHeaderDAO)
tipResultF.map { tipResult =>
tipResult match {
case TipUpdateResult.Success(headerDb) =>
logger.debug(
s"Successfully verified=${headerDb.hashBE.hex}, connecting to chain")
val oldChain =
blockchain.takeRight(blockchain.length - prevHeaderIdx)
val newChain =
Blockchain.fromHeaders(headerDb +: oldChain)
BlockchainUpdate.Successful(newChain, headerDb)
case fail: TipUpdateResult.Failure =>
logger.warn(
s"Could not verify header=${header.hashBE.hex}, reason=$fail")
BlockchainUpdate.Failed(blockchain, header, fail)
}
}
}
}
parseSuccessOrFailure(nested = nested)
}
tipResultF
}

View file

@ -19,9 +19,11 @@ import org.bitcoins.chain.validation.TipUpdateResult
* of [[org.bitcoins.chain.api.ChainApi ChainApi]], this is the entry point in to the
* chain project.
*/
case class ChainHandler(blockHeaderDAO: BlockHeaderDAO)(
implicit private[chain] val chainConfig: ChainAppConfig
) extends ChainApi
case class ChainHandler(
blockHeaderDAO: BlockHeaderDAO,
blockchains: Vector[Blockchain])(
implicit private[chain] val chainConfig: ChainAppConfig)
extends ChainApi
with ChainVerificationLogger {
override def getBlockCount(implicit ec: ExecutionContext): Future[Long] = {
@ -49,18 +51,40 @@ case class ChainHandler(blockHeaderDAO: BlockHeaderDAO)(
logger.debug(
s"Processing header=${header.hashBE.hex}, previousHash=${header.previousBlockHashBE.hex}")
val blockchainUpdateF =
Blockchain.connectTip(header, blockHeaderDAO)
val blockchainUpdateF = Blockchain.connectTip(header = header,
blockHeaderDAO =
blockHeaderDAO,
blockchains = blockchains)
val newHandlerF = blockchainUpdateF.flatMap {
case BlockchainUpdate.Successful(_, updatedHeader) =>
case BlockchainUpdate.Successful(newChain, updatedHeader) =>
//now we have successfully connected the header, we need to insert
//it into the database
val createdF = blockHeaderDAO.create(updatedHeader)
createdF.map { header =>
logger.debug(
s"Connected new header to blockchain, height=${header.height} hash=${header.hashBE.hex}")
ChainHandler(blockHeaderDAO)
s"Connected new header to blockchain, height=${header.height} hash=${header.hashBE}")
val chainIdxOpt = blockchains.zipWithIndex.find {
case (chain, _) =>
val oldTip = newChain(1) //should be safe, even with genesis header as we just connected a tip
oldTip == chain.tip
}
val updatedChains = {
chainIdxOpt match {
case Some((_, idx)) =>
logger.trace(
s"Updating chain at idx=${idx} out of competing chains=${blockchains.length} with new tip=${header.hashBE.hex}")
blockchains.updated(idx, newChain)
case None =>
logger.info(
s"New competing blockchain with tip=${newChain.tip}")
blockchains.:+(newChain)
}
}
ChainHandler(blockHeaderDAO, updatedChains)
}
case BlockchainUpdate.Failed(_, _, reason) =>
val errMsg =
@ -117,11 +141,42 @@ case class ChainHandler(blockHeaderDAO: BlockHeaderDAO)(
//this does _not_ mean that it is on the chain that has the most work
//TODO: Enhance this in the future to return the "heaviest" header
//https://bitcoin.org/en/glossary/block-chain
blockHeaderDAO.chainTips.map { tips =>
val sorted = tips.sortBy(header => header.blockHeader.difficulty)
val hash = sorted.head.hashBE
logger.debug(s"getBestBlockHash result: hash=$hash")
hash
val groupedChains = blockchains.groupBy(_.tip.height)
val maxHeight = groupedChains.keys.max
val chains = groupedChains(maxHeight)
val hashBE: DoubleSha256DigestBE = chains match {
case Vector() =>
val errMsg = s"Did not find blockchain with height $maxHeight"
logger.error(errMsg)
throw new RuntimeException(errMsg)
case chain +: Vector() =>
chain.tip.hashBE
case chain +: rest =>
logger.warn(
s"We have multiple competing blockchains: ${(chain +: rest).map(_.tip.hashBE.hex).mkString(", ")}")
chain.tip.hashBE
}
Future.successful(hashBE)
}
}
object ChainHandler {
/** Constructs a [[ChainHandler chain handler]] from the state in the database
* This gives us the guaranteed latest state we have in the database
* */
def fromDatabase(blockHeaderDAO: BlockHeaderDAO)(
implicit ec: ExecutionContext,
chainConfig: ChainAppConfig): Future[ChainHandler] = {
val bestChainsF = blockHeaderDAO.getBlockchains()
bestChainsF.map(chains =>
new ChainHandler(blockHeaderDAO = blockHeaderDAO, blockchains = chains))
}
def apply(blockHeaderDAO: BlockHeaderDAO, blockchains: Blockchain)(
implicit chainConfig: ChainAppConfig): ChainHandler = {
new ChainHandler(blockHeaderDAO, Vector(blockchains))
}
}

View file

@ -23,4 +23,21 @@ object FutureUtil {
}
val unit: Future[Unit] = Future.successful(())
/**
* Folds over the given elements sequentially in a non-blocking async way
* @param init the initialized value for the accumulator
* @param items the items we are folding over
* @param fun the function we are applying to every element that returns a future
* @return
*/
def foldLeftAsync[T, U](init: T, items: Seq[U])(fun: (T, U) => Future[T])(
implicit ec: ExecutionContext): Future[T] = {
items.foldLeft(Future.successful(init)) {
case (accumF, elem) =>
accumF.flatMap { accum =>
fun(accum, elem)
}
}
}
}

View file

@ -58,16 +58,10 @@ class BroadcastTransactionTest extends BitcoinSWalletTest {
address <- rpc.getNewAddress
bloom <- wallet.getBloomFilter()
spv <- {
val peer = Peer.fromBitcoind(rpc.instance)
val chainHandler = {
val bhDao = BlockHeaderDAO()
ChainHandler(bhDao)
}
val spv =
SpvNode(peer, chainHandler, bloomFilter = bloom)
val spv = SpvNode(peer, bloomFilter = bloom)
spv.start()
}
_ <- spv.sync()

View file

@ -90,7 +90,7 @@ class NodeWithWalletTest extends NodeUnitTest {
bloom <- wallet.getBloomFilter()
address <- wallet.getNewAddress()
spv <- initSpv.start()
updatedBloom = spv.updateBloomFilter(address).bloomFilter
updatedBloom <- spv.updateBloomFilter(address).map(_.bloomFilter)
_ <- spv.sync()
_ <- NodeTestUtil.awaitSync(spv, rpc)

View file

@ -27,9 +27,10 @@ class SpvNodeTest extends NodeUnitTest {
val spvNode = spvNodeConnectedWithBitcoind.spvNode
val bitcoind = spvNodeConnectedWithBitcoind.bitcoind
assert(spvNode.isConnected)
assert(spvNode.isInitialized)
val assert1F = for {
_ <- spvNode.isConnected.map(assert(_))
a2 <- spvNode.isInitialized.map(assert(_))
} yield a2
val hashF: Future[DoubleSha256DigestBE] = {
bitcoind.generate(1).map(_.head)
@ -37,6 +38,7 @@ class SpvNodeTest extends NodeUnitTest {
//sync our spv node expecting to get that generated hash
val spvSyncF = for {
_ <- assert1F
_ <- hashF
sync <- spvNode.sync()
} yield sync
@ -62,7 +64,9 @@ class SpvNodeTest extends NodeUnitTest {
//as they happen with the 'sendheaders' message
//both our spv node and our bitcoind node _should_ both be at the genesis block (regtest)
//at this point so no actual syncing is happening
val initSyncF = gen1F.flatMap(_ => spvNode.sync())
val initSyncF = gen1F.flatMap { _ =>
spvNode.sync()
}
//start generating a block every 10 seconds with bitcoind
//this should result in 5 blocks
@ -76,7 +80,8 @@ class SpvNodeTest extends NodeUnitTest {
//we should expect 5 headers have been announced to us via
//the send headers message.
val has6BlocksF = RpcUtil.retryUntilSatisfiedF(
conditionF = () => spvNode.chainApi.getBlockCount.map(_ == 6),
conditionF =
() => spvNode.chainApiFromDb().flatMap(_.getBlockCount.map(_ == 6)),
duration = 1.seconds)
has6BlocksF.map(_ => succeed)

View file

@ -88,7 +88,7 @@ class UpdateBloomFilterTest extends NodeUnitTest with BeforeAndAfter {
addressFromWallet <- wallet.getNewAddress()
_ = addressFromWalletP.success(addressFromWallet)
spv <- initSpv.start()
_ = spv.updateBloomFilter(addressFromWallet)
_ <- spv.updateBloomFilter(addressFromWallet)
_ <- spv.sync()
_ <- rpc.sendToAddress(addressFromWallet, 1.bitcoin)
_ <- NodeTestUtil.awaitSync(spv, rpc)
@ -130,11 +130,11 @@ class UpdateBloomFilterTest extends NodeUnitTest with BeforeAndAfter {
5.bitcoin,
SatoshisPerByte(100.sats))
_ = txFromWalletP.success(tx)
spvNewBloom = spv.updateBloomFilter(tx)
updatedBloom <- spv.updateBloomFilter(tx).map(_.bloomFilter)
_ = spv.broadcastTransaction(tx)
_ <- spv.sync()
_ <- NodeTestUtil.awaitSync(spv, rpc)
_ = assert(spvNewBloom.bloomFilter.contains(tx.txId))
_ = assert(updatedBloom.contains(tx.txId))
_ = {
cancelable = Some {
system.scheduler.scheduleOnce(

View file

@ -2,6 +2,8 @@ package org.bitcoins.node.networking
import akka.io.Tcp
import akka.testkit.{TestActorRef, TestProbe}
import org.bitcoins.chain.db.ChainDbManagement
import org.bitcoins.node.SpvNodeCallbacks
import org.bitcoins.node.models.Peer
import org.bitcoins.node.networking.peer.PeerMessageReceiver
import org.bitcoins.node.networking.peer.PeerMessageReceiverState.Preconnection
@ -39,6 +41,7 @@ class P2PClientTest
BitcoinSTestAppConfig.getTestConfig()
implicit private val chainConf = config.chainConf
implicit private val nodeConf = config.nodeConf
implicit private val timeout = akka.util.Timeout(10.seconds)
implicit val np = config.chainConf.network
@ -126,8 +129,20 @@ class P2PClientTest
}
behavior of "P2PClient"
override def beforeAll(): Unit = {
ChainDbManagement.createHeaderTable()
}
override def afterAll(): Unit = {
ChainDbManagement.dropHeaderTable()
super.afterAll()
}
it must "establish a tcp connection with a bitcoin node" in {
bitcoindPeerF.flatMap(remote => connectAndDisconnect(remote))
bitcoindPeerF.flatMap { remote =>
println(s"Starting test")
connectAndDisconnect(remote)
}
}
it must "connect to two nodes" in {
@ -152,26 +167,32 @@ class P2PClientTest
def connectAndDisconnect(peer: Peer): Future[Assertion] = {
val probe = TestProbe()
val remote = peer.socket
val chainHandler = {
val dao = BlockHeaderDAO()
ChainHandler(dao)
val peerMessageReceiverF =
PeerMessageReceiver.preConnection(peer, SpvNodeCallbacks.empty)
val clientActorF: Future[TestActorRef[P2PClientActor]] =
peerMessageReceiverF.map { peerMsgRecv =>
TestActorRef(P2PClient.props(peer, peerMsgRecv), probe.ref)
}
val p2pClientF: Future[P2PClient] = clientActorF.map {
client: TestActorRef[P2PClientActor] =>
P2PClient(client, peer)
}
val peerMessageReceiver =
PeerMessageReceiver(state = Preconnection, chainHandler)
val client =
TestActorRef(P2PClient.props(peer, peerMessageReceiver), probe.ref)
client ! Tcp.Connect(remote)
val isConnectedF =
TestAsyncUtil.retryUntilSatisfied(peerMessageReceiver.isInitialized)
val isConnectedF = for {
p2pClient <- p2pClientF
_ = p2pClient.actor ! Tcp.Connect(remote)
isConnected <- TestAsyncUtil.retryUntilSatisfiedF(p2pClient.isConnected)
} yield isConnected
isConnectedF.flatMap { _ =>
//disconnect here
client ! Tcp.Abort
val isDisconnectedF =
TestAsyncUtil.retryUntilSatisfied(peerMessageReceiver.isDisconnected,
duration = 1.seconds)
val isDisconnectedF = for {
p2pClient <- p2pClientF
_ = p2pClient.actor ! Tcp.Abort
isDisconnected <- TestAsyncUtil.retryUntilSatisfiedF(
p2pClient.isDisconnected,
duration = 1.seconds)
} yield isDisconnected
isDisconnectedF.map { _ =>
succeed

View file

@ -22,32 +22,22 @@ class PeerMessageHandlerTest extends NodeUnitTest {
behavior of "PeerHandler"
it must "be able to fully initialize a PeerMessageReceiver" in { _ =>
val peerHandlerF = bitcoindPeerF.map(p => NodeUnitTest.buildPeerHandler(p))
val peerHandlerF =
bitcoindPeerF.flatMap(p => NodeUnitTest.buildPeerHandler(p))
val peerMsgSenderF = peerHandlerF.map(_.peerMsgSender)
val peerMsgRecvF = peerHandlerF.map(_.peerMsgRecv)
val p2pClientF = peerHandlerF.map(_.p2pClient)
val _ =
bitcoindPeerF.flatMap(p => peerHandlerF.map(_.peerMsgSender.connect()))
val isConnectedF = TestAsyncUtil.retryUntilSatisfiedF(
() => peerMsgRecvF.map(_.isConnected),
() => p2pClientF.flatMap(_.isConnected),
duration = 500.millis
)
val hasVersionMsgF = isConnectedF.flatMap { _ =>
TestAsyncUtil.retryUntilSatisfiedF(
conditionF = () => peerMsgRecvF.map(_.hasReceivedVersionMsg)
)
}
val hasVerackMsg = hasVersionMsgF.flatMap { _ =>
TestAsyncUtil.retryUntilSatisfiedF(
conditionF = () => peerMsgRecvF.map(_.hasReceivedVerackMsg)
)
}
val isInitF = hasVerackMsg.flatMap { _ =>
peerMsgRecvF.map(p => assert(p.isInitialized))
val isInitF = isConnectedF.flatMap { _ =>
TestAsyncUtil.retryUntilSatisfiedF(() =>
p2pClientF.flatMap(_.isInitialized()))
}
val disconnectF = isInitF.flatMap { _ =>
@ -56,7 +46,7 @@ class PeerMessageHandlerTest extends NodeUnitTest {
val isDisconnectedF = disconnectF.flatMap { _ =>
TestAsyncUtil.retryUntilSatisfiedF(() =>
peerMsgRecvF.map(_.isDisconnected))
p2pClientF.flatMap(_.isDisconnected()))
}

View file

@ -2,76 +2,115 @@ package org.bitcoins.node
import akka.actor.ActorSystem
import org.bitcoins.chain.api.ChainApi
import org.bitcoins.node.models.Peer
import org.bitcoins.chain.blockchain.ChainHandler
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.chain.models.BlockHeaderDAO
import org.bitcoins.core.bloom.BloomFilter
import org.bitcoins.core.p2p.NetworkPayload
import org.bitcoins.core.protocol.BitcoinAddress
import org.bitcoins.core.protocol.transaction.Transaction
import org.bitcoins.db.P2PLogger
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models.{
BroadcastAbleTransaction,
BroadcastAbleTransactionDAO,
Peer
}
import org.bitcoins.node.networking.P2PClient
import org.bitcoins.node.networking.peer.{
PeerMessageReceiver,
PeerMessageSender
}
import org.bitcoins.rpc.util.AsyncUtil
import slick.jdbc.SQLiteProfile
import scala.concurrent.Future
import org.bitcoins.core.bloom.BloomFilter
import org.bitcoins.core.p2p.NetworkPayload
import org.bitcoins.core.protocol.transaction.Transaction
import org.bitcoins.node.models.BroadcastAbleTransaction
import org.bitcoins.node.models.BroadcastAbleTransactionDAO
import slick.jdbc.SQLiteProfile
import scala.util.Failure
import scala.util.Success
import org.bitcoins.db.P2PLogger
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.core.protocol.BitcoinAddress
import scala.concurrent.duration.DurationInt
import scala.util.{Failure, Success}
case class SpvNode(
peer: Peer,
chainApi: ChainApi,
bloomFilter: BloomFilter,
callbacks: SpvNodeCallbacks = SpvNodeCallbacks.empty
)(implicit system: ActorSystem, nodeAppConfig: NodeAppConfig)
)(
implicit system: ActorSystem,
nodeAppConfig: NodeAppConfig,
chainAppConfig: ChainAppConfig)
extends P2PLogger {
import system.dispatcher
/** This implicit is required for using the [[akka.pattern.ask akka ask]]
* to query what the state of our node is, like [[isConnected isConnected]]
* */
implicit private val timeout = akka.util.Timeout(10.seconds)
private val txDAO = BroadcastAbleTransactionDAO(SQLiteProfile)
private val peerMsgRecv =
PeerMessageReceiver.newReceiver(chainApi, callbacks)
/** This is constructing a chain api from disk every time we call this method
* This involves database calls which can be slow and expensive to construct
* our [[org.bitcoins.chain.blockchain.Blockchain Blockchain]]
* */
def chainApiFromDb(): Future[ChainApi] = {
ChainHandler.fromDatabase(BlockHeaderDAO())
}
private val client: P2PClient =
P2PClient(context = system, peer = peer, peerMessageReceiver = peerMsgRecv)
/** Unlike our chain api, this is cached inside our spv node
* object. Internally in [[org.bitcoins.node.networking.P2PClient p2p client]] you will see that
* the [[org.bitcoins.chain.api.ChainApi chain api]] is updated inside of the p2p client
* */
private val clientF: Future[P2PClient] = {
for {
chainApi <- chainApiFromDb()
} yield {
val peerMsgRecv: PeerMessageReceiver =
PeerMessageReceiver.newReceiver(chainApi = chainApi,
peer = peer,
callbacks = callbacks)
val p2p = P2PClient(context = system,
peer = peer,
peerMessageReceiver = peerMsgRecv)
p2p
}
}
private val peerMsgSender: PeerMessageSender = {
PeerMessageSender(client)
private val peerMsgSenderF: Future[PeerMessageSender] = {
clientF.map { client =>
PeerMessageSender(client)
}
}
/** Updates our bloom filter to match the given TX
*
* @return SPV node with the updated bloom filter
*/
def updateBloomFilter(transaction: Transaction): SpvNode = {
logger.info(s"Updating bloom filter with transaction=${transaction.txIdBE}")
def updateBloomFilter(transaction: Transaction): Future[SpvNode] = {
logger(nodeAppConfig).info(
s"Updating bloom filter with transaction=${transaction.txIdBE}")
val newBloom = bloomFilter.update(transaction)
// we could send filteradd messages, but we would
// then need to calculate all the new elements in
// the filter. this is easier:-)
peerMsgSender.sendFilterClearMessage()
peerMsgSender.sendFilterLoadMessage(newBloom)
val newBloomLoadF = peerMsgSenderF.map { p =>
p.sendFilterClearMessage()
p.sendFilterLoadMessage(newBloom)
}
copy(bloomFilter = newBloom)
newBloomLoadF.map(_ => copy(bloomFilter = newBloom))
}
/** Updates our bloom filter to match the given address
*
* @return SPV node with the updated bloom filter
*/
def updateBloomFilter(address: BitcoinAddress): SpvNode = {
logger.info(s"Updating bloom filter with address=$address")
def updateBloomFilter(address: BitcoinAddress): Future[SpvNode] = {
logger(nodeAppConfig).info(s"Updating bloom filter with address=$address")
val hash = address.hash
val newBloom = bloomFilter.insert(hash)
peerMsgSender.sendFilterAddMessage(hash)
val sentFilterAddF = peerMsgSenderF.map(_.sendFilterAddMessage(hash))
copy(bloomFilter = newBloom)
sentFilterAddF.map { _ =>
copy(bloomFilter = newBloom)
}
}
/**
@ -80,8 +119,8 @@ case class SpvNode(
* with P2P messages, therefore marked as
* `private[node]`.
*/
private[node] def send(msg: NetworkPayload): Unit = {
peerMsgSender.sendMsg(msg)
private[node] def send(msg: NetworkPayload): Future[Unit] = {
peerMsgSenderF.map(_.sendMsg(msg))
}
/** Starts our spv node */
@ -89,59 +128,72 @@ case class SpvNode(
for {
_ <- nodeAppConfig.initialize()
node <- {
peerMsgSender.connect()
val isInitializedF = for {
_ <- peerMsgSenderF.map(_.connect())
_ <- AsyncUtil.retryUntilSatisfiedF(() => isInitialized)
} yield ()
val isInitializedF =
AsyncUtil.retryUntilSatisfied(peerMsgRecv.isInitialized)
isInitializedF.failed.foreach(err =>
logger.error(s"Failed to connect with peer=$peer with err=${err}"))
isInitializedF.failed.foreach(
err =>
logger(nodeAppConfig).error(
s"Failed to connect with peer=$peer with err=${err}"))
isInitializedF.map { _ =>
logger.info(s"Our peer=${peer} has been initialized")
logger(nodeAppConfig).info(s"Our peer=${peer} has been initialized")
this
}
}
_ <- peerMsgSenderF.map(_.sendFilterLoadMessage(bloomFilter))
} yield {
logger.info(s"Sending bloomfilter=${bloomFilter.hex} to $peer")
val _ = peerMsgSender.sendFilterLoadMessage(bloomFilter)
logger(nodeAppConfig).info(
s"Sending bloomfilter=${bloomFilter.hex} to $peer")
node
}
}
/** Stops our spv node */
def stop(): Future[SpvNode] = {
peerMsgSender.disconnect()
logger(nodeAppConfig).info(s"Stopping spv node")
val disconnectF = peerMsgSenderF.map(_.disconnect())
val isStoppedF = AsyncUtil.retryUntilSatisfied(peerMsgRecv.isDisconnected)
val isStoppedF = disconnectF.flatMap { _ =>
logger(nodeAppConfig).info(s"Awaiting disconnect")
AsyncUtil.retryUntilSatisfiedF(() => isDisconnected)
}
isStoppedF.map(_ => this)
isStoppedF.map { _ =>
logger(nodeAppConfig).info(s"Spv node stopped!")
this
}
}
/** Broadcasts the given transaction over the P2P network */
def broadcastTransaction(transaction: Transaction): Unit = {
def broadcastTransaction(transaction: Transaction): Future[Unit] = {
val broadcastTx = BroadcastAbleTransaction(transaction)
txDAO.create(broadcastTx).onComplete {
case Failure(exception) =>
logger.error(s"Error when writing broadcastable TX to DB", exception)
logger(nodeAppConfig)
.error(s"Error when writing broadcastable TX to DB", exception)
case Success(written) =>
logger.debug(
logger(nodeAppConfig).debug(
s"Wrote tx=${written.transaction.txIdBE} to broadcastable table")
}
logger.info(s"Sending out inv for tx=${transaction.txIdBE}")
peerMsgSender.sendInventoryMessage(transaction)
logger(nodeAppConfig).info(s"Sending out inv for tx=${transaction.txIdBE}")
peerMsgSenderF.map(_.sendInventoryMessage(transaction))
}
/** Checks if we have a tcp connection with our peer */
def isConnected: Boolean = peerMsgRecv.isConnected
def isConnected: Future[Boolean] = clientF.flatMap(_.isConnected)
/** Checks if we are fully initialized with our peer and have executed the handshake
* This means we can now send arbitrary messages to our peer
* @return
*/
def isInitialized: Boolean = peerMsgRecv.isInitialized
def isInitialized: Future[Boolean] = clientF.flatMap(_.isInitialized)
def isDisconnected: Future[Boolean] = clientF.flatMap(_.isDisconnected)
/** Starts to sync our spv node with our peer
* If our local best block hash is the same as our peers
@ -151,13 +203,15 @@ case class SpvNode(
*/
def sync(): Future[Unit] = {
for {
chainApi <- chainApiFromDb()
hash <- chainApi.getBestBlockHash
header <- chainApi
.getHeader(hash)
.map(_.get) // .get is safe since this is an internal call
} yield {
peerMsgSender.sendGetHeadersMessage(hash.flip)
logger.info(s"Starting sync node, height=${header.height} hash=$hash")
peerMsgSenderF.map(_.sendGetHeadersMessage(hash.flip))
logger(nodeAppConfig).info(
s"Starting sync node, height=${header.height} hash=$hash")
}
}
}

View file

@ -2,21 +2,26 @@ package org.bitcoins.node.networking
import akka.actor.{Actor, ActorRef, ActorRefFactory, Props}
import akka.io.{IO, Tcp}
import akka.util.ByteString
import akka.pattern.AskTimeoutException
import akka.util.{ByteString, CompactByteString, Timeout}
import org.bitcoins.core.config.NetworkParameters
import org.bitcoins.core.p2p.NetworkMessage
import org.bitcoins.core.p2p.NetworkPayload
import org.bitcoins.core.util.FutureUtil
import org.bitcoins.node.models.Peer
import org.bitcoins.node.networking.peer.PeerMessageReceiver
import org.bitcoins.node.networking.peer.PeerMessageReceiver.NetworkMessageReceived
import org.bitcoins.node.util.BitcoinSpvNodeUtil
import scodec.bits.ByteVector
import org.bitcoins.node.config.NodeAppConfig
import akka.util.CompactByteString
import scala.annotation.tailrec
import scala.util._
import org.bitcoins.db.P2PLogger
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration.DurationInt
/**
* This actor is responsible for creating a connection,
* relaying messages and closing a connection to our peer on
@ -43,17 +48,19 @@ import org.bitcoins.db.P2PLogger
* CANNOT fit in a single TCP packet. This means we must cache
* the bytes and wait for the rest of them to be sent.
*
* @param peerMsgHandlerReceiver The place we send messages that we successfully parsed
* @param initPeerMsgHandlerReceiver The place we send messages that we successfully parsed
* from our peer on the P2P network. This is mostly likely
* a [[org.bitcoins.node.networking.peer.PeerMessageSender]]
*/
case class P2PClientActor(
peer: Peer,
peerMsgHandlerReceiver: PeerMessageReceiver
initPeerMsgHandlerReceiver: PeerMessageReceiver
)(implicit config: NodeAppConfig)
extends Actor
with P2PLogger {
private var currentPeerMsgHandlerRecv = initPeerMsgHandlerReceiver
/**
* The manager is an actor that handles the underlying low level I/O resources (selectors, channels)
* and instantiates workers for specific tasks, such as listening to incoming connections.
@ -65,6 +72,8 @@ case class P2PClientActor(
*/
val network: NetworkParameters = config.network
private val timeout = 10.seconds
/**
* TODO: this comment seems wrong?
*
@ -80,8 +89,12 @@ case class P2PClientActor(
self.forward(networkMsg)
case message: Tcp.Message =>
val newUnalignedBytes =
handleTcpMessage(message, Some(peer), unalignedBytes)
Await.result(handleTcpMessage(message, Some(peer), unalignedBytes),
timeout)
context.become(awaitNetworkRequest(peer, newUnalignedBytes))
case metaMsg: P2PClient.MetaMsg =>
sender ! handleMetaMsg(metaMsg)
}
/** This context is responsible for initializing a tcp connection with a peer on the bitcoin p2p network */
@ -92,16 +105,19 @@ case class P2PClientActor(
//after receiving Tcp.Connected we switch to the
//'awaitNetworkRequest' context. This is the main
//execution loop for the Client actor
val _ = handleCommand(cmd, peer = None)
handleCommand(cmd, peer = None)
case connected: Tcp.Connected =>
val _ = handleEvent(connected, unalignedBytes = ByteVector.empty)
Await.result(handleEvent(connected, unalignedBytes = ByteVector.empty),
timeout)
case msg: NetworkMessage =>
self.forward(msg.payload)
case payload: NetworkPayload =>
logger.error(
s"Cannot send a message to our peer when we are not connected! payload=${payload} peer=${peer}")
case metaMsg: P2PClient.MetaMsg =>
sender ! handleMetaMsg(metaMsg)
}
/**
@ -112,14 +128,14 @@ case class P2PClientActor(
private def handleTcpMessage(
message: Tcp.Message,
peer: Option[ActorRef],
unalignedBytes: ByteVector): ByteVector = {
unalignedBytes: ByteVector): Future[ByteVector] = {
message match {
case event: Tcp.Event =>
handleEvent(event, unalignedBytes)
handleEvent(event, unalignedBytes = unalignedBytes)
case command: Tcp.Command =>
handleCommand(command, peer)
unalignedBytes
Future.successful(unalignedBytes)
}
}
@ -128,18 +144,19 @@ case class P2PClientActor(
*/
private def handleEvent(
event: Tcp.Event,
unalignedBytes: ByteVector): ByteVector = {
unalignedBytes: ByteVector): Future[ByteVector] = {
import context.dispatcher
event match {
case Tcp.Bound(localAddress) =>
logger.debug(
s"Actor is now bound to the local address: ${localAddress}")
context.parent ! Tcp.Bound(localAddress)
unalignedBytes
Future.successful(unalignedBytes)
case Tcp.CommandFailed(command) =>
logger.debug(s"Client Command failed: ${command}")
unalignedBytes
Future.successful(unalignedBytes)
case Tcp.Connected(remote, local) =>
logger.debug(s"Tcp connection to: ${remote}")
logger.debug(s"Local: ${local}")
@ -149,22 +166,30 @@ case class P2PClientActor(
//our bitcoin peer will send all messages to this actor.
sender ! Tcp.Register(self)
val _ = peerMsgHandlerReceiver.connect(P2PClient(self, peer))
val newPeerMsgRecvF: Future[PeerMessageReceiver] =
currentPeerMsgHandlerRecv.connect(P2PClient(self, peer))
newPeerMsgRecvF.map { newPeerMsgRecv =>
currentPeerMsgHandlerRecv = newPeerMsgRecv
context.become(awaitNetworkRequest(sender, unalignedBytes))
unalignedBytes
}
context.become(awaitNetworkRequest(sender, ByteVector.empty))
unalignedBytes
case closeCmd @ (Tcp.ConfirmedClosed | Tcp.Closed | Tcp.Aborted |
Tcp.PeerClosed) =>
logger.debug(s"Closed command received: ${closeCmd}")
//tell our peer message handler we are disconnecting
val disconnectT = peerMsgHandlerReceiver.disconnect()
val newPeerMsgRecvF = currentPeerMsgHandlerRecv.disconnect()
disconnectT.failed.foreach(err =>
newPeerMsgRecvF.failed.foreach(err =>
logger.error(s"Failed to disconnect=${err}"))
context.stop(self)
unalignedBytes
newPeerMsgRecvF.map { newPeerMsgRecv =>
currentPeerMsgHandlerRecv = newPeerMsgRecv
context.stop(self)
unalignedBytes
}
case Tcp.Received(byteString: ByteString) =>
val byteVec = ByteVector(byteString.toArray)
logger.debug(s"Received ${byteVec.length} TCP bytes")
@ -194,17 +219,25 @@ case class P2PClientActor(
logger.trace(s"Unaligned bytes: ${newUnalignedBytes.toHex}")
}
//for the messages we successfully parsed above
//send them to 'context.parent' -- this is the
//PeerMessageHandler that is responsible for
//creating this Client Actor
messages.foreach { m =>
val msg = NetworkMessageReceived(m, P2PClient(self, peer))
peerMsgHandlerReceiver.handleNetworkMessageReceived(msg)
val f: (
PeerMessageReceiver,
NetworkMessage) => Future[PeerMessageReceiver] = {
case (peerMsgRecv: PeerMessageReceiver, m: NetworkMessage) =>
logger.trace(s"Processing message=${m}")
val msg = NetworkMessageReceived(m, P2PClient(self, peer))
val doneF = peerMsgRecv.handleNetworkMessageReceived(msg)
doneF
}
newUnalignedBytes
val newMsgReceiverF: Future[PeerMessageReceiver] = {
logger.trace(s"About to process ${messages.length} messages")
FutureUtil.foldLeftAsync(currentPeerMsgHandlerRecv, messages)(f)
}
newMsgReceiverF.map { newMsgReceiver =>
currentPeerMsgHandlerRecv = newMsgReceiver
newUnalignedBytes
}
}
}
@ -224,6 +257,17 @@ case class P2PClientActor(
manager ! bind
}
/**
* Returns the current state of our peer given the [[P2PClient.MetaMsg meta message]]
*/
private def handleMetaMsg(metaMsg: P2PClient.MetaMsg): Boolean = {
metaMsg match {
case P2PClient.IsConnected => currentPeerMsgHandlerRecv.isConnected
case P2PClient.IsInitialized => currentPeerMsgHandlerRecv.isInitialized
case P2PClient.IsDisconnected => currentPeerMsgHandlerRecv.isDisconnected
}
}
/**
* Sends a network request to our peer on the network
*/
@ -237,10 +281,60 @@ case class P2PClientActor(
}
case class P2PClient(actor: ActorRef, peer: Peer)
case class P2PClient(actor: ActorRef, peer: Peer) extends P2PLogger {
import akka.pattern.ask
def isConnected()(
implicit timeout: Timeout,
ec: ExecutionContext): Future[Boolean] = {
val isConnectedF = actor.ask(P2PClient.IsConnected).mapTo[Boolean]
isConnectedF.recoverWith {
case _: Throwable => Future.successful(false)
}
}
def isInitialized()(
implicit timeout: Timeout,
ec: ExecutionContext): Future[Boolean] = {
val isInitF = actor.ask(P2PClient.IsInitialized).mapTo[Boolean]
isInitF.recoverWith {
case _: Throwable => Future.successful(false)
}
}
def isDisconnected()(
implicit timeout: Timeout,
ec: ExecutionContext): Future[Boolean] = {
val isDisconnect: Future[Boolean] =
actor.ask(P2PClient.IsDisconnected).mapTo[Boolean]
//this future can be failed, as we stop the P2PClientActor if we send a disconnect
//if that actor has been killed, the peer _has_ to have been disconnected
isDisconnect.recoverWith {
case _: Throwable => Future.successful(true)
}
}
}
object P2PClient extends P2PLogger {
/** A message hierarchy that canbe sent to [[P2PClientActor P2P Client Actor]]
* to query about meta information of a peer
* */
sealed trait MetaMsg
/** A message that can be sent to [[P2PClient p2p client]] that returns true
* if the peer is connected, false if not */
final case object IsConnected extends MetaMsg
/** A message that can be sent to [[P2PClient p2p client]] that returns true
* if the peer is initialized (p2p handshake complete), false if not */
final case object IsInitialized extends MetaMsg
/** A message that can be sent to [[P2PClient p2p client]] that returns true
* if the peer is disconnected, false otherwise */
final case object IsDisconnected extends MetaMsg
def props(peer: Peer, peerMsgHandlerReceiver: PeerMessageReceiver)(
implicit config: NodeAppConfig
): Props =

View file

@ -1,31 +1,23 @@
package org.bitcoins.node.networking.peer
import org.bitcoins.chain.api.ChainApi
import org.bitcoins.core.util.FutureUtil
import org.bitcoins.core.p2p.{DataPayload, HeadersMessage, InventoryMessage}
import scala.concurrent.{ExecutionContext, Future}
import org.bitcoins.core.protocol.blockchain.Block
import org.bitcoins.core.protocol.blockchain.MerkleBlock
import org.bitcoins.core.p2p.{Inventory, MsgUnassigned, TypeIdentifier, _}
import org.bitcoins.core.protocol.blockchain.{Block, MerkleBlock}
import org.bitcoins.core.protocol.transaction.Transaction
import org.bitcoins.core.p2p.BlockMessage
import org.bitcoins.core.p2p.TransactionMessage
import org.bitcoins.core.p2p.MerkleBlockMessage
import org.bitcoins.db.P2PLogger
import org.bitcoins.node.SpvNodeCallbacks
import org.bitcoins.core.p2p.GetDataMessage
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models.BroadcastAbleTransactionDAO
import slick.jdbc.SQLiteProfile
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.core.p2p.TypeIdentifier
import org.bitcoins.core.p2p.MsgUnassigned
import org.bitcoins.db.P2PLogger
import org.bitcoins.core.p2p.Inventory
import scala.concurrent.{ExecutionContext, Future}
/** This actor is meant to handle a [[org.bitcoins.core.p2p.DataPayload DataPayload]]
* that a peer to sent to us on the p2p network, for instance, if we a receive a
* [[org.bitcoins.core.p2p.HeadersMessage HeadersMessage]] we should store those headers in our database
*/
class DataMessageHandler(callbacks: SpvNodeCallbacks, chainHandler: ChainApi)(
class DataMessageHandler(chainApi: ChainApi, callbacks: SpvNodeCallbacks)(
implicit ec: ExecutionContext,
appConfig: NodeAppConfig)
extends P2PLogger {
@ -34,7 +26,7 @@ class DataMessageHandler(callbacks: SpvNodeCallbacks, chainHandler: ChainApi)(
def handleDataPayload(
payload: DataPayload,
peerMsgSender: PeerMessageSender): Future[Unit] = {
peerMsgSender: PeerMessageSender): Future[DataMessageHandler] = {
payload match {
case getData: GetDataMessage =>
@ -65,17 +57,17 @@ class DataMessageHandler(callbacks: SpvNodeCallbacks, chainHandler: ChainApi)(
}
}
FutureUtil.unit
Future.successful(this)
case HeadersMessage(count, headers) =>
logger.debug(s"Received headers message with ${count.toInt} headers")
logger.trace(
s"Received headers=${headers.map(_.hashBE.hex).mkString("[", ",", "]")}")
val chainApiF = chainHandler.processHeaders(headers)
val chainApiF = chainApi.processHeaders(headers)
logger.trace(s"Requesting data for headers=${headers.length}")
peerMsgSender.sendGetDataMessage(headers: _*)
chainApiF
val getHeadersF = chainApiF
.map { newApi =>
if (headers.nonEmpty) {
@ -100,27 +92,40 @@ class DataMessageHandler(callbacks: SpvNodeCallbacks, chainHandler: ChainApi)(
}
}
.failed
.map { err =>
logger.error(s"Error when processing headers message", err)
}
getHeadersF.failed.map { err =>
logger.error(s"Error when processing headers message", err)
}
for {
newApi <- chainApiF
_ <- getHeadersF
} yield {
new DataMessageHandler(newApi, callbacks)
}
case msg: BlockMessage =>
Future { callbacks.onBlockReceived.foreach(_.apply(msg.block)) }
Future {
callbacks.onBlockReceived.foreach(_.apply(msg.block))
this
}
case TransactionMessage(tx) =>
val belongsToMerkle =
MerkleBuffers.putTx(tx, callbacks.onMerkleBlockReceived)
if (belongsToMerkle) {
logger.trace(
s"Transaction=${tx.txIdBE} belongs to merkleblock, not calling callbacks")
FutureUtil.unit
Future.successful(this)
} else {
logger.trace(
s"Transaction=${tx.txIdBE} does not belong to merkleblock, processing given callbacks")
Future { callbacks.onTxReceived.foreach(_.apply(tx)) }
Future {
callbacks.onTxReceived.foreach(_.apply(tx))
this
}
}
case MerkleBlockMessage(merkleBlock) =>
MerkleBuffers.putMerkle(merkleBlock)
FutureUtil.unit
Future.successful(this)
case invMsg: InventoryMessage =>
handleInventoryMsg(invMsg = invMsg, peerMsgSender = peerMsgSender)
}
@ -128,7 +133,7 @@ class DataMessageHandler(callbacks: SpvNodeCallbacks, chainHandler: ChainApi)(
private def handleInventoryMsg(
invMsg: InventoryMessage,
peerMsgSender: PeerMessageSender): Future[Unit] = {
peerMsgSender: PeerMessageSender): Future[DataMessageHandler] = {
logger.info(s"Received inv=${invMsg}")
val getData = GetDataMessage(invMsg.inventories.map {
case Inventory(TypeIdentifier.MsgBlock, hash) =>
@ -136,7 +141,7 @@ class DataMessageHandler(callbacks: SpvNodeCallbacks, chainHandler: ChainApi)(
case other: Inventory => other
})
peerMsgSender.sendMsg(getData)
FutureUtil.unit
Future.successful(this)
}
}

View file

@ -1,5 +1,7 @@
package org.bitcoins.node.networking.peer
import org.bitcoins.node.networking.P2PClient
/*
abstract class PeerHandler extends BitcoinSLogger {
implicit val system: ActorSystem
@ -58,6 +60,4 @@ object PeerHandler {
}
*/
case class PeerHandler(
peerMsgRecv: PeerMessageReceiver,
peerMsgSender: PeerMessageSender)
case class PeerHandler(p2pClient: P2PClient, peerMsgSender: PeerMessageSender)

View file

@ -1,9 +1,14 @@
package org.bitcoins.node.networking.peer
import akka.actor.ActorRefFactory
import org.bitcoins.core.p2p.NetworkMessage
import org.bitcoins.chain.api.ChainApi
import org.bitcoins.chain.blockchain.ChainHandler
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.chain.models.BlockHeaderDAO
import org.bitcoins.core.p2p.{NetworkMessage, _}
import org.bitcoins.db.P2PLogger
import org.bitcoins.node.SpvNodeCallbacks
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.core.p2p._
import org.bitcoins.node.models.Peer
import org.bitcoins.node.networking.P2PClient
import org.bitcoins.node.networking.peer.PeerMessageReceiverState.{
@ -13,10 +18,7 @@ import org.bitcoins.node.networking.peer.PeerMessageReceiverState.{
Preconnection
}
import scala.util.{Failure, Success, Try}
import org.bitcoins.node.SpvNodeCallbacks
import org.bitcoins.db.P2PLogger
import org.bitcoins.chain.api.ChainApi
import scala.concurrent.Future
/**
* Responsible for receiving messages from a peer on the
@ -26,62 +28,57 @@ import org.bitcoins.chain.api.ChainApi
* [[org.bitcoins.core.p2p.NetworkMessage NetworkMessage]]
*/
class PeerMessageReceiver(
state: PeerMessageReceiverState,
callbacks: SpvNodeCallbacks,
chainHandler: ChainApi
)(implicit ref: ActorRefFactory, nodeAppConfig: NodeAppConfig)
dataMessageHandler: DataMessageHandler,
val state: PeerMessageReceiverState,
peer: Peer,
callbacks: SpvNodeCallbacks
)(
implicit ref: ActorRefFactory,
nodeAppConfig: NodeAppConfig,
chainAppConfig: ChainAppConfig)
extends P2PLogger {
import ref.dispatcher
//TODO: Really bad to just modify this internal state
//not async safe at all
private var internalState: PeerMessageReceiverState = state
/** The peer we are connected to. */
private var peerOpt: Option[Peer] = None
/** This method is called when we have received
* a [[akka.io.Tcp.Connected]] message from our peer
* This means we have opened a Tcp connection,
* but have NOT started the handshake
* This method will initiate the handshake
*/
protected[networking] def connect(client: P2PClient): Try[Unit] = {
protected[networking] def connect(
client: P2PClient): Future[PeerMessageReceiver] = {
internalState match {
state match {
case bad @ (_: Initializing | _: Normal | _: Disconnected) =>
Failure(
Future.failed(
new RuntimeException(s"Cannot call connect when in state=${bad}")
)
case Preconnection =>
peerOpt = Some(client.peer)
logger.info(s"Connection established with peer=${peerOpt.get}")
logger(nodeAppConfig).info(s"Connection established with peer=${peer}")
val newState = Preconnection.toInitializing(client)
val _ = toState(newState)
val peerMsgSender = PeerMessageSender(client)
peerMsgSender.sendVersionMessage()
Success(())
val newRecv = toState(newState)
Future.successful(newRecv)
}
}
protected[networking] def disconnect(): Try[Unit] = {
internalState match {
protected[networking] def disconnect(): Future[PeerMessageReceiver] = {
logger(nodeAppConfig).trace(s"Disconnecting with internalstate=${state}")
state match {
case bad @ (_: Initializing | _: Disconnected | Preconnection) =>
Failure(
Future.failed(
new RuntimeException(
s"Cannot disconnect from peer=${peerOpt.get} when in state=${bad}")
s"Cannot disconnect from peer=${peer} when in state=${bad}")
)
case good: Normal =>
logger.debug(s"Disconnected bitcoin peer=${peerOpt.get}")
logger(nodeAppConfig).debug(s"Disconnected bitcoin peer=${peer}")
val newState = Disconnected(
clientConnectP = good.clientConnectP,
clientDisconnectP = good.clientDisconnectP.success(()),
@ -89,37 +86,40 @@ class PeerMessageReceiver(
verackMsgP = good.verackMsgP
)
val _ = toState(newState)
Success(())
val newRecv = toState(newState)
Future.successful(newRecv)
}
}
def isConnected: Boolean = internalState.isConnected
private[networking] def isConnected: Boolean = state.isConnected
def isDisconnected: Boolean = internalState.isDisconnected
private[networking] def isDisconnected: Boolean = state.isDisconnected
def hasReceivedVersionMsg: Boolean =
internalState.hasReceivedVersionMsg.isCompleted
private[networking] def hasReceivedVersionMsg: Boolean =
state.hasReceivedVersionMsg.isCompleted
def hasReceivedVerackMsg: Boolean =
internalState.hasReceivedVerackMsg.isCompleted
private[networking] def hasReceivedVerackMsg: Boolean =
state.hasReceivedVerackMsg.isCompleted
def isInitialized: Boolean = internalState.isInitialized
private[networking] def isInitialized: Boolean = state.isInitialized
def handleNetworkMessageReceived(
networkMsgRecv: PeerMessageReceiver.NetworkMessageReceived): Unit = {
networkMsgRecv: PeerMessageReceiver.NetworkMessageReceived): Future[
PeerMessageReceiver] = {
val client = networkMsgRecv.client
//create a way to send a response if we need too
val peerMsgSender = PeerMessageSender(client)
logger.debug(
s"Received message=${networkMsgRecv.msg.header.commandName} from peer=${client.peer} ")
logger(nodeAppConfig).debug(
s"Received message=${networkMsgRecv.msg.header.commandName} from peer=${client.peer} state=${state} ")
networkMsgRecv.msg.payload match {
case controlPayload: ControlPayload =>
handleControlPayload(payload = controlPayload, sender = peerMsgSender)
()
val peerMsgRecvF =
handleControlPayload(payload = controlPayload, sender = peerMsgSender)
peerMsgRecvF
case dataPayload: DataPayload =>
handleDataPayload(payload = dataPayload, sender = peerMsgSender)
}
@ -135,12 +135,15 @@ class PeerMessageReceiver(
*/
private def handleDataPayload(
payload: DataPayload,
sender: PeerMessageSender): Unit = {
val dataMsgHandler = new DataMessageHandler(callbacks, chainHandler)
sender: PeerMessageSender): Future[PeerMessageReceiver] = {
//else it means we are receiving this data payload from a peer,
//we need to handle it
dataMsgHandler.handleDataPayload(payload, sender)
()
val newDataMessageHandlerF =
dataMessageHandler.handleDataPayload(payload, sender)
newDataMessageHandlerF.map { handler =>
new PeerMessageReceiver(handler, state, peer, callbacks)
}
}
/**
@ -152,21 +155,21 @@ class PeerMessageReceiver(
*/
private def handleControlPayload(
payload: ControlPayload,
sender: PeerMessageSender): Try[Unit] = {
sender: PeerMessageSender): Future[PeerMessageReceiver] = {
payload match {
case versionMsg: VersionMessage =>
logger.trace(
s"Received versionMsg=${versionMsg}from peer=${peerOpt.get}")
logger(nodeAppConfig).trace(
s"Received versionMsg=${versionMsg}from peer=${peer}")
internalState match {
state match {
case bad @ (_: Disconnected | _: Normal | Preconnection) =>
Failure(
Future.failed(
new RuntimeException(
s"Cannot handle version message while in state=${bad}"))
case good: Initializing =>
internalState = good.withVersionMsg(versionMsg)
val newState = good.withVersionMsg(versionMsg)
sender.sendVerackMessage()
@ -174,45 +177,52 @@ class PeerMessageReceiver(
//we don't want to have to request them manually
sender.sendHeadersMessage()
Success(())
val newRecv = toState(newState)
Future.successful(newRecv)
}
case VerAckMessage =>
internalState match {
state match {
case bad @ (_: Disconnected | _: Normal | Preconnection) =>
Failure(
Future.failed(
new RuntimeException(
s"Cannot handle version message while in state=${bad}"))
case good: Initializing =>
internalState = good.toNormal(VerAckMessage)
Success(())
val newState = good.toNormal(VerAckMessage)
val newRecv = toState(newState)
Future.successful(newRecv)
}
case ping: PingMessage =>
sender.sendPong(ping)
Success(())
Future.successful(this)
case SendHeadersMessage =>
//not implemented as of now
Success(())
Future.successful(this)
case _: AddrMessage =>
Success(())
Future.successful(this)
case _ @(_: FilterAddMessage | _: FilterLoadMessage |
FilterClearMessage) =>
Success(())
Future.successful(this)
case _ @(GetAddrMessage | _: PongMessage) =>
Success(())
Future.successful(this)
case _: RejectMessage =>
Success(())
Future.successful(this)
case _: FeeFilterMessage =>
Success(())
Future.successful(this)
}
}
private def toState(state: PeerMessageReceiverState): Unit = {
logger.debug(
s"PeerMessageReceiver changing state, oldState=$internalState, newState=$state")
internalState = state
/** Transitions our PeerMessageReceiver to a new state */
def toState(newState: PeerMessageReceiverState): PeerMessageReceiver = {
new PeerMessageReceiver(
dataMessageHandler = dataMessageHandler,
state = newState,
peer = peer,
callbacks = callbacks
)
}
}
@ -231,20 +241,52 @@ object PeerMessageReceiver {
def apply(
state: PeerMessageReceiverState,
chainHandler: ChainApi,
callbacks: SpvNodeCallbacks = SpvNodeCallbacks.empty)(
chainApi: ChainApi,
peer: Peer,
callbacks: SpvNodeCallbacks)(
implicit ref: ActorRefFactory,
nodeAppConfig: NodeAppConfig): PeerMessageReceiver = {
new PeerMessageReceiver(state, callbacks, chainHandler)
nodeAppConfig: NodeAppConfig,
chainAppConfig: ChainAppConfig
): PeerMessageReceiver = {
import ref.dispatcher
val dataHandler = new DataMessageHandler(chainApi, callbacks)
new PeerMessageReceiver(dataMessageHandler = dataHandler,
state = state,
peer = peer,
callbacks = callbacks)
}
def newReceiver(
chainHandler: ChainApi,
callbacks: SpvNodeCallbacks = SpvNodeCallbacks.empty)(
/**
* Creates a peer message receiver that is ready
* to be connected to a peer. This can be given to [[org.bitcoins.node.networking.P2PClient.props() P2PClient]]
* to connect to a peer on the network
*/
def preConnection(peer: Peer, callbacks: SpvNodeCallbacks)(
implicit ref: ActorRefFactory,
nodeAppConfig: NodeAppConfig,
chainAppConfig: ChainAppConfig
): Future[PeerMessageReceiver] = {
import ref.dispatcher
val blockHeaderDAO = BlockHeaderDAO()
val chainHandlerF =
ChainHandler.fromDatabase(blockHeaderDAO)
for {
chainHandler <- chainHandlerF
} yield {
PeerMessageReceiver(state = PeerMessageReceiverState.fresh(),
chainApi = chainHandler,
peer = peer,
callbacks = callbacks)
}
}
def newReceiver(chainApi: ChainApi, peer: Peer, callbacks: SpvNodeCallbacks)(
implicit nodeAppConfig: NodeAppConfig,
chainAppConfig: ChainAppConfig,
ref: ActorRefFactory): PeerMessageReceiver = {
new PeerMessageReceiver(state = PeerMessageReceiverState.fresh(),
callbacks,
chainHandler)
PeerMessageReceiver(state = PeerMessageReceiverState.fresh(),
chainApi = chainApi,
peer = peer,
callbacks = callbacks)
}
}

View file

@ -184,7 +184,8 @@ trait ChainUnitTest
def createPopulatedChainHandler(): Future[ChainHandler] = {
for {
blockHeaderDAO <- ChainUnitTest.createPopulatedBlockHeaderDAO()
} yield ChainHandler(blockHeaderDAO = blockHeaderDAO)
chainHandler <- ChainHandler.fromDatabase(blockHeaderDAO = blockHeaderDAO)
} yield chainHandler
}
def withPopulatedChainHandler(test: OneArgAsyncTest): FutureOutcome = {
@ -194,15 +195,17 @@ trait ChainUnitTest
def createChainHandlerWithBitcoindZmq(
bitcoind: BitcoindRpcClient): Future[(ChainHandler, ZMQSubscriber)] = {
val (chainHandler, genesisHeaderF) =
val handlerWithGenesisHeaderF =
ChainUnitTest.setupHeaderTableWithGenesisHeader()
val chainHandlerF = handlerWithGenesisHeaderF.map(_._1)
val zmqRawBlockUriOpt: Option[InetSocketAddress] =
bitcoind.instance.zmqConfig.rawBlock
val handleRawBlock: ByteVector => Unit = { bytes: ByteVector =>
val block = Block.fromBytes(bytes)
chainHandler.processHeader(block.blockHeader)
chainHandlerF.flatMap(_.processHeader(block.blockHeader))
()
}
@ -217,15 +220,19 @@ trait ChainUnitTest
zmqSubscriber.start()
Thread.sleep(1000)
genesisHeaderF.map(_ => (chainHandler, zmqSubscriber))
for {
chainHandler <- chainHandlerF
} yield (chainHandler, zmqSubscriber)
}
def createChainApiWithBitcoindRpc(
bitcoind: BitcoindRpcClient): Future[BitcoindChainHandlerViaRpc] = {
val (handler, genesisHeaderF) =
val handlerWithGenesisHeaderF =
ChainUnitTest.setupHeaderTableWithGenesisHeader()
genesisHeaderF.map { _ =>
val chainHandlerF = handlerWithGenesisHeaderF.map(_._1)
chainHandlerF.map { handler =>
chain.fixture.BitcoindChainHandlerViaRpc(bitcoind, handler)
}
@ -304,16 +311,21 @@ object ChainUnitTest extends BitcoinSLogger {
def createChainHandler()(
implicit ec: ExecutionContext,
appConfig: ChainAppConfig): Future[ChainHandler] = {
val (chainHandler, genesisHeaderF) = setupHeaderTableWithGenesisHeader()
genesisHeaderF.map(_ => chainHandler)
val handlerWithGenesisHeaderF =
ChainUnitTest.setupHeaderTableWithGenesisHeader()
val chainHandlerF = handlerWithGenesisHeaderF.map(_._1)
chainHandlerF
}
def createBlockHeaderDAO()(
implicit ec: ExecutionContext,
appConfig: ChainAppConfig): Future[BlockHeaderDAO] = {
val (chainHandler, genesisHeaderF) = setupHeaderTableWithGenesisHeader()
val handlerWithGenesisHeaderF =
ChainUnitTest.setupHeaderTableWithGenesisHeader()
genesisHeaderF.map(_ => chainHandler.blockHeaderDAO)
val chainHandlerF = handlerWithGenesisHeaderF.map(_._1)
chainHandlerF.map(_.blockHeaderDAO)
}
/** Creates and populates BlockHeaderTable with block headers 562375 to 571375 */
@ -365,17 +377,21 @@ object ChainUnitTest extends BitcoinSLogger {
dbHeaders = dbHeaders,
batchesSoFar = Vector.empty)
val chainHandler = ChainUnitTest.makeChainHandler()
val chainHandlerF = ChainUnitTest.makeChainHandler()
val insertedF = tableSetupF.flatMap { _ =>
batchedDbHeaders.foldLeft(
Future.successful[Vector[BlockHeaderDb]](Vector.empty)) {
case (fut, batch) =>
fut.flatMap(_ => chainHandler.blockHeaderDAO.createAll(batch))
for {
_ <- fut
chainHandler <- chainHandlerF
headers <- chainHandler.blockHeaderDAO.createAll(batch)
} yield headers
}
}
insertedF.map(_ => chainHandler.blockHeaderDAO)
insertedF.flatMap(_ => chainHandlerF.map(_.blockHeaderDAO))
}
}
@ -398,24 +414,31 @@ object ChainUnitTest extends BitcoinSLogger {
/** Creates the [[org.bitcoins.chain.models.BlockHeaderTable]] and inserts the genesis header */
def setupHeaderTableWithGenesisHeader()(
implicit ec: ExecutionContext,
appConfig: ChainAppConfig): (ChainHandler, Future[BlockHeaderDb]) = {
appConfig: ChainAppConfig): Future[(ChainHandler, BlockHeaderDb)] = {
val tableSetupF = setupHeaderTable()
val chainHandler = makeChainHandler()
val chainHandlerF = makeChainHandler()
val genesisHeaderF = tableSetupF.flatMap { _ =>
chainHandler.blockHeaderDAO.create(genesisHeaderDb)
for {
chainHandler <- chainHandlerF
genHeader <- chainHandler.blockHeaderDAO.create(genesisHeaderDb)
} yield genHeader
}
(chainHandler, genesisHeaderF)
for {
genHeader <- genesisHeaderF
chainHandler <- makeChainHandler()
} yield (chainHandler, genHeader)
}
def makeChainHandler()(
implicit appConfig: ChainAppConfig,
ec: ExecutionContext): ChainHandler = {
ec: ExecutionContext): Future[ChainHandler] = {
lazy val blockHeaderDAO = BlockHeaderDAO()
ChainHandler(blockHeaderDAO)
ChainHandler.fromDatabase(blockHeaderDAO = blockHeaderDAO)
}
}

View file

@ -107,9 +107,9 @@ abstract class NodeTestUtil extends BitcoinSLogger {
def isSameBestHash(node: SpvNode, rpc: BitcoindRpcClient)(
implicit ec: ExecutionContext): Future[Boolean] = {
val hashF = rpc.getBestBlockHash
val spvHashF = node.chainApi.getBestBlockHash
for {
spvBestHash <- spvHashF
chainApi <- node.chainApiFromDb()
spvBestHash <- chainApi.getBestBlockHash
hash <- hashF
} yield {
spvBestHash == hash
@ -122,9 +122,8 @@ abstract class NodeTestUtil extends BitcoinSLogger {
def isSameBlockCount(spv: SpvNode, rpc: BitcoindRpcClient)(
implicit ec: ExecutionContext): Future[Boolean] = {
val rpcCountF = rpc.getBlockCount
val spvCountF = spv.chainApi.getBlockCount
for {
spvCount <- spvCountF
spvCount <- spv.chainApiFromDb().flatMap(_.getBlockCount)
rpcCount <- rpcCountF
} yield rpcCount == spvCount
}

View file

@ -6,6 +6,7 @@ import akka.actor.ActorSystem
import org.bitcoins.chain.blockchain.ChainHandler
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.chain.models.BlockHeaderDAO
import org.bitcoins.chain.api.ChainApi
import org.bitcoins.core.config.NetworkParameters
import org.bitcoins.core.util.BitcoinSLogger
import org.bitcoins.db.AppConfig
@ -15,6 +16,7 @@ import org.bitcoins.node.models.Peer
import org.bitcoins.node.networking.peer.{
PeerHandler,
PeerMessageReceiver,
PeerMessageReceiverState,
PeerMessageSender
}
import org.bitcoins.rpc.client.common.BitcoindRpcClient
@ -154,6 +156,38 @@ object NodeUnitTest extends BitcoinSLogger {
wallet: UnlockedWalletApi,
bitcoindRpc: BitcoindRpcClient)
def buildPeerMessageReceiver(chainApi: ChainApi, peer: Peer)(
implicit appConfig: BitcoinSAppConfig,
system: ActorSystem): Future[PeerMessageReceiver] = {
val receiver =
PeerMessageReceiver(state = PeerMessageReceiverState.fresh(),
chainApi = chainApi,
peer = peer,
callbacks = SpvNodeCallbacks.empty)
Future.successful(receiver)
}
def buildPeerHandler(peer: Peer)(
implicit nodeAppConfig: NodeAppConfig,
chainAppConfig: ChainAppConfig,
system: ActorSystem): Future[PeerHandler] = {
import system.dispatcher
val chainApiF = ChainUnitTest.createChainHandler()
val peerMsgReceiverF = chainApiF.flatMap { _ =>
PeerMessageReceiver.preConnection(peer, SpvNodeCallbacks.empty)
}
//the problem here is the 'self', this needs to be an ordinary peer message handler
//that can handle the handshake
val peerHandlerF = for {
peerMsgReceiver <- peerMsgReceiverF
client = NodeTestUtil.client(peer, peerMsgReceiver)
peerMsgSender = PeerMessageSender(client)
} yield PeerHandler(client, peerMsgSender)
peerHandlerF
}
def destroySpvNode(spvNode: SpvNode)(
implicit config: BitcoinSAppConfig,
ec: ExecutionContext): Future[Unit] = {
@ -165,6 +199,7 @@ object NodeUnitTest extends BitcoinSLogger {
spvNodeConnectedWithBitcoind: SpvNodeConnectedWithBitcoind)(
implicit system: ActorSystem,
appConfig: BitcoinSAppConfig): Future[Unit] = {
logger.debug(s"Beggining tear down of spv node connected with bitcoind")
import system.dispatcher
val spvNode = spvNodeConnectedWithBitcoind.spvNode
val bitcoind = spvNodeConnectedWithBitcoind.bitcoind
@ -174,7 +209,10 @@ object NodeUnitTest extends BitcoinSLogger {
for {
_ <- spvNodeDestroyF
_ <- bitcoindDestroyF
} yield ()
} yield {
logger.debug(s"Done with teardown of spv node connected with bitcoind!")
()
}
}
/** Creates a spv node, a funded bitcoin-s wallet, all of which are connected to bitcoind */
@ -211,31 +249,16 @@ object NodeUnitTest extends BitcoinSLogger {
}
def buildPeerMessageReceiver()(
implicit system: ActorSystem,
def buildPeerMessageReceiver(chainApi: ChainApi, peer: Peer)(
implicit nodeAppConfig: NodeAppConfig,
chainAppConfig: ChainAppConfig,
nodeAppConfig: NodeAppConfig): PeerMessageReceiver = {
import system.dispatcher
val dao = BlockHeaderDAO()
val chainHandler = ChainHandler(dao)
system: ActorSystem): Future[PeerMessageReceiver] = {
val receiver =
PeerMessageReceiver.newReceiver(chainHandler, SpvNodeCallbacks.empty)
receiver
}
def buildPeerHandler(peer: Peer)(
implicit system: ActorSystem,
chainAppConfig: ChainAppConfig,
nodeAppConfig: NodeAppConfig): PeerHandler = {
val peerMsgReceiver = buildPeerMessageReceiver()
//the problem here is the 'self', this needs to be an ordinary peer message handler
//that can handle the handshake
val peerMsgSender: PeerMessageSender = {
val client = NodeTestUtil.client(peer, peerMsgReceiver)
PeerMessageSender(client)
}
PeerHandler(peerMsgReceiver, peerMsgSender)
PeerMessageReceiver(state = PeerMessageReceiverState.fresh(),
chainApi = chainApi,
peer = peer,
callbacks = SpvNodeCallbacks.empty)
Future.successful(receiver)
}
def peerSocketAddress(
@ -256,10 +279,9 @@ object NodeUnitTest extends BitcoinSLogger {
val chainApiF = ChainUnitTest.createChainHandler()
val peer = createPeer(bitcoind)
for {
chainApi <- chainApiF
_ <- chainApiF
} yield {
SpvNode(peer = peer,
chainApi = chainApi,
bloomFilter = NodeTestUtil.emptyBloomFilter,
callbacks = callbacks)
}