diff --git a/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeWithUncachedBitcoindTest.scala b/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeWithUncachedBitcoindTest.scala index 624c6917c8..cc66d95d62 100644 --- a/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeWithUncachedBitcoindTest.scala +++ b/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeWithUncachedBitcoindTest.scala @@ -1,11 +1,11 @@ package org.bitcoins.node import org.bitcoins.asyncutil.AsyncUtil -import org.bitcoins.core.p2p.{GetHeadersMessage, HeadersMessage, NetworkMessage} +import org.bitcoins.core.p2p.{GetHeadersMessage, HeadersMessage} import org.bitcoins.core.protocol.blockchain.BlockHeader import org.bitcoins.core.util.FutureUtil import org.bitcoins.node.models.Peer -import org.bitcoins.node.networking.peer.{DataMessageWrapper, SendToPeer} +import org.bitcoins.node.networking.peer.DataMessageWrapper import org.bitcoins.server.BitcoinSAppConfig import org.bitcoins.testkit.BitcoinSTestAppConfig import org.bitcoins.testkit.node.fixture.NeutrinoNodeConnectedWithBitcoinds @@ -76,8 +76,11 @@ class NeutrinoNodeWithUncachedBitcoindTest extends NodeUnitTest with CachedTor { networkPayload = GetHeadersMessage(node.chainConfig.chain.genesisHash) //waiting for response to header query now - networkMessage = NetworkMessage(networkParam, networkPayload) - _ <- node.peerManager.offer(SendToPeer(networkMessage, Some(peer0))) + _ <- node.peerManager + .getPeerData(peer0) + .get + .peerMessageSenderApi + .sendMsg(networkPayload, Some(peer0)) nodeUri <- NodeTestUtil.getNodeURIFromBitcoind(bitcoinds(0)) _ <- bitcoinds(0).disconnectNode(nodeUri) _ = logger.debug( diff --git a/node/src/main/scala/org/bitcoins/node/PeerManager.scala b/node/src/main/scala/org/bitcoins/node/PeerManager.scala index 6549f80a59..be8125ca75 100644 --- a/node/src/main/scala/org/bitcoins/node/PeerManager.scala +++ b/node/src/main/scala/org/bitcoins/node/PeerManager.scala @@ -104,8 +104,46 @@ case class PeerManager( msg: NetworkPayload, peerOpt: Option[Peer]): Future[Unit] = { val networkMessage = NetworkMessage(nodeAppConfig.network, msg) - offer(SendToPeer(msg = networkMessage, peerOpt = peerOpt)) - .map(_ => ()) + + val sendToPeer = SendToPeer(networkMessage, peerOpt) + logger.debug( + s"Sending message ${sendToPeer.msg.payload.commandName} to peerOpt=${sendToPeer.peerOpt}") + val peerMsgSenderOptF: Future[Option[PeerMessageSender]] = + sendToPeer.peerOpt match { + case Some(peer) => + getPeerMsgSender(peer).flatMap { + case Some(peerMsgSender) => + Future.successful(Some(peerMsgSender)) + case None => + sendToPeer.msg.payload match { + case _: ControlPayload => + //peer may not be fully initialized, we may be doing the handshake with a peer + finderOpt.get + .getData(peer) + .map(_.peerMessageSender) match { + case Some(p) => p.map(Some(_)) + case None => FutureUtil.none + } + case _: DataPayload => + //peer must be fully initialized to send a data payload + val msg = + s"Cannot find peerOpt=${sendToPeer.peerOpt} to send message=${sendToPeer.msg.payload.commandName} to. It may have been disconnected, sending to another random peer." + logger.warn(msg) + randomPeerMsgSenderWithService(ServiceIdentifier.NODE_NETWORK) + } + } + case None => + randomPeerMsgSenderWithService(ServiceIdentifier.NODE_NETWORK) + } + + peerMsgSenderOptF.flatMap { + case Some(peerMsgSender) => + peerMsgSender + .sendMsg(sendToPeer.msg) + case None => + Future.failed(new RuntimeException( + s"Unable to find peer message sender to send msg=${sendToPeer.msg.header.commandName} to. This means we are not connected to any peers.")) + } } /** Gossips the given message to all peers except the excluded peer. If None given as excluded peer, gossip message to all peers */ @@ -711,53 +749,6 @@ case class PeerManager( StreamDataMessageWrapper, Future[DataMessageHandler]] = { Sink.foldAsync(initDmh) { - case (dmh, sendToPeer: SendToPeer) => - logger.debug( - s"Sending message ${sendToPeer.msg.payload.commandName} to peerOpt=${sendToPeer.peerOpt}") - val peerMsgSenderOptF: Future[Option[PeerMessageSender]] = - sendToPeer.peerOpt match { - case Some(peer) => - getPeerMsgSender(peer).flatMap { - case Some(peerMsgSender) => - Future.successful(Some(peerMsgSender)) - case None => - sendToPeer.msg.payload match { - case _: ControlPayload => - //peer may not be fully initialized, we may be doing the handshake with a peer - finderOpt.get - .getData(peer) - .map(_.peerMessageSender) match { - case Some(p) => p.map(Some(_)) - case None => FutureUtil.none - } - case _: DataPayload => - //peer must be fully initialized to send a data payload - val msg = - s"Cannot find peerOpt=${sendToPeer.peerOpt} to send message=${sendToPeer.msg.payload.commandName} to. It may have been disconnected, sending to another random peer." - logger.warn(msg) - randomPeerMsgSenderWithService( - ServiceIdentifier.NODE_NETWORK) - } - } - case None => - dmh.state match { - case s: SyncDataMessageHandlerState => - getPeerMsgSender(s.syncPeer) - case DoneSyncing | _: MisbehavingPeer | _: RemovePeers => - //pick a random peer to sync with - randomPeerMsgSenderWithService(ServiceIdentifier.NODE_NETWORK) - } - } - - peerMsgSenderOptF.flatMap { - case Some(peerMsgSender) => - peerMsgSender - .sendMsg(sendToPeer.msg) - .map(_ => handleDisconnectedPeer(sendToPeer, peerMsgSender, dmh)) - case None => - Future.failed(new RuntimeException( - s"Unable to find peer message sender to send msg=${sendToPeer.msg.header.commandName} to. This means we are not connected to any peers.")) - } case (dmh, DataMessageWrapper(payload, peer)) => logger.debug(s"Got ${payload.commandName} from peer=${peer} in stream") val peerMsgSenderOptF = getPeerMsgSender(peer) @@ -1011,38 +1002,6 @@ case class PeerManager( _ <- syncHelper(syncPeerOpt) } yield syncPeerOpt } - - /** Handles the case where we need to send a message to a peer, but that peer was disconnected - * We change the peer and the adjust the state in [[DataMessageHandler]] - * @param sendToPeer the peer we were originally sending the message to - * @param peerMessageSender the new peer we are going to send the message to - * @param dmh the data message handler we need to adjust state of - */ - private def handleDisconnectedPeer( - sendToPeer: SendToPeer, - peerMessageSender: PeerMessageSender, - dmh: DataMessageHandler): DataMessageHandler = { - val destination = peerMessageSender.peer - val newState: DataMessageHandlerState = sendToPeer.peerOpt match { - case Some(originalPeer) => - if (originalPeer != destination) { - //need to replace syncPeer with newSyncPeer - dmh.state match { - case s: SyncDataMessageHandlerState => - s.replaceSyncPeer(destination) - case m: MisbehavingPeer => m - case r: RemovePeers => r - case DoneSyncing => DoneSyncing - } - } else { - dmh.state - } - case None => - dmh.state - } - - dmh.copy(state = newState) - } } case class ResponseTimeout(payload: NetworkPayload) diff --git a/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala b/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala index e00b50b983..af1f42d9c7 100644 --- a/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala +++ b/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala @@ -898,4 +898,3 @@ case class SendResponseTimeout(peer: Peer, payload: NetworkPayload) extends StreamDataMessageWrapper case class SendToPeer(msg: NetworkMessage, peerOpt: Option[Peer]) - extends StreamDataMessageWrapper