Change PeerManager to keep NodeState in Sink.foldAsync() rather than DataMessageHandler (#5255)

This commit is contained in:
Chris Stewart 2023-10-05 16:11:36 -05:00 committed by GitHub
parent 522821869d
commit 2863b3f5cc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -354,11 +354,13 @@ case class PeerManager(
override def start(): Future[PeerManager] = {
logger.debug(s"Starting PeerManager")
val (queue, source) = dataMessageStreamSource.preMaterialize()
val initDmh = buildStatelessDataMessagehandler(queue)
val graph = buildDataMessageStreamGraph(initDmh = initDmh, source = source)
val initState =
DoneSyncing(peers = peers, waitingForDisconnection = Set.empty)
val graph =
buildDataMessageStreamGraph(initState = initState, source = source)
dataMessageQueueOpt = Some(queue)
val dmhF = graph.run()
streamDoneFOpt = Some(dmhF)
val stateF = graph.run()
streamDoneFOpt = Some(stateF)
val finder = PeerFinder(
paramPeers = paramPeers,
controlMessageHandler = ControlMessageHandler(this),
@ -705,15 +707,14 @@ case class PeerManager(
private def onHeaderRequestTimeout(
peer: Peer,
dmh: DataMessageHandler): Future[DataMessageHandler] = {
val state = dmh.state
state: NodeState): Future[NodeState] = {
logger.info(s"Header request timed out from $peer in state $state")
state match {
case _: HeaderSync | _: MisbehavingPeer | _: DoneSyncing =>
syncFromNewPeer().map(_ => dmh)
syncFromNewPeer().map(_ => state)
case _: FilterHeaderSync | _: FilterSync | _: RemovePeers =>
Future.successful(dmh)
Future.successful(state)
}
}
@ -738,17 +739,6 @@ case class PeerManager(
}
}
private def buildStatelessDataMessagehandler(
queue: SourceQueueWithComplete[NodeStreamMessage]): DataMessageHandler = {
DataMessageHandler(
chainApi = ChainHandler.fromDatabase(),
walletCreationTimeOpt = walletCreationTimeOpt,
queue = queue,
peerMessageSenderApi = this,
state = DoneSyncing(peers = peers, waitingForDisconnection = Set.empty)
)
}
private val dataMessageStreamSource: Source[
NodeStreamMessage,
SourceQueueWithComplete[NodeStreamMessage]] = {
@ -759,13 +749,12 @@ case class PeerManager(
maxConcurrentOffers = Runtime.getRuntime.availableProcessors())
}
private def buildDataMessageStreamSink(initDmh: DataMessageHandler): Sink[
NodeStreamMessage,
Future[DataMessageHandler]] = {
Sink.foldAsync(initDmh) {
case (dmh, s: StartSync) =>
syncHelper(s.peerOpt).map(_ => dmh)
case (dmh, i: InitializeDisconnect) =>
private def buildP2PMessageHandlerSink(
initState: NodeState): Sink[NodeStreamMessage, Future[NodeState]] = {
Sink.foldAsync(initState) {
case (state, s: StartSync) =>
syncHelper(s.peerOpt).map(_ => state)
case (state, i: InitializeDisconnect) =>
val client: PeerData = peerDataMap(i.peer)
_peerDataMap.remove(i.peer)
//so we need to remove if from the map for connected peers so no more request could be sent to it but we before
@ -774,19 +763,25 @@ case class PeerManager(
//now send request to stop actor which will be completed some time in future
client.stop().map { _ =>
val newWaiting = dmh.state.waitingForDisconnection.+(i.peer)
val newState = dmh.state.replaceWaitingForDisconnection(newWaiting)
dmh.copy(state = newState)
val newWaiting = state.waitingForDisconnection.+(i.peer)
val newState = state.replaceWaitingForDisconnection(newWaiting)
newState
}
case (dmh, DataMessageWrapper(payload, peer)) =>
case (state, DataMessageWrapper(payload, peer)) =>
logger.debug(s"Got ${payload.commandName} from peer=${peer} in stream")
val peerDataOpt = peerDataMap.get(peer)
peerDataOpt match {
case None =>
logger.warn(
s"Ignoring received msg=${payload.commandName} from peer=$peer because it was disconnected, peers=$peers state=${dmh.state}")
Future.successful(dmh)
s"Ignoring received msg=${payload.commandName} from peer=$peer because it was disconnected, peers=$peers state=${state}")
Future.successful(state)
case Some(peerData) =>
val dmh = DataMessageHandler(chainApi = ChainHandler.fromDatabase(),
walletCreationTimeOpt =
walletCreationTimeOpt,
queue = dataMessageQueueOpt.get,
peerMessageSenderApi = this,
state = state)
val resultF = dmh
.handleDataPayload(payload, peerData)
.flatMap { newDmh =>
@ -808,35 +803,33 @@ case class PeerManager(
resultF.map { r =>
logger.debug(
s"Done processing ${payload.commandName} in peer=${peer}")
r
r.state
}
}
case (dmh, ControlMessageWrapper(payload, peer)) =>
case (state, ControlMessageWrapper(payload, peer)) =>
val controlMessageHandler = ControlMessageHandler(this)
controlMessageHandler.handleControlPayload(payload, peer).flatMap {
case Some(i) =>
onInitialization(i.peer, dmh.state).map(newState =>
dmh.copy(state = newState))
onInitialization(i.peer, state)
case None =>
Future.successful(dmh)
Future.successful(state)
}
case (dmh, HeaderTimeoutWrapper(peer)) =>
case (state, HeaderTimeoutWrapper(peer)) =>
logger.debug(s"Processing timeout header for $peer")
for {
newDmh <- {
onHeaderRequestTimeout(peer, dmh).map { newDmh =>
onHeaderRequestTimeout(peer, state).map { s =>
logger.debug(s"Done processing timeout header for $peer")
newDmh
s
}
}
} yield newDmh
case (dmh, DisconnectedPeer(peer, forceReconnect)) =>
onDisconnect(peer, forceReconnect, dmh.state)
.map(newState => dmh.copy(state = newState))
case (dmh, i: InitializationTimeout) =>
onInitializationTimeout(i.peer).map(_ => dmh)
case (dmh, q: QueryTimeout) =>
onQueryTimeout(q.payload, q.peer, dmh.state).map(_ => dmh)
case (state, DisconnectedPeer(peer, forceReconnect)) =>
onDisconnect(peer, forceReconnect, state)
case (state, i: InitializationTimeout) =>
onInitializationTimeout(i.peer).map(_ => state)
case (state, q: QueryTimeout) =>
onQueryTimeout(q.payload, q.peer, state).map(_ => state)
case (dmh, srt: SendResponseTimeout) =>
sendResponseTimeout(srt.peer, srt.payload).map(_ => dmh)
}
@ -848,11 +841,11 @@ case class PeerManager(
}
private def buildDataMessageStreamGraph(
initDmh: DataMessageHandler,
initState: NodeState,
source: Source[NodeStreamMessage, NotUsed]): RunnableGraph[
Future[DataMessageHandler]] = {
Future[NodeState]] = {
val graph = source
.toMat(buildDataMessageStreamSink(initDmh))(Keep.right)
.toMat(buildP2PMessageHandlerSink(initState))(Keep.right)
.withAttributes(ActorAttributes.supervisionStrategy(decider))
graph
}
@ -860,7 +853,7 @@ case class PeerManager(
private[bitcoins] var dataMessageQueueOpt: Option[
SourceQueueWithComplete[NodeStreamMessage]] = None
private var streamDoneFOpt: Option[Future[DataMessageHandler]] = None
private var streamDoneFOpt: Option[Future[NodeState]] = None
override def offer(elem: NodeStreamMessage): Future[QueueOfferResult] = {
dataMessageQueueOpt match {