diff --git a/node/src/main/scala/org/bitcoins/node/NodeState.scala b/node/src/main/scala/org/bitcoins/node/NodeState.scala index 0cb67b4bf3..50935e0e85 100644 --- a/node/src/main/scala/org/bitcoins/node/NodeState.scala +++ b/node/src/main/scala/org/bitcoins/node/NodeState.scala @@ -31,6 +31,10 @@ sealed trait NodeRunningState extends NodeState { def connectedPeerCount: Int = peers.size + def getPeerData(peer: Peer): Option[PersistentPeerData] = { + peerDataMap.find(_._1.peer == peer).map(_._2) + } + def getPeerConnection(peer: Peer): Option[PeerConnection] = { peerDataMap.find(_._1.peer == peer).map(_._2.peerConnection) match { case Some(peerConnection) => Some(peerConnection) diff --git a/node/src/main/scala/org/bitcoins/node/PeerManager.scala b/node/src/main/scala/org/bitcoins/node/PeerManager.scala index 8a4eef10bd..b29bbf2f2d 100644 --- a/node/src/main/scala/org/bitcoins/node/PeerManager.scala +++ b/node/src/main/scala/org/bitcoins/node/PeerManager.scala @@ -128,8 +128,6 @@ case class PeerManager( private def replacePeer(replacePeer: Peer, withPeer: Peer): Future[Unit] = { logger.debug(s"Replacing $replacePeer with $withPeer") - require(!peerDataMap(replacePeer).serviceIdentifier.nodeCompactFilters, - s"$replacePeer has cf") for { _ <- disconnectPeer(replacePeer) _ <- connectPeer(withPeer) @@ -201,16 +199,18 @@ case class PeerManager( state: NodeRunningState): Future[Unit] = { logger.debug(s"onInitializationTimeout() peer=$peer state=$state") val finder = state.peerFinder - require(!finder.hasPeer(peer) || !peerDataMap.contains(peer), + require(!finder.hasPeer(peer) || !state.getPeerData(peer).isDefined, s"$peer cannot be both a test and a persistent peer") if (finder.hasPeer(peer)) { //one of the peers that we tried, failed to init within time, disconnect finder.getPeerData(peer).get.stop().map(_ => ()) - } else if (peerDataMap.contains(peer)) { + } else if (state.getPeerData(peer).isDefined) { //this is one of our persistent peers which must have been initialized earlier, this can happen in case of //a reconnection attempt, meaning it got connected but failed to initialize, disconnect - peerDataMap(peer) + state + .getPeerData(peer) + .get .stop() .map(_ => ()) } else { @@ -276,14 +276,14 @@ case class PeerManager( _ <- createInDb(peer, peerData.serviceIdentifier) newState <- managePeerAfterInitialization(state, peer) } yield { - require(!finder.hasPeer(peer) || !peerDataMap.contains(peer), + require(!finder.hasPeer(peer) || !state.getPeerData(peer).isDefined, s"$peer cannot be both a test and a persistent peer") logger.debug( s"Initialized peer $peer with compactFilter support=$hasCf") newState } - } else if (peerDataMap.contains(peer)) { + } else if (state.peers.contains(peer)) { //one of the persistent peers initialized again, this can happen in case of a reconnection attempt //which succeeded which is all good, do nothing state match { @@ -315,19 +315,18 @@ case class PeerManager( forceReconnect: Boolean, state: NodeRunningState): Future[NodeState] = { logger.info( - s"Disconnected peer=$peer peers=$peers state=$state forceReconnect=$forceReconnect peerDataMap=${peerDataMap - .map(_._1)}") + s"Disconnected peer=$peer peers=$peers state=$state forceReconnect=$forceReconnect") val finder = state.peerFinder val _ = onDisconnectSyncFiltersJob(peer) val updateLastSeenF = PeerDAO().updateLastSeenTime(peer) - val stateF: Future[NodeState] = { - require(!finder.hasPeer(peer) || !peerDataMap.contains(peer), + val stateF: Future[NodeRunningState] = { + require(!finder.hasPeer(peer) || !state.getPeerData(peer).isDefined, s"$peer cannot be both a test and a persistent peer") if (finder.hasPeer(peer)) { finder.removePeer(peer) Future.successful(state) - } else if (peerDataMap.contains(peer)) { + } else if (state.peers.contains(peer)) { _peerDataMap.remove(peer) onDisconnectNodeStateUpdate(state = state, disconnectedPeer = peer, @@ -362,10 +361,10 @@ case class PeerManager( val isShuttingDown = state.isInstanceOf[NodeShuttingDown] val finder = state.peerFinder if (state.peers.exists(_ != disconnectedPeer)) { - state match { + val rm = state.removePeer(disconnectedPeer) + rm match { case s: SyncNodeState => - switchSyncToRandomPeer(state = s, - excludePeerOpt = Some(disconnectedPeer)) + syncHelper(s).map(_ => s) case d: DoneSyncing => //defensively try to sync with the new peer //this headerSync is not safe, need to exclude peer we are disconnencting @@ -382,7 +381,7 @@ case class PeerManager( case x @ (_: DoneSyncing | _: NodeShuttingDown | _: MisbehavingPeer | _: RemovePeers) => - Future.successful(x.removePeer(disconnectedPeer)) + Future.successful(x) } } else { //no new peers to try to sync from, transition to done syncing? @@ -409,14 +408,12 @@ case class PeerManager( private def onQueryTimeout( payload: ExpectsResponse, peer: Peer, - state: NodeState): Future[Unit] = { + state: NodeRunningState): Future[Unit] = { logger.debug(s"Query timeout out for $peer with payload=${payload}") //if we are removing this peer and an existing query timed out because of that // peerData will not have this peer - if (peerDataMap.contains(peer)) { - peerDataMap(peer).updateLastFailureTime() - } + state.getPeerData(peer).map(_.updateLastFailureTime()) payload match { case _: GetHeadersMessage => @@ -458,10 +455,11 @@ case class PeerManager( private def sendResponseTimeout( peer: Peer, - payload: NetworkPayload): Future[Unit] = { + payload: NetworkPayload, + state: NodeRunningState): Future[Unit] = { logger.debug( s"Sending response timeout for ${payload.commandName} to $peer") - if (peerDataMap.contains(peer)) { + if (state.getPeerData(peer).isDefined) { payload match { case e: ExpectsResponse => queue @@ -584,7 +582,7 @@ case class PeerManager( //now send request to stop actor which will be completed some time in future val _ = _peerDataMap.remove(i.peer) - + val _ = onDisconnectSyncFiltersJob(i.peer) val newStateF = onDisconnectNodeStateUpdate(state = running, disconnectedPeer = i.peer, @@ -733,9 +731,17 @@ case class PeerManager( } case (state, q: QueryTimeout) => - onQueryTimeout(q.payload, q.peer, state).map(_ => state) + state match { + case running: NodeRunningState => + onQueryTimeout(q.payload, q.peer, running).map(_ => state) + } + case (state, srt: SendResponseTimeout) => - sendResponseTimeout(srt.peer, srt.payload).map(_ => state) + state match { + case running: NodeRunningState => + sendResponseTimeout(srt.peer, srt.payload, running).map(_ => state) + } + case (state, gossipMessage: GossipMessage) => state match { case runningState: NodeRunningState => @@ -759,8 +765,7 @@ case class PeerManager( sender.sendMsg(msg) case None => logger.warn( - s"Attempting to gossip to peer that is available in state.peers, but not peerDataMap? state=$state peerDataMap=${peerDataMap - .map(_._1)}") + s"Attempting to gossip to peer that is available in state.peers, but not peerDataMap? state=$state") Future.unit } } @@ -809,21 +814,6 @@ case class PeerManager( } } - private def switchSyncToRandomPeer( - state: SyncNodeState, - excludePeerOpt: Option[Peer]): Future[SyncNodeState] = { - val randomPeerOpt = - state.randomPeer(excludePeers = excludePeerOpt.toSet, - ServiceIdentifier.NODE_COMPACT_FILTERS) - randomPeerOpt match { - case Some(peer) => - switchSyncToPeer(oldSyncState = state, newPeer = peer) - case None => - //if we have no new peers should we just switch to DoneSyncing? - Future.successful(state) - } - } - private def sendToPeerHelper( state: NodeRunningState, stp: SendToPeer): Future[NodeRunningState] = {