2024 01 02 encapsulate state p2psink (#5342)

* Move NodeState back to node module

* Refactor peerWithServicesDataMap into NodeState

* More usage of state.peerDataMap in stream

* Fix log message

* Fix compile

* Move PeerFinder into NodeState

* WIP: Move PeerFinder init into NeutrinoNode.start()

* Get things mostly working again after rebase

* Fix bug in handling of headers message where we wouldn't transition to DoneSyncing if a peer sent us 0 headers

* Move SendToPeer into stream

* scalafmt

* Empty commit to run CI

* Re-add DataMessageHandlerTest

* Renable disconnecting portion of NeutrinoNodeTest

* Empty commit to run CI

* Remove disconnection part of test again

* Empty commit to re-run CI

* Empty commit to re-run CI
This commit is contained in:
Chris Stewart 2024-01-13 17:22:01 -06:00 committed by GitHub
parent 56b1a557a8
commit 9cd60d5366
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 348 additions and 210 deletions

View File

@ -67,7 +67,7 @@ class NeutrinoNodeTest extends NodeTestWithCachedBitcoindPair {
}
}
it must "be able to connect, initialize and then disconnect from all peers" in {
it must "be able to connect, initialize all peers" in {
nodeConnectedWithBitcoind: NeutrinoNodeConnectedWithBitcoinds =>
val node = nodeConnectedWithBitcoind.node
def peerManager = node.peerManager
@ -89,24 +89,11 @@ class NeutrinoNodeTest extends NodeTestWithCachedBitcoindPair {
inits <- Future.sequence(peers.map(peerManager.isInitialized))
} yield assert(inits.forall(_ == true))
def allDisconn: Future[Unit] = AsyncUtil.retryUntilSatisfied(
peers
.map(p =>
!peerManager
.getPeerData(p)
.isDefined)
.forall(_ == true),
maxTries = 5,
interval = 1.second
)
for {
_ <- has2Peers
_ <- bothOurs
_ <- allConnected
_ <- allInitialized
_ <- Future.sequence(peers.map(peerManager.disconnectPeer))
_ <- allDisconn
} yield {
succeed
}
@ -396,9 +383,8 @@ class NeutrinoNodeTest extends NodeTestWithCachedBitcoindPair {
bestBlockHash0 <- bitcoind0.getBestBlockHash()
//invalidate blockhash to force a reorg when next block is generated
_ <- bitcoind0.invalidateBlock(bestBlockHash0)
_ <- AsyncUtil.retryUntilSatisfiedF(
() => node.getConnectionCount.map(_ == 1),
1.second)
_ <- AsyncUtil.retryUntilSatisfiedF(() =>
node.getConnectionCount.map(_ == 1))
//now generate a block, make sure we sync with them
hashes0 <- bitcoind0.generate(1)
chainApi <- node.chainApiFromDb()

View File

@ -4,19 +4,19 @@ import org.bitcoins.asyncutil.AsyncUtil
import org.bitcoins.core.config.SigNet
import org.bitcoins.core.currency._
import org.bitcoins.core.gcs.{FilterType, GolombFilter}
import org.bitcoins.core.p2p._
import org.bitcoins.core.p2p.HeadersMessage
import org.bitcoins.core.protocol.blockchain.{Block, BlockHeader}
import org.bitcoins.core.protocol.transaction.Transaction
import org.bitcoins.crypto.DoubleSha256Digest
import org.bitcoins.node.NodeState.HeaderSync
import org.bitcoins.node._
import org.bitcoins.core.api.node.NodeState.HeaderSync
import org.bitcoins.server.BitcoinSAppConfig
import org.bitcoins.testkit.BitcoinSTestAppConfig
import org.bitcoins.testkit.node.fixture.NeutrinoNodeConnectedWithBitcoind
import org.bitcoins.testkit.node.{
NodeTestUtil,
NodeTestWithCachedBitcoindNewest
}
import org.bitcoins.testkit.node.fixture.NeutrinoNodeConnectedWithBitcoind
import org.scalatest.{FutureOutcome, Outcome}
import scala.concurrent.duration.DurationInt
@ -53,12 +53,21 @@ class DataMessageHandlerTest extends NodeTestWithCachedBitcoindNewest {
chainApi <- node.chainApiFromDb()
_ = require(peerManager.getPeerData(peer).isDefined)
peerMsgSender = peerManager.getPeerData(peer).get.peerMessageSender
peerFinder = PeerFinder(paramPeers = Vector.empty,
queue = node,
skipPeers = () => Set.empty)(system.dispatcher,
system,
node.nodeConfig,
node.chainConfig)
dataMessageHandler = DataMessageHandler(
chainApi = chainApi,
walletCreationTimeOpt = None,
peerMessageSenderApi = peerMsgSender,
peerManager = peerManager,
state = HeaderSync(peer, peerManager.peersWithServices, Set.empty)
state = HeaderSync(peer,
peerManager.peerWithServicesDataMap,
Set.empty,
peerFinder)
)(node.executionContext, node.nodeAppConfig, node.chainConfig)
// Use signet genesis block header, this should be invalid for regtest

View File

@ -18,8 +18,8 @@ import akka.{Done, NotUsed}
import org.bitcoins.asyncutil.AsyncUtil
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.core.api.chain.ChainQueryApi.FilterResponse
import org.bitcoins.core.api.node.NodeState.DoneSyncing
import org.bitcoins.core.api.node.{NodeState, NodeType, Peer}
import NodeState.DoneSyncing
import org.bitcoins.core.api.node.{NodeType, Peer}
import org.bitcoins.core.config.{MainNet, RegTest, SigNet, TestNet3}
import org.bitcoins.core.protocol.BlockStamp
import org.bitcoins.node.config.NodeAppConfig
@ -58,16 +58,10 @@ case class NeutrinoNode(
maxConcurrentOffers = Runtime.getRuntime.availableProcessors())
}
private lazy val peerFinder: PeerFinder = PeerFinder(paramPeers = paramPeers,
queue = this,
skipPeers =
() => Set.empty)
override lazy val peerManager: PeerManager = {
PeerManager(paramPeers = paramPeers,
walletCreationTimeOpt = walletCreationTimeOpt,
queue = this,
finder = peerFinder)
queue = this)
}
private[this] var queueOpt: Option[
@ -93,13 +87,18 @@ case class NeutrinoNode(
override def start(): Future[NeutrinoNode] = {
isStarted.set(true)
val initState =
DoneSyncing(peersWithServices = Set.empty,
waitingForDisconnection = Set.empty)
val (queue, source) =
dataMessageStreamSource.preMaterialize()
queueOpt = Some(queue)
val peerFinder: PeerFinder = PeerFinder(paramPeers = paramPeers,
queue = queue,
skipPeers = () => Set.empty)
val initState =
DoneSyncing(peerDataMap = Map.empty,
waitingForDisconnection = Set.empty,
peerFinder)
val graph =
buildDataMessageStreamGraph(initState = initState, source = source)
val stateF = graph.run()
@ -127,13 +126,15 @@ case class NeutrinoNode(
val start = System.currentTimeMillis()
inactivityCancellableOpt.map(_.cancel())
for {
_ <- peerFinder.stop()
_ <- peerManager.stop()
_ = queueOpt.map(_.complete())
_ <- {
val finishedF = streamDoneFOpt match {
case Some(f) => f
case None => Future.successful(Done)
case Some(f) =>
f.flatMap { case r: NodeRunningState =>
r.peerFinder.stop()
}
case None => Future.successful(Done)
}
finishedF
}

View File

@ -1,7 +1,9 @@
package org.bitcoins.core.api.node
package org.bitcoins.node
import org.bitcoins.core.api.node.NodeState.NodeShuttingDown
import org.bitcoins.core.api.node.{Peer, PeerWithServices}
import org.bitcoins.core.p2p.{CompactFilterMessage, ServiceIdentifier}
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.networking.peer.{PeerConnection, PeerMessageSender}
import scala.util.Random
@ -9,7 +11,9 @@ sealed abstract class NodeState
/** Means our node has been started and is running */
sealed trait NodeRunningState extends NodeState {
def peersWithServices: Set[PeerWithServices]
def peerDataMap: Map[PeerWithServices, PersistentPeerData]
def peersWithServices: Set[PeerWithServices] = peerDataMap.map(_._1).toSet
/** All peers the node is currently connected to */
def peers: Set[Peer] = peersWithServices.map(_.peer)
@ -17,23 +21,40 @@ sealed trait NodeRunningState extends NodeState {
def waitingForDisconnection: Set[Peer]
def isSyncing: Boolean
def peerFinder: PeerFinder
def getPeerConnection(peer: Peer): Option[PeerConnection] = {
peerDataMap.find(_._1.peer == peer).map(_._2.peerConnection) match {
case Some(peerConnection) => Some(peerConnection)
case None => None
}
}
def getPeerMsgSender(peer: Peer)(implicit
nodeAppConfig: NodeAppConfig): Option[PeerMessageSender] = {
val randomPeerOpt = getPeerConnection(peer)
randomPeerOpt.map(PeerMessageSender(_))
}
def replacePeers(
peerWithServices: Set[PeerWithServices]): NodeRunningState = {
peerDataMap: Map[
PeerWithServices,
PersistentPeerData]): NodeRunningState = {
this match {
case h: NodeState.HeaderSync =>
h.copy(peersWithServices = peerWithServices)
h.copy(peerDataMap = peerDataMap)
case fh: NodeState.FilterHeaderSync =>
fh.copy(peersWithServices = peerWithServices)
fh.copy(peerDataMap = peerDataMap)
case fs: NodeState.FilterSync =>
fs.copy(peersWithServices = peerWithServices)
fs.copy(peerDataMap = peerDataMap)
case d: NodeState.DoneSyncing =>
d.copy(peersWithServices = peerWithServices)
d.copy(peerDataMap = peerDataMap)
case rm: NodeState.RemovePeers =>
rm.copy(peersWithServices = peerWithServices)
rm.copy(peerDataMap = peerDataMap)
case m: NodeState.MisbehavingPeer =>
m.copy(peersWithServices = peerWithServices)
case s: NodeShuttingDown =>
s.copy(peersWithServices = peersWithServices)
m.copy(peerDataMap = peerDataMap)
case s: NodeState.NodeShuttingDown =>
s.copy(peerDataMap = peerDataMap)
}
}
@ -52,7 +73,7 @@ sealed trait NodeRunningState extends NodeState {
rm.copy(waitingForDisconnection = newWaitingForDisconnection)
case m: NodeState.MisbehavingPeer =>
m.copy(waitingForDisconnection = newWaitingForDisconnection)
case s: NodeShuttingDown =>
case s: NodeState.NodeShuttingDown =>
s.copy(waitingForDisconnection = newWaitingForDisconnection)
}
}
@ -75,6 +96,15 @@ sealed trait NodeRunningState extends NodeState {
}
peerOpt.map(_.peer)
}
def randomPeerMessageSender(
excludePeers: Set[Peer],
services: ServiceIdentifier)(implicit
nodeAppConfig: NodeAppConfig): Option[PeerMessageSender] = {
randomPeer(excludePeers, services).flatMap { p =>
getPeerMsgSender(p)
}
}
}
/** State to indicate that we are syncing the blockchain */
@ -99,21 +129,24 @@ object NodeState {
case class HeaderSync(
syncPeer: Peer,
peersWithServices: Set[PeerWithServices],
waitingForDisconnection: Set[Peer])
peerDataMap: Map[PeerWithServices, PersistentPeerData],
waitingForDisconnection: Set[Peer],
peerFinder: PeerFinder)
extends SyncNodeState
case class FilterHeaderSync(
syncPeer: Peer,
peersWithServices: Set[PeerWithServices],
waitingForDisconnection: Set[Peer])
peerDataMap: Map[PeerWithServices, PersistentPeerData],
waitingForDisconnection: Set[Peer],
peerFinder: PeerFinder)
extends SyncNodeState
case class FilterSync(
syncPeer: Peer,
peersWithServices: Set[PeerWithServices],
peerDataMap: Map[PeerWithServices, PersistentPeerData],
waitingForDisconnection: Set[Peer],
filterBatchCache: Set[CompactFilterMessage])
filterBatchCache: Set[CompactFilterMessage],
peerFinder: PeerFinder)
extends SyncNodeState {
override def toString: String = {
@ -123,8 +156,9 @@ object NodeState {
case class MisbehavingPeer(
badPeer: Peer,
peersWithServices: Set[PeerWithServices],
waitingForDisconnection: Set[Peer])
peerDataMap: Map[PeerWithServices, PersistentPeerData],
waitingForDisconnection: Set[Peer],
peerFinder: PeerFinder)
extends NodeRunningState {
if (peers.nonEmpty) {
//needed for the case where the last peer we are connected to is the bad peer
@ -138,9 +172,10 @@ object NodeState {
case class RemovePeers(
peersToRemove: Vector[Peer],
peersWithServices: Set[PeerWithServices],
peerDataMap: Map[PeerWithServices, PersistentPeerData],
waitingForDisconnection: Set[Peer],
isSyncing: Boolean)
isSyncing: Boolean,
peerFinder: PeerFinder)
extends NodeRunningState {
require(
peersToRemove.forall(rm => peers.exists(_ == rm)),
@ -149,16 +184,34 @@ object NodeState {
/** State to indicate we are not currently syncing with a peer */
case class DoneSyncing(
peersWithServices: Set[PeerWithServices],
waitingForDisconnection: Set[Peer])
peerDataMap: Map[PeerWithServices, PersistentPeerData],
waitingForDisconnection: Set[Peer],
peerFinder: PeerFinder)
extends NodeRunningState {
override val isSyncing: Boolean = false
/** selects a random peer and returns us a header sync state */
def toHeaderSync: HeaderSync = {
val syncPeerOpt =
randomPeer(Set.empty, ServiceIdentifier.NODE_COMPACT_FILTERS)
syncPeerOpt match {
case Some(p) => toHeaderSync(p)
case None =>
sys.error(
s"Could not find a peer to transition from DoneSyncing -> HeaderSync")
}
}
def toHeaderSync(syncPeer: Peer): HeaderSync = {
HeaderSync(syncPeer, peerDataMap, waitingForDisconnection, peerFinder)
}
}
/** means our node is in the process of shutting down */
case class NodeShuttingDown(
peersWithServices: Set[PeerWithServices],
waitingForDisconnection: Set[Peer])
peerDataMap: Map[PeerWithServices, PersistentPeerData],
waitingForDisconnection: Set[Peer],
peerFinder: PeerFinder)
extends NodeRunningState {
override val isSyncing: Boolean = false
}

View File

@ -37,7 +37,9 @@ object NodeStreamMessage {
extends NodeStreamMessage
case class StartSync(peerOpt: Option[Peer]) extends NodeStreamMessage
case class SendToPeer(msg: NetworkMessage, peerOpt: Option[Peer])
extends NodeStreamMessage
case class GossipMessage(msg: NetworkMessage, excludePeerOpt: Option[Peer])
extends NodeStreamMessage

View File

@ -13,7 +13,7 @@ import org.bitcoins.core.api.chain.db.{
CompactFilterDb,
CompactFilterHeaderDb
}
import org.bitcoins.core.api.node.NodeState._
import NodeState._
import org.bitcoins.core.api.node._
import org.bitcoins.core.p2p._
import org.bitcoins.core.util.{NetworkUtil, StartStopAsync}
@ -31,13 +31,11 @@ import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.mutable
import scala.concurrent.duration.DurationInt
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Random
case class PeerManager(
paramPeers: Vector[Peer],
walletCreationTimeOpt: Option[Instant],
queue: SourceQueue[NodeStreamMessage],
finder: PeerFinder)(implicit
queue: SourceQueue[NodeStreamMessage])(implicit
ec: ExecutionContext,
system: ActorSystem,
nodeAppConfig: NodeAppConfig,
@ -62,6 +60,10 @@ case class PeerManager(
peerDataMap.map(_._2.peerWithServicesOpt).flatten.toSet
}
def peerWithServicesDataMap: Map[PeerWithServices, PersistentPeerData] = {
peerDataMap.map(t => (t._2.peerWithServicesOpt.get, t._2))
}
/** Starts sync compact filter headers.
* Only starts syncing compact filters if our compact filter headers are in sync with block headers
*/
@ -115,24 +117,6 @@ case class PeerManager(
}
}
private def randomPeerConnection(
services: ServiceIdentifier): Option[PeerConnection] = {
val potentialPeers =
peerDataMap.filter(_._2.serviceIdentifier.hasServicesOf(services))
val peerOpt = {
if (potentialPeers.nonEmpty) {
val randIdx = Random.nextInt(potentialPeers.size)
Some(potentialPeers.keys.toVector(randIdx))
} else {
None
}
}
val peerConnectionOpt = peerOpt.flatMap(getPeerConnection(_))
peerConnectionOpt
}
private def getPeerMsgSender(peer: Peer): Option[PeerMessageSender] = {
val randomPeerOpt = getPeerConnection(peer)
randomPeerOpt.map(PeerMessageSender(_))
@ -232,7 +216,10 @@ case class PeerManager(
Future.successful(peerDataMap.exists(_._1 == peer))
}
private def onInitializationTimeout(peer: Peer): Future[Unit] = {
private def onInitializationTimeout(
peer: Peer,
state: NodeRunningState): Future[Unit] = {
val finder = state.peerFinder
require(!finder.hasPeer(peer) || !peerDataMap.contains(peer),
s"$peer cannot be both a test and a persistent peer")
@ -272,7 +259,6 @@ case class PeerManager(
//try to drop another non compact filter connection for this
if (hasCf && notCf.nonEmpty)
replacePeer(replacePeer = notCf.head, withPeer = peer)
.flatMap(_ => syncHelper(peer))
else {
peerData
.stop()
@ -294,6 +280,7 @@ case class PeerManager(
private def onInitialization(
peer: Peer,
state: NodeRunningState): Future[NodeState] = {
val finder = state.peerFinder
val stateF: Future[NodeRunningState] = {
require(!finder.hasPeer(peer) || !peerDataMap.contains(peer),
s"$peer cannot be both a test and a persistent peer")
@ -321,7 +308,17 @@ case class PeerManager(
} else if (peerDataMap.contains(peer)) {
//one of the persistent peers initialized again, this can happen in case of a reconnection attempt
//which succeeded which is all good, do nothing
syncHelper(peer).map(_ => state) //does this state need to be modified?
state match {
case s: SyncNodeState =>
val x = s.replaceSyncPeer(peer)
syncHelper(x).map(_ => x)
case d: DoneSyncing =>
val h = d.toHeaderSync(peer)
syncHelper(h).map(_ => h)
case x @ (_: RemovePeers | _: MisbehavingPeer |
_: NodeShuttingDown) =>
Future.successful(x)
}
} else {
logger.warn(s"onInitialization called for unknown $peer")
Future.successful(state)
@ -329,7 +326,7 @@ case class PeerManager(
}
stateF.map { s =>
s.replacePeers(peersWithServices)
s.replacePeers(peerWithServicesDataMap)
}
}
@ -344,6 +341,7 @@ case class PeerManager(
logger.info(
s"Disconnected peer=$peer peers=$peers state=$state forceReconnect=$forceReconnect peerDataMap=${peerDataMap
.map(_._1)}")
val finder = state.peerFinder
val stateF = {
require(!finder.hasPeer(peer) || !peerDataMap.contains(peer),
s"$peer cannot be both a test and a persistent peer")
@ -369,6 +367,10 @@ case class PeerManager(
if (state.peers.exists(_ != peer)) {
state match {
case s: SyncNodeState => switchSyncToRandomPeer(s, Some(peer))
case d: DoneSyncing =>
//defensively try to sync with the new peer
val hs = d.toHeaderSync(peer)
switchSyncToRandomPeer(hs, Some(peer))
case x @ (_: DoneSyncing | _: NodeShuttingDown |
_: MisbehavingPeer | _: RemovePeers) =>
Future.successful(x)
@ -415,13 +417,16 @@ case class PeerManager(
case Some(p) => s.replaceSyncPeer(p)
case None =>
//switch to state DoneSyncing since we have no peers to sync from
DoneSyncing(peersWithServices, state.waitingForDisconnection)
DoneSyncing(peerDataMap = peerWithServicesDataMap,
waitingForDisconnection =
state.waitingForDisconnection,
peerFinder = s.peerFinder)
}
} else {
s.replacePeers(peersWithServices)
s.replacePeers(peerWithServicesDataMap)
}
case runningState: NodeRunningState =>
runningState.replacePeers(peersWithServices)
runningState.replacePeers(peerWithServicesDataMap)
}
}
@ -464,7 +469,9 @@ case class PeerManager(
state match {
case h: HeaderSync =>
syncFromNewPeer(h)
case x @ (_: MisbehavingPeer | _: DoneSyncing) =>
case d: DoneSyncing =>
syncFromNewPeer(d)
case x: MisbehavingPeer =>
syncFromNewPeer(x)
case _: FilterHeaderSync | _: FilterSync | _: RemovePeers |
@ -495,10 +502,6 @@ case class PeerManager(
}
}
private lazy val controlMessageHandler: ControlMessageHandler = {
finder.controlMessageHandler
}
def buildP2PMessageHandlerSink(
initState: NodeState): Sink[NodeStreamMessage, Future[NodeState]] = {
Sink.foldAsync(initState) {
@ -522,7 +525,10 @@ case class PeerManager(
Future.successful(Some(s))
case d: DoneSyncing =>
val h =
HeaderSync(p, d.peersWithServices, d.waitingForDisconnection)
HeaderSync(p,
d.peerDataMap,
d.waitingForDisconnection,
d.peerFinder)
syncFromNewPeer(h)
}
case None =>
@ -535,9 +541,11 @@ case class PeerManager(
d.randomPeer(Set.empty,
ServiceIdentifier.NODE_COMPACT_FILTERS) match {
case Some(p) =>
val h = HeaderSync(p,
d.peersWithServices,
d.waitingForDisconnection)
val h =
HeaderSync(p,
d.peerDataMap,
d.waitingForDisconnection,
d.peerFinder)
syncFromNewPeer(h)
case None =>
Future.successful(None)
@ -559,25 +567,41 @@ case class PeerManager(
Future.successful(s)
case runningState: NodeRunningState =>
val peer = c.peer
val curPeerData = finder.popFromCache(peer).get
val curPeerData = runningState.peerFinder.popFromCache(peer).get
_peerDataMap.put(peer, curPeerData)
val hasCf =
if (curPeerData.serviceIdentifier.nodeCompactFilters)
"with filters"
else ""
val newPeersWithSvcs =
runningState.peersWithServices + curPeerData.peerWithServicesOpt.get
val newState = runningState.replacePeers(newPeersWithSvcs)
val peerWithSvcs = curPeerData.peerWithServicesOpt.get
val newPdm =
runningState.peerDataMap.+((peerWithSvcs, curPeerData))
val replacePeers = runningState.replacePeers(newPdm)
logger.info(
s"Connected to peer $peer $hasCf. Connected peer count $connectedPeerCount")
syncHelper(c.peer).map(_ => newState)
s"Connected to peer $peer $hasCf. Connected peer count ${replacePeers.peerDataMap.size}")
replacePeers match {
case s: SyncNodeState =>
syncHelper(s).map(_ => s)
case d: DoneSyncing =>
val x = d.toHeaderSync(c.peer)
syncHelper(x).map(_ => x)
case x @ (_: MisbehavingPeer | _: RemovePeers |
_: NodeShuttingDown) =>
Future.successful(x)
}
}
case (state, i: InitializeDisconnect) =>
state match {
case r: NodeRunningState =>
val client: PeerData = peerDataMap(i.peer)
_peerDataMap.remove(i.peer)
val client: PeerData =
r.peerDataMap.find(_._1.peer == i.peer) match {
case Some((_, p)) => p
case None =>
sys.error(
s"Cannot find peer=${i.peer} for InitializeDisconnect=$i")
}
//so we need to remove if from the map for connected peers so no more request could be sent to it but we before
//the actor is stopped we don't delete it to ensure that no such case where peers is deleted but actor not stopped
//leading to a memory leak may happen
@ -585,7 +609,10 @@ case class PeerManager(
//now send request to stop actor which will be completed some time in future
client.stop().map { _ =>
val newWaiting = r.waitingForDisconnection.+(i.peer)
val newState = r.replaceWaitingForDisconnection(newWaiting)
val newPdm = r.peerDataMap.filterNot(_._1.peer == i.peer)
val newState = r
.replaceWaitingForDisconnection(newWaiting)
.replacePeers(newPdm)
newState
}
}
@ -594,7 +621,9 @@ case class PeerManager(
logger.debug(s"Got ${payload.commandName} from peer=${peer} in stream")
state match {
case runningState: NodeRunningState =>
val peerDataOpt = peerDataMap.get(peer)
val peerDataOpt = runningState.peerDataMap
.find(_._1.peer == peer)
.map(_._2)
peerDataOpt match {
case None =>
logger.warn(
@ -631,7 +660,7 @@ case class PeerManager(
}
resultF.map { r =>
logger.debug(
s"Done processing ${payload.commandName} in peer=${peer}")
s"Done processing ${payload.commandName} in peer=${peer} state=${r.state}")
r.state
}
}
@ -641,17 +670,17 @@ case class PeerManager(
state match {
case runningState: NodeRunningState =>
val peerMsgSenderApiOpt: Option[PeerMessageSenderApi] =
getPeerMsgSender(peer) match {
runningState.getPeerMsgSender(peer) match {
case Some(p) => Some(p)
case None =>
finder.getPeerData(peer) match {
runningState.peerFinder.getPeerData(peer) match {
case Some(p) => Some(p.peerMessageSender)
case None => None
}
}
peerMsgSenderApiOpt match {
case Some(peerMsgSenderApi) =>
val resultOptF = controlMessageHandler
val resultOptF = runningState.peerFinder.controlMessageHandler
.handleControlPayload(payload,
peerMsgSenderApi = peerMsgSenderApi)
resultOptF.flatMap {
@ -677,8 +706,9 @@ case class PeerManager(
case Some(s) => s
case None =>
//we don't have a state to represent no connected peers atm, so switch to DoneSyncing?
DoneSyncing(peersWithServices = Set.empty,
runningState.waitingForDisconnection)
DoneSyncing(peerDataMap = Map.empty,
runningState.waitingForDisconnection,
runningState.peerFinder)
}
}
} yield {
@ -694,7 +724,11 @@ case class PeerManager(
}
case (state, i: InitializationTimeout) =>
onInitializationTimeout(i.peer).map(_ => state)
state match {
case r: NodeRunningState =>
onInitializationTimeout(i.peer, r).map(_ => r)
}
case (state, q: QueryTimeout) =>
onQueryTimeout(q.payload, q.peer, state).map(_ => state)
case (state, srt: SendResponseTimeout) =>
@ -716,13 +750,13 @@ case class PeerManager(
} else {
Future
.traverse(gossipPeers) { p =>
getPeerConnection(p) match {
runningState.getPeerConnection(p) match {
case Some(pc) =>
val sender = PeerMessageSender(pc)
sender.sendMsg(msg)
case None =>
logger.warn(
s"Attempting to gossip to peer that is availble in state.peers, but not peerDataMap? state=$state peerDataMap=${peerDataMap
s"Attempting to gossip to peer that is available in state.peers, but not peerDataMap? state=$state peerDataMap=${peerDataMap
.map(_._1)}")
Future.unit
}
@ -730,6 +764,16 @@ case class PeerManager(
.map(_ => state)
}
}
case (state, stp: SendToPeer) =>
state match {
case _: NodeShuttingDown =>
logger.warn(
s"Cannot send to peer when we are shutting down! stp=$stp state=$state")
Future.successful(state)
case r: NodeRunningState =>
sendToPeerHelper(r, stp)
}
case (state, NodeShutdown) =>
state match {
case s: NodeShuttingDown =>
@ -738,7 +782,10 @@ case class PeerManager(
Future.successful(s)
case r: NodeRunningState =>
val shutdownState =
NodeShuttingDown(r.peersWithServices, r.waitingForDisconnection)
NodeShuttingDown(peerDataMap = r.peerDataMap,
waitingForDisconnection =
r.waitingForDisconnection,
peerFinder = r.peerFinder)
Future
.traverse(r.peers)(disconnectPeer(_))
.map(_ => shutdownState)
@ -763,6 +810,29 @@ case class PeerManager(
}
}
private def sendToPeerHelper(
state: NodeRunningState,
stp: SendToPeer): Future[NodeRunningState] = {
val peerMsgSenderOpt = stp.peerOpt match {
case Some(p) =>
state.getPeerMsgSender(p)
case None =>
state.randomPeerMessageSender(Set.empty,
ServiceIdentifier.NODE_COMPACT_FILTERS)
}
peerMsgSenderOpt match {
case Some(pms) =>
pms
.sendMsg(stp.msg.payload)
.map(_ => state)
case None =>
logger.warn(
s"Unable to find peer to send message=${stp.msg.payload} to, state=$state")
Future.successful(state)
}
}
private def switchSyncToPeer(
oldSyncState: SyncNodeState,
newPeer: Peer): Future[NodeState] = {
@ -772,7 +842,7 @@ case class PeerManager(
oldSyncState match {
case s: HeaderSync =>
if (s.syncPeer != newPeer) {
syncHelper(newPeer).map(_ => newState)
syncHelper(newState).map(_ => newState)
} else {
//if its same peer we don't need to switch
Future.successful(oldSyncState)
@ -780,7 +850,7 @@ case class PeerManager(
case s @ (_: FilterHeaderSync | _: FilterSync) =>
if (s.syncPeer != newPeer) {
filterSyncHelper(chainApi = ChainHandler.fromDatabase(),
syncPeer = newPeer).map(_ => newState)
syncNodeState = newState).map(_ => newState)
} else {
//if its same peer we don't need to switch
Future.successful(oldSyncState)
@ -790,7 +860,8 @@ case class PeerManager(
}
/** If [[syncPeerOpt]] is given, we send getheaders to only that peer, if no sync peer given we gossip getheaders to all our peers */
private def getHeaderSyncHelper(syncPeerOpt: Option[Peer]): Future[Unit] = {
private def getHeaderSyncHelper(
syncNodeState: SyncNodeState): Future[Unit] = {
val blockchainsF =
BlockHeaderDAO()(ec, chainAppConfig).getBlockchains()
@ -799,11 +870,11 @@ case class PeerManager(
// Get all of our cached headers in case of a reorg
cachedHeaders = blockchains.flatMap(_.headers).map(_.hashBE)
_ <- {
syncPeerOpt match {
case Some(peer) =>
val peerMsgSender = getPeerMsgSender(peer).get //check this .get
syncNodeState.getPeerMsgSender(syncNodeState.syncPeer) match {
case Some(peerMsgSender) =>
peerMsgSender.sendGetHeadersMessage(cachedHeaders)
case None => gossipGetHeadersMessage(cachedHeaders)
case None =>
gossipGetHeadersMessage(cachedHeaders)
}
}
} yield ()
@ -811,7 +882,7 @@ case class PeerManager(
private def filterSyncHelper(
chainApi: ChainApi,
syncPeer: Peer): Future[Unit] = {
syncNodeState: SyncNodeState): Future[Unit] = {
for {
header <- chainApi.getBestBlockHeader()
bestFilterHeaderOpt <- chainApi.getBestFilterHeader()
@ -824,9 +895,10 @@ case class PeerManager(
//after we are done syncing block headers
Future.unit
} else {
val fhs = FilterHeaderSync(syncPeer = syncPeer,
peersWithServices = peersWithServices,
waitingForDisconnection = Set.empty)
val fhs = FilterHeaderSync(syncPeer = syncNodeState.syncPeer,
peerDataMap = peerWithServicesDataMap,
waitingForDisconnection = Set.empty,
syncNodeState.peerFinder)
syncFilters(
bestFilterHeaderOpt = bestFilterHeaderOpt,
bestFilterOpt = bestFilterOpt,
@ -852,7 +924,8 @@ case class PeerManager(
*
* @param syncPeerOpt if syncPeer is given, we send [[org.bitcoins.core.p2p.GetHeadersMessage]] to that peer. If None we gossip GetHeadersMessage to all peers
*/
private def syncHelper(syncPeer: Peer): Future[Unit] = {
private def syncHelper(syncNodeState: SyncNodeState): Future[Unit] = {
val syncPeer = syncNodeState.syncPeer
logger.debug(
s"syncHelper() syncPeer=$syncPeer isStarted.get=${isStarted.get} syncFilterCancellableOpt.isDefined=${syncFilterCancellableOpt.isDefined}")
val chainApi: ChainApi = ChainHandler.fromDatabase()
@ -861,7 +934,7 @@ case class PeerManager(
val filterCountF = chainApi.getFilterCount()
for {
_ <- chainApi.setSyncing(true)
_ <- getHeaderSyncHelper(Some(syncPeer))
_ <- getHeaderSyncHelper(syncNodeState)
_ = {
if (isStarted.get) {
//in certain cases, we can schedule this job while the peer manager is attempting to shutdown
@ -877,7 +950,7 @@ case class PeerManager(
case s: Some[Cancellable] =>
s //do nothing as we already have a job scheduled
case None =>
val c = createFilterSyncJob(chainApi, syncPeer)
val c = createFilterSyncJob(chainApi, syncNodeState)
Some(c)
}
}
@ -893,7 +966,7 @@ case class PeerManager(
private def createFilterSyncJob(
chainApi: ChainApi,
syncPeer: Peer): Cancellable = {
syncNodeState: SyncNodeState): Cancellable = {
require(
syncFilterCancellableOpt.isEmpty,
s"Cannot schedule a syncFilterCancellable as one is already scheduled")
@ -924,7 +997,7 @@ case class PeerManager(
if (isOutOfSync) {
//if it hasn't started it, start it
filterSyncHelper(chainApi, syncPeer)
filterSyncHelper(chainApi, syncNodeState)
} else {
Future.unit
}
@ -1019,7 +1092,8 @@ case class PeerManager(
}
}
private def syncFromNewPeer(state: NodeState): Future[Option[NodeState]] = {
private def syncFromNewPeer(
state: NodeRunningState): Future[Option[NodeRunningState]] = {
val svcIdentifier = ServiceIdentifier.NODE_COMPACT_FILTERS
val syncPeerOpt = state match {
case s: SyncNodeState =>
@ -1032,28 +1106,28 @@ case class PeerManager(
d.randomPeer(Set.empty, svcIdentifier)
case _: NodeShuttingDown => None
}
for {
val newStateOptF: Future[Option[NodeRunningState]] = for {
newStateOpt <- syncPeerOpt match {
case Some(syncPeer) =>
syncHelper(syncPeer).map { _ =>
val newState = state match {
case s: SyncNodeState =>
s.replaceSyncPeer(syncPeer)
case d: DoneSyncing =>
HeaderSync(syncPeer,
d.peersWithServices,
d.waitingForDisconnection)
case x @ (_: MisbehavingPeer | _: RemovePeers |
_: NodeShuttingDown) =>
x
}
Some(newState)
state match {
case sns: SyncNodeState =>
val newState = sns.replaceSyncPeer(syncPeer)
syncHelper(newState).map(_ => Some(newState))
case d: DoneSyncing =>
val hs = HeaderSync(syncPeer,
d.peerDataMap,
d.waitingForDisconnection,
d.peerFinder)
syncHelper(hs).map(_ => Some(hs))
case x @ (_: MisbehavingPeer | _: RemovePeers |
_: NodeShuttingDown) =>
Future.successful(Some(x))
}
case None => Future.successful(None)
}
} yield {
newStateOpt
}
} yield newStateOpt
newStateOptF
}
/** Gossips the given message to all peers except the excluded peer. If None given as excluded peer, gossip message to all peers */
@ -1073,16 +1147,11 @@ case class PeerManager(
}
override def sendToRandomPeer(payload: NetworkPayload): Future[Unit] = {
val randomPeerOpt = randomPeerConnection(
ServiceIdentifier.NODE_COMPACT_FILTERS)
randomPeerOpt match {
case Some(p) =>
val peerMsgSender = PeerMessageSender(p)
peerMsgSender.sendMsg(payload)
case None =>
logger.warn(s"Cannot find peer to send message=${payload.commandName}")
Future.unit
}
val msg = NetworkMessage(nodeAppConfig.network, payload)
val stp = SendToPeer(msg, None)
queue
.offer(stp)
.map(_ => ())
}
}
@ -1135,9 +1204,10 @@ object PeerManager extends Logging {
.map(_ =>
Some(
FilterHeaderSync(syncPeer = state.syncPeer,
peersWithServices = state.peersWithServices,
peerDataMap = state.peerDataMap,
waitingForDisconnection =
state.waitingForDisconnection)))
state.waitingForDisconnection,
state.peerFinder)))
case None =>
logger.info(
s"Filter headers are synced! filterHeader.blockHashBE=$blockHash")

View File

@ -5,23 +5,17 @@ import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.chain.models.BlockHeaderDAO
import org.bitcoins.core.api.chain.ChainApi
import org.bitcoins.core.api.chain.db.CompactFilterHeaderDb
import org.bitcoins.core.api.node.{
NodeRunningState,
NodeState,
NodeType,
Peer,
SyncNodeState
}
import org.bitcoins.core.api.node.{NodeType, Peer}
import org.bitcoins.core.gcs.{BlockFilter, GolombFilter}
import org.bitcoins.core.p2p._
import org.bitcoins.core.protocol.CompactSizeUInt
import org.bitcoins.core.protocol.blockchain.BlockHeader
import org.bitcoins.crypto.{DoubleSha256Digest, DoubleSha256DigestBE}
import org.bitcoins.node.NodeState._
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models._
import org.bitcoins.core.api.node.NodeState._
import org.bitcoins.node.util.PeerMessageSenderApi
import org.bitcoins.node.{P2PLogger, PeerManager, PersistentPeerData}
import org.bitcoins.node._
import java.time.Instant
import scala.concurrent.{ExecutionContext, Future}
@ -97,7 +91,7 @@ case class DataMessageHandler(
case m: MisbehavingPeer =>
val badPeer = m.badPeer
val peersWithSvcs = m.peersWithServices
val pdm = m.peerDataMap
if (badPeer == peerData.peer) {
Future.failed(
new RuntimeException(
@ -105,17 +99,17 @@ case class DataMessageHandler(
} else {
//re-review this, we should probably pattern match on old state so we can continue syncing
//from where we left off?
val d = DoneSyncing(peersWithSvcs, m.waitingForDisconnection)
val d = DoneSyncing(pdm, m.waitingForDisconnection, m.peerFinder)
copy(state = d).handleDataPayload(payload, peerData)
}
case r: RemovePeers =>
val badPeers = r.peersToRemove
val peersWithSvcs = r.peersWithServices
val pdm = r.peerDataMap
if (badPeers.exists(_ == peerData.peer)) {
Future.failed(new RuntimeException(
s"Cannot continue processing p2p messages from peer we were suppose to remove, peer=${peerData.peer}"))
} else {
val d = DoneSyncing(peersWithSvcs, r.waitingForDisconnection)
val d = DoneSyncing(pdm, r.waitingForDisconnection, r.peerFinder)
copy(state = d).handleDataPayload(payload, peerData)
}
@ -153,8 +147,9 @@ case class DataMessageHandler(
state match {
case s @ (_: HeaderSync | _: DoneSyncing) =>
val filterHeaderSync = FilterHeaderSync(peer,
s.peersWithServices,
s.waitingForDisconnection)
s.peerDataMap,
s.waitingForDisconnection,
s.peerFinder)
handleFilterHeadersMessage(filterHeaderSync,
filterHeader,
chainApi,
@ -184,9 +179,10 @@ case class DataMessageHandler(
.map(s => copy(state = s))
case s @ (_: DoneSyncing | _: FilterHeaderSync) =>
val f = FilterSync(peer,
s.peersWithServices,
s.peerDataMap,
s.waitingForDisconnection,
Set.empty)
Set.empty,
s.peerFinder)
handleFilterMessage(f, filter)
.map(s => copy(state = s))
case x @ (_: HeaderSync | _: NodeShuttingDown) =>
@ -248,11 +244,20 @@ case class DataMessageHandler(
case d: DoneSyncing =>
val s = if (count.toInt != 0) {
//why do we sometimes get empty HeadersMessage?
HeaderSync(peer, d.peersWithServices, d.waitingForDisconnection)
} else DoneSyncing(d.peersWithServices, d.waitingForDisconnection)
d.toHeaderSync(peer)
} else {
DoneSyncing(d.peerDataMap,
d.waitingForDisconnection,
d.peerFinder)
}
Some(s)
case headerSync: HeaderSync =>
if (headerSync.syncPeer == peer) {
if (count.toInt == 0) {
val d = DoneSyncing(headerSync.peerDataMap,
headerSync.waitingForDisconnection,
headerSync.peerFinder)
Some(d)
} else if (headerSync.syncPeer == peer) {
Some(headerSync)
} else {
//means we received a headers message from a peer we aren't syncing with, so ignore for now
@ -269,7 +274,6 @@ case class DataMessageHandler(
case x @ (_: MisbehavingPeer | _: RemovePeers) =>
sys.error(s"Invalid state to receive headers in, got=$x")
}
newStateOpt match {
case Some(h: HeaderSync) =>
handleHeadersMessage(h, headers, peerData)
@ -353,8 +357,9 @@ case class DataMessageHandler(
logger.info(
s"Starting to fetch filter headers in data message handler")
val fhs = FilterHeaderSync(syncNodeState.syncPeer,
syncNodeState.peersWithServices,
syncNodeState.waitingForDisconnection)
syncNodeState.peerDataMap,
syncNodeState.waitingForDisconnection,
syncNodeState.peerFinder)
for {
syncingFilterHeadersState <- PeerManager
@ -365,8 +370,9 @@ case class DataMessageHandler(
stopBlockHeaderDb = bestBlockHeaderDb)
} yield {
syncingFilterHeadersState.getOrElse(
DoneSyncing(syncNodeState.peersWithServices,
syncNodeState.waitingForDisconnection))
DoneSyncing(syncNodeState.peerDataMap,
syncNodeState.waitingForDisconnection,
syncNodeState.peerFinder))
}
} else {
@ -380,8 +386,9 @@ case class DataMessageHandler(
//was ongoing, see: https://github.com/bitcoin-s/bitcoin-s/issues/5036
for {
bestBlockHash <- chainApi.getBestBlockHash()
d = DoneSyncing(syncNodeState.peersWithServices,
syncNodeState.waitingForDisconnection)
d = DoneSyncing(syncNodeState.peerDataMap,
syncNodeState.waitingForDisconnection,
syncNodeState.peerFinder)
newState <- {
peerManager
.gossipGetHeadersMessage(Vector(bestBlockHash))
@ -409,9 +416,10 @@ case class DataMessageHandler(
s"$peer exceeded max limit of invalid messages. Disconnecting. peers=${state.peers}")
val m = MisbehavingPeer(badPeer = peer,
peersWithServices = state.peersWithServices,
peerDataMap = state.peerDataMap,
waitingForDisconnection =
state.waitingForDisconnection)
state.waitingForDisconnection,
state.peerFinder)
Future.successful(m)
} else {
@ -427,8 +435,9 @@ case class DataMessageHandler(
peerManager.gossipGetHeadersMessage(cachedHeaders)
//switch to DoneSyncing state until we receive a valid header from our peers
val d =
DoneSyncing(state.peersWithServices,
state.waitingForDisconnection)
DoneSyncing(state.peerDataMap,
state.waitingForDisconnection,
state.peerFinder)
queryF.map(_ => d)
}
} yield newState
@ -477,9 +486,11 @@ case class DataMessageHandler(
if (isSyncing) {
val newState = NodeState.FilterSync(
syncPeer = fs.syncPeer,
peersWithServices = fs.peersWithServices,
peerDataMap = fs.peerDataMap,
waitingForDisconnection = fs.waitingForDisconnection,
filterBatchCache = fs.filterBatchCache)
filterBatchCache = fs.filterBatchCache,
peerFinder = fs.peerFinder
)
Some(newState)
} else None
}
@ -495,9 +506,10 @@ case class DataMessageHandler(
val fs = syncNodeState match {
case x @ (_: HeaderSync | _: FilterHeaderSync) =>
FilterSync(x.syncPeer,
x.peersWithServices,
x.peerDataMap,
x.waitingForDisconnection,
Set.empty)
Set.empty,
x.peerFinder)
case fs: FilterSync => fs
}
@ -683,8 +695,10 @@ case class DataMessageHandler(
case Some(s) => s
case None =>
//is this right? If we don't send cfheaders to our peers, are we done syncing?
DoneSyncing(state.peersWithServices,
state.waitingForDisconnection)
DoneSyncing(peerDataMap = state.peerDataMap,
waitingForDisconnection =
state.waitingForDisconnection,
peerFinder = state.peerFinder)
}
}
} else {
@ -720,8 +734,9 @@ case class DataMessageHandler(
val recoveredStateF: Future[NodeRunningState] = getHeadersF.recoverWith {
case _: DuplicateHeaders =>
logger.warn(s"Received duplicate headers from ${peer} in state=$state")
val d = DoneSyncing(headerSyncState.peersWithServices,
headerSyncState.waitingForDisconnection)
val d = DoneSyncing(headerSyncState.peerDataMap,
headerSyncState.waitingForDisconnection,
headerSyncState.peerFinder)
Future.successful(d)
case _: InvalidBlockHeader =>
logger.warn(
@ -786,8 +801,9 @@ case class DataMessageHandler(
case Some(filterSyncState) => filterSyncState
case None =>
val d =
DoneSyncing(filterHeaderSync.peersWithServices,
filterHeaderSync.waitingForDisconnection)
DoneSyncing(filterHeaderSync.peerDataMap,
filterHeaderSync.waitingForDisconnection,
filterHeaderSync.peerFinder)
d
}
}
@ -849,8 +865,9 @@ case class DataMessageHandler(
case Some(filterSyncState) =>
filterSyncState.copy(filterBatchCache = newBatch)
case None =>
val d = DoneSyncing(filterSyncState.peersWithServices,
filterSyncState.waitingForDisconnection)
val d = DoneSyncing(filterSyncState.peerDataMap,
filterSyncState.waitingForDisconnection,
filterSyncState.peerFinder)
d
}
Future.successful(res)