diff --git a/node/src/main/scala/org/bitcoins/node/NeutrinoNode.scala b/node/src/main/scala/org/bitcoins/node/NeutrinoNode.scala index 22d5e92c3c..19e66c66bd 100644 --- a/node/src/main/scala/org/bitcoins/node/NeutrinoNode.scala +++ b/node/src/main/scala/org/bitcoins/node/NeutrinoNode.scala @@ -202,7 +202,14 @@ case class NeutrinoNode( val peers = peerManager.peers logger.info(s"Running inactivity checks for peers=${peers}") val resultF = if (peers.nonEmpty) { - Future.unit //do nothing? + queueOpt match { + case Some(q) => + q.offer(NodeStreamMessage.PeerHealthCheck) + .map(_ => ()) + case None => + logger.warn(s"No queue defined for inactivity check") + Future.unit + } } else if (isStarted.get) { //stop and restart to get more peers stop() diff --git a/node/src/main/scala/org/bitcoins/node/NodeStreamMessage.scala b/node/src/main/scala/org/bitcoins/node/NodeStreamMessage.scala index 2322b08151..52a4d5714d 100644 --- a/node/src/main/scala/org/bitcoins/node/NodeStreamMessage.scala +++ b/node/src/main/scala/org/bitcoins/node/NodeStreamMessage.scala @@ -46,5 +46,8 @@ object NodeStreamMessage { case object NodeShutdown extends NodeStreamMessage + /** Checks our peers are healthy, for instance checking that we are peered with compact filter peers */ + case object PeerHealthCheck extends NodeStreamMessage + case class Initialized(peer: Peer) } diff --git a/node/src/main/scala/org/bitcoins/node/PeerManager.scala b/node/src/main/scala/org/bitcoins/node/PeerManager.scala index c4f33e5d36..93b7595e19 100644 --- a/node/src/main/scala/org/bitcoins/node/PeerManager.scala +++ b/node/src/main/scala/org/bitcoins/node/PeerManager.scala @@ -798,6 +798,15 @@ case class PeerManager( } + case (state, NodeStreamMessage.PeerHealthCheck) => + state match { + case s: NodeShuttingDown => + logger.trace(s"Ignorinng peer health check as we are shutting down") + Future.successful(s) + case r: NodeRunningState => + PeerManager.handleHealthCheck(r) + } + } } @@ -1345,6 +1354,22 @@ object PeerManager extends Logging { Future.successful(None) } } + } + def handleHealthCheck(runningState: NodeRunningState)(implicit + ec: ExecutionContext): Future[NodeRunningState] = { + val blockFilterPeers = runningState.peerDataMap.filter( + _._2.serviceIdentifier.hasServicesOf( + ServiceIdentifier.NODE_COMPACT_FILTERS)) + if (blockFilterPeers.nonEmpty) { + //do nothing + Future.successful(runningState) + } else { + val peerFinder = runningState.peerFinder + for { + _ <- peerFinder.stop() + _ <- peerFinder.start() + } yield runningState + } } }