diff --git a/node/src/main/scala/org/bitcoins/node/NodeState.scala b/node/src/main/scala/org/bitcoins/node/NodeState.scala index 9965b1d270..586cfafe21 100644 --- a/node/src/main/scala/org/bitcoins/node/NodeState.scala +++ b/node/src/main/scala/org/bitcoins/node/NodeState.scala @@ -2,6 +2,7 @@ package org.bitcoins.node import org.bitcoins.core.api.node.{Peer, PeerWithServices} import org.bitcoins.core.p2p.{CompactFilterMessage, ServiceIdentifier} +import org.bitcoins.node.NodeState.DoneSyncing import org.bitcoins.node.config.NodeAppConfig import org.bitcoins.node.networking.peer.{PeerConnection, PeerMessageSender} @@ -105,6 +106,16 @@ sealed trait NodeRunningState extends NodeState { getPeerMsgSender(p) } } + + def isConnected(peer: Peer): Boolean = { + peerDataMap.filter(_._1.peer == peer).nonEmpty || peerFinder.hasPeer(peer) + } + + def isDisconnected(peer: Peer): Boolean = !isConnected(peer) + + def toDoneSyncing: DoneSyncing = { + DoneSyncing(peerDataMap, waitingForDisconnection, peerFinder) + } } /** State to indicate that we are syncing the blockchain */ diff --git a/node/src/main/scala/org/bitcoins/node/PeerData.scala b/node/src/main/scala/org/bitcoins/node/PeerData.scala index b5db1cf882..647b4ac0ef 100644 --- a/node/src/main/scala/org/bitcoins/node/PeerData.scala +++ b/node/src/main/scala/org/bitcoins/node/PeerData.scala @@ -95,4 +95,11 @@ case class AttemptToConnectPeerData( override val system: ActorSystem, override val nodeAppConfig: NodeAppConfig, override val chainAppConfig: ChainAppConfig) - extends PeerData + extends PeerData { + + def toPersistentPeerData: PersistentPeerData = { + val p = PersistentPeerData(peer, peerMessageSender) + p.setServiceIdentifier(serviceIdentifier = serviceIdentifier) + p + } +} diff --git a/node/src/main/scala/org/bitcoins/node/PeerManager.scala b/node/src/main/scala/org/bitcoins/node/PeerManager.scala index cb3b05831b..052204e41f 100644 --- a/node/src/main/scala/org/bitcoins/node/PeerManager.scala +++ b/node/src/main/scala/org/bitcoins/node/PeerManager.scala @@ -192,18 +192,18 @@ case class PeerManager( stopF } - def isConnected(peer: Peer): Future[Boolean] = { + override def isConnected(peer: Peer): Future[Boolean] = { peerDataMap.get(peer) match { case None => Future.successful(false) case Some(p) => p.peerConnection.isConnected() } } - def isDisconnected(peer: Peer): Future[Boolean] = { + override def isDisconnected(peer: Peer): Future[Boolean] = { isConnected(peer).map(b => !b) } - def isInitialized(peer: Peer): Future[Boolean] = { + override def isInitialized(peer: Peer): Future[Boolean] = { Future.successful(peerDataMap.exists(_._1 == peer)) } @@ -340,41 +340,23 @@ case class PeerManager( s"$peer cannot be both a test and a persistent peer") if (finder.hasPeer(peer)) { - if (peers.isEmpty) { - finder.reconnect(peer).map(_ => state) - } else { - finder.removePeer(peer).map(_ => state) - } + finder.removePeer(peer) + Future.successful(state) } else if (peerDataMap.contains(peer)) { _peerDataMap.remove(peer) - val syncPeerOpt = state match { - case s: SyncNodeState => - Some(s.syncPeer) - case _: DoneSyncing | _: RemovePeers | _: NodeShuttingDown | - _: MisbehavingPeer => - None - } val isShuttingDown = state.isInstanceOf[NodeShuttingDown] if (state.peers.exists(_ != peer)) { state match { case s: SyncNodeState => switchSyncToRandomPeer(s, Some(peer)) case d: DoneSyncing => //defensively try to sync with the new peer - val hs = d.toHeaderSync(peer) - switchSyncToRandomPeer(hs, Some(peer)) + val hs = d.toHeaderSync + syncHelper(hs).map(_ => hs) case x @ (_: DoneSyncing | _: NodeShuttingDown | _: MisbehavingPeer | _: RemovePeers) => Future.successful(x) } - } else if (syncPeerOpt.isDefined) { - if (forceReconnect && !isShuttingDown) { - finder.reconnect(peer).map(_ => state) - } else { - logger.warn( - s"No new peers to sync from, cannot start new sync. Terminated sync with peer=$peer current syncPeer=$syncPeerOpt state=${state} peers=$peers") - Future.successful(state) - } } else { if (forceReconnect && !isShuttingDown) { finder.reconnect(peer).map(_ => state) @@ -642,7 +624,7 @@ case class PeerManager( peerManager = this, state = runningState ) - val resultF = dmh + val resultF: Future[NodeState] = dmh .handleDataPayload(payload, peerData) .flatMap { newDmh => newDmh.state match { @@ -650,22 +632,21 @@ case class PeerManager( //disconnect the misbehaving peer for { _ <- disconnectPeer(m.badPeer) - _ <- syncFromNewPeer(m) - } yield newDmh + } yield newDmh.state case removePeers: RemovePeers => for { _ <- Future.traverse(removePeers.peers)( disconnectPeer) - } yield newDmh + } yield newDmh.state case _: SyncNodeState | _: DoneSyncing | _: NodeShuttingDown => - Future.successful(newDmh) + Future.successful(newDmh.state) } } resultF.map { r => logger.debug( - s"Done processing ${payload.commandName} in peer=${peer} state=${r.state}") - r.state + s"Done processing ${payload.commandName} in peer=${peer} state=${r}") + r } } } @@ -1105,6 +1086,7 @@ case class PeerManager( } } + /** Attempts to start syncing from a new peer. Returns None if we have no new peers to sync with */ private def syncFromNewPeer( state: NodeRunningState): Future[Option[NodeRunningState]] = { val svcIdentifier = ServiceIdentifier.NODE_COMPACT_FILTERS