diff --git a/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeTest.scala b/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeTest.scala index f2cc8125fa..d33cec5163 100644 --- a/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeTest.scala +++ b/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeTest.scala @@ -99,8 +99,9 @@ class NeutrinoNodeTest extends NodeTestWithCachedBitcoindPair { def allDisconn: Future[Unit] = AsyncUtil.retryUntilSatisfied( peers .map(p => - !peerManager.peerDataMap.contains( - p) && !peerManager.waitingForDeletion + !peerManager + .getPeerData(p) + .isDefined && !peerManager.waitingForDeletion .contains(p)) .forall(_ == true), maxTries = 5, diff --git a/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeWithUncachedBitcoindTest.scala b/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeWithUncachedBitcoindTest.scala index 0ac1174de0..0db3f83d5c 100644 --- a/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeWithUncachedBitcoindTest.scala +++ b/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeWithUncachedBitcoindTest.scala @@ -3,6 +3,7 @@ package org.bitcoins.node import org.bitcoins.asyncutil.AsyncUtil import org.bitcoins.core.p2p.{GetHeadersMessage, HeadersMessage} import org.bitcoins.core.protocol.blockchain.BlockHeader +import org.bitcoins.core.util.FutureUtil import org.bitcoins.node.models.Peer import org.bitcoins.node.networking.P2PClient.ExpectResponseCommand import org.bitcoins.node.networking.peer.DataMessageHandlerState.{ @@ -89,9 +90,8 @@ class NeutrinoNodeWithUncachedBitcoindTest extends NodeUnitTest with CachedTor { GetHeadersMessage(node.chainConfig.chain.genesisHash)) //waiting for response to header query now client <- node.peerManager - .peerDataMap(bitcoindPeers(0)) - .peerMessageSender - .map(_.client) + .getPeerMsgSender(bitcoindPeers(0)) + .map(_.get.client) _ = client.actor ! expectHeaders nodeUri <- NodeTestUtil.getNodeURIFromBitcoind(bitcoinds(0)) _ <- bitcoinds(0).disconnectNode(nodeUri) @@ -160,11 +160,8 @@ class NeutrinoNodeWithUncachedBitcoindTest extends NodeUnitTest with CachedTor { node.chainConfig)) invalidHeaderMessage = HeadersMessage(headers = Vector(invalidHeader)) - sender <- node.peerManager.peerDataMap(peer).peerMessageSender - _ <- node.peerManager.getDataMessageHandler.addToStream( - invalidHeaderMessage, - sender, - peer) + _ <- node.peerManager.getDataMessageHandler + .addToStream(invalidHeaderMessage, peer) bestChain = bitcoinds(1) _ <- NodeTestUtil.awaitSync(node, bestChain) } yield { @@ -180,17 +177,16 @@ class NeutrinoNodeWithUncachedBitcoindTest extends NodeUnitTest with CachedTor { def sendInvalidHeaders(peer: Peer): Future[Unit] = { val invalidHeaderMessage = HeadersMessage(headers = Vector(invalidHeader)) - val senderF = node.peerManager.peerDataMap(peer).peerMessageSender - - for { - sender <- senderF - sendFs = 1 + val sendFs = { + val count = 1 .to(node.nodeConfig.maxInvalidResponsesAllowed + 1) - .map(_ => - node.peerManager.getDataMessageHandler - .addToStream(invalidHeaderMessage, sender, peer)) - _ <- Future.sequence(sendFs) - } yield () + FutureUtil.sequentially[Int, Unit](count) { _ => + node.peerManager.getDataMessageHandler + .addToStream(invalidHeaderMessage, peer) + } + } + + sendFs.map(_ => ()) } for { diff --git a/node/src/main/scala/org/bitcoins/node/NeutrinoNode.scala b/node/src/main/scala/org/bitcoins/node/NeutrinoNode.scala index 07a6cf1c6f..be46830b06 100644 --- a/node/src/main/scala/org/bitcoins/node/NeutrinoNode.scala +++ b/node/src/main/scala/org/bitcoins/node/NeutrinoNode.scala @@ -15,14 +15,9 @@ import org.bitcoins.core.p2p.ServiceIdentifier import org.bitcoins.core.protocol.BlockStamp import org.bitcoins.node.config.NodeAppConfig import org.bitcoins.node.models.Peer -import org.bitcoins.node.networking.peer.DataMessageHandlerState.{ - DoneSyncing, - MisbehavingPeer -} import org.bitcoins.node.networking.peer.{ ControlMessageHandler, - DataMessageHandlerState, - SyncDataMessageHandlerState + DataMessageHandlerState } import java.time.Instant @@ -90,14 +85,13 @@ case class NeutrinoNode( peerManager.getDataMessageHandler.copy(state = DataMessageHandlerState.HeaderSync(syncPeer))) for { - peerMsgSender <- peerManager.peerDataMap(syncPeer).peerMessageSender header <- chainApi.getBestBlockHeader() bestFilterHeaderOpt <- chainApi.getBestFilterHeader() bestFilterOpt <- chainApi.getBestFilter() blockchains <- blockchainsF // Get all of our cached headers in case of a reorg - cachedHeaders = blockchains.flatMap(_.headers).map(_.hashBE.flip) - _ <- peerMsgSender.sendGetHeadersMessage(cachedHeaders) + cachedHeaders = blockchains.flatMap(_.headers).map(_.hashBE) + _ <- peerManager.sendGetHeadersMessage(cachedHeaders, Some(syncPeer)) hasStaleTip <- chainApi.isTipStale() _ <- { if (hasStaleTip) { @@ -144,10 +138,12 @@ case class NeutrinoNode( //do nothing Future.unit } else { - syncCompactFilters(bestFilterHeader, chainApi, Some(bestFilter)) + peerManager.syncCompactFilters(bestFilterHeader, + chainApi, + Some(bestFilter)) } case (Some(bestFilterHeader), None) => - syncCompactFilters(bestFilterHeader, chainApi, None) + peerManager.syncCompactFilters(bestFilterHeader, chainApi, None) } } @@ -159,63 +155,6 @@ case class NeutrinoNode( } yield () } - /** Starts sync compact filer headers. - * Only starts syncing compact filters if our compact filter headers are in sync with block headers - */ - private def syncCompactFilters( - bestFilterHeader: CompactFilterHeaderDb, - chainApi: ChainApi, - bestFilterOpt: Option[CompactFilterDb]): Future[Unit] = { - val syncPeerMsgSenderOptF = { - peerManager.getDataMessageHandler.state match { - case syncState: SyncDataMessageHandlerState => - val peerMsgSender = - peerManager.peerDataMap(syncState.syncPeer).peerMessageSender - Some(peerMsgSender) - case DoneSyncing | _: MisbehavingPeer => None - } - } - val sendCompactFilterHeaderMsgF = syncPeerMsgSenderOptF match { - case Some(syncPeerMsgSenderF) => - syncPeerMsgSenderF.flatMap( - _.sendNextGetCompactFilterHeadersCommand( - chainApi = chainApi, - filterHeaderBatchSize = chainConfig.filterHeaderBatchSize, - prevStopHash = bestFilterHeader.blockHashBE) - ) - case None => Future.successful(false) - } - sendCompactFilterHeaderMsgF.flatMap { isSyncFilterHeaders => - // If we have started syncing filters - if ( - !isSyncFilterHeaders && - bestFilterOpt.isDefined && - bestFilterOpt.get.hashBE != bestFilterHeader.filterHashBE - ) { - syncPeerMsgSenderOptF match { - case Some(syncPeerMsgSenderF) => - //means we are not syncing filter headers, and our filters are NOT - //in sync with our compact filter headers - syncPeerMsgSenderF.flatMap { sender => - sender - .sendNextGetCompactFilterCommand(chainApi = chainApi, - filterBatchSize = - chainConfig.filterBatchSize, - startHeight = - bestFilterOpt.get.height) - .map(_ => ()) - } - case None => - logger.warn( - s"Not syncing compact filters since we do not have a syncPeer set, bestFilterOpt=$bestFilterOpt") - Future.unit - } - } else { - Future.unit - } - } - } - /** Gets the number of compact filters in the database */ override def getFilterCount(): Future[Int] = chainApiFromDb().flatMap(_.getFilterCount()) diff --git a/node/src/main/scala/org/bitcoins/node/Node.scala b/node/src/main/scala/org/bitcoins/node/Node.scala index b77f631f34..5f801968c8 100644 --- a/node/src/main/scala/org/bitcoins/node/Node.scala +++ b/node/src/main/scala/org/bitcoins/node/Node.scala @@ -70,8 +70,7 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger { * `private[node]`. */ def send(msg: NetworkPayload, peer: Peer): Future[Unit] = { - val senderF = peerManager.peerDataMap(peer).peerMessageSender - senderF.flatMap(_.sendMsg(msg)) + peerManager.sendMsg(msg, Some(peer)) } /** Starts our node */ @@ -195,20 +194,19 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger { } syncPeerOpt match { case Some(peer) => - peerManager - .peerDataMap(peer) - .peerMessageSender - .flatMap(_.sendGetDataMessage(TypeIdentifier.MsgWitnessBlock, - blockHashes: _*)) + peerManager.sendGetDataMessages(typeIdentifier = + TypeIdentifier.MsgWitnessBlock, + hashes = blockHashes.map(_.flip), + peerOpt = Some(peer)) case None => throw new RuntimeException( "IBD not started yet. Cannot query for blocks.") } } else { - val peerMsgSenderF = peerManager.randomPeerMsgSenderWithService( - ServiceIdentifier.NODE_NETWORK) - peerMsgSenderF.flatMap( - _.sendGetDataMessage(TypeIdentifier.MsgWitnessBlock, blockHashes: _*)) + peerManager.sendGetDataMessages(typeIdentifier = + TypeIdentifier.MsgWitnessBlock, + hashes = blockHashes.map(_.flip), + peerOpt = None) } } diff --git a/node/src/main/scala/org/bitcoins/node/PeerManager.scala b/node/src/main/scala/org/bitcoins/node/PeerManager.scala index 816b455e79..0d6021f47c 100644 --- a/node/src/main/scala/org/bitcoins/node/PeerManager.scala +++ b/node/src/main/scala/org/bitcoins/node/PeerManager.scala @@ -1,12 +1,13 @@ package org.bitcoins.node import akka.actor.{ActorRef, ActorSystem, Cancellable, Props} -import akka.stream.OverflowStrategy +import akka.stream.{ActorAttributes, OverflowStrategy, Supervision} import akka.stream.scaladsl.{Sink, Source, SourceQueueWithComplete} import org.bitcoins.asyncutil.AsyncUtil import org.bitcoins.chain.blockchain.ChainHandler import org.bitcoins.chain.config.ChainAppConfig import org.bitcoins.core.api.chain.ChainApi +import org.bitcoins.core.api.chain.db.{CompactFilterDb, CompactFilterHeaderDb} import org.bitcoins.core.api.node.NodeType import org.bitcoins.core.p2p._ import org.bitcoins.core.util.{NetworkUtil, StartStopAsync} @@ -16,7 +17,7 @@ import org.bitcoins.node.models.{Peer, PeerDAO, PeerDb} import org.bitcoins.node.networking.peer._ import org.bitcoins.node.networking.P2PClientSupervisor import org.bitcoins.node.networking.peer.DataMessageHandlerState._ -import org.bitcoins.node.util.BitcoinSNodeUtil +import org.bitcoins.node.util.{BitcoinSNodeUtil, PeerMessageSenderApi} import scodec.bits.ByteVector import java.net.InetAddress @@ -35,6 +36,7 @@ case class PeerManager( nodeAppConfig: NodeAppConfig, chainAppConfig: ChainAppConfig) extends StartStopAsync[PeerManager] + with PeerMessageSenderApi with P2PLogger { private val _peerDataMap: mutable.Map[Peer, PeerData] = mutable.Map.empty @@ -80,6 +82,130 @@ case class PeerManager( .map(_.toVector) } + override def sendMsg( + msg: NetworkPayload, + peerOpt: Option[Peer]): Future[Unit] = { + val peerMsgSenderF = peerOpt match { + case Some(peer) => + val peerMsgSenderF = peerDataMap(peer).peerMessageSender + peerMsgSenderF + case None => + val peerMsgSenderF = randomPeerMsgSenderWithService( + ServiceIdentifier.NODE_NETWORK) + peerMsgSenderF + } + peerMsgSenderF.flatMap(_.sendMsg(msg)) + } + + /** Gossips the given message to all peers except the excluded peer. If None given as excluded peer, gossip message to all peers */ + override def gossipMessage( + msg: NetworkPayload, + excludedPeerOpt: Option[Peer]): Future[Unit] = { + val gossipPeers = excludedPeerOpt match { + case Some(excludedPeer) => + peerDataMap + .filterNot(_._1 == excludedPeer) + .map(_._1) + case None => peerDataMap.map(_._1) + } + + Future + .traverse(gossipPeers)(p => sendMsg(msg, Some(p))) + .map(_ => ()) + } + + override def sendGetHeadersMessage( + hashes: Vector[DoubleSha256DigestBE], + peerOpt: Option[Peer]): Future[Unit] = { + val peerMsgSenderF = peerOpt match { + case Some(peer) => + val peerMsgSenderF = peerDataMap(peer).peerMessageSender + peerMsgSenderF + case None => + val peerMsgSenderF = randomPeerMsgSenderWithService( + ServiceIdentifier.NODE_NETWORK) + peerMsgSenderF + } + peerMsgSenderF.flatMap(_.sendGetHeadersMessage(hashes.map(_.flip))) + } + + override def sendGetDataMessages( + typeIdentifier: TypeIdentifier, + hashes: Vector[DoubleSha256DigestBE], + peerOpt: Option[Peer]): Future[Unit] = { + peerOpt match { + case Some(peer) => + val peerMsgSenderF = peerDataMap(peer).peerMessageSender + val flip = hashes.map(_.flip) + peerMsgSenderF + .flatMap(_.sendGetDataMessage(typeIdentifier, flip: _*)) + case None => + val peerMsgSenderF = randomPeerMsgSenderWithService( + ServiceIdentifier.NODE_NETWORK) + peerMsgSenderF.flatMap( + _.sendGetDataMessage(TypeIdentifier.MsgWitnessBlock, + hashes.map(_.flip): _*)) + + } + } + + /** Starts sync compact filer headers. + * Only starts syncing compact filters if our compact filter headers are in sync with block headers + */ + def syncCompactFilters( + bestFilterHeader: CompactFilterHeaderDb, + chainApi: ChainApi, + bestFilterOpt: Option[CompactFilterDb])(implicit + chainAppConfig: ChainAppConfig): Future[Unit] = { + val syncPeerMsgSenderOptF = { + getDataMessageHandler.state match { + case syncState: SyncDataMessageHandlerState => + val peerMsgSender = + peerDataMap(syncState.syncPeer).peerMessageSender + Some(peerMsgSender) + case DoneSyncing | _: MisbehavingPeer => None + } + } + val sendCompactFilterHeaderMsgF = syncPeerMsgSenderOptF match { + case Some(syncPeerMsgSenderF) => + syncPeerMsgSenderF.flatMap( + _.sendNextGetCompactFilterHeadersCommand( + chainApi = chainApi, + filterHeaderBatchSize = chainAppConfig.filterHeaderBatchSize, + prevStopHash = bestFilterHeader.blockHashBE) + ) + case None => Future.successful(false) + } + sendCompactFilterHeaderMsgF.flatMap { isSyncFilterHeaders => + // If we have started syncing filters + if ( + !isSyncFilterHeaders && + bestFilterOpt.isDefined && + bestFilterOpt.get.hashBE != bestFilterHeader.filterHashBE + ) { + syncPeerMsgSenderOptF match { + case Some(syncPeerMsgSenderF) => + //means we are not syncing filter headers, and our filters are NOT + //in sync with our compact filter headers + syncPeerMsgSenderF.flatMap { sender => + sender + .sendNextGetCompactFilterCommand( + chainApi = chainApi, + filterBatchSize = chainAppConfig.filterBatchSize, + startHeight = bestFilterOpt.get.height) + .map(_ => ()) + } + case None => + logger.warn( + s"Not syncing compact filters since we do not have a syncPeer set, bestFilterOpt=$bestFilterOpt") + Future.unit + } + } else { + Future.unit + } + } + } + def getPeerMsgSender(peer: Peer): Future[Option[PeerMessageSender]] = { _peerDataMap.find(_._1 == peer).map(_._2.peerMessageSender) match { case Some(peerMsgSender) => peerMsgSender.map(Some(_)) @@ -223,7 +349,9 @@ case class PeerManager( } } - def peerDataMap: Map[Peer, PeerData] = _peerDataMap.toMap + private def peerDataMap: Map[Peer, PeerData] = _peerDataMap.toMap + + def getPeerData(peer: Peer): Option[PeerData] = peerDataMap.get(peer) override def stop(): Future[PeerManager] = { logger.info(s"Stopping PeerManager") @@ -475,29 +603,38 @@ case class PeerManager( } private val dataMessageStreamSource = Source - .queue[StreamDataMessageWrapper](1500, - overflowStrategy = - OverflowStrategy.backpressure) + .queue[StreamDataMessageWrapper]( + 8, + overflowStrategy = OverflowStrategy.backpressure, + maxConcurrentOffers = nodeAppConfig.maxConnectedPeers) .mapAsync(1) { - case msg @ DataMessageWrapper(payload, peerMsgSender, peer) => + case msg @ DataMessageWrapper(payload, peer) => logger.debug(s"Got ${payload.commandName} from peer=${peer} in stream") - getDataMessageHandler - .handleDataPayload(payload, peerMsgSender, peer) - .flatMap { newDmh => - newDmh.state match { - case m: MisbehavingPeer => - updateDataMessageHandler(newDmh) - //disconnect the misbehaving peer - for { - _ <- removePeer(m.badPeer) - _ <- node.syncFromNewPeer() - } yield msg - case _: SyncDataMessageHandlerState | DoneSyncing => - updateDataMessageHandler(newDmh) - Future.successful(msg) - } + val peerMsgSenderOptF = getPeerMsgSender(peer) + peerMsgSenderOptF.flatMap { + case None => + Future.failed(new RuntimeException( + s"Couldn't find PeerMessageSender that corresponds with peer=$peer msg=${payload.commandName}. Was it disconnected?")) + case Some(peerMsgSender) => + getDataMessageHandler + .handleDataPayload(payload, peerMsgSender, peer) + .flatMap { newDmh => + newDmh.state match { + case m: MisbehavingPeer => + updateDataMessageHandler(newDmh) + //disconnect the misbehaving peer + for { + _ <- removePeer(m.badPeer) + _ <- node.syncFromNewPeer() + } yield msg + case _: SyncDataMessageHandlerState | DoneSyncing => + updateDataMessageHandler(newDmh) + Future.successful(msg) + } + + } + } - } case msg @ HeaderTimeoutWrapper(peer) => logger.debug(s"Processing timeout header for $peer") onHeaderRequestTimeout(peer, getDataMessageHandler.state).map { @@ -510,13 +647,21 @@ case class PeerManager( private val dataMessageStreamSink = Sink.foreach[StreamDataMessageWrapper] { - case DataMessageWrapper(payload, _, peer) => + case DataMessageWrapper(payload, peer) => logger.debug(s"Done processing ${payload.commandName} in peer=${peer}") case HeaderTimeoutWrapper(_) => } + private val decider: Supervision.Decider = { case err: Throwable => + logger.error(s"Error occurred while processing p2p pipeline stream", err) + Supervision.Resume + } + val dataMessageStream: SourceQueueWithComplete[StreamDataMessageWrapper] = - dataMessageStreamSource.to(dataMessageStreamSink).run() + dataMessageStreamSource + .to(dataMessageStreamSink) + .withAttributes(ActorAttributes.supervisionStrategy(decider)) + .run() def fetchCompactFilterHeaders( currentDmh: DataMessageHandler): Future[DataMessageHandler] = { diff --git a/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala b/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala index ce29950172..2388bc5615 100644 --- a/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala +++ b/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala @@ -48,12 +48,11 @@ case class DataMessageHandler( copy(filterBatchCache = Set.empty, state = DoneSyncing) } - def addToStream( - payload: DataPayload, - peerMsgSender: PeerMessageSender, - peer: Peer): Future[Unit] = { - val msg = DataMessageWrapper(payload, peerMsgSender, peer) - peerManager.dataMessageStream.offer(msg).map(_ => ()) + def addToStream(payload: DataPayload, peer: Peer): Future[Unit] = { + val msg = DataMessageWrapper(payload, peer) + peerManager.dataMessageStream + .offer(msg) + .map(_ => ()) } private def isChainIBD: Future[Boolean] = { @@ -476,12 +475,15 @@ case class DataMessageHandler( peerMsgSender: PeerMessageSender): Future[DataMessageHandler] = { val result = state match { case HeaderSync(peer) => - peerManager.peerDataMap(peer).updateInvalidMessageCount() - if ( - peerManager - .peerDataMap(peer) - .exceededMaxInvalidMessages && peerManager.peers.size > 1 - ) { + val peerDataOpt = peerManager.getPeerData(peer) + val peerData = peerDataOpt match { + case Some(peerData) => peerData + case None => + sys.error( + s"Cannot find peer we are syncing with in PeerManager, peer=$peer") + } + peerData.updateInvalidMessageCount() + if (peerData.exceededMaxInvalidMessages && peerManager.peers.size > 1) { logger.warn( s"$peer exceeded max limit of invalid messages. Disconnecting.") @@ -786,15 +788,12 @@ case class DataMessageHandler( logger.info( s"Starting to validate headers now. Verifying with ${newState.verifyingWith}") - val getHeadersAllF = peerManager.peerDataMap - .filter(_._1 != peer) - .map( - _._2.peerMessageSender.flatMap( - _.sendGetHeadersMessage(lastHash)) - ) + val getHeadersAllF = { + val msg = GetHeadersMessage(lastHash) + peerManager.gossipMessage(msg, excludedPeerOpt = Some(peer)) + } - Future - .sequence(getHeadersAllF) + getHeadersAllF .map(_ => newDmh.copy(state = newState)) } else { //if just one peer then can proceed ahead directly @@ -856,10 +855,7 @@ case class DataMessageHandler( sealed trait StreamDataMessageWrapper -case class DataMessageWrapper( - payload: DataPayload, - peerMsgSender: PeerMessageSender, - peer: Peer) +case class DataMessageWrapper(payload: DataPayload, peer: Peer) extends StreamDataMessageWrapper case class HeaderTimeoutWrapper(peer: Peer) extends StreamDataMessageWrapper diff --git a/node/src/main/scala/org/bitcoins/node/networking/peer/PeerMessageReceiver.scala b/node/src/main/scala/org/bitcoins/node/networking/peer/PeerMessageReceiver.scala index 32b2d7d023..4f8b1916e5 100644 --- a/node/src/main/scala/org/bitcoins/node/networking/peer/PeerMessageReceiver.scala +++ b/node/src/main/scala/org/bitcoins/node/networking/peer/PeerMessageReceiver.scala @@ -72,7 +72,7 @@ case class PeerMessageReceiver( sender = peerMsgSender, curReceiverState = curState) case dataPayload: DataPayload => - handleDataPayload(payload = dataPayload, sender = peerMsgSender) + handleDataPayload(payload = dataPayload) .map(_ => curState) } } @@ -85,12 +85,11 @@ case class PeerMessageReceiver( * @param sender */ private def handleDataPayload( - payload: DataPayload, - sender: PeerMessageSender): Future[PeerMessageReceiver] = { + payload: DataPayload): Future[PeerMessageReceiver] = { //else it means we are receiving this data payload from a peer, //we need to handle it dataMessageHandler - .addToStream(payload, sender, peer) + .addToStream(payload, peer) .map(_ => new PeerMessageReceiver(controlMessageHandler, dataMessageHandler, diff --git a/node/src/main/scala/org/bitcoins/node/util/PeerMessageSenderApi.scala b/node/src/main/scala/org/bitcoins/node/util/PeerMessageSenderApi.scala new file mode 100644 index 0000000000..6584ea5719 --- /dev/null +++ b/node/src/main/scala/org/bitcoins/node/util/PeerMessageSenderApi.scala @@ -0,0 +1,34 @@ +package org.bitcoins.node.util + +import org.bitcoins.core.p2p.{NetworkPayload, TypeIdentifier} +import org.bitcoins.crypto.{DoubleSha256DigestBE} +import org.bitcoins.node.models.Peer + +import scala.concurrent.Future + +trait PeerMessageSenderApi { + + def sendGetDataMessage( + typeIdentifier: TypeIdentifier, + hash: DoubleSha256DigestBE, + peerOpt: Option[Peer]): Future[Unit] = { + sendGetDataMessages(typeIdentifier, Vector(hash), peerOpt) + } + + def sendGetDataMessages( + typeIdentifier: TypeIdentifier, + hashes: Vector[DoubleSha256DigestBE], + peerOpt: Option[Peer]): Future[Unit] + + def sendGetHeadersMessage( + hashes: Vector[DoubleSha256DigestBE], + peerOpt: Option[Peer]): Future[Unit] + + /** Gossips the given message to all peers except the excluded peer. If None given as excluded peer, gossip message to all peers */ + def gossipMessage( + msg: NetworkPayload, + excludedPeerOpt: Option[Peer]): Future[Unit] + + def sendMsg(msg: NetworkPayload, peerOpt: Option[Peer]): Future[Unit] + +}