mirror of
https://github.com/bitcoin-s/bitcoin-s.git
synced 2025-03-26 21:42:48 +01:00
2023 05 05 Make DataMessageHandler
be accummulated in our akka stream (#5098)
* WIP * WIP2 * Rebase with P2PClientCallbacks * scalafmt * Rework stream materialization in PeerManager to have DataMessageHandler encapsulated in the stream * WIP: Fix compile * Get everything compiling, ignore Uncached tests for now * Increase queue size * WIP: Make queue re-usable based on PeerManager.{start()/stop()} * Get things compiling after rebase * Try to handle case where we have SendToPeer in queue with peer that has been disconnected * Empty commit to re-run CI * Add sleep to let version/verack handshake complete in P2PClientActorTest * Empty commit to re-run CI * Reduce usage of bitcoind in P2PClientActorTest from 3 bitcoinds -> 2 bitcoinds * Add error message to PeerFinder.stop() so we know what peer was not getting removed * Cleanup error message * Fix scalafmt, add state to log message * Fix bug PeerMessageReceiverState.stopReconnect() which didn't send DisconnectedPeer() to queue * Empty commit to re-run CI * Empty commit to re-run CI * Reduce log level of onP2PClientDisconnected * Empty commit to re-run CI * Small cleanup * scalafmt * Get new reference to ChainHandler in more places node.syncFromNewPeer() is called * Fix rebase * Commit to run on CI * Empty commit to run CI * Empty commit to run CI * Empty commit to re-run CI * Empty commit to re-run CI * Try to reproduce with logs on CI * Empty commit to re-run CI * WIP * Rework onP2PClientDisconnected to return new DataMessagehandlerState * Save comment about bug * Add a helper method switchSyncToPeer to take into account the previous DataMessagehandlerState if we need to start a new sync because of disconnection * Empty commit to re-run CI * Empty commit to re-run CI * Cleanup * Fix case where we weren't sending getheaders to new peer when old peer was disconnected when in state DoneSyncing * Revert logback-test.xml * remove comment * Try using syncHelper() rather than getHeaderSyncHelper() to make sure we sync filters as well if needed * Re-add log * Fix bug where we weren't starting to sync filter headers * Tighten dmhState type to SyncDataMessageHandler on syncFilters(), clean up uncessary code * Empty commit to re-run CI * Empty commit to re-run CI
This commit is contained in:
parent
458a80d854
commit
c3eed1e92b
8 changed files with 359 additions and 249 deletions
|
@ -80,7 +80,9 @@ class NeutrinoNodeWithUncachedBitcoindTest extends NodeUnitTest with CachedTor {
|
|||
_ <- node.peerManager.offer(SendToPeer(networkMessage, Some(peer0)))
|
||||
nodeUri <- NodeTestUtil.getNodeURIFromBitcoind(bitcoinds(0))
|
||||
_ <- bitcoinds(0).disconnectNode(nodeUri)
|
||||
_ = logger.info(s"Disconnected $nodeUri from bitcoind")
|
||||
_ = logger.debug(
|
||||
s"Disconnected $nodeUri from bitcoind bitcoind(0).p2pPort=${peer0.socket.getPort} bitcoind(1).p2pPort=${bitcoinds(
|
||||
1).instance.p2pPort}")
|
||||
//old peer we were syncing with that just disconnected us
|
||||
_ <- NodeTestUtil.awaitAllSync(node, bitcoinds(1))
|
||||
} yield {
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package org.bitcoins.node.networking
|
||||
|
||||
import akka.testkit.{TestActorRef, TestProbe}
|
||||
import org.bitcoins.asyncutil.AsyncUtil
|
||||
import org.bitcoins.chain.config.ChainAppConfig
|
||||
import org.bitcoins.node.Node
|
||||
import org.bitcoins.node.config.NodeAppConfig
|
||||
|
@ -41,7 +42,7 @@ class P2PClientActorTest
|
|||
}
|
||||
|
||||
lazy val bitcoindRpc2F =
|
||||
BitcoindRpcTestUtil.startedBitcoindRpcClient(clientAccum = clientAccum)
|
||||
cachedBitcoindWithFundsF
|
||||
|
||||
lazy val bitcoindPeer2F = bitcoindRpcF.flatMap { bitcoind =>
|
||||
NodeTestUtil.getBitcoindPeer(bitcoind)
|
||||
|
@ -150,6 +151,8 @@ class P2PClientActorTest
|
|||
isConnected <- TestAsyncUtil.retryUntilSatisfiedF(p2pClient.isConnected,
|
||||
1.second,
|
||||
15)
|
||||
//let version/verack handshake complete before disconnecting immediately
|
||||
_ <- AsyncUtil.nonBlockingSleep(1.second)
|
||||
} yield isConnected
|
||||
|
||||
isConnectedF.flatMap { _ =>
|
||||
|
|
|
@ -208,13 +208,19 @@ case class PeerFinder(
|
|||
//delete try queue
|
||||
_peersToTry.clear()
|
||||
|
||||
for {
|
||||
val stopF = for {
|
||||
_ <- Future.traverse(_peerData.map(_._2))(_.stop())
|
||||
_ <- AsyncUtil
|
||||
.retryUntilSatisfied(_peerData.isEmpty,
|
||||
interval = 1.seconds,
|
||||
maxTries = 30)
|
||||
} yield this
|
||||
|
||||
stopF.failed.foreach { e =>
|
||||
logger.error(s"Failed to stop peer finder. Peers: ${_peerData.map(_._1)}",
|
||||
e)
|
||||
}
|
||||
stopF
|
||||
}
|
||||
|
||||
/** creates and initialises a new test peer */
|
||||
|
|
|
@ -1,7 +1,13 @@
|
|||
package org.bitcoins.node
|
||||
|
||||
import akka.Done
|
||||
import akka.{Done, NotUsed}
|
||||
import akka.actor.{ActorRef, ActorSystem, Props}
|
||||
import akka.stream.{
|
||||
ActorAttributes,
|
||||
OverflowStrategy,
|
||||
QueueOfferResult,
|
||||
Supervision
|
||||
}
|
||||
import akka.stream.scaladsl.{
|
||||
Keep,
|
||||
RunnableGraph,
|
||||
|
@ -10,12 +16,6 @@ import akka.stream.scaladsl.{
|
|||
SourceQueue,
|
||||
SourceQueueWithComplete
|
||||
}
|
||||
import akka.stream.{
|
||||
ActorAttributes,
|
||||
OverflowStrategy,
|
||||
QueueOfferResult,
|
||||
Supervision
|
||||
}
|
||||
import grizzled.slf4j.Logging
|
||||
import org.bitcoins.asyncutil.AsyncUtil
|
||||
import org.bitcoins.chain.blockchain.ChainHandler
|
||||
|
@ -125,10 +125,18 @@ case class PeerManager(
|
|||
.map(_._1)
|
||||
case None => peerDataMap.map(_._1)
|
||||
}
|
||||
|
||||
Future
|
||||
.traverse(gossipPeers)(p => sendMsg(msg, Some(p)))
|
||||
.map(_ => ())
|
||||
if (gossipPeers.isEmpty) {
|
||||
logger.warn(
|
||||
s"We have 0 peers to gossip message=${msg.commandName} to peerDataMap=${peerDataMap
|
||||
.map(_._1)}.")
|
||||
Future.unit
|
||||
} else {
|
||||
Future
|
||||
.traverse(gossipPeers) { p =>
|
||||
sendMsg(msg, Some(p))
|
||||
}
|
||||
.map(_ => ())
|
||||
}
|
||||
}
|
||||
|
||||
override def sendGetHeadersMessage(
|
||||
|
@ -141,7 +149,7 @@ case class PeerManager(
|
|||
override def gossipGetHeadersMessage(
|
||||
hashes: Vector[DoubleSha256DigestBE]): Future[Unit] = {
|
||||
val headersMsg = GetHeadersMessage(hashes.distinct.take(101).map(_.flip))
|
||||
gossipMessage(headersMsg, None)
|
||||
gossipMessage(msg = headersMsg, excludedPeerOpt = None)
|
||||
}
|
||||
|
||||
override def sendGetDataMessages(
|
||||
|
@ -199,29 +207,21 @@ case class PeerManager(
|
|||
/** Starts sync compact filer headers.
|
||||
* Only starts syncing compact filters if our compact filter headers are in sync with block headers
|
||||
*/
|
||||
def syncCompactFilters(
|
||||
private def syncCompactFilters(
|
||||
bestFilterHeader: CompactFilterHeaderDb,
|
||||
chainApi: ChainApi,
|
||||
bestFilterOpt: Option[CompactFilterDb])(implicit
|
||||
bestFilterOpt: Option[CompactFilterDb],
|
||||
dmhState: SyncDataMessageHandlerState)(implicit
|
||||
chainAppConfig: ChainAppConfig): Future[Unit] = {
|
||||
val syncPeerOptF = {
|
||||
getDataMessageHandler.state match {
|
||||
case syncState: SyncDataMessageHandlerState =>
|
||||
Some(syncState.syncPeer)
|
||||
case DoneSyncing | _: MisbehavingPeer | _: RemovePeers => None
|
||||
}
|
||||
}
|
||||
val sendCompactFilterHeaderMsgF = syncPeerOptF match {
|
||||
case Some(syncPeer) =>
|
||||
PeerManager.sendNextGetCompactFilterHeadersCommand(
|
||||
peerMessageSenderApi = this,
|
||||
chainApi = chainApi,
|
||||
peer = syncPeer,
|
||||
filterHeaderBatchSize = chainAppConfig.filterHeaderBatchSize,
|
||||
prevStopHash = bestFilterHeader.blockHashBE
|
||||
)
|
||||
case None => Future.successful(false)
|
||||
}
|
||||
val syncPeer = dmhState.syncPeer
|
||||
val sendCompactFilterHeaderMsgF =
|
||||
PeerManager.sendNextGetCompactFilterHeadersCommand(
|
||||
peerMessageSenderApi = this,
|
||||
chainApi = chainApi,
|
||||
peer = syncPeer,
|
||||
filterHeaderBatchSize = chainAppConfig.filterHeaderBatchSize,
|
||||
prevStopHash = bestFilterHeader.blockHashBE
|
||||
)
|
||||
sendCompactFilterHeaderMsgF.flatMap { isSyncFilterHeaders =>
|
||||
// If we have started syncing filters
|
||||
if (
|
||||
|
@ -229,23 +229,14 @@ case class PeerManager(
|
|||
bestFilterOpt.isDefined &&
|
||||
bestFilterOpt.get.hashBE != bestFilterHeader.filterHashBE
|
||||
) {
|
||||
syncPeerOptF match {
|
||||
case Some(syncPeer) =>
|
||||
//means we are not syncing filter headers, and our filters are NOT
|
||||
//in sync with our compact filter headers
|
||||
PeerManager
|
||||
.sendNextGetCompactFilterCommand(
|
||||
peerMessageSenderApi = this,
|
||||
chainApi = chainApi,
|
||||
filterBatchSize = chainAppConfig.filterBatchSize,
|
||||
startHeight = bestFilterOpt.get.height,
|
||||
peer = syncPeer)
|
||||
.map(_ => ())
|
||||
case None =>
|
||||
logger.warn(
|
||||
s"Not syncing compact filters since we do not have a syncPeer set, bestFilterOpt=$bestFilterOpt")
|
||||
Future.unit
|
||||
}
|
||||
PeerManager
|
||||
.sendNextGetCompactFilterCommand(
|
||||
peerMessageSenderApi = this,
|
||||
chainApi = chainApi,
|
||||
filterBatchSize = chainAppConfig.filterBatchSize,
|
||||
startHeight = bestFilterOpt.get.height,
|
||||
peer = syncPeer)
|
||||
.map(_ => ())
|
||||
} else {
|
||||
Future.unit
|
||||
}
|
||||
|
@ -343,9 +334,12 @@ case class PeerManager(
|
|||
|
||||
override def start(): Future[PeerManager] = {
|
||||
logger.debug(s"Starting PeerManager")
|
||||
val (queue, doneF) = dataMessageStreamGraph.run()
|
||||
val (queue, source) = dataMessageStreamSource.preMaterialize()
|
||||
val initDmh = buildStatelessDataMessagehandler(queue)
|
||||
val graph = buildDataMessageStreamGraph(initDmh = initDmh, source = source)
|
||||
dataMessageQueueOpt = Some(queue)
|
||||
streamDoneFOpt = Some(doneF)
|
||||
val dmhF = graph.run()
|
||||
streamDoneFOpt = Some(dmhF)
|
||||
val finder = PeerFinder(
|
||||
paramPeers = paramPeers,
|
||||
controlMessageHandler = ControlMessageHandler(this),
|
||||
|
@ -395,7 +389,6 @@ case class PeerManager(
|
|||
_ = {
|
||||
//reset all variables
|
||||
dataMessageQueueOpt = None
|
||||
dataMessageHandlerOpt = None
|
||||
streamDoneFOpt = None
|
||||
finderOpt = None
|
||||
}
|
||||
|
@ -405,12 +398,6 @@ case class PeerManager(
|
|||
this
|
||||
}
|
||||
|
||||
stopF.failed.foreach { e =>
|
||||
logger.error(
|
||||
s"Failed to stop peer manager. Peers: ${_peerDataMap.map(_._1)}, waiting for deletion: $waitingForDeletion",
|
||||
e)
|
||||
}
|
||||
|
||||
stopF
|
||||
}
|
||||
|
||||
|
@ -524,7 +511,6 @@ case class PeerManager(
|
|||
s"onInitialization cannot be run, PeerFinder was not started")
|
||||
Future.unit
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/** @param peer the peer we were disconencted from
|
||||
|
@ -533,24 +519,26 @@ case class PeerManager(
|
|||
*/
|
||||
private def onP2PClientDisconnected(
|
||||
peer: Peer,
|
||||
forceReconnect: Boolean): Future[Unit] = {
|
||||
forceReconnect: Boolean,
|
||||
state: DataMessageHandlerState): Future[DataMessageHandlerState] = {
|
||||
logger.debug(
|
||||
s"Client stopped for $peer peers=$peers state=$state forceReconnect=$forceReconnect finder.isDefined=${finderOpt.isDefined} peerDataMap=${peerDataMap
|
||||
.map(_._1)}")
|
||||
finderOpt match {
|
||||
case Some(finder) =>
|
||||
require(!finder.hasPeer(peer) || !peerDataMap.contains(peer),
|
||||
s"$peer cannot be both a test and a persistent peer")
|
||||
|
||||
logger.info(s"Client stopped for $peer peers=$peers")
|
||||
if (finder.hasPeer(peer)) {
|
||||
//client actor for one of the test peers stopped, can remove it from map now
|
||||
finder.removePeer(peer)
|
||||
Future.unit
|
||||
Future.successful(state)
|
||||
} else if (peerDataMap.contains(peer)) {
|
||||
//actor stopped for one of the persistent peers, can happen in case a reconnection attempt failed due to
|
||||
//reconnection tries exceeding the max limit in which the client was stopped to disconnect from it, remove it
|
||||
_peerDataMap.remove(peer)
|
||||
//getDataMesageHandler.state is already mutated from another thread
|
||||
//this will be set to the new sync peer not the old one.
|
||||
val state = getDataMessageHandler.state
|
||||
val syncPeerOpt = state match {
|
||||
case s: SyncDataMessageHandlerState =>
|
||||
Some(s.syncPeer)
|
||||
|
@ -560,11 +548,27 @@ case class PeerManager(
|
|||
}
|
||||
val shouldReconnect =
|
||||
(forceReconnect || connectedPeerCount == 0) && isStarted.get
|
||||
if (peers.exists(_ != peer) && syncPeerOpt.isDefined) {
|
||||
syncFromNewPeer().map(_ => ())
|
||||
if (peers.exists(_ != peer)) {
|
||||
val randomPeerOptF = randomPeerWithService(
|
||||
ServiceIdentifier.NODE_COMPACT_FILTERS)
|
||||
randomPeerOptF.flatMap {
|
||||
case Some(peer) =>
|
||||
state match {
|
||||
case syncState: SyncDataMessageHandlerState =>
|
||||
switchSyncToPeer(oldSyncState = syncState, newPeer = peer)
|
||||
case DoneSyncing =>
|
||||
//defensively try to sync with the new peer
|
||||
syncHelper(Some(peer)).map(_ => DoneSyncing)
|
||||
case x @ (_: MisbehavingPeer | _: RemovePeers) =>
|
||||
Future.successful(x)
|
||||
}
|
||||
case None =>
|
||||
//if we have no new peers should we just switch to DoneSyncing?
|
||||
Future.successful(state)
|
||||
}
|
||||
} else if (syncPeerOpt.isDefined) {
|
||||
if (shouldReconnect) {
|
||||
finder.reconnect(peer)
|
||||
finder.reconnect(peer).map(_ => state)
|
||||
} else {
|
||||
val exn = new RuntimeException(
|
||||
s"No new peers to sync from, cannot start new sync. Terminated sync with peer=$peer current syncPeer=$syncPeerOpt state=${state} peers=$peers")
|
||||
|
@ -572,23 +576,23 @@ case class PeerManager(
|
|||
}
|
||||
} else {
|
||||
if (shouldReconnect) {
|
||||
finder.reconnect(peer)
|
||||
finder.reconnect(peer).map(_ => state)
|
||||
} else {
|
||||
Future.unit
|
||||
Future.successful(state)
|
||||
}
|
||||
}
|
||||
} else if (waitingForDeletion.contains(peer)) {
|
||||
//a peer we wanted to disconnect has remove has stopped the client actor, finally mark this as deleted
|
||||
_waitingForDeletion.remove(peer)
|
||||
Future.unit
|
||||
Future.successful(state)
|
||||
} else {
|
||||
logger.warn(s"onP2PClientStopped called for unknown $peer")
|
||||
Future.unit
|
||||
Future.successful(state)
|
||||
}
|
||||
case None =>
|
||||
logger.warn(
|
||||
s"onP2PClientStopped cannot be run, PeerFinder was not started")
|
||||
Future.unit
|
||||
Future.successful(state)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -617,8 +621,9 @@ case class PeerManager(
|
|||
|
||||
private def onQueryTimeout(
|
||||
payload: ExpectsResponse,
|
||||
peer: Peer): Future[Unit] = {
|
||||
logger.debug(s"Query timeout out for $peer")
|
||||
peer: Peer,
|
||||
state: DataMessageHandlerState): Future[Unit] = {
|
||||
logger.debug(s"Query timeout out for $peer with payload=${payload}")
|
||||
|
||||
//if we are removing this peer and an existing query timed out because of that
|
||||
// peerData will not have this peer
|
||||
|
@ -630,7 +635,7 @@ case class PeerManager(
|
|||
case _: GetHeadersMessage =>
|
||||
offer(HeaderTimeoutWrapper(peer)).map(_ => ())
|
||||
case _ =>
|
||||
val syncPeer = getDataMessageHandler.state match {
|
||||
val syncPeer = state match {
|
||||
case syncState: SyncDataMessageHandlerState =>
|
||||
syncState.syncPeer
|
||||
case s @ (DoneSyncing | _: MisbehavingPeer | _: RemovePeers) =>
|
||||
|
@ -644,22 +649,22 @@ case class PeerManager(
|
|||
|
||||
private def onHeaderRequestTimeout(
|
||||
peer: Peer,
|
||||
state: DataMessageHandlerState): Future[DataMessageHandler] = {
|
||||
dmh: DataMessageHandler): Future[DataMessageHandler] = {
|
||||
val state = dmh.state
|
||||
logger.info(s"Header request timed out from $peer in state $state")
|
||||
state match {
|
||||
case HeaderSync(_) | MisbehavingPeer(_) =>
|
||||
syncFromNewPeer().map(_ => getDataMessageHandler)
|
||||
|
||||
case HeaderSync(_) | MisbehavingPeer(_) | DoneSyncing =>
|
||||
syncFromNewPeer().map(_ => dmh)
|
||||
case headerState @ ValidatingHeaders(_, _, failedCheck, _) =>
|
||||
val newHeaderState = headerState.copy(failedCheck = failedCheck + peer)
|
||||
val newDmh = getDataMessageHandler.copy(state = newHeaderState)
|
||||
val newDmh = dmh.copy(state = newHeaderState)
|
||||
|
||||
if (newHeaderState.validated) {
|
||||
PeerManager.fetchCompactFilterHeaders(newDmh, this)
|
||||
} else Future.successful(newDmh)
|
||||
|
||||
case DoneSyncing | _: FilterHeaderSync | _: FilterSync | _: RemovePeers =>
|
||||
Future.successful(getDataMessageHandler)
|
||||
case _: FilterHeaderSync | _: FilterSync | _: RemovePeers =>
|
||||
Future.successful(dmh)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -677,15 +682,36 @@ case class PeerManager(
|
|||
}
|
||||
}
|
||||
|
||||
private def buildStatelessDataMessagehandler(
|
||||
queue: SourceQueueWithComplete[
|
||||
StreamDataMessageWrapper]): DataMessageHandler = {
|
||||
DataMessageHandler(
|
||||
chainApi = ChainHandler.fromDatabase(),
|
||||
walletCreationTimeOpt = walletCreationTimeOpt,
|
||||
queue = queue,
|
||||
peers = Vector.empty,
|
||||
peerMessgeSenderApi = this,
|
||||
peerDataOpt = None,
|
||||
state = DoneSyncing,
|
||||
filterBatchCache = Set.empty
|
||||
)
|
||||
}
|
||||
|
||||
private val dataMessageStreamSource: Source[
|
||||
StreamDataMessageWrapper,
|
||||
SourceQueueWithComplete[StreamDataMessageWrapper]] = Source
|
||||
.queue[StreamDataMessageWrapper](
|
||||
16 * nodeAppConfig.maxConnectedPeers,
|
||||
overflowStrategy = OverflowStrategy.backpressure,
|
||||
maxConcurrentOffers = nodeAppConfig.maxConnectedPeers)
|
||||
.mapAsync(1) {
|
||||
case sendToPeer: SendToPeer =>
|
||||
SourceQueueWithComplete[StreamDataMessageWrapper]] = {
|
||||
Source
|
||||
.queue[StreamDataMessageWrapper](
|
||||
16 * nodeAppConfig.maxConnectedPeers,
|
||||
overflowStrategy = OverflowStrategy.backpressure,
|
||||
maxConcurrentOffers = nodeAppConfig.maxConnectedPeers)
|
||||
}
|
||||
|
||||
private def buildDataMessageStreamSink(initDmh: DataMessageHandler): Sink[
|
||||
StreamDataMessageWrapper,
|
||||
Future[DataMessageHandler]] = {
|
||||
Sink.foldAsync(initDmh) {
|
||||
case (dmh, sendToPeer: SendToPeer) =>
|
||||
logger.debug(
|
||||
s"Sending message ${sendToPeer.msg.payload.commandName} to peerOpt=${sendToPeer.peerOpt}")
|
||||
val peerMsgSenderOptF: Future[Option[PeerMessageSender]] =
|
||||
|
@ -706,12 +732,15 @@ case class PeerManager(
|
|||
}
|
||||
case _: DataPayload =>
|
||||
//peer must be fully initialized to send a data payload
|
||||
Future.failed(new RuntimeException(
|
||||
s"Cannot find peer message sender to send message=${sendToPeer.msg.payload.commandName} to peerOpt=${sendToPeer.peerOpt}"))
|
||||
val msg =
|
||||
s"Cannot find peerOpt=${sendToPeer.peerOpt} to send message=${sendToPeer.msg.payload.commandName} to. It may have been disconnected, sending to another random peer."
|
||||
logger.warn(msg)
|
||||
randomPeerMsgSenderWithService(
|
||||
ServiceIdentifier.NODE_NETWORK)
|
||||
}
|
||||
}
|
||||
case None =>
|
||||
getDataMessageHandler.state match {
|
||||
dmh.state match {
|
||||
case s: SyncDataMessageHandlerState =>
|
||||
getPeerMsgSender(s.syncPeer)
|
||||
case DoneSyncing | _: MisbehavingPeer | _: RemovePeers =>
|
||||
|
@ -724,100 +753,88 @@ case class PeerManager(
|
|||
case Some(peerMsgSender) =>
|
||||
peerMsgSender
|
||||
.sendMsg(sendToPeer.msg)
|
||||
.map(_ => sendToPeer)
|
||||
.map(_ => handleDisconnectedPeer(sendToPeer, peerMsgSender, dmh))
|
||||
case None =>
|
||||
Future.failed(new RuntimeException(
|
||||
s"Unable to find peer message sender to send msg=${sendToPeer.msg.header.commandName} to"))
|
||||
s"Unable to find peer message sender to send msg=${sendToPeer.msg.header.commandName} to. This means we are not connected to any peers."))
|
||||
}
|
||||
case msg @ DataMessageWrapper(payload, peer) =>
|
||||
logger.debug(
|
||||
s"Got ${payload.commandName} from peer=${peer} in stream state=${getDataMessageHandler.state}")
|
||||
case (dmh, DataMessageWrapper(payload, peer)) =>
|
||||
logger.debug(s"Got ${payload.commandName} from peer=${peer} in stream")
|
||||
val peerMsgSenderOptF = getPeerMsgSender(peer)
|
||||
peerMsgSenderOptF.flatMap {
|
||||
case None =>
|
||||
Future.failed(new RuntimeException(
|
||||
s"Couldn't find PeerMessageSender that corresponds with peer=$peer msg=${payload.commandName}. Was it disconnected?"))
|
||||
case Some(_) =>
|
||||
val dmh = {
|
||||
getDataMessageHandler.copy(peerDataOpt = getPeerData(peer))
|
||||
}
|
||||
dmh
|
||||
val peerDmh = dmh.copy(peerDataOpt = getPeerData(peer))
|
||||
|
||||
val resultF = peerDmh
|
||||
.handleDataPayload(payload, peer)
|
||||
.flatMap { newDmh =>
|
||||
newDmh.state match {
|
||||
case m: MisbehavingPeer =>
|
||||
updateDataMessageHandler(newDmh)
|
||||
//disconnect the misbehaving peer
|
||||
for {
|
||||
_ <- removePeer(m.badPeer)
|
||||
_ <- syncFromNewPeer()
|
||||
} yield msg
|
||||
} yield newDmh
|
||||
case removePeers: RemovePeers =>
|
||||
updateDataMessageHandler(newDmh)
|
||||
for {
|
||||
_ <- Future.traverse(removePeers.peers)(removePeer)
|
||||
} yield msg
|
||||
} yield newDmh
|
||||
case _: SyncDataMessageHandlerState | DoneSyncing =>
|
||||
updateDataMessageHandler(newDmh)
|
||||
Future.successful(msg)
|
||||
Future.successful(newDmh)
|
||||
}
|
||||
|
||||
}
|
||||
resultF.map { r =>
|
||||
logger.debug(
|
||||
s"Done processing ${payload.commandName} in peer=${peer}")
|
||||
r
|
||||
}
|
||||
}
|
||||
|
||||
case msg @ HeaderTimeoutWrapper(peer) =>
|
||||
case (dmh, HeaderTimeoutWrapper(peer)) =>
|
||||
logger.debug(s"Processing timeout header for $peer")
|
||||
onHeaderRequestTimeout(peer, getDataMessageHandler.state).map {
|
||||
newDmh =>
|
||||
updateDataMessageHandler(newDmh)
|
||||
logger.debug(s"Done processing timeout header for $peer")
|
||||
msg
|
||||
}
|
||||
|
||||
case d @ DisconnectedPeer(peer, forceReconnect) =>
|
||||
onP2PClientDisconnected(peer, forceReconnect).map(_ => d)
|
||||
case i: Initialized =>
|
||||
onInitialization(i.peer).map(_ => i)
|
||||
case i: InitializationTimeout =>
|
||||
onInitializationTimeout(i.peer).map(_ => i)
|
||||
case q: QueryTimeout =>
|
||||
onQueryTimeout(q.payload, q.peer).map(_ => q)
|
||||
case srt: SendResponseTimeout =>
|
||||
sendResponseTimeout(srt.peer, srt.payload).map(_ => srt)
|
||||
}
|
||||
|
||||
private val dataMessageStreamSink =
|
||||
Sink.foreach[StreamDataMessageWrapper] {
|
||||
case DataMessageWrapper(payload, peer) =>
|
||||
logger.debug(
|
||||
s"Done processing ${payload.commandName} in peer=${peer} state=${getDataMessageHandler.state}")
|
||||
case HeaderTimeoutWrapper(_) =>
|
||||
case DisconnectedPeer(_, _) =>
|
||||
case Initialized(_) =>
|
||||
case InitializationTimeout(_) =>
|
||||
case QueryTimeout(_, _) =>
|
||||
case SendResponseTimeout(_, _) =>
|
||||
case stp: SendToPeer =>
|
||||
logger.debug(
|
||||
s"Done processing ${stp.msg.header.commandName} in peerOpt=${stp.peerOpt}")
|
||||
for {
|
||||
newDmh <- {
|
||||
onHeaderRequestTimeout(peer, dmh).map { newDmh =>
|
||||
logger.debug(s"Done processing timeout header for $peer")
|
||||
newDmh
|
||||
}
|
||||
}
|
||||
} yield newDmh
|
||||
case (dmh, DisconnectedPeer(peer, forceReconnect)) =>
|
||||
onP2PClientDisconnected(peer, forceReconnect, dmh.state)
|
||||
.map(newState => dmh.copy(state = newState))
|
||||
case (dmh, i: Initialized) =>
|
||||
onInitialization(i.peer).map(_ => dmh)
|
||||
case (dmh, i: InitializationTimeout) =>
|
||||
onInitializationTimeout(i.peer).map(_ => dmh)
|
||||
case (dmh, q: QueryTimeout) =>
|
||||
onQueryTimeout(q.payload, q.peer, dmh.state).map(_ => dmh)
|
||||
case (dmh, srt: SendResponseTimeout) =>
|
||||
sendResponseTimeout(srt.peer, srt.payload).map(_ => dmh)
|
||||
}
|
||||
}
|
||||
|
||||
private val decider: Supervision.Decider = { case err: Throwable =>
|
||||
logger.error(s"Error occurred while processing p2p pipeline stream", err)
|
||||
Supervision.Resume
|
||||
}
|
||||
|
||||
private val dataMessageStreamGraph: RunnableGraph[
|
||||
(SourceQueueWithComplete[StreamDataMessageWrapper], Future[Done])] = {
|
||||
dataMessageStreamSource
|
||||
.toMat(dataMessageStreamSink)(Keep.both)
|
||||
private def buildDataMessageStreamGraph(
|
||||
initDmh: DataMessageHandler,
|
||||
source: Source[StreamDataMessageWrapper, NotUsed]): RunnableGraph[
|
||||
Future[DataMessageHandler]] = {
|
||||
val graph = source
|
||||
.toMat(buildDataMessageStreamSink(initDmh))(Keep.right)
|
||||
.withAttributes(ActorAttributes.supervisionStrategy(decider))
|
||||
graph
|
||||
}
|
||||
|
||||
private[bitcoins] var dataMessageQueueOpt: Option[
|
||||
SourceQueueWithComplete[StreamDataMessageWrapper]] = None
|
||||
|
||||
private var streamDoneFOpt: Option[Future[Done]] = None
|
||||
private var streamDoneFOpt: Option[Future[DataMessageHandler]] = None
|
||||
|
||||
override def offer(
|
||||
elem: StreamDataMessageWrapper): Future[QueueOfferResult] = {
|
||||
|
@ -836,66 +853,26 @@ case class PeerManager(
|
|||
}
|
||||
}
|
||||
|
||||
private var dataMessageHandlerOpt: Option[DataMessageHandler] = {
|
||||
None
|
||||
}
|
||||
|
||||
def fetchCompactFilterHeaders(
|
||||
currentDmh: DataMessageHandler): Future[DataMessageHandler] = {
|
||||
val syncPeer = currentDmh.state match {
|
||||
case s: SyncDataMessageHandlerState => s.syncPeer
|
||||
case state @ (DoneSyncing | _: MisbehavingPeer | _: RemovePeers) =>
|
||||
sys.error(
|
||||
s"Cannot fetch compact filter headers when we are in state=$state")
|
||||
}
|
||||
logger.info(
|
||||
s"Now syncing filter headers from $syncPeer in state=${currentDmh.state}")
|
||||
for {
|
||||
newSyncingState <- PeerManager.sendFirstGetCompactFilterHeadersCommand(
|
||||
this,
|
||||
currentDmh.chainApi,
|
||||
syncPeer)
|
||||
} yield {
|
||||
currentDmh.copy(state = newSyncingState)
|
||||
private def switchSyncToPeer(
|
||||
oldSyncState: SyncDataMessageHandlerState,
|
||||
newPeer: Peer): Future[DataMessageHandlerState] = {
|
||||
logger.debug(
|
||||
s"switchSyncToPeer() oldSyncState=$oldSyncState newPeer=$newPeer")
|
||||
val newState = oldSyncState.replaceSyncPeer(newPeer)
|
||||
oldSyncState match {
|
||||
case _: HeaderSync | _: ValidatingHeaders =>
|
||||
syncHelper(Some(newPeer)).map(_ => newState)
|
||||
case _: FilterHeaderSync | _: FilterSync =>
|
||||
filterSyncHelper(ChainHandler.fromDatabase(), Some(newPeer)).map(_ =>
|
||||
newState)
|
||||
}
|
||||
}
|
||||
|
||||
def getDataMessageHandler: DataMessageHandler = {
|
||||
if (dataMessageHandlerOpt.isDefined) {
|
||||
dataMessageHandlerOpt.get
|
||||
} else {
|
||||
DataMessageHandler(
|
||||
chainApi = ChainHandler.fromDatabase(),
|
||||
walletCreationTimeOpt = walletCreationTimeOpt,
|
||||
queue = dataMessageQueueOpt.get,
|
||||
peers,
|
||||
peerMessgeSenderApi = this,
|
||||
peerDataOpt = None,
|
||||
state = DoneSyncing,
|
||||
filterBatchCache = Set.empty
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
def updateDataMessageHandler(
|
||||
dataMessageHandler: DataMessageHandler): PeerManager = {
|
||||
this.dataMessageHandlerOpt = Some(dataMessageHandler.copy(peers = peers))
|
||||
this
|
||||
}
|
||||
|
||||
/** Helper method to sync the blockchain over the network
|
||||
*
|
||||
* @param syncPeerOpt if syncPeer is given, we send [[org.bitcoins.core.p2p.GetHeadersMessage]] to that peer. If None we gossip GetHeadersMessage to all peers
|
||||
*/
|
||||
def syncHelper(syncPeerOpt: Option[Peer]): Future[Unit] = {
|
||||
logger.info(s"Syncing with peerOpt=$syncPeerOpt")
|
||||
val chainApi: ChainApi = ChainHandler.fromDatabase()
|
||||
private def getHeaderSyncHelper(syncPeerOpt: Option[Peer]): Future[Unit] = {
|
||||
val blockchainsF =
|
||||
BlockHeaderDAO()(ec, chainAppConfig).getBlockchains()
|
||||
|
||||
for {
|
||||
header <- chainApi.getBestBlockHeader()
|
||||
bestFilterHeaderOpt <- chainApi.getBestFilterHeader()
|
||||
bestFilterOpt <- chainApi.getBestFilter()
|
||||
blockchains <- blockchainsF
|
||||
// Get all of our cached headers in case of a reorg
|
||||
cachedHeaders = blockchains.flatMap(_.headers).map(_.hashBE)
|
||||
|
@ -906,6 +883,17 @@ case class PeerManager(
|
|||
case None => gossipGetHeadersMessage(cachedHeaders)
|
||||
}
|
||||
}
|
||||
} yield ()
|
||||
}
|
||||
|
||||
private def filterSyncHelper(
|
||||
chainApi: ChainApi,
|
||||
syncPeerOpt: Option[Peer]): Future[Unit] = {
|
||||
for {
|
||||
header <- chainApi.getBestBlockHeader()
|
||||
bestFilterHeaderOpt <- chainApi.getBestFilterHeader()
|
||||
bestFilterOpt <- chainApi.getBestFilter()
|
||||
|
||||
hasStaleTip <- chainApi.isTipStale()
|
||||
_ <- {
|
||||
if (hasStaleTip) {
|
||||
|
@ -913,15 +901,42 @@ case class PeerManager(
|
|||
//after we are done syncing block headers
|
||||
Future.unit
|
||||
} else {
|
||||
syncFilters(bestFilterHeaderOpt = bestFilterHeaderOpt,
|
||||
bestFilterOpt = bestFilterOpt,
|
||||
bestBlockHeader = header,
|
||||
chainApi = chainApi)
|
||||
val syncPeerOptF = syncPeerOpt match {
|
||||
case Some(p) => Future.successful(Some(p))
|
||||
case None =>
|
||||
randomPeerWithService(ServiceIdentifier.NODE_COMPACT_FILTERS)
|
||||
}
|
||||
syncPeerOptF.flatMap {
|
||||
case Some(p) =>
|
||||
syncFilters(bestFilterHeaderOpt = bestFilterHeaderOpt,
|
||||
bestFilterOpt = bestFilterOpt,
|
||||
bestBlockHeader = header,
|
||||
chainApi = chainApi,
|
||||
dmhState = FilterHeaderSync(p))
|
||||
case None =>
|
||||
Future.failed(
|
||||
new RuntimeException(
|
||||
"Could not find peer to sync filters with!"))
|
||||
}
|
||||
}
|
||||
}
|
||||
} yield ()
|
||||
}
|
||||
|
||||
/** Helper method to sync the blockchain over the network
|
||||
*
|
||||
* @param syncPeerOpt if syncPeer is given, we send [[org.bitcoins.core.p2p.GetHeadersMessage]] to that peer. If None we gossip GetHeadersMessage to all peers
|
||||
*/
|
||||
def syncHelper(syncPeerOpt: Option[Peer]): Future[Unit] = {
|
||||
val chainApi: ChainApi = ChainHandler.fromDatabase()
|
||||
val headerF = chainApi.getBestBlockHeader()
|
||||
for {
|
||||
_ <- getHeaderSyncHelper(syncPeerOpt)
|
||||
_ <- filterSyncHelper(chainApi, syncPeerOpt)
|
||||
header <- headerF
|
||||
} yield {
|
||||
logger.info(
|
||||
s"Starting sync node, height=${header.height} hash=${header.hashBE.hex}")
|
||||
s"Starting sync node, height=${header.height} hash=${header.hashBE.hex} peerOpt=$syncPeerOpt")
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -929,47 +944,105 @@ case class PeerManager(
|
|||
bestFilterHeaderOpt: Option[CompactFilterHeaderDb],
|
||||
bestFilterOpt: Option[CompactFilterDb],
|
||||
bestBlockHeader: BlockHeaderDb,
|
||||
chainApi: ChainApi): Future[Unit] = {
|
||||
// If we have started syncing filters headers
|
||||
(bestFilterHeaderOpt, bestFilterOpt) match {
|
||||
case (None, None) | (None, Some(_)) =>
|
||||
//do nothing if we haven't started syncing
|
||||
chainApi: ChainApi,
|
||||
dmhState: SyncDataMessageHandlerState): Future[Unit] = {
|
||||
val isTipStaleF = chainApi.isTipStale()
|
||||
isTipStaleF.flatMap { isTipStale =>
|
||||
if (isTipStale) {
|
||||
logger.error(
|
||||
s"Cannot start syncing filters while blockchain tip is stale")
|
||||
Future.unit
|
||||
case (Some(bestFilterHeader), Some(bestFilter)) =>
|
||||
val isFilterHeaderSynced =
|
||||
bestFilterHeader.blockHashBE == bestBlockHeader.hashBE
|
||||
val isFiltersSynced = {
|
||||
//check if we have started syncing filters,
|
||||
//and if so, see if filter headers and filters
|
||||
//were in sync
|
||||
bestFilter.hashBE == bestFilterHeader.filterHashBE
|
||||
} else {
|
||||
logger.debug(
|
||||
s"syncFilters() bestBlockHeader=$bestBlockHeader bestFilterHeaderOpt=$bestFilterHeaderOpt bestFilterOpt=$bestFilterOpt state=$dmhState")
|
||||
// If we have started syncing filters headers
|
||||
(bestFilterHeaderOpt, bestFilterOpt) match {
|
||||
case (None, None) | (None, Some(_)) =>
|
||||
dmhState match {
|
||||
case fhs: FilterHeaderSync =>
|
||||
PeerManager
|
||||
.sendFirstGetCompactFilterHeadersCommand(
|
||||
peerMessageSenderApi = this,
|
||||
chainApi = chainApi,
|
||||
peer = fhs.syncPeer)
|
||||
.map(_ => ())
|
||||
case x @ (_: FilterSync | _: HeaderSync | _: ValidatingHeaders) =>
|
||||
val exn = new RuntimeException(
|
||||
s"Invalid state to start syncing filter headers with, got=$x")
|
||||
Future.failed(exn)
|
||||
}
|
||||
|
||||
case (Some(bestFilterHeader), Some(bestFilter)) =>
|
||||
val isFilterHeaderSynced =
|
||||
bestFilterHeader.blockHashBE == bestBlockHeader.hashBE
|
||||
val isFiltersSynced = {
|
||||
//check if we have started syncing filters,
|
||||
//and if so, see if filter headers and filters
|
||||
//were in sync
|
||||
bestFilter.hashBE == bestFilterHeader.filterHashBE
|
||||
}
|
||||
if (isFilterHeaderSynced && isFiltersSynced) {
|
||||
//means we are in sync, with filter heads & block headers & filters
|
||||
//if there _both_ filter headers and block headers are on
|
||||
//an old tip, our event driven node will start syncing
|
||||
//filters after block headers are in sync
|
||||
//do nothing
|
||||
Future.unit
|
||||
} else {
|
||||
syncCompactFilters(bestFilterHeader = bestFilterHeader,
|
||||
chainApi = chainApi,
|
||||
bestFilterOpt = Some(bestFilter),
|
||||
dmhState = dmhState)
|
||||
}
|
||||
case (Some(bestFilterHeader), None) =>
|
||||
syncCompactFilters(bestFilterHeader = bestFilterHeader,
|
||||
chainApi = chainApi,
|
||||
bestFilterOpt = None,
|
||||
dmhState = dmhState)
|
||||
}
|
||||
if (isFilterHeaderSynced && isFiltersSynced) {
|
||||
//means we are in sync, with filter heads & block headers & filters
|
||||
//if there _both_ filter headers and block headers are on
|
||||
//an old tip, our event driven node will start syncing
|
||||
//filters after block headers are in sync
|
||||
//do nothing
|
||||
Future.unit
|
||||
} else {
|
||||
syncCompactFilters(bestFilterHeader = bestFilterHeader,
|
||||
chainApi = chainApi,
|
||||
bestFilterOpt = Some(bestFilter))
|
||||
}
|
||||
case (Some(bestFilterHeader), None) =>
|
||||
syncCompactFilters(bestFilterHeader = bestFilterHeader,
|
||||
chainApi = chainApi,
|
||||
bestFilterOpt = None)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def syncFromNewPeer(): Future[Option[Peer]] = {
|
||||
private def syncFromNewPeer(): Future[Option[Peer]] = {
|
||||
for {
|
||||
syncPeerOpt <- randomPeerWithService(
|
||||
ServiceIdentifier.NODE_COMPACT_FILTERS)
|
||||
_ <- syncHelper(syncPeerOpt)
|
||||
} yield syncPeerOpt
|
||||
}
|
||||
|
||||
/** Handles the case where we need to send a message to a peer, but that peer was disconnected
|
||||
* We change the peer and the adjust the state in [[DataMessageHandler]]
|
||||
* @param sendToPeer the peer we were originally sending the message to
|
||||
* @param peerMessageSender the new peer we are going to send the message to
|
||||
* @param dmh the data message handler we need to adjust state of
|
||||
*/
|
||||
private def handleDisconnectedPeer(
|
||||
sendToPeer: SendToPeer,
|
||||
peerMessageSender: PeerMessageSender,
|
||||
dmh: DataMessageHandler): DataMessageHandler = {
|
||||
val destination = peerMessageSender.client.peer
|
||||
val newState: DataMessageHandlerState = sendToPeer.peerOpt match {
|
||||
case Some(originalPeer) =>
|
||||
if (originalPeer != destination) {
|
||||
//need to replace syncPeer with newSyncPeer
|
||||
dmh.state match {
|
||||
case s: SyncDataMessageHandlerState =>
|
||||
s.replaceSyncPeer(destination)
|
||||
case m: MisbehavingPeer => m
|
||||
case r: RemovePeers => r
|
||||
case DoneSyncing => DoneSyncing
|
||||
}
|
||||
} else {
|
||||
dmh.state
|
||||
}
|
||||
case None =>
|
||||
dmh.state
|
||||
}
|
||||
|
||||
dmh.copy(state = newState)
|
||||
}
|
||||
}
|
||||
|
||||
case class ResponseTimeout(payload: NetworkPayload)
|
||||
|
@ -1022,7 +1095,7 @@ object PeerManager extends Logging {
|
|||
res <- filterSyncMarkerOpt match {
|
||||
case Some(filterSyncMarker) =>
|
||||
logger.info(
|
||||
s"Requesting next compact filter headers from $filterSyncMarker")
|
||||
s"Requesting next compact filter headers from $filterSyncMarker with peer=$peer")
|
||||
peerMessageSenderApi
|
||||
.sendGetCompactFilterHeadersMessage(filterSyncMarker, Some(peer))
|
||||
.map(_ => true)
|
||||
|
@ -1045,7 +1118,8 @@ object PeerManager extends Logging {
|
|||
chainApi.nextFilterHeaderBatchRange(startHeight, filterBatchSize)
|
||||
res <- filterSyncMarkerOpt match {
|
||||
case Some(filterSyncMarker) =>
|
||||
logger.info(s"Requesting compact filters from $filterSyncMarker")
|
||||
logger.info(
|
||||
s"Requesting compact filters from $filterSyncMarker with peer=$peer")
|
||||
|
||||
peerMessageSenderApi
|
||||
.sendGetCompactFiltersMessage(filterSyncMarker, peer)
|
||||
|
|
|
@ -542,7 +542,8 @@ case class P2PClientActor(
|
|||
s"Attempting to disconnect peer that was not connected!")
|
||||
}
|
||||
case P2PClient.CloseAnyStateCommand =>
|
||||
logger.info(s"Received close any state for $peer")
|
||||
logger.info(
|
||||
s"Received close any state for peer=$peer peerConnectionOpt=$peerConnectionOpt currentPeerMsgRecvState=$currentPeerMsgRecvState")
|
||||
peerConnectionOpt match {
|
||||
case Some(peerConnection) =>
|
||||
context become ignoreNetworkMessages(Some(peerConnection),
|
||||
|
@ -552,8 +553,11 @@ case class P2PClientActor(
|
|||
peerConnection ! Tcp.Close
|
||||
case None =>
|
||||
context become ignoreNetworkMessages(None, ByteVector.empty)
|
||||
currentPeerMsgRecvState =
|
||||
currentPeerMsgRecvState.stopReconnect(peer)
|
||||
currentPeerMsgRecvState = Await.result(
|
||||
currentPeerMsgRecvState.stopReconnect(
|
||||
peer,
|
||||
peerMsgHandlerReceiver.queue),
|
||||
timeout)
|
||||
context.stop(self)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -736,7 +736,8 @@ case class DataMessageHandler(
|
|||
newDmh: DataMessageHandler,
|
||||
headers: Vector[BlockHeader],
|
||||
peer: Peer): Future[DataMessageHandler] = {
|
||||
logger.debug(s"getHeaders() newDmh.state=${newDmh.state} peer=$peer")
|
||||
logger.debug(
|
||||
s"getHeaders() newDmh.state=${newDmh.state} peer=$peer peers=$peer")
|
||||
val state = newDmh.state
|
||||
val count = headers.length
|
||||
val getHeadersF: Future[DataMessageHandler] = {
|
||||
|
|
|
@ -1,6 +1,12 @@
|
|||
package org.bitcoins.node.networking.peer
|
||||
|
||||
import org.bitcoins.node.models.Peer
|
||||
import org.bitcoins.node.networking.peer.DataMessageHandlerState.{
|
||||
FilterHeaderSync,
|
||||
FilterSync,
|
||||
HeaderSync,
|
||||
ValidatingHeaders
|
||||
}
|
||||
|
||||
sealed abstract class DataMessageHandlerState {
|
||||
def isSyncing: Boolean
|
||||
|
@ -13,6 +19,15 @@ sealed abstract class SyncDataMessageHandlerState
|
|||
override def isSyncing: Boolean = true
|
||||
|
||||
def syncPeer: Peer
|
||||
|
||||
def replaceSyncPeer(newSyncPeer: Peer): SyncDataMessageHandlerState = {
|
||||
this match {
|
||||
case h: HeaderSync => h.copy(syncPeer = newSyncPeer)
|
||||
case fh: FilterHeaderSync => fh.copy(syncPeer = newSyncPeer)
|
||||
case fs: FilterSync => fs.copy(syncPeer = newSyncPeer)
|
||||
case vh: ValidatingHeaders => vh.copy(syncPeer = newSyncPeer)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
object DataMessageHandlerState {
|
||||
|
|
|
@ -320,7 +320,10 @@ sealed abstract class PeerMessageReceiverState extends Logging {
|
|||
}
|
||||
}
|
||||
|
||||
def stopReconnect(peer: Peer): PeerMessageReceiverState = {
|
||||
def stopReconnect(
|
||||
peer: Peer,
|
||||
queue: SourceQueueWithComplete[StreamDataMessageWrapper])(implicit
|
||||
ec: ExecutionContext): Future[PeerMessageReceiverState] = {
|
||||
this match {
|
||||
case Preconnection =>
|
||||
//when retry, state should be back to preconnection
|
||||
|
@ -328,15 +331,17 @@ sealed abstract class PeerMessageReceiverState extends Logging {
|
|||
clientDisconnectP,
|
||||
versionMsgP,
|
||||
verackMsgP)
|
||||
newState
|
||||
val disconnectedPeer = DisconnectedPeer(peer, false)
|
||||
queue.offer(disconnectedPeer).map(_ => newState)
|
||||
case _: StoppedReconnect =>
|
||||
logger.warn(
|
||||
s"Already stopping reconnect from peer=$peer, this is a noop")
|
||||
this
|
||||
Future.successful(this)
|
||||
case bad @ (_: Initializing | _: Normal | _: InitializedDisconnect |
|
||||
_: InitializedDisconnectDone | _: Disconnected | _: Waiting) =>
|
||||
throw new RuntimeException(
|
||||
val exn = new RuntimeException(
|
||||
s"Cannot stop reconnect from peer=$peer when in state=$bad")
|
||||
Future.failed(exn)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue