diff --git a/node-test/src/test/scala/org/bitcoins/node/networking/peer/DataMessageHandlerTest.scala b/node-test/src/test/scala/org/bitcoins/node/networking/peer/DataMessageHandlerTest.scala index cdba5aea44..9932676e78 100644 --- a/node-test/src/test/scala/org/bitcoins/node/networking/peer/DataMessageHandlerTest.scala +++ b/node-test/src/test/scala/org/bitcoins/node/networking/peer/DataMessageHandlerTest.scala @@ -52,7 +52,6 @@ class DataMessageHandlerTest extends NodeTestWithCachedBitcoindNewest { peer = node.peerManager.peers.head chainApi <- node.chainApiFromDb() _ = require(peerManager.getPeerData(peer).isDefined) - peerMsgSender = peerManager.getPeerData(peer).get.peerMessageSender peerFinder = PeerFinder(peerManagerApi = peerManager, paramPeers = Vector.empty, queue = node)(system.dispatcher, @@ -62,7 +61,6 @@ class DataMessageHandlerTest extends NodeTestWithCachedBitcoindNewest { dataMessageHandler = DataMessageHandler( chainApi = chainApi, walletCreationTimeOpt = None, - peerMessageSenderApi = peerMsgSender, peerManager = peerManager, state = HeaderSync(peer, peerManager.peerWithServicesDataMap, diff --git a/node/src/main/scala/org/bitcoins/node/PeerManager.scala b/node/src/main/scala/org/bitcoins/node/PeerManager.scala index e0b2dc71dc..273aed1610 100644 --- a/node/src/main/scala/org/bitcoins/node/PeerManager.scala +++ b/node/src/main/scala/org/bitcoins/node/PeerManager.scala @@ -616,11 +616,9 @@ case class PeerManager( s"Ignoring received msg=${payload.commandName} from peer=$peer because it was disconnected, peers=$peers state=${state}") Future.successful(state) case Some(peerData) => - val peerMsgSender = PeerMessageSender(peerData.peerConnection) val dmh = DataMessageHandler( chainApi = ChainHandler.fromDatabase(), walletCreationTimeOpt = walletCreationTimeOpt, - peerMessageSenderApi = peerMsgSender, peerManager = this, state = runningState ) diff --git a/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala b/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala index ad0dffeec0..aad0bb4c07 100644 --- a/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala +++ b/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala @@ -31,7 +31,6 @@ import scala.util.control.NonFatal case class DataMessageHandler( chainApi: ChainApi, walletCreationTimeOpt: Option[Instant], - peerMessageSenderApi: PeerMessageSenderApi, peerManager: PeerManager, state: NodeRunningState)(implicit ec: ExecutionContext, @@ -148,16 +147,18 @@ case class DataMessageHandler( s.peerDataMap, s.waitingForDisconnection, s.peerFinder) - handleFilterHeadersMessage(filterHeaderSync, - filterHeader, - chainApi, - peer) + handleFilterHeadersMessage(filterHeaderSync = filterHeaderSync, + filterHeader = filterHeader, + chainApi = chainApi, + peerMessageSenderApi = + peerData.peerMessageSender) .map(s => copy(state = s)) case filterHeaderSync: FilterHeaderSync => - handleFilterHeadersMessage(filterHeaderSync, - filterHeader, - chainApi, - peer) + handleFilterHeadersMessage(filterHeaderSync = filterHeaderSync, + filterHeader = filterHeader, + chainApi = chainApi, + peerMessageSenderApi = + peerData.peerMessageSender) .map(s => copy(state = s)) case x @ (_: NodeShuttingDown | _: FilterSync) => logger.warn( @@ -173,7 +174,10 @@ case class DataMessageHandler( s"Received ${filter.commandName}, filter.blockHash=${filter.blockHash.flip} state=$state") state match { case f: FilterSync => - handleFilterMessage(f, filter) + handleFilterMessage(filterSyncState = f, + filter = filter, + peerMessageSenderApi = + peerData.peerMessageSender) .map(s => copy(state = s)) case s @ (_: DoneSyncing | _: FilterHeaderSync) => val f = FilterSync(peer, @@ -181,7 +185,10 @@ case class DataMessageHandler( s.waitingForDisconnection, Set.empty, s.peerFinder) - handleFilterMessage(f, filter) + handleFilterMessage(filterSyncState = f, + filter = filter, + peerMessageSenderApi = + peerData.peerMessageSender) .map(s => copy(state = s)) case x @ (_: HeaderSync | _: NodeShuttingDown) => logger.warn( @@ -213,8 +220,8 @@ case class DataMessageHandler( tx.toBaseTx } else tx // send normal serialization - peerMessageSenderApi.sendTransactionMessage(transaction = - txToBroadcast) + peerData.peerMessageSender + .sendTransactionMessage(transaction = txToBroadcast) case None => logger.warn( s"Got request to send data with hash=${inv.hash}, but found nothing") @@ -329,7 +336,8 @@ case class DataMessageHandler( logger.warn(s"Merkleblock is not supported") Future.successful(this) case invMsg: InventoryMessage => - handleInventoryMsg(invMsg = invMsg) + handleInventoryMsg(invMsg = invMsg, + peerMessageSenderApi = peerData.peerMessageSender) } } @@ -338,7 +346,8 @@ case class DataMessageHandler( /** syncs filter headers in case the header chain is still ahead post filter sync */ private def syncIfHeadersAhead( - syncNodeState: SyncNodeState): Future[NodeRunningState] = { + syncNodeState: SyncNodeState, + peerMessageSenderApi: PeerMessageSenderApi): Future[NodeRunningState] = { val bestBlockHeaderDbF = chainApi.getBestBlockHeader() for { headerHeight <- chainApi.getBestHashBlockHeight() @@ -518,7 +527,9 @@ case class DataMessageHandler( } private def handleInventoryMsg( - invMsg: InventoryMessage): Future[DataMessageHandler] = { + invMsg: InventoryMessage, + peerMessageSenderApi: PeerMessageSenderApi): Future[ + DataMessageHandler] = { logger.debug(s"Received inv=${invMsg}") val invsOptF: Future[Seq[Option[Inventory]]] = Future.traverse(invMsg.inventories) { @@ -655,8 +666,9 @@ case class DataMessageHandler( private def getHeaders( state: HeaderSync, headers: Vector[BlockHeader], - peer: Peer, + peerMessageSenderApi: PeerMessageSenderApi, chainApi: ChainApi): Future[NodeRunningState] = { + val peer = peerMessageSenderApi.peer logger.debug(s"getHeaders() newDmh.state=${state} peer=$peer peers=$peer") val count = headers.length val getHeadersF: Future[NodeRunningState] = { @@ -739,7 +751,7 @@ case class DataMessageHandler( newDmh <- chainApiHeaderProcessF dmh <- getHeaders(state = headerSyncState, headers = headers, - peer = peer, + peerMessageSenderApi = peerData.peerMessageSender, newDmh.chainApi) } yield dmh } @@ -780,7 +792,8 @@ case class DataMessageHandler( filterHeaderSync: FilterHeaderSync, filterHeader: CompactFilterHeadersMessage, chainApi: ChainApi, - peer: Peer): Future[NodeRunningState] = { + peerMessageSenderApi: PeerMessageSenderApi): Future[NodeRunningState] = { + val peer = peerMessageSenderApi.peer val filterHeaders = filterHeader.filterHeaders val blockCountF = chainApi.getBlockCount() val bestBlockHashF = chainApi.getBestBlockHash() @@ -828,7 +841,8 @@ case class DataMessageHandler( private def handleFilterMessage( filterSyncState: FilterSync, - filter: CompactFilterMessage): Future[NodeRunningState] = { + filter: CompactFilterMessage, + peerMessageSenderApi: PeerMessageSenderApi): Future[NodeRunningState] = { val filterBatch = filterSyncState.filterBatchCache.+(filter) val batchSizeFull: Boolean = filterBatch.size == chainConfig.filterBatchSize @@ -871,7 +885,7 @@ case class DataMessageHandler( } else Future.successful(Some(filterSyncState)) newDmhState <- { if (isFiltersSynced) { - syncIfHeadersAhead(filterSyncState) + syncIfHeadersAhead(filterSyncState, peerMessageSenderApi) } else { val res = filterHeaderSyncStateOpt match { case Some(filterSyncState) =>