diff --git a/node-test/src/test/scala/org/bitcoins/node/networking/P2PClientActorTest.scala b/node-test/src/test/scala/org/bitcoins/node/networking/P2PClientActorTest.scala index 3350940ae9..1533d7d573 100644 --- a/node-test/src/test/scala/org/bitcoins/node/networking/P2PClientActorTest.scala +++ b/node-test/src/test/scala/org/bitcoins/node/networking/P2PClientActorTest.scala @@ -100,7 +100,10 @@ class P2PClientActorTest val peerMessageReceiverF = for { node <- NodeUnitTest.buildNode(peer, None) - } yield PeerMessageReceiver(node, peer) + } yield PeerMessageReceiver( + controlMessageHandler = node.controlMessageHandler, + dataMessageHandler = node.getDataMessageHandler, + peer = peer) val clientActorF: Future[TestActorRef[P2PClientActor]] = peerMessageReceiverF.map { peerMsgRecv => diff --git a/node-test/src/test/scala/org/bitcoins/node/networking/peer/DataMessageHandlerTest.scala b/node-test/src/test/scala/org/bitcoins/node/networking/peer/DataMessageHandlerTest.scala index e3836d2690..f7f819e64a 100644 --- a/node-test/src/test/scala/org/bitcoins/node/networking/peer/DataMessageHandlerTest.scala +++ b/node-test/src/test/scala/org/bitcoins/node/networking/peer/DataMessageHandlerTest.scala @@ -51,7 +51,7 @@ class DataMessageHandlerTest extends NodeTestWithCachedBitcoindNewest { dataMessageHandler = DataMessageHandler( chainApi = chainApi, walletCreationTimeOpt = None, - node = node, + peerManager = node.peerManager, state = HeaderSync, initialSyncDone = None, filterBatchCache = Set.empty, @@ -101,7 +101,7 @@ class DataMessageHandlerTest extends NodeTestWithCachedBitcoindNewest { DataMessageHandler( chainApi = chainApi, walletCreationTimeOpt = None, - node = node, + peerManager = node.peerManager, state = HeaderSync, initialSyncDone = None, filterBatchCache = Set.empty, @@ -145,7 +145,7 @@ class DataMessageHandlerTest extends NodeTestWithCachedBitcoindNewest { DataMessageHandler( chainApi = chainApi, walletCreationTimeOpt = None, - node = node, + peerManager = node.peerManager, state = HeaderSync, initialSyncDone = None, filterBatchCache = Set.empty, @@ -211,7 +211,7 @@ class DataMessageHandlerTest extends NodeTestWithCachedBitcoindNewest { DataMessageHandler( chainApi = chainApi, walletCreationTimeOpt = None, - node = node, + peerManager = node.peerManager, state = HeaderSync, initialSyncDone = None, filterBatchCache = Set.empty, diff --git a/node/src/main/scala/org/bitcoins/node/NeutrinoNode.scala b/node/src/main/scala/org/bitcoins/node/NeutrinoNode.scala index 597e24a560..d2634078f9 100644 --- a/node/src/main/scala/org/bitcoins/node/NeutrinoNode.scala +++ b/node/src/main/scala/org/bitcoins/node/NeutrinoNode.scala @@ -52,7 +52,7 @@ case class NeutrinoNode( DataMessageHandler( chainApi = chainApi, walletCreationTimeOpt = walletCreationTimeOpt, - node = this, + peerManager = peerManager, state = HeaderSync, initialSyncDone = None, filterBatchCache = Set.empty, @@ -71,7 +71,7 @@ case class NeutrinoNode( this } - override val peerManager: PeerManager = PeerManager(paramPeers, this) + override lazy val peerManager: PeerManager = PeerManager(paramPeers, this) override def start(): Future[NeutrinoNode] = { val res = for { diff --git a/node/src/main/scala/org/bitcoins/node/Node.scala b/node/src/main/scala/org/bitcoins/node/Node.scala index fe66799d9b..75c1586690 100644 --- a/node/src/main/scala/org/bitcoins/node/Node.scala +++ b/node/src/main/scala/org/bitcoins/node/Node.scala @@ -37,7 +37,7 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger { implicit def executionContext: ExecutionContext = system.dispatcher - val peerManager: PeerManager + def peerManager: PeerManager /** The current data message handler. * It should be noted that the dataMessageHandler contains diff --git a/node/src/main/scala/org/bitcoins/node/PeerData.scala b/node/src/main/scala/org/bitcoins/node/PeerData.scala index cffb10bea4..479df9d3da 100644 --- a/node/src/main/scala/org/bitcoins/node/PeerData.scala +++ b/node/src/main/scala/org/bitcoins/node/PeerData.scala @@ -33,7 +33,9 @@ case class PeerData( private lazy val client: Future[P2PClient] = { val peerMessageReceiver = - PeerMessageReceiver(node, peer) + PeerMessageReceiver(node.controlMessageHandler, + node.getDataMessageHandler, + peer) P2PClient( peer = peer, peerMessageReceiver = peerMessageReceiver, diff --git a/node/src/main/scala/org/bitcoins/node/PeerManager.scala b/node/src/main/scala/org/bitcoins/node/PeerManager.scala index de5a878487..c2d49bb8a8 100644 --- a/node/src/main/scala/org/bitcoins/node/PeerManager.scala +++ b/node/src/main/scala/org/bitcoins/node/PeerManager.scala @@ -5,9 +5,11 @@ import akka.stream.OverflowStrategy import akka.stream.scaladsl.{Sink, Source, SourceQueueWithComplete} import org.bitcoins.asyncutil.AsyncUtil import org.bitcoins.chain.config.ChainAppConfig +import org.bitcoins.core.api.chain.ChainApi import org.bitcoins.core.api.node.NodeType import org.bitcoins.core.p2p._ import org.bitcoins.core.util.{NetworkUtil, StartStopAsync} +import org.bitcoins.crypto.DoubleSha256DigestBE import org.bitcoins.node.config.NodeAppConfig import org.bitcoins.node.models.{Peer, PeerDAO, PeerDb} import org.bitcoins.node.networking.peer._ @@ -468,6 +470,62 @@ case class PeerManager( val dataMessageStream: SourceQueueWithComplete[StreamDataMessageWrapper] = dataMessageStreamSource.to(dataMessageStreamSink).run() + + def fetchCompactFilterHeaders( + currentDmh: DataMessageHandler): Future[DataMessageHandler] = { + for { + peer <- randomPeerWithService(ServiceIdentifier.NODE_COMPACT_FILTERS) + newDmh = currentDmh.copy(syncPeer = Some(peer)) + _ = logger.info(s"Now syncing filter headers from $peer") + sender <- peerDataMap(peer).peerMessageSender + newSyncing <- PeerManager.sendFirstGetCompactFilterHeadersCommand( + sender, + currentDmh.chainApi) + } yield { + val syncPeerOpt = if (newSyncing) { + Some(peer) + } else { + None + } + newDmh.copy(syncPeer = syncPeerOpt) + } + } + } case class ResponseTimeout(payload: NetworkPayload) + +object PeerManager { + + def sendFirstGetCompactFilterHeadersCommand( + peerMsgSender: PeerMessageSender, + chainApi: ChainApi)(implicit + ec: ExecutionContext, + chainConfig: ChainAppConfig): Future[Boolean] = { + + for { + bestFilterHeaderOpt <- + chainApi + .getBestFilterHeader() + filterCount <- chainApi.getFilterCount() + blockHash = bestFilterHeaderOpt match { + case Some(filterHeaderDb) => + filterHeaderDb.blockHashBE + case None => + DoubleSha256DigestBE.empty + } + hashHeightOpt <- chainApi.nextBlockHeaderBatchRange( + prevStopHash = blockHash, + batchSize = chainConfig.filterHeaderBatchSize) + res <- hashHeightOpt match { + case Some(filterSyncMarker) => + peerMsgSender + .sendGetCompactFilterHeadersMessage(filterSyncMarker) + .map(_ => true) + case None => + sys.error( + s"Could not find block header in database to sync filter headers from! It's likely your database is corrupted blockHash=$blockHash bestFilterHeaderOpt=$bestFilterHeaderOpt filterCount=$filterCount") + } + } yield res + } +} diff --git a/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala b/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala index e277727165..fe0c0cc967 100644 --- a/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala +++ b/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala @@ -13,7 +13,7 @@ import org.bitcoins.crypto.{DoubleSha256Digest, DoubleSha256DigestBE} import org.bitcoins.node.config.NodeAppConfig import org.bitcoins.node.models._ import org.bitcoins.node.networking.peer.DataMessageHandlerState._ -import org.bitcoins.node.{Node, P2PLogger, PeerManager} +import org.bitcoins.node.{P2PLogger, PeerManager} import java.time.Instant import scala.concurrent.{ExecutionContext, Future, Promise} @@ -30,7 +30,7 @@ import scala.util.control.NonFatal case class DataMessageHandler( chainApi: ChainApi, walletCreationTimeOpt: Option[Instant], - node: Node, + peerManager: PeerManager, state: DataMessageHandlerState, initialSyncDone: Option[Promise[Done]], filterBatchCache: Set[CompactFilterMessage], @@ -51,14 +51,12 @@ case class DataMessageHandler( syncPeer = None, state = HeaderSync) - def manager: PeerManager = node.peerManager - def addToStream( payload: DataPayload, peerMsgSender: PeerMessageSender, peer: Peer): Future[Unit] = { val msg = DataMessageWrapper(payload, peerMsgSender, peer) - manager.dataMessageStream.offer(msg).map(_ => ()) + peerManager.dataMessageStream.offer(msg).map(_ => ()) } private def isChainIBD: Future[Boolean] = { @@ -300,7 +298,8 @@ case class DataMessageHandler( case ValidatingHeaders(inSyncWith, _, _) => //In the validation stage, some peer sent max amount of valid headers, revert to HeaderSync with that peer as syncPeer //disconnect the ones that we have already checked since they are at least out of sync by 2000 headers - val removeFs = inSyncWith.map(p => manager.removePeer(p)) + val removeFs = + inSyncWith.map(p => peerManager.removePeer(p)) val newSyncPeer = Some(peer) @@ -340,16 +339,17 @@ case class DataMessageHandler( // headers are synced now with the current sync peer, now move to validating it for all peers assert(syncPeer.get == peer) - if (manager.peers.size > 1) { + if (peerManager.peers.size > 1) { val newState = ValidatingHeaders(inSyncWith = Set(peer), - verifyingWith = manager.peers.toSet, + verifyingWith = + peerManager.peers.toSet, failedCheck = Set.empty[Peer]) logger.info( s"Starting to validate headers now. Verifying with ${newState.verifyingWith}") - val getHeadersAllF = manager.peerDataMap + val getHeadersAllF = peerManager.peerDataMap .filter(_._1 != peer) .map( _._2.peerMessageSender.flatMap( @@ -361,8 +361,9 @@ case class DataMessageHandler( .map(_ => newDmh.copy(state = newState)) } else { //if just one peer then can proceed ahead directly - fetchCompactFilterHeaders(newDmh).map( - _.copy(state = PostHeaderSync)) + peerManager + .fetchCompactFilterHeaders(newDmh) + .map(_.copy(state = PostHeaderSync)) } case headerState @ ValidatingHeaders(inSyncWith, _, _) => @@ -377,8 +378,9 @@ case class DataMessageHandler( // so we also check if our cached filter heights have been set as well, if they haven't then // we probably need to sync filters - fetchCompactFilterHeaders(newDmh2).map( - _.copy(state = PostHeaderSync)) + peerManager + .fetchCompactFilterHeaders(newDmh2) + .map(_.copy(state = PostHeaderSync)) } else { //do nothing, we are still waiting for some peers to send headers or timeout Future.successful(newDmh2) @@ -389,7 +391,9 @@ case class DataMessageHandler( logger.info( s"Starting to fetch filter headers in data message handler") val newSyncingF = - sendFirstGetCompactFilterHeadersCommand(peerMsgSender) + PeerManager.sendFirstGetCompactFilterHeadersCommand( + peerMsgSender, + chainApi) newSyncingF.map { newSyncing => val syncPeerOpt = if (newSyncing) { syncPeer @@ -408,8 +412,9 @@ case class DataMessageHandler( headerState.copy(inSyncWith = inSyncWith + peer) val newDmh2 = newDmh.copy(state = newHeaderState) if (newHeaderState.validated) { - fetchCompactFilterHeaders(newDmh2).map( - _.copy(state = PostHeaderSync)) + peerManager + .fetchCompactFilterHeaders(newDmh2) + .map(_.copy(state = PostHeaderSync)) } else { //do nothing, we are still waiting for some peers to send headers Future.successful(newDmh2) @@ -517,7 +522,8 @@ case class DataMessageHandler( if (headerHeight > filterHeaderCount) { logger.info( s"Starting to fetch filter headers in data message handler") - sendFirstGetCompactFilterHeadersCommand(peerMessageSender) + PeerManager.sendFirstGetCompactFilterHeadersCommand(peerMessageSender, + chainApi) } else { require( headerHeight == filterHeaderCount, @@ -550,17 +556,17 @@ case class DataMessageHandler( peerMsgSender: PeerMessageSender): Future[DataMessageHandler] = { state match { case HeaderSync => - manager.peerDataMap(peer).updateInvalidMessageCount() + peerManager.peerDataMap(peer).updateInvalidMessageCount() if ( - manager + peerManager .peerDataMap(peer) - .exceededMaxInvalidMessages && manager.peers.size > 1 + .exceededMaxInvalidMessages && peerManager.peers.size > 1 ) { logger.info( s"$peer exceeded max limit of invalid messages. Disconnecting.") for { - _ <- manager.removePeer(peer) - newDmh <- manager.syncFromNewPeer() + _ <- peerManager.removePeer(peer) + newDmh <- peerManager.syncFromNewPeer() } yield newDmh.copy(state = HeaderSync) } else { logger.info(s"Re-querying headers from $peer.") @@ -584,7 +590,9 @@ case class DataMessageHandler( if (newHeaderState.validated) { logger.info( s"Done validating headers, inSyncWith=${newHeaderState.inSyncWith}, failedCheck=${newHeaderState.failedCheck}") - fetchCompactFilterHeaders(newDmh).map(_.copy(state = PostHeaderSync)) + peerManager + .fetchCompactFilterHeaders(newDmh) + .map(_.copy(state = PostHeaderSync)) } else { Future.successful(newDmh) } @@ -594,40 +602,23 @@ case class DataMessageHandler( } } - private def fetchCompactFilterHeaders( - currentDmh: DataMessageHandler): Future[DataMessageHandler] = { - for { - peer <- manager.randomPeerWithService( - ServiceIdentifier.NODE_COMPACT_FILTERS) - newDmh = currentDmh.copy(syncPeer = Some(peer)) - _ = logger.info(s"Now syncing filter headers from $peer") - sender <- manager.peerDataMap(peer).peerMessageSender - newSyncing <- sendFirstGetCompactFilterHeadersCommand(sender) - } yield { - val syncPeerOpt = if (newSyncing) { - Some(peer) - } else { - None - } - newDmh.copy(syncPeer = syncPeerOpt) - } - } - def onHeaderRequestTimeout(peer: Peer): Future[DataMessageHandler] = { logger.info(s"Header request timed out from $peer in state $state") state match { case HeaderSync => - manager.syncFromNewPeer() + peerManager.syncFromNewPeer() case headerState @ ValidatingHeaders(_, failedCheck, _) => val newHeaderState = headerState.copy(failedCheck = failedCheck + peer) val newDmh = copy(state = newHeaderState) if (newHeaderState.validated) { - fetchCompactFilterHeaders(newDmh).map(_.copy(state = PostHeaderSync)) + peerManager + .fetchCompactFilterHeaders(newDmh) + .map(_.copy(state = PostHeaderSync)) } else Future.successful(newDmh) - case _: DataMessageHandlerState => Future.successful(this) + case PostHeaderSync => Future.successful(this) } } @@ -639,35 +630,6 @@ case class DataMessageHandler( filterHeaderBatchSize = chainConfig.filterHeaderBatchSize, prevStopHash = prevStopHash) - private def sendFirstGetCompactFilterHeadersCommand( - peerMsgSender: PeerMessageSender): Future[Boolean] = { - - for { - bestFilterHeaderOpt <- - chainApi - .getBestFilterHeader() - filterCount <- chainApi.getFilterCount() - blockHash = bestFilterHeaderOpt match { - case Some(filterHeaderDb) => - filterHeaderDb.blockHashBE - case None => - DoubleSha256DigestBE.empty - } - hashHeightOpt <- chainApi.nextBlockHeaderBatchRange( - prevStopHash = blockHash, - batchSize = chainConfig.filterHeaderBatchSize) - res <- hashHeightOpt match { - case Some(filterSyncMarker) => - peerMsgSender - .sendGetCompactFilterHeadersMessage(filterSyncMarker) - .map(_ => true) - case None => - sys.error( - s"Could not find block header in database to sync filter headers from! It's likely your database is corrupted blockHash=$blockHash bestFilterHeaderOpt=$bestFilterHeaderOpt filterCount=$filterCount") - } - } yield res - } - private def sendNextGetCompactFilterCommand( peerMsgSender: PeerMessageSender, startHeight: Int): Future[Boolean] = diff --git a/node/src/main/scala/org/bitcoins/node/networking/peer/PeerMessageReceiver.scala b/node/src/main/scala/org/bitcoins/node/networking/peer/PeerMessageReceiver.scala index 920806642c..32b2d7d023 100644 --- a/node/src/main/scala/org/bitcoins/node/networking/peer/PeerMessageReceiver.scala +++ b/node/src/main/scala/org/bitcoins/node/networking/peer/PeerMessageReceiver.scala @@ -7,7 +7,7 @@ import org.bitcoins.node.config.NodeAppConfig import org.bitcoins.node.models.Peer import org.bitcoins.node.networking.P2PClient import org.bitcoins.node.networking.peer.PeerMessageReceiverState._ -import org.bitcoins.node.{Node, P2PLogger} +import org.bitcoins.node.P2PLogger import scala.concurrent.Future @@ -17,8 +17,9 @@ import scala.concurrent.Future * operations. This is the entry point for handling all received * [[org.bitcoins.core.p2p.NetworkMessage NetworkMessage]] */ -class PeerMessageReceiver( - node: Node, +case class PeerMessageReceiver( + controlMessageHandler: ControlMessageHandler, + dataMessageHandler: DataMessageHandler, peer: Peer )(implicit system: ActorSystem, nodeAppConfig: NodeAppConfig) extends P2PLogger { @@ -88,9 +89,12 @@ class PeerMessageReceiver( sender: PeerMessageSender): Future[PeerMessageReceiver] = { //else it means we are receiving this data payload from a peer, //we need to handle it - node.getDataMessageHandler + dataMessageHandler .addToStream(payload, sender, peer) - .map(_ => new PeerMessageReceiver(node, peer)) + .map(_ => + new PeerMessageReceiver(controlMessageHandler, + dataMessageHandler, + peer)) } /** Handles control payloads defined here https://bitcoin.org/en/developer-reference#control-messages @@ -104,7 +108,7 @@ class PeerMessageReceiver( sender: PeerMessageSender, curReceiverState: PeerMessageReceiverState): Future[ PeerMessageReceiverState] = { - node.controlMessageHandler + controlMessageHandler .handleControlPayload(payload, sender, peer, curReceiverState) } } @@ -121,11 +125,4 @@ object PeerMessageReceiver { case class NetworkMessageReceived(msg: NetworkMessage, client: P2PClient) extends PeerMessageReceiverMsg - - def apply(node: Node, peer: Peer)(implicit - system: ActorSystem, - nodeAppConfig: NodeAppConfig - ): PeerMessageReceiver = { - new PeerMessageReceiver(node = node, peer = peer) - } } diff --git a/testkit/src/main/scala/org/bitcoins/testkit/node/NodeUnitTest.scala b/testkit/src/main/scala/org/bitcoins/testkit/node/NodeUnitTest.scala index cdd0d00af5..994467999c 100644 --- a/testkit/src/main/scala/org/bitcoins/testkit/node/NodeUnitTest.scala +++ b/testkit/src/main/scala/org/bitcoins/testkit/node/NodeUnitTest.scala @@ -207,13 +207,14 @@ object NodeUnitTest extends P2PLogger { walletCreationTimeOpt: Option[Instant])(implicit appConfig: BitcoinSAppConfig, system: ActorSystem): Future[PeerMessageReceiver] = { + val node = buildNode(peer, chainApi, walletCreationTimeOpt)( + appConfig.chainConf, + appConfig.nodeConf, + system) val receiver = - PeerMessageReceiver( - node = - buildNode(peer, chainApi, walletCreationTimeOpt)(appConfig.chainConf, - appConfig.nodeConf, - system), - peer = peer)(system, appConfig.nodeConf) + PeerMessageReceiver(controlMessageHandler = node.controlMessageHandler, + dataMessageHandler = node.getDataMessageHandler, + peer = peer)(system, appConfig.nodeConf) Future.successful(receiver) } @@ -388,9 +389,10 @@ object NodeUnitTest extends P2PLogger { nodeAppConfig: NodeAppConfig, chainAppConfig: ChainAppConfig, system: ActorSystem): Future[PeerMessageReceiver] = { + val node = buildNode(peer, chainApi, walletCreationTimeOpt) val receiver = - PeerMessageReceiver(node = - buildNode(peer, chainApi, walletCreationTimeOpt), + PeerMessageReceiver(controlMessageHandler = node.controlMessageHandler, + dataMessageHandler = node.getDataMessageHandler, peer = peer) Future.successful(receiver) }