mirror of
https://github.com/bitcoin-s/bitcoin-s.git
synced 2025-03-26 21:42:48 +01:00
2023 05 29 peermanager peerfinder refactor (#5086)
* WIP * Reset PeerManager.dataMessageHandler to None on PeerManager.stop() * WIP2 * Always set dataMessageHandler.peerData we process a new message in the stream * remove PeerManager from PeerDatta * Make PeerFinder mutable inside of PeerManager, pass the queue into PeerFinder to decouple PeerManager and PeerFinder * Don't verify actor system shutdown for now
This commit is contained in:
parent
6c38a791d7
commit
61e142a631
12 changed files with 441 additions and 318 deletions
|
@ -8,7 +8,8 @@ import org.bitcoins.node.models.Peer
|
|||
import org.bitcoins.node.networking.P2PClient.ExpectResponseCommand
|
||||
import org.bitcoins.node.networking.peer.DataMessageHandlerState.{
|
||||
DoneSyncing,
|
||||
MisbehavingPeer
|
||||
MisbehavingPeer,
|
||||
RemovePeers
|
||||
}
|
||||
import org.bitcoins.node.networking.peer.{
|
||||
DataMessageHandlerState,
|
||||
|
@ -99,7 +100,7 @@ class NeutrinoNodeWithUncachedBitcoindTest extends NodeUnitTest with CachedTor {
|
|||
//old peer we were syncing with that just disconnected us
|
||||
oldSyncPeer = node.peerManager.getDataMessageHandler.state match {
|
||||
case state: SyncDataMessageHandlerState => state.syncPeer
|
||||
case DoneSyncing | _: MisbehavingPeer =>
|
||||
case DoneSyncing | _: MisbehavingPeer | _: RemovePeers =>
|
||||
sys.error(s"Cannot be in DOneSyncing state while awaiting sync")
|
||||
}
|
||||
_ <- NodeTestUtil.awaitAllSync(node, bitcoinds(1))
|
||||
|
|
|
@ -2,6 +2,7 @@ package org.bitcoins.node.networking
|
|||
|
||||
import akka.testkit.{TestActorRef, TestProbe}
|
||||
import org.bitcoins.chain.config.ChainAppConfig
|
||||
import org.bitcoins.node.Node
|
||||
import org.bitcoins.node.config.NodeAppConfig
|
||||
import org.bitcoins.node.models.Peer
|
||||
import org.bitcoins.node.networking.P2PClient.ConnectCommand
|
||||
|
@ -60,22 +61,34 @@ class P2PClientActorTest
|
|||
implicit val nodeConf = tuple._1.nodeConf
|
||||
for {
|
||||
peer <- bitcoindPeerF
|
||||
client <- buildP2PClient(peer)
|
||||
node <- NodeUnitTest.buildNode(peer = peer, walletCreationTimeOpt = None)
|
||||
client = buildP2PClient(peer, node)
|
||||
res <- connectAndDisconnect(client)
|
||||
_ <- node.stop()
|
||||
} yield res
|
||||
}
|
||||
|
||||
it must "connect to two nodes" in { tuple =>
|
||||
val try1 = for {
|
||||
peer <- bitcoindPeerF
|
||||
client <- buildP2PClient(peer)(tuple._1.chainConf, tuple._1.nodeConf)
|
||||
node <- NodeUnitTest.buildNode(peer = peer, walletCreationTimeOpt = None)(
|
||||
tuple._1.chainConf,
|
||||
tuple._1.nodeConf,
|
||||
system)
|
||||
client = buildP2PClient(peer, node)(tuple._1.chainConf, tuple._1.nodeConf)
|
||||
res <- connectAndDisconnect(client)
|
||||
_ <- node.stop()
|
||||
} yield res
|
||||
|
||||
val try2 = for {
|
||||
peer <- bitcoindPeer2F
|
||||
client <- buildP2PClient(peer)(tuple._2.chainConf, tuple._2.nodeConf)
|
||||
node <- NodeUnitTest.buildNode(peer = peer, walletCreationTimeOpt = None)(
|
||||
tuple._2.chainConf,
|
||||
tuple._2.nodeConf,
|
||||
system)
|
||||
client = buildP2PClient(peer, node)(tuple._2.chainConf, tuple._2.nodeConf)
|
||||
res <- connectAndDisconnect(client)
|
||||
_ <- node.stop()
|
||||
} yield res
|
||||
|
||||
try1.flatMap { _ =>
|
||||
|
@ -84,9 +97,12 @@ class P2PClientActorTest
|
|||
}
|
||||
|
||||
it must "close actor on disconnect" in { tuple =>
|
||||
implicit val chainConf = tuple._1.chainConf
|
||||
implicit val nodeConf = tuple._1.nodeConf
|
||||
for {
|
||||
peer <- bitcoindPeerF
|
||||
client <- buildP2PClient(peer)(tuple._1.chainConf, tuple._1.nodeConf)
|
||||
node <- NodeUnitTest.buildNode(peer = peer, walletCreationTimeOpt = None)
|
||||
client = buildP2PClient(peer, node)(tuple._1.chainConf, tuple._1.nodeConf)
|
||||
_ = probe.watch(client.actor)
|
||||
_ <- connectAndDisconnect(client)
|
||||
term = probe.expectTerminated(client.actor)
|
||||
|
@ -95,39 +111,32 @@ class P2PClientActorTest
|
|||
}
|
||||
}
|
||||
|
||||
def buildP2PClient(peer: Peer)(implicit
|
||||
def buildP2PClient(peer: Peer, node: Node)(implicit
|
||||
chainAppConfig: ChainAppConfig,
|
||||
nodeAppConfig: NodeAppConfig): Future[P2PClient] = {
|
||||
val peerMessageReceiverF =
|
||||
for {
|
||||
node <- NodeUnitTest.buildNode(peer, None)
|
||||
//piggy back off of node infra to setup p2p clients, but don't actually use
|
||||
//the node itself so stop it here an clean up resources allocated by it
|
||||
_ <- node.stop()
|
||||
controlMessageHandler = ControlMessageHandler(node.peerManager)
|
||||
} yield PeerMessageReceiver(controlMessageHandler = controlMessageHandler,
|
||||
dataMessageHandler =
|
||||
node.peerManager.getDataMessageHandler,
|
||||
peer = peer)
|
||||
nodeAppConfig: NodeAppConfig): P2PClient = {
|
||||
|
||||
val clientActorF: Future[TestActorRef[P2PClientActor]] =
|
||||
peerMessageReceiverF.map { peerMsgRecv =>
|
||||
TestActorRef(
|
||||
P2PClient.props(
|
||||
peer = peer,
|
||||
peerMsgHandlerReceiver = peerMsgRecv,
|
||||
peerMsgRecvState = PeerMessageReceiverState.fresh(),
|
||||
p2pClientCallbacks = P2PClientCallbacks.empty,
|
||||
maxReconnectionTries = 16
|
||||
),
|
||||
probe.ref
|
||||
)
|
||||
}
|
||||
val p2pClientF: Future[P2PClient] = clientActorF.map {
|
||||
client: TestActorRef[P2PClientActor] =>
|
||||
P2PClient(client, peer)
|
||||
}
|
||||
p2pClientF
|
||||
//piggy back off of node infra to setup p2p clients, but don't actually use
|
||||
//the node itself so stop it here an clean up resources allocated by it
|
||||
val controlMessageHandler = ControlMessageHandler(node.peerManager)
|
||||
val peerMsgRecv = PeerMessageReceiver(
|
||||
controlMessageHandler = controlMessageHandler,
|
||||
queue = node.peerManager.dataMessageQueueOpt.get,
|
||||
peer = peer)
|
||||
|
||||
val client: TestActorRef[P2PClientActor] =
|
||||
TestActorRef(
|
||||
P2PClient.props(
|
||||
peer = peer,
|
||||
peerMsgHandlerReceiver = peerMsgRecv,
|
||||
peerMsgRecvState = PeerMessageReceiverState.fresh(),
|
||||
P2PClientCallbacks.empty,
|
||||
maxReconnectionTries = 16
|
||||
),
|
||||
probe.ref
|
||||
)
|
||||
|
||||
val p2pClient: P2PClient = P2PClient(client, peer)
|
||||
p2pClient
|
||||
}
|
||||
|
||||
/** Helper method to connect to the
|
||||
|
|
|
@ -46,12 +46,17 @@ class DataMessageHandlerTest extends NodeTestWithCachedBitcoindNewest {
|
|||
val peer = node.peerManager.peers.head
|
||||
|
||||
val senderF = node.peerMsgSendersF.map(_.head)
|
||||
val peerManager = node.peerManager
|
||||
for {
|
||||
chainApi <- node.chainApiFromDb()
|
||||
_ = require(peerManager.getPeerData(peer).isDefined)
|
||||
dataMessageHandler = DataMessageHandler(
|
||||
chainApi = chainApi,
|
||||
walletCreationTimeOpt = None,
|
||||
peerManager = node.peerManager,
|
||||
queue = peerManager.dataMessageQueueOpt.get,
|
||||
peers = peerManager.peers,
|
||||
peerMessgeSenderApi = peerManager,
|
||||
peerDataOpt = peerManager.getPeerData(peer),
|
||||
state = HeaderSync(peer),
|
||||
filterBatchCache = Set.empty
|
||||
)(node.executionContext, node.nodeAppConfig, node.chainConfig)
|
||||
|
@ -85,6 +90,7 @@ class DataMessageHandlerTest extends NodeTestWithCachedBitcoindNewest {
|
|||
}
|
||||
}
|
||||
val senderF = node.peerMsgSendersF.map(_.head)
|
||||
val peerManager = node.peerManager
|
||||
|
||||
for {
|
||||
hash <- bitcoind.generateToAddress(blocks = 1, junkAddress).map(_.head)
|
||||
|
@ -95,14 +101,16 @@ class DataMessageHandlerTest extends NodeTestWithCachedBitcoindNewest {
|
|||
nodeCallbacks = NodeCallbacks.onBlockReceived(callback)
|
||||
_ = node.nodeAppConfig.addCallbacks(nodeCallbacks)
|
||||
chainApi <- node.chainApiFromDb()
|
||||
dataMessageHandler =
|
||||
DataMessageHandler(
|
||||
chainApi = chainApi,
|
||||
walletCreationTimeOpt = None,
|
||||
peerManager = node.peerManager,
|
||||
state = HeaderSync(peer),
|
||||
filterBatchCache = Set.empty
|
||||
)(node.executionContext, node.nodeAppConfig, node.chainConfig)
|
||||
dataMessageHandler = DataMessageHandler(
|
||||
chainApi = chainApi,
|
||||
walletCreationTimeOpt = None,
|
||||
peerManager.dataMessageQueueOpt.get,
|
||||
peerManager.peers,
|
||||
peerManager,
|
||||
peerManager.getPeerData(peer),
|
||||
state = HeaderSync(peer),
|
||||
filterBatchCache = Set.empty
|
||||
)(node.executionContext, node.nodeAppConfig, node.chainConfig)
|
||||
sender <- senderF
|
||||
_ <- dataMessageHandler.handleDataPayload(payload, sender, peer)
|
||||
result <- resultP.future
|
||||
|
@ -127,6 +135,7 @@ class DataMessageHandlerTest extends NodeTestWithCachedBitcoindNewest {
|
|||
}
|
||||
|
||||
val senderF = node.peerMsgSendersF.map(_.head)
|
||||
val peerManager = node.peerManager
|
||||
for {
|
||||
hash <- bitcoind.generateToAddress(blocks = 1, junkAddress).map(_.head)
|
||||
header <- bitcoind.getBlockHeaderRaw(hash)
|
||||
|
@ -137,15 +146,16 @@ class DataMessageHandlerTest extends NodeTestWithCachedBitcoindNewest {
|
|||
|
||||
_ = node.nodeAppConfig.addCallbacks(callbacks)
|
||||
chainApi <- node.chainApiFromDb()
|
||||
dataMessageHandler =
|
||||
DataMessageHandler(chainApi = chainApi,
|
||||
walletCreationTimeOpt = None,
|
||||
peerManager = node.peerManager,
|
||||
state = HeaderSync(peer),
|
||||
filterBatchCache = Set.empty)(
|
||||
node.executionContext,
|
||||
node.nodeAppConfig,
|
||||
node.chainConfig)
|
||||
dataMessageHandler = DataMessageHandler(
|
||||
chainApi = chainApi,
|
||||
walletCreationTimeOpt = None,
|
||||
peerManager.dataMessageQueueOpt.get,
|
||||
peerManager.peers,
|
||||
peerManager,
|
||||
peerManager.getPeerData(peer),
|
||||
state = HeaderSync(peer),
|
||||
filterBatchCache = Set.empty
|
||||
)(node.executionContext, node.nodeAppConfig, node.chainConfig)
|
||||
sender <- senderF
|
||||
_ <- dataMessageHandler.handleDataPayload(payload, sender, peer)
|
||||
result <- resultP.future
|
||||
|
@ -191,7 +201,7 @@ class DataMessageHandlerTest extends NodeTestWithCachedBitcoindNewest {
|
|||
}
|
||||
}
|
||||
val senderF = node.peerMsgSendersF.map(_.head)
|
||||
|
||||
val peerManager = node.peerManager
|
||||
for {
|
||||
|
||||
txId <- bitcoind.sendToAddress(junkAddress, 1.bitcoin)
|
||||
|
@ -202,14 +212,16 @@ class DataMessageHandlerTest extends NodeTestWithCachedBitcoindNewest {
|
|||
nodeCallbacks = NodeCallbacks.onTxReceived(callback)
|
||||
_ = node.nodeAppConfig.addCallbacks(nodeCallbacks)
|
||||
chainApi <- node.chainApiFromDb()
|
||||
dataMessageHandler =
|
||||
DataMessageHandler(
|
||||
chainApi = chainApi,
|
||||
walletCreationTimeOpt = None,
|
||||
peerManager = node.peerManager,
|
||||
state = HeaderSync(peer),
|
||||
filterBatchCache = Set.empty
|
||||
)(node.executionContext, node.nodeAppConfig, node.chainConfig)
|
||||
dataMessageHandler = DataMessageHandler(
|
||||
chainApi = chainApi,
|
||||
walletCreationTimeOpt = None,
|
||||
peerManager.dataMessageQueueOpt.get,
|
||||
peerManager.peers,
|
||||
peerManager,
|
||||
peerManager.getPeerData(peer),
|
||||
state = HeaderSync(peer),
|
||||
filterBatchCache = Set.empty
|
||||
)(node.executionContext, node.nodeAppConfig, node.chainConfig)
|
||||
sender <- senderF
|
||||
_ <- dataMessageHandler.handleDataPayload(payload, sender, peer)
|
||||
result <- resultP.future
|
||||
|
|
|
@ -18,7 +18,8 @@ import org.bitcoins.node.config.NodeAppConfig
|
|||
import org.bitcoins.node.models._
|
||||
import org.bitcoins.node.networking.peer.DataMessageHandlerState.{
|
||||
DoneSyncing,
|
||||
MisbehavingPeer
|
||||
MisbehavingPeer,
|
||||
RemovePeers
|
||||
}
|
||||
import org.bitcoins.node.networking.peer.{
|
||||
PeerMessageSender,
|
||||
|
@ -185,8 +186,8 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
|
|||
blockHashes: Vector[DoubleSha256Digest]): Future[Unit] = {
|
||||
if (isIBD) {
|
||||
val syncPeerOpt = peerManager.getDataMessageHandler.state match {
|
||||
case state: SyncDataMessageHandlerState => Some(state.syncPeer)
|
||||
case DoneSyncing | _: MisbehavingPeer => None
|
||||
case state: SyncDataMessageHandlerState => Some(state.syncPeer)
|
||||
case DoneSyncing | _: MisbehavingPeer | _: RemovePeers => None
|
||||
}
|
||||
syncPeerOpt match {
|
||||
case Some(peer) =>
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package org.bitcoins.node
|
||||
|
||||
import akka.actor.{ActorRef, ActorSystem}
|
||||
import akka.stream.scaladsl.SourceQueueWithComplete
|
||||
import org.bitcoins.chain.config.ChainAppConfig
|
||||
import org.bitcoins.core.p2p.ServiceIdentifier
|
||||
import org.bitcoins.node.config.NodeAppConfig
|
||||
|
@ -10,7 +11,8 @@ import org.bitcoins.node.networking.peer.{
|
|||
ControlMessageHandler,
|
||||
PeerMessageReceiver,
|
||||
PeerMessageReceiverState,
|
||||
PeerMessageSender
|
||||
PeerMessageSender,
|
||||
StreamDataMessageWrapper
|
||||
}
|
||||
|
||||
import scala.concurrent.Future
|
||||
|
@ -21,7 +23,7 @@ import scala.concurrent.duration.DurationInt
|
|||
case class PeerData(
|
||||
peer: Peer,
|
||||
controlMessageHandler: ControlMessageHandler,
|
||||
peerManager: PeerManager,
|
||||
queue: SourceQueueWithComplete[StreamDataMessageWrapper],
|
||||
p2pClientCallbacks: P2PClientCallbacks,
|
||||
supervisor: ActorRef
|
||||
)(implicit
|
||||
|
@ -36,9 +38,7 @@ case class PeerData(
|
|||
|
||||
private lazy val client: Future[P2PClient] = {
|
||||
val peerMessageReceiver =
|
||||
PeerMessageReceiver(controlMessageHandler,
|
||||
peerManager.getDataMessageHandler,
|
||||
peer)
|
||||
PeerMessageReceiver(controlMessageHandler, queue, peer)
|
||||
P2PClient(
|
||||
peer = peer,
|
||||
peerMessageReceiver = peerMessageReceiver,
|
||||
|
@ -70,6 +70,8 @@ case class PeerData(
|
|||
_invalidMessagesCount += 1
|
||||
}
|
||||
|
||||
def getInvalidMessageCount = _invalidMessagesCount
|
||||
|
||||
private var lastTimedOut: Long = 0
|
||||
|
||||
def updateLastFailureTime(): Unit = {
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package org.bitcoins.node
|
||||
|
||||
import akka.actor.{ActorRef, ActorSystem, Cancellable}
|
||||
import akka.stream.scaladsl.SourceQueueWithComplete
|
||||
import org.bitcoins.asyncutil.AsyncUtil
|
||||
import org.bitcoins.chain.config.ChainAppConfig
|
||||
import org.bitcoins.core.p2p.ServiceIdentifier
|
||||
|
@ -8,7 +9,10 @@ import org.bitcoins.core.util.{NetworkUtil, StartStopAsync}
|
|||
import org.bitcoins.node.config.NodeAppConfig
|
||||
import org.bitcoins.node.models.{Peer, PeerDAO, PeerDb}
|
||||
import org.bitcoins.node.networking.P2PClientCallbacks
|
||||
import org.bitcoins.node.networking.peer.ControlMessageHandler
|
||||
import org.bitcoins.node.networking.peer.{
|
||||
ControlMessageHandler,
|
||||
StreamDataMessageWrapper
|
||||
}
|
||||
|
||||
import java.net.{InetAddress, UnknownHostException}
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
|
@ -21,7 +25,7 @@ import scala.util.{Failure, Random, Success}
|
|||
case class PeerFinder(
|
||||
paramPeers: Vector[Peer],
|
||||
controlMessageHandler: ControlMessageHandler,
|
||||
peerManager: PeerManager,
|
||||
queue: SourceQueueWithComplete[StreamDataMessageWrapper],
|
||||
p2pClientCallbacks: P2PClientCallbacks,
|
||||
skipPeers: () => Vector[Peer],
|
||||
supervisor: ActorRef)(implicit
|
||||
|
@ -218,20 +222,22 @@ case class PeerFinder(
|
|||
_peerData.put(peer,
|
||||
PeerData(peer,
|
||||
controlMessageHandler,
|
||||
peerManager,
|
||||
queue,
|
||||
p2pClientCallbacks,
|
||||
supervisor))
|
||||
_peerData(peer).peerMessageSender.map(_.connect())
|
||||
|
||||
}
|
||||
|
||||
private def tryToReconnectPeer(peer: Peer): Future[Unit] = {
|
||||
_peerData.put(peer,
|
||||
PeerData(peer,
|
||||
controlMessageHandler,
|
||||
peerManager,
|
||||
queue,
|
||||
p2pClientCallbacks,
|
||||
supervisor))
|
||||
_peerData(peer).peerMessageSender.map(_.reconnect())
|
||||
|
||||
}
|
||||
|
||||
def removePeer(peer: Peer): Unit = {
|
||||
|
|
|
@ -72,32 +72,38 @@ case class PeerManager(
|
|||
sendResponseTimeout = sendResponseTimeout
|
||||
)
|
||||
|
||||
private val finder: PeerFinder =
|
||||
PeerFinder(
|
||||
paramPeers = paramPeers,
|
||||
controlMessageHandler = ControlMessageHandler(this),
|
||||
peerManager = this,
|
||||
p2pClientCallbacks = p2pClientCallbacks,
|
||||
skipPeers = () => peers,
|
||||
supervisor = supervisor
|
||||
)
|
||||
private var finderOpt: Option[PeerFinder] = {
|
||||
None
|
||||
}
|
||||
|
||||
def addPeerToTry(peers: Vector[Peer], priority: Int = 0): Unit = {
|
||||
finder.addToTry(peers, priority)
|
||||
finderOpt match {
|
||||
case Some(finder) => finder.addToTry(peers, priority)
|
||||
case None =>
|
||||
sys.error(
|
||||
s"Cannot addPeerToTry, finder not started. Call PeerManager.start()")
|
||||
}
|
||||
}
|
||||
|
||||
def connectedPeerCount: Int = _peerDataMap.size
|
||||
|
||||
def addPeer(peer: Peer): Future[Unit] = {
|
||||
require(finder.hasPeer(peer), s"Unknown $peer marked as usable")
|
||||
val curPeerData = finder.popFromCache(peer).get
|
||||
_peerDataMap.put(peer, curPeerData)
|
||||
val hasCf =
|
||||
if (curPeerData.serviceIdentifier.nodeCompactFilters) "with filters"
|
||||
else ""
|
||||
logger.info(
|
||||
s"Connected to peer $peer $hasCf. Connected peer count $connectedPeerCount")
|
||||
Future.unit
|
||||
private def addPeer(peer: Peer): Future[Unit] = {
|
||||
finderOpt match {
|
||||
case Some(finder) =>
|
||||
require(finder.hasPeer(peer), s"Unknown $peer marked as usable")
|
||||
val curPeerData = finder.popFromCache(peer).get
|
||||
_peerDataMap.put(peer, curPeerData)
|
||||
val hasCf =
|
||||
if (curPeerData.serviceIdentifier.nodeCompactFilters) "with filters"
|
||||
else ""
|
||||
logger.info(
|
||||
s"Connected to peer $peer $hasCf. Connected peer count $connectedPeerCount")
|
||||
Future.unit
|
||||
case None =>
|
||||
sys.error(
|
||||
s"Cannot addPeer, finder not started. Call PeerManager.start()")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
def peers: Vector[Peer] = _peerDataMap.keys.toVector
|
||||
|
@ -189,7 +195,7 @@ case class PeerManager(
|
|||
val peerMsgSender =
|
||||
peerDataMap(syncState.syncPeer).peerMessageSender
|
||||
Some(peerMsgSender)
|
||||
case DoneSyncing | _: MisbehavingPeer => None
|
||||
case DoneSyncing | _: MisbehavingPeer | _: RemovePeers => None
|
||||
}
|
||||
}
|
||||
val sendCompactFilterHeaderMsgF = syncPeerMsgSenderOptF match {
|
||||
|
@ -364,6 +370,15 @@ case class PeerManager(
|
|||
val (queue, doneF) = dataMessageStreamGraph.run()
|
||||
dataMessageQueueOpt = Some(queue)
|
||||
streamDoneFOpt = Some(doneF)
|
||||
val finder = PeerFinder(
|
||||
paramPeers = paramPeers,
|
||||
controlMessageHandler = ControlMessageHandler(this),
|
||||
queue = queue,
|
||||
p2pClientCallbacks = p2pClientCallbacks,
|
||||
skipPeers = () => peers,
|
||||
supervisor = supervisor
|
||||
)
|
||||
finderOpt = Some(finder)
|
||||
finder.start().map { _ =>
|
||||
logger.info("Done starting PeerManager")
|
||||
this
|
||||
|
@ -378,31 +393,36 @@ case class PeerManager(
|
|||
logger.info(s"Stopping PeerManager")
|
||||
val beganAt = System.currentTimeMillis()
|
||||
|
||||
val _ = dataMessageQueueOpt.map(_.complete())
|
||||
val watchQueueCompleteF = watchCompletion()
|
||||
val finderStopF = finder.stop()
|
||||
val finderStopF = finderOpt match {
|
||||
case Some(finder) => finder.stop()
|
||||
case None => Future.unit
|
||||
}
|
||||
|
||||
peerServicesQueries.foreach(_.cancel()) //reset the peerServicesQueries var?
|
||||
|
||||
val stopF = for {
|
||||
_ <- finderStopF
|
||||
_ <- watchQueueCompleteF
|
||||
_ = dataMessageQueueOpt.map(_.complete())
|
||||
_ <- {
|
||||
val finishedF = streamDoneFOpt match {
|
||||
case Some(f) => f
|
||||
case None => Future.successful(Done)
|
||||
}
|
||||
streamDoneFOpt = None
|
||||
finishedF
|
||||
}
|
||||
_ = {
|
||||
dataMessageQueueOpt = None //reset dataMessageQueue var
|
||||
}
|
||||
_ <- Future.traverse(peers)(removePeer)
|
||||
_ <- AsyncUtil.retryUntilSatisfied(
|
||||
_peerDataMap.isEmpty && waitingForDeletion.isEmpty,
|
||||
interval = 1.seconds,
|
||||
maxTries = 30)
|
||||
_ <- watchCompletion()
|
||||
_ = {
|
||||
//reset all variables
|
||||
dataMessageQueueOpt = None
|
||||
dataMessageHandlerOpt = None
|
||||
streamDoneFOpt = None
|
||||
finderOpt = None
|
||||
}
|
||||
} yield {
|
||||
logger.info(
|
||||
s"Stopped PeerManager. Took ${System.currentTimeMillis() - beganAt} ms ")
|
||||
|
@ -435,145 +455,177 @@ case class PeerManager(
|
|||
}
|
||||
|
||||
def onInitializationTimeout(peer: Peer): Future[Unit] = {
|
||||
require(!finder.hasPeer(peer) || !peerDataMap.contains(peer),
|
||||
s"$peer cannot be both a test and a persistent peer")
|
||||
finderOpt match {
|
||||
case Some(finder) =>
|
||||
require(!finder.hasPeer(peer) || !peerDataMap.contains(peer),
|
||||
s"$peer cannot be both a test and a persistent peer")
|
||||
|
||||
if (finder.hasPeer(peer)) {
|
||||
//one of the peers that we tried, failed to init within time, disconnect
|
||||
finder.getData(peer).get.stop()
|
||||
} else if (peerDataMap.contains(peer)) {
|
||||
//this is one of our persistent peers which must have been initialized earlier, this can happen in case of
|
||||
//a reconnection attempt, meaning it got connected but failed to initialize, disconnect
|
||||
peerDataMap(peer).stop()
|
||||
} else {
|
||||
//this should never happen
|
||||
logger.warn(s"onInitializationTimeout called for unknown $peer")
|
||||
Future.unit
|
||||
if (finder.hasPeer(peer)) {
|
||||
//one of the peers that we tried, failed to init within time, disconnect
|
||||
finder.getData(peer).get.stop()
|
||||
} else if (peerDataMap.contains(peer)) {
|
||||
//this is one of our persistent peers which must have been initialized earlier, this can happen in case of
|
||||
//a reconnection attempt, meaning it got connected but failed to initialize, disconnect
|
||||
peerDataMap(peer).stop()
|
||||
} else {
|
||||
//this should never happen
|
||||
logger.warn(s"onInitializationTimeout called for unknown $peer")
|
||||
Future.unit
|
||||
}
|
||||
case None =>
|
||||
logger.warn(
|
||||
s"Cannot execute onInitializationTimeout, finder not started")
|
||||
Future.unit
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
def onInitialization(peer: Peer): Future[Unit] = {
|
||||
require(!finder.hasPeer(peer) || !peerDataMap.contains(peer),
|
||||
s"$peer cannot be both a test and a persistent peer")
|
||||
finderOpt match {
|
||||
case Some(finder) =>
|
||||
require(!finder.hasPeer(peer) || !peerDataMap.contains(peer),
|
||||
s"$peer cannot be both a test and a persistent peer")
|
||||
|
||||
//this assumes neutrino and checks for compact filter support so should not be called for anything else
|
||||
require(nodeAppConfig.nodeType == NodeType.NeutrinoNode,
|
||||
s"Node cannot be ${nodeAppConfig.nodeType.shortName}")
|
||||
//this assumes neutrino and checks for compact filter support so should not be called for anything else
|
||||
require(nodeAppConfig.nodeType == NodeType.NeutrinoNode,
|
||||
s"Node cannot be ${nodeAppConfig.nodeType.shortName}")
|
||||
|
||||
if (finder.hasPeer(peer)) {
|
||||
//one of the peers we tries got initialized successfully
|
||||
val peerData = finder.getData(peer).get
|
||||
val serviceIdentifer = peerData.serviceIdentifier
|
||||
val hasCf = serviceIdentifer.nodeCompactFilters
|
||||
logger.debug(s"Initialized peer $peer with $hasCf")
|
||||
if (finder.hasPeer(peer)) {
|
||||
//one of the peers we tries got initialized successfully
|
||||
val peerData = finder.getData(peer).get
|
||||
val serviceIdentifer = peerData.serviceIdentifier
|
||||
val hasCf = serviceIdentifer.nodeCompactFilters
|
||||
logger.debug(s"Initialized peer $peer with $hasCf")
|
||||
|
||||
def sendAddrReq: Future[Unit] =
|
||||
finder
|
||||
.getData(peer)
|
||||
.get
|
||||
.peerMessageSender
|
||||
.flatMap(_.sendGetAddrMessage())
|
||||
def sendAddrReq: Future[Unit] =
|
||||
finder
|
||||
.getData(peer)
|
||||
.get
|
||||
.peerMessageSender
|
||||
.flatMap(_.sendGetAddrMessage())
|
||||
|
||||
def managePeerF(): Future[Unit] = {
|
||||
//if we have slots remaining, connect
|
||||
if (connectedPeerCount < nodeAppConfig.maxConnectedPeers) {
|
||||
addPeer(peer)
|
||||
} else {
|
||||
lazy val notCf = peerDataMap
|
||||
.filter(p => !p._2.serviceIdentifier.nodeCompactFilters)
|
||||
.keys
|
||||
def managePeerF(): Future[Unit] = {
|
||||
//if we have slots remaining, connect
|
||||
if (connectedPeerCount < nodeAppConfig.maxConnectedPeers) {
|
||||
addPeer(peer)
|
||||
} else {
|
||||
lazy val notCf = peerDataMap
|
||||
.filter(p => !p._2.serviceIdentifier.nodeCompactFilters)
|
||||
.keys
|
||||
|
||||
//try to drop another non compact filter connection for this
|
||||
if (hasCf && notCf.nonEmpty)
|
||||
replacePeer(replacePeer = notCf.head, withPeer = peer)
|
||||
else {
|
||||
//no use for this apart from writing in db
|
||||
//we do want to give it enough time to send addr messages
|
||||
AsyncUtil
|
||||
.nonBlockingSleep(duration = 10.seconds)
|
||||
.flatMap { _ =>
|
||||
//could have already been deleted in case of connection issues
|
||||
finder.getData(peer) match {
|
||||
case Some(p) => p.stop()
|
||||
case None => Future.unit
|
||||
}
|
||||
//try to drop another non compact filter connection for this
|
||||
if (hasCf && notCf.nonEmpty)
|
||||
replacePeer(replacePeer = notCf.head, withPeer = peer)
|
||||
else {
|
||||
//no use for this apart from writing in db
|
||||
//we do want to give it enough time to send addr messages
|
||||
AsyncUtil
|
||||
.nonBlockingSleep(duration = 10.seconds)
|
||||
.flatMap { _ =>
|
||||
//could have already been deleted in case of connection issues
|
||||
finder.getData(peer) match {
|
||||
case Some(p) => p.stop()
|
||||
case None => Future.unit
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for {
|
||||
_ <- sendAddrReq
|
||||
_ <- createInDb(peer, peerData.serviceIdentifier)
|
||||
_ <- managePeerF()
|
||||
} yield ()
|
||||
|
||||
} else if (peerDataMap.contains(peer)) {
|
||||
//one of the persistent peers initialized again, this can happen in case of a reconnection attempt
|
||||
//which succeeded which is all good, do nothing
|
||||
Future.unit
|
||||
} else {
|
||||
logger.warn(s"onInitialization called for unknown $peer")
|
||||
Future.unit
|
||||
}
|
||||
}
|
||||
|
||||
for {
|
||||
_ <- sendAddrReq
|
||||
_ <- createInDb(peer, peerData.serviceIdentifier)
|
||||
_ <- managePeerF()
|
||||
} yield ()
|
||||
|
||||
} else if (peerDataMap.contains(peer)) {
|
||||
//one of the persistent peers initialized again, this can happen in case of a reconnection attempt
|
||||
//which succeeded which is all good, do nothing
|
||||
Future.unit
|
||||
} else {
|
||||
logger.warn(s"onInitialization called for unknown $peer")
|
||||
Future.unit
|
||||
case None =>
|
||||
logger.warn(
|
||||
s"onInitialization cannot be run, PeerFinder was not started")
|
||||
Future.unit
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
def onP2PClientStopped(peer: Peer): Future[Unit] = {
|
||||
require(!finder.hasPeer(peer) || !peerDataMap.contains(peer),
|
||||
s"$peer cannot be both a test and a persistent peer")
|
||||
finderOpt match {
|
||||
case Some(finder) =>
|
||||
require(!finder.hasPeer(peer) || !peerDataMap.contains(peer),
|
||||
s"$peer cannot be both a test and a persistent peer")
|
||||
|
||||
logger.info(s"Client stopped for $peer peers=$peers")
|
||||
logger.info(s"Client stopped for $peer peers=$peers")
|
||||
|
||||
if (finder.hasPeer(peer)) {
|
||||
//client actor for one of the test peers stopped, can remove it from map now
|
||||
finder.removePeer(peer)
|
||||
Future.unit
|
||||
} else if (peerDataMap.contains(peer)) {
|
||||
//actor stopped for one of the persistent peers, can happen in case a reconnection attempt failed due to
|
||||
//reconnection tries exceeding the max limit in which the client was stopped to disconnect from it, remove it
|
||||
_peerDataMap.remove(peer)
|
||||
//getDataMesageHandler.state is already mutated from another thread
|
||||
//this will be set to the new sync peer not the old one.
|
||||
val state = getDataMessageHandler.state
|
||||
val syncPeerOpt = state match {
|
||||
case s: SyncDataMessageHandlerState =>
|
||||
Some(s.syncPeer)
|
||||
case m: MisbehavingPeer => Some(m.badPeer)
|
||||
case DoneSyncing =>
|
||||
None
|
||||
}
|
||||
if (peers.exists(_ != peer) && syncPeerOpt.isDefined) {
|
||||
node.syncFromNewPeer().map(_ => ())
|
||||
} else if (syncPeerOpt.isDefined) {
|
||||
//means we aren't syncing with anyone, so do nothing?
|
||||
val exn = new RuntimeException(
|
||||
s"No new peers to sync from, cannot start new sync. Terminated sync with peer=$peer current syncPeer=$syncPeerOpt state=${state} peers=$peers")
|
||||
Future.failed(exn)
|
||||
} else {
|
||||
finder.reconnect(peer)
|
||||
}
|
||||
} else if (waitingForDeletion.contains(peer)) {
|
||||
//a peer we wanted to disconnect has remove has stopped the client actor, finally mark this as deleted
|
||||
_waitingForDeletion.remove(peer)
|
||||
Future.unit
|
||||
} else {
|
||||
logger.warn(s"onP2PClientStopped called for unknown $peer")
|
||||
Future.unit
|
||||
if (finder.hasPeer(peer)) {
|
||||
//client actor for one of the test peers stopped, can remove it from map now
|
||||
finder.removePeer(peer)
|
||||
Future.unit
|
||||
} else if (peerDataMap.contains(peer)) {
|
||||
//actor stopped for one of the persistent peers, can happen in case a reconnection attempt failed due to
|
||||
//reconnection tries exceeding the max limit in which the client was stopped to disconnect from it, remove it
|
||||
_peerDataMap.remove(peer)
|
||||
//getDataMesageHandler.state is already mutated from another thread
|
||||
//this will be set to the new sync peer not the old one.
|
||||
val state = getDataMessageHandler.state
|
||||
val syncPeerOpt = state match {
|
||||
case s: SyncDataMessageHandlerState =>
|
||||
Some(s.syncPeer)
|
||||
case m: MisbehavingPeer => Some(m.badPeer)
|
||||
case DoneSyncing | _: RemovePeers =>
|
||||
None
|
||||
}
|
||||
if (peers.exists(_ != peer) && syncPeerOpt.isDefined) {
|
||||
node.syncFromNewPeer().map(_ => ())
|
||||
} else if (syncPeerOpt.isDefined) {
|
||||
//means we aren't syncing with anyone, so do nothing?
|
||||
val exn = new RuntimeException(
|
||||
s"No new peers to sync from, cannot start new sync. Terminated sync with peer=$peer current syncPeer=$syncPeerOpt state=${state} peers=$peers")
|
||||
Future.failed(exn)
|
||||
} else {
|
||||
finder.reconnect(peer)
|
||||
}
|
||||
} else if (waitingForDeletion.contains(peer)) {
|
||||
//a peer we wanted to disconnect has remove has stopped the client actor, finally mark this as deleted
|
||||
_waitingForDeletion.remove(peer)
|
||||
Future.unit
|
||||
} else {
|
||||
logger.warn(s"onP2PClientStopped called for unknown $peer")
|
||||
Future.unit
|
||||
}
|
||||
case None =>
|
||||
logger.warn(
|
||||
s"onP2PClientStopped cannot be run, PeerFinder was not started")
|
||||
Future.unit
|
||||
}
|
||||
}
|
||||
|
||||
def onVersionMessage(peer: Peer, versionMsg: VersionMessage): Unit = {
|
||||
require(!finder.hasPeer(peer) || !peerDataMap.contains(peer),
|
||||
s"$peer cannot be both a test and a persistent peer")
|
||||
finderOpt match {
|
||||
case Some(finder) =>
|
||||
require(!finder.hasPeer(peer) || !peerDataMap.contains(peer),
|
||||
s"$peer cannot be both a test and a persistent peer")
|
||||
|
||||
if (finder.hasPeer(peer)) {
|
||||
finder.getData(peer).get.setServiceIdentifier(versionMsg.services)
|
||||
} else if (peerDataMap.contains(peer)) {
|
||||
require(
|
||||
peerDataMap(peer).serviceIdentifier.bytes == versionMsg.services.bytes)
|
||||
} else {
|
||||
logger.warn(s"onVersionMessage called for unknown $peer")
|
||||
if (finder.hasPeer(peer)) {
|
||||
finder.getData(peer).get.setServiceIdentifier(versionMsg.services)
|
||||
} else if (peerDataMap.contains(peer)) {
|
||||
require(
|
||||
peerDataMap(
|
||||
peer).serviceIdentifier.bytes == versionMsg.services.bytes)
|
||||
} else {
|
||||
logger.warn(s"onVersionMessage called for unknown $peer")
|
||||
}
|
||||
case None =>
|
||||
logger.warn(
|
||||
s"onVersionMessage cannot be run, PeerFinder was not started")
|
||||
()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
def onQueryTimeout(payload: ExpectsResponse, peer: Peer): Future[Unit] = {
|
||||
|
@ -592,7 +644,7 @@ case class PeerManager(
|
|||
val syncPeer = getDataMessageHandler.state match {
|
||||
case syncState: SyncDataMessageHandlerState =>
|
||||
syncState.syncPeer
|
||||
case s @ (DoneSyncing | _: MisbehavingPeer) =>
|
||||
case s @ (DoneSyncing | _: MisbehavingPeer | _: RemovePeers) =>
|
||||
sys.error(s"Cannot have state=$s and have a query timeout")
|
||||
}
|
||||
if (peer == syncPeer)
|
||||
|
@ -608,7 +660,8 @@ case class PeerManager(
|
|||
|
||||
private def onHeaderRequestTimeout(
|
||||
peer: Peer,
|
||||
state: DataMessageHandlerState): Future[DataMessageHandler] = {
|
||||
state: DataMessageHandlerState,
|
||||
peerData: PeerData): Future[DataMessageHandler] = {
|
||||
logger.info(s"Header request timed out from $peer in state $state")
|
||||
state match {
|
||||
case HeaderSync(_) | MisbehavingPeer(_) =>
|
||||
|
@ -620,10 +673,10 @@ case class PeerManager(
|
|||
|
||||
if (newHeaderState.validated) {
|
||||
//re-review this
|
||||
fetchCompactFilterHeaders(newDmh)
|
||||
newDmh.fetchCompactFilterHeaders(newDmh, peerData = peerData)
|
||||
} else Future.successful(newDmh)
|
||||
|
||||
case DoneSyncing | _: FilterHeaderSync | _: FilterSync =>
|
||||
case DoneSyncing | _: FilterHeaderSync | _: FilterSync | _: RemovePeers =>
|
||||
Future.successful(getDataMessageHandler)
|
||||
}
|
||||
}
|
||||
|
@ -654,7 +707,10 @@ case class PeerManager(
|
|||
Future.failed(new RuntimeException(
|
||||
s"Couldn't find PeerMessageSender that corresponds with peer=$peer msg=${payload.commandName}. Was it disconnected?"))
|
||||
case Some(peerMsgSender) =>
|
||||
getDataMessageHandler
|
||||
val dmh = {
|
||||
getDataMessageHandler.copy(peerDataOpt = getPeerData(peer))
|
||||
}
|
||||
dmh
|
||||
.handleDataPayload(payload, peerMsgSender, peer)
|
||||
.flatMap { newDmh =>
|
||||
newDmh.state match {
|
||||
|
@ -665,6 +721,11 @@ case class PeerManager(
|
|||
_ <- removePeer(m.badPeer)
|
||||
_ <- node.syncFromNewPeer()
|
||||
} yield msg
|
||||
case removePeers: RemovePeers =>
|
||||
updateDataMessageHandler(newDmh)
|
||||
for {
|
||||
_ <- Future.traverse(removePeers.peers)(removePeer)
|
||||
} yield msg
|
||||
case _: SyncDataMessageHandlerState | DoneSyncing =>
|
||||
updateDataMessageHandler(newDmh)
|
||||
Future.successful(msg)
|
||||
|
@ -675,11 +736,22 @@ case class PeerManager(
|
|||
|
||||
case msg @ HeaderTimeoutWrapper(peer) =>
|
||||
logger.debug(s"Processing timeout header for $peer")
|
||||
onHeaderRequestTimeout(peer, getDataMessageHandler.state).map {
|
||||
newDmh =>
|
||||
updateDataMessageHandler(newDmh)
|
||||
logger.debug(s"Done processing timeout header for $peer")
|
||||
msg
|
||||
val peerDataOpt = getPeerData(peer)
|
||||
peerDataOpt match {
|
||||
case Some(peerData) =>
|
||||
for {
|
||||
msg <- {
|
||||
onHeaderRequestTimeout(peer,
|
||||
getDataMessageHandler.state,
|
||||
peerData).map { newDmh =>
|
||||
updateDataMessageHandler(newDmh)
|
||||
logger.debug(s"Done processing timeout header for $peer")
|
||||
msg
|
||||
}
|
||||
}
|
||||
} yield msg
|
||||
case None =>
|
||||
sys.error(s"Unkown peer timeing out header request, peer=$peer")
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -702,7 +774,7 @@ case class PeerManager(
|
|||
.withAttributes(ActorAttributes.supervisionStrategy(decider))
|
||||
}
|
||||
|
||||
private var dataMessageQueueOpt: Option[
|
||||
private[bitcoins] var dataMessageQueueOpt: Option[
|
||||
SourceQueueWithComplete[StreamDataMessageWrapper]] = None
|
||||
|
||||
private var streamDoneFOpt: Option[Future[Done]] = None
|
||||
|
@ -724,41 +796,30 @@ case class PeerManager(
|
|||
}
|
||||
}
|
||||
|
||||
def fetchCompactFilterHeaders(
|
||||
currentDmh: DataMessageHandler): Future[DataMessageHandler] = {
|
||||
val syncPeer = currentDmh.state match {
|
||||
case s: SyncDataMessageHandlerState => s.syncPeer
|
||||
case state @ (DoneSyncing | _: MisbehavingPeer) =>
|
||||
sys.error(
|
||||
s"Cannot fetch compact filter headers when we are in state=$state")
|
||||
}
|
||||
logger.info(
|
||||
s"Now syncing filter headers from $syncPeer in state=${currentDmh.state}")
|
||||
for {
|
||||
sender <- peerDataMap(syncPeer).peerMessageSender
|
||||
newSyncingState <- PeerManager.sendFirstGetCompactFilterHeadersCommand(
|
||||
sender,
|
||||
currentDmh.chainApi)
|
||||
} yield {
|
||||
currentDmh.copy(state = newSyncingState)
|
||||
}
|
||||
private var dataMessageHandlerOpt: Option[DataMessageHandler] = {
|
||||
None
|
||||
}
|
||||
|
||||
private var dataMessageHandler: DataMessageHandler = {
|
||||
DataMessageHandler(
|
||||
chainApi = ChainHandler.fromDatabase(),
|
||||
walletCreationTimeOpt = walletCreationTimeOpt,
|
||||
peerManager = this,
|
||||
state = DoneSyncing,
|
||||
filterBatchCache = Set.empty
|
||||
)
|
||||
def getDataMessageHandler: DataMessageHandler = {
|
||||
if (dataMessageHandlerOpt.isDefined) {
|
||||
dataMessageHandlerOpt.get
|
||||
} else {
|
||||
DataMessageHandler(
|
||||
chainApi = ChainHandler.fromDatabase(),
|
||||
walletCreationTimeOpt = walletCreationTimeOpt,
|
||||
queue = dataMessageQueueOpt.get,
|
||||
peers,
|
||||
peerMessgeSenderApi = this,
|
||||
peerDataOpt = None,
|
||||
state = DoneSyncing,
|
||||
filterBatchCache = Set.empty
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
def getDataMessageHandler: DataMessageHandler = dataMessageHandler
|
||||
|
||||
def updateDataMessageHandler(
|
||||
dataMessageHandler: DataMessageHandler): PeerManager = {
|
||||
this.dataMessageHandler = dataMessageHandler
|
||||
this.dataMessageHandlerOpt = Some(dataMessageHandler.copy(peers = peers))
|
||||
this
|
||||
}
|
||||
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
package org.bitcoins.node.networking.peer
|
||||
|
||||
import akka.stream.scaladsl.SourceQueue
|
||||
import org.bitcoins.chain.blockchain.{DuplicateHeaders, InvalidBlockHeader}
|
||||
import org.bitcoins.chain.config.ChainAppConfig
|
||||
import org.bitcoins.chain.models.BlockHeaderDAO
|
||||
|
@ -13,7 +14,8 @@ import org.bitcoins.crypto.{DoubleSha256Digest, DoubleSha256DigestBE}
|
|||
import org.bitcoins.node.config.NodeAppConfig
|
||||
import org.bitcoins.node.models._
|
||||
import org.bitcoins.node.networking.peer.DataMessageHandlerState._
|
||||
import org.bitcoins.node.{P2PLogger, PeerManager}
|
||||
import org.bitcoins.node.util.PeerMessageSenderApi
|
||||
import org.bitcoins.node.{P2PLogger, PeerData, PeerManager}
|
||||
|
||||
import java.time.Instant
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
|
@ -29,7 +31,10 @@ import scala.util.control.NonFatal
|
|||
case class DataMessageHandler(
|
||||
chainApi: ChainApi,
|
||||
walletCreationTimeOpt: Option[Instant],
|
||||
peerManager: PeerManager,
|
||||
queue: SourceQueue[StreamDataMessageWrapper],
|
||||
peers: Vector[Peer],
|
||||
peerMessgeSenderApi: PeerMessageSenderApi,
|
||||
peerDataOpt: Option[PeerData],
|
||||
state: DataMessageHandlerState,
|
||||
filterBatchCache: Set[CompactFilterMessage])(implicit
|
||||
ec: ExecutionContext,
|
||||
|
@ -50,7 +55,7 @@ case class DataMessageHandler(
|
|||
|
||||
def addToStream(payload: DataPayload, peer: Peer): Future[Unit] = {
|
||||
val msg = DataMessageWrapper(payload, peer)
|
||||
peerManager
|
||||
queue
|
||||
.offer(msg)
|
||||
.map(_ => ())
|
||||
}
|
||||
|
@ -72,7 +77,8 @@ case class DataMessageHandler(
|
|||
//process messages from all peers
|
||||
resultF.failed.foreach { err =>
|
||||
logger.error(
|
||||
s"Failed to handle data payload=${payload} from $peer in state=$state errMsg=${err.getMessage}")
|
||||
s"Failed to handle data payload=${payload} from $peer in state=$state errMsg=${err.getMessage}",
|
||||
err)
|
||||
}
|
||||
resultF.recoverWith { case NonFatal(_) =>
|
||||
Future.successful(this)
|
||||
|
@ -89,7 +95,8 @@ case class DataMessageHandler(
|
|||
handleDataPayloadValidState(payload, peerMsgSender, peer)
|
||||
resultF.failed.foreach { err =>
|
||||
logger.error(
|
||||
s"Failed to handle data payload=${payload} from $peer in state=$state errMsg=${err.getMessage}")
|
||||
s"Failed to handle data payload=${payload} from $peer in state=$state errMsg=${err.getMessage}",
|
||||
err)
|
||||
}
|
||||
resultF.recoverWith { case NonFatal(_) =>
|
||||
Future.successful(this)
|
||||
|
@ -101,7 +108,8 @@ case class DataMessageHandler(
|
|||
|
||||
resultF.failed.foreach { err =>
|
||||
logger.error(
|
||||
s"Failed to handle data payload=${payload} from $peer in state=$state errMsg=${err.getMessage}")
|
||||
s"Failed to handle data payload=${payload} from $peer in state=$state errMsg=${err.getMessage}",
|
||||
err)
|
||||
}
|
||||
|
||||
resultF.recoverWith { case NonFatal(_) =>
|
||||
|
@ -120,6 +128,16 @@ case class DataMessageHandler(
|
|||
peerMsgSender,
|
||||
peer)
|
||||
}
|
||||
case RemovePeers(peers, _) =>
|
||||
if (peers.exists(_ == peer)) {
|
||||
Future.failed(new RuntimeException(
|
||||
s"Cannot continue processing p2p messages from peer we were suppose to remove, peer=$peer"))
|
||||
} else {
|
||||
copy(state = DoneSyncing).handleDataPayload(payload,
|
||||
peerMsgSender,
|
||||
peer)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -152,7 +170,8 @@ case class DataMessageHandler(
|
|||
case _: HeaderSync | _: ValidatingHeaders =>
|
||||
FilterHeaderSync(peer)
|
||||
case filterHeaderSync: FilterHeaderSync => filterHeaderSync
|
||||
case x @ (DoneSyncing | _: FilterSync | _: MisbehavingPeer) =>
|
||||
case x @ (DoneSyncing | _: FilterSync | _: MisbehavingPeer |
|
||||
_: RemovePeers) =>
|
||||
sys.error(
|
||||
s"Incorrect state for handling filter header messages, got=$x")
|
||||
}
|
||||
|
@ -307,11 +326,11 @@ case class DataMessageHandler(
|
|||
ValidatingHeaders(syncPeer = peer,
|
||||
inSyncWith = Set.empty,
|
||||
failedCheck = Set.empty,
|
||||
verifyingWith = peerManager.peers.toSet)
|
||||
verifyingWith = peers.toSet)
|
||||
}
|
||||
case v: ValidatingHeaders => v
|
||||
case x @ (_: FilterHeaderSync | _: FilterSync |
|
||||
_: MisbehavingPeer) =>
|
||||
case x @ (_: FilterHeaderSync | _: FilterSync | _: MisbehavingPeer |
|
||||
_: RemovePeers) =>
|
||||
sys.error(s"Invalid state to receive headers in, got=$x")
|
||||
}
|
||||
val chainApiHeaderProcessF: Future[DataMessageHandler] = for {
|
||||
|
@ -327,7 +346,8 @@ case class DataMessageHandler(
|
|||
dmh <- getHeaders(newDmh = newDmh,
|
||||
headers = headers,
|
||||
peerMsgSender = peerMsgSender,
|
||||
peer = peer)
|
||||
peer = peer,
|
||||
peerDataOpt = peerDataOpt)
|
||||
} yield dmh
|
||||
}
|
||||
val recoveredDmhF = getHeadersF.recoverWith {
|
||||
|
@ -338,7 +358,7 @@ case class DataMessageHandler(
|
|||
case _: InvalidBlockHeader =>
|
||||
logger.warn(
|
||||
s"Invalid headers of count $count sent from ${peer} in state=$state")
|
||||
recoverInvalidHeader(peerMsgSender)
|
||||
recoverInvalidHeader(peerMsgSender, peerDataOpt.get)
|
||||
case e: Throwable => throw e
|
||||
}
|
||||
|
||||
|
@ -472,24 +492,19 @@ case class DataMessageHandler(
|
|||
|
||||
/** Recover the data message handler if we received an invalid block header from a peer */
|
||||
private def recoverInvalidHeader(
|
||||
peerMsgSender: PeerMessageSender): Future[DataMessageHandler] = {
|
||||
peerMsgSender: PeerMessageSender,
|
||||
peerData: PeerData): Future[DataMessageHandler] = {
|
||||
val result = state match {
|
||||
case HeaderSync(peer) =>
|
||||
val peerDataOpt = peerManager.getPeerData(peer)
|
||||
val peerData = peerDataOpt match {
|
||||
case Some(peerData) => peerData
|
||||
case None =>
|
||||
sys.error(
|
||||
s"Cannot find peer we are syncing with in PeerManager, peer=$peer")
|
||||
}
|
||||
peerData.updateInvalidMessageCount()
|
||||
if (peerData.exceededMaxInvalidMessages && peerManager.peers.size > 1) {
|
||||
if (peerData.exceededMaxInvalidMessages && peers.size > 1) {
|
||||
logger.warn(
|
||||
s"$peer exceeded max limit of invalid messages. Disconnecting.")
|
||||
|
||||
Future.successful(copy(state = MisbehavingPeer(peer)))
|
||||
} else {
|
||||
logger.info(s"Re-querying headers from $peer.")
|
||||
logger.info(
|
||||
s"Re-querying headers from $peer. invalidMessages=${peerData.getInvalidMessageCount} peers.size=${peers.size}")
|
||||
for {
|
||||
blockchains <- BlockHeaderDAO().getBlockchains()
|
||||
cachedHeaders = blockchains
|
||||
|
@ -510,16 +525,14 @@ case class DataMessageHandler(
|
|||
if (newHeaderState.validated) {
|
||||
logger.info(
|
||||
s"Done validating headers, inSyncWith=${newHeaderState.inSyncWith}, failedCheck=${newHeaderState.failedCheck}")
|
||||
peerManager
|
||||
.fetchCompactFilterHeaders(newDmh)
|
||||
fetchCompactFilterHeaders(newDmh, peerData)
|
||||
} else {
|
||||
Future.successful(newDmh)
|
||||
}
|
||||
case DoneSyncing | _: FilterHeaderSync | _: FilterSync =>
|
||||
Future.successful(this)
|
||||
case m: MisbehavingPeer =>
|
||||
sys.error(
|
||||
s"Cannot recover invalid headers from already misbehaving peer, got=$m")
|
||||
case m @ (_: MisbehavingPeer | _: RemovePeers) =>
|
||||
sys.error(s"Cannot recover invalid headers, got=$m")
|
||||
}
|
||||
|
||||
result
|
||||
|
@ -714,7 +727,8 @@ case class DataMessageHandler(
|
|||
newDmh: DataMessageHandler,
|
||||
headers: Vector[BlockHeader],
|
||||
peerMsgSender: PeerMessageSender,
|
||||
peer: Peer): Future[DataMessageHandler] = {
|
||||
peer: Peer,
|
||||
peerDataOpt: Option[PeerData]): Future[DataMessageHandler] = {
|
||||
val state = newDmh.state
|
||||
val count = headers.length
|
||||
val getHeadersF: Future[DataMessageHandler] = {
|
||||
|
@ -740,24 +754,18 @@ case class DataMessageHandler(
|
|||
.map(_ => newDmh)
|
||||
|
||||
case ValidatingHeaders(_, inSyncWith, _, _) =>
|
||||
//In the validation stage, some peer sent max amount of valid headers, revert to HeaderSync with that peer as syncPeer
|
||||
//disconnect the ones that we have already checked since they are at least out of sync by 2000 headers
|
||||
val removeFs =
|
||||
inSyncWith.map(p => peerManager.removePeer(p))
|
||||
|
||||
//ask for more headers now
|
||||
val askF = peerMsgSender
|
||||
.sendGetHeadersMessage(lastHash)
|
||||
.map(_ => syncing)
|
||||
|
||||
for {
|
||||
_ <- Future.sequence(removeFs)
|
||||
_ <- askF
|
||||
} yield {
|
||||
newDmh.copy(state = HeaderSync(peer))
|
||||
newDmh.copy(state = RemovePeers(inSyncWith.toVector, true))
|
||||
}
|
||||
case x @ (_: FilterHeaderSync | _: FilterSync | DoneSyncing |
|
||||
_: MisbehavingPeer) =>
|
||||
_: MisbehavingPeer | _: RemovePeers) =>
|
||||
sys.error(s"Cannot be in state=$x while retrieving block headers")
|
||||
}
|
||||
|
||||
|
@ -776,12 +784,12 @@ case class DataMessageHandler(
|
|||
// headers are synced now with the current sync peer, now move to validating it for all peers
|
||||
require(syncPeer == peer, s"syncPeer=$syncPeer peer=$peer")
|
||||
|
||||
if (peerManager.peers.size > 1) {
|
||||
if (peers.size > 1) {
|
||||
val newState =
|
||||
ValidatingHeaders(
|
||||
syncPeer = peer,
|
||||
inSyncWith = Set(peer),
|
||||
verifyingWith = peerManager.peers.toSet,
|
||||
verifyingWith = peers.toSet,
|
||||
failedCheck = Set.empty[Peer]
|
||||
)
|
||||
|
||||
|
@ -790,15 +798,16 @@ case class DataMessageHandler(
|
|||
|
||||
val getHeadersAllF = {
|
||||
val msg = GetHeadersMessage(lastHash)
|
||||
peerManager.gossipMessage(msg, excludedPeerOpt = Some(peer))
|
||||
peerMessgeSenderApi.gossipMessage(msg,
|
||||
excludedPeerOpt =
|
||||
Some(peer))
|
||||
}
|
||||
|
||||
getHeadersAllF
|
||||
.map(_ => newDmh.copy(state = newState))
|
||||
} else {
|
||||
//if just one peer then can proceed ahead directly
|
||||
peerManager
|
||||
.fetchCompactFilterHeaders(newDmh)
|
||||
fetchCompactFilterHeaders(newDmh, peerDataOpt.get)
|
||||
}
|
||||
|
||||
case headerState @ ValidatingHeaders(_, inSyncWith, _, _) =>
|
||||
|
@ -814,14 +823,14 @@ case class DataMessageHandler(
|
|||
// so we also check if our cached filter heights have been set as well, if they haven't then
|
||||
// we probably need to sync filters
|
||||
|
||||
peerManager
|
||||
.fetchCompactFilterHeaders(newDmh2)
|
||||
fetchCompactFilterHeaders(currentDmh = newDmh2,
|
||||
peerData = peerDataOpt.get)
|
||||
} else {
|
||||
//do nothing, we are still waiting for some peers to send headers or timeout
|
||||
Future.successful(newDmh2)
|
||||
}
|
||||
case x @ (_: FilterHeaderSync | _: FilterSync | DoneSyncing |
|
||||
_: MisbehavingPeer) =>
|
||||
_: MisbehavingPeer | _: RemovePeers) =>
|
||||
sys.error(
|
||||
s"Cannot be in state=$x while we are about to begin syncing compact filter headers")
|
||||
}
|
||||
|
@ -834,8 +843,7 @@ case class DataMessageHandler(
|
|||
headerState.copy(inSyncWith = inSyncWith + peer)
|
||||
val newDmh2 = newDmh.copy(state = newHeaderState)
|
||||
if (newHeaderState.validated) {
|
||||
peerManager
|
||||
.fetchCompactFilterHeaders(newDmh2)
|
||||
fetchCompactFilterHeaders(newDmh2, peerDataOpt.get)
|
||||
} else {
|
||||
//do nothing, we are still waiting for some peers to send headers
|
||||
Future.successful(newDmh2)
|
||||
|
@ -844,13 +852,35 @@ case class DataMessageHandler(
|
|||
Future.successful(newDmh)
|
||||
case DoneSyncing =>
|
||||
Future.successful(newDmh)
|
||||
case x @ (_: FilterHeaderSync | _: FilterSync | _: MisbehavingPeer) =>
|
||||
case x @ (_: FilterHeaderSync | _: FilterSync | _: MisbehavingPeer |
|
||||
_: RemovePeers) =>
|
||||
sys.error(s"Invalid state to complete block header sync in, got=$x")
|
||||
}
|
||||
}
|
||||
}
|
||||
getHeadersF
|
||||
}
|
||||
|
||||
def fetchCompactFilterHeaders(
|
||||
currentDmh: DataMessageHandler,
|
||||
peerData: PeerData): Future[DataMessageHandler] = {
|
||||
val syncPeer = currentDmh.state match {
|
||||
case s: SyncDataMessageHandlerState => s.syncPeer
|
||||
case state @ (DoneSyncing | _: MisbehavingPeer | _: RemovePeers) =>
|
||||
sys.error(
|
||||
s"Cannot fetch compact filter headers when we are in state=$state")
|
||||
}
|
||||
logger.info(
|
||||
s"Now syncing filter headers from $syncPeer in state=${currentDmh.state}")
|
||||
for {
|
||||
sender <- peerData.peerMessageSender
|
||||
newSyncingState <- PeerManager.sendFirstGetCompactFilterHeadersCommand(
|
||||
sender,
|
||||
currentDmh.chainApi)
|
||||
} yield {
|
||||
currentDmh.copy(state = newSyncingState)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sealed trait StreamDataMessageWrapper
|
||||
|
|
|
@ -37,6 +37,9 @@ object DataMessageHandlerState {
|
|||
override val isSyncing: Boolean = false
|
||||
}
|
||||
|
||||
case class RemovePeers(peers: Vector[Peer], isSyncing: Boolean)
|
||||
extends DataMessageHandlerState
|
||||
|
||||
/** State to indicate we are not currently syncing with a peer */
|
||||
case object DoneSyncing extends DataMessageHandlerState {
|
||||
override val isSyncing: Boolean = false
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package org.bitcoins.node.networking.peer
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
import akka.stream.scaladsl.SourceQueueWithComplete
|
||||
import org.bitcoins.core.api.node.NodeType
|
||||
import org.bitcoins.core.p2p._
|
||||
import org.bitcoins.node.config.NodeAppConfig
|
||||
|
@ -19,7 +20,7 @@ import scala.concurrent.Future
|
|||
*/
|
||||
case class PeerMessageReceiver(
|
||||
controlMessageHandler: ControlMessageHandler,
|
||||
dataMessageHandler: DataMessageHandler,
|
||||
queue: SourceQueueWithComplete[StreamDataMessageWrapper],
|
||||
peer: Peer
|
||||
)(implicit system: ActorSystem, nodeAppConfig: NodeAppConfig)
|
||||
extends P2PLogger {
|
||||
|
@ -88,12 +89,11 @@ case class PeerMessageReceiver(
|
|||
payload: DataPayload): Future[PeerMessageReceiver] = {
|
||||
//else it means we are receiving this data payload from a peer,
|
||||
//we need to handle it
|
||||
dataMessageHandler
|
||||
.addToStream(payload, peer)
|
||||
.map(_ =>
|
||||
new PeerMessageReceiver(controlMessageHandler,
|
||||
dataMessageHandler,
|
||||
peer))
|
||||
val wrapper = DataMessageWrapper(payload, peer)
|
||||
|
||||
queue
|
||||
.offer(wrapper)
|
||||
.map(_ => new PeerMessageReceiver(controlMessageHandler, queue, peer))
|
||||
}
|
||||
|
||||
/** Handles control payloads defined here https://bitcoin.org/en/developer-reference#control-messages
|
||||
|
|
|
@ -216,8 +216,7 @@ object NodeUnitTest extends P2PLogger {
|
|||
appConfig.nodeConf)
|
||||
val receiver =
|
||||
PeerMessageReceiver(controlMessageHandler = controlMessageHandler,
|
||||
dataMessageHandler =
|
||||
node.peerManager.getDataMessageHandler,
|
||||
queue = node.peerManager.dataMessageQueueOpt.get,
|
||||
peer = peer)(system, appConfig.nodeConf)
|
||||
Future.successful(receiver)
|
||||
}
|
||||
|
@ -375,8 +374,7 @@ object NodeUnitTest extends P2PLogger {
|
|||
ControlMessageHandler(node.peerManager)(system.dispatcher, nodeAppConfig)
|
||||
val receiver =
|
||||
PeerMessageReceiver(controlMessageHandler = controlMessageHandler,
|
||||
dataMessageHandler =
|
||||
node.peerManager.getDataMessageHandler,
|
||||
queue = node.peerManager.dataMessageQueueOpt.get,
|
||||
peer = peer)
|
||||
Future.successful(receiver)
|
||||
}
|
||||
|
|
|
@ -31,7 +31,7 @@ trait BitcoinSAkkaAsyncTest extends BaseAsyncTest with Logging {
|
|||
system.dispatcher
|
||||
|
||||
override def afterAll(): Unit = {
|
||||
TestKit.shutdownActorSystem(system, verifySystemShutdown = true)
|
||||
TestKit.shutdownActorSystem(system, verifySystemShutdown = false)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue