From 47c433f365cfaf77d6cf2efef59185f9b8f523f1 Mon Sep 17 00:00:00 2001 From: Chris Stewart Date: Wed, 29 Nov 2023 15:13:13 -0600 Subject: [PATCH] Replace inactivity logic with `Flow.idleTimeout` (#5311) * Replace inactivity logic with Flow.idleTimeout * Fix bug where we were calling PeerConnection.connect() rather than PeerConnection.reconnect() --- .../org/bitcoins/node/NeutrinoNode.scala | 4 +-- .../scala/org/bitcoins/node/PeerData.scala | 2 -- .../scala/org/bitcoins/node/PeerFinder.scala | 3 +- .../scala/org/bitcoins/node/PeerManager.scala | 24 +++---------- .../node/networking/peer/PeerConnection.scala | 36 ++----------------- 5 files changed, 11 insertions(+), 58 deletions(-) diff --git a/node/src/main/scala/org/bitcoins/node/NeutrinoNode.scala b/node/src/main/scala/org/bitcoins/node/NeutrinoNode.scala index 0472bfc14b..0e8d4652c0 100644 --- a/node/src/main/scala/org/bitcoins/node/NeutrinoNode.scala +++ b/node/src/main/scala/org/bitcoins/node/NeutrinoNode.scala @@ -191,9 +191,7 @@ case class NeutrinoNode( val peers = peerManager.peers logger.debug(s"Running inactivity checks for peers=${peers}") val resultF = if (peers.nonEmpty) { - Future - .traverse(peers)(peerManager.inactivityChecks) - .map(_ => ()) + Future.unit //do nothing? } else if (isStarted.get) { //stop and restart to get more peers stop() diff --git a/node/src/main/scala/org/bitcoins/node/PeerData.scala b/node/src/main/scala/org/bitcoins/node/PeerData.scala index a10328eb76..086a35dc1d 100644 --- a/node/src/main/scala/org/bitcoins/node/PeerData.scala +++ b/node/src/main/scala/org/bitcoins/node/PeerData.scala @@ -67,8 +67,6 @@ case class PersistentPeerData( lastTimedOut = System.currentTimeMillis() } - def isConnectionTimedOut: Boolean = peerConnection.isConnectionTimedOut - /** returns true if the peer has failed due to any reason within the past 30 minutes */ def hasFailedRecently: Boolean = { diff --git a/node/src/main/scala/org/bitcoins/node/PeerFinder.scala b/node/src/main/scala/org/bitcoins/node/PeerFinder.scala index 7b4b2803c4..a13cae3275 100644 --- a/node/src/main/scala/org/bitcoins/node/PeerFinder.scala +++ b/node/src/main/scala/org/bitcoins/node/PeerFinder.scala @@ -238,10 +238,11 @@ case class PeerFinder( private def tryToAttemptToConnectPeer(peer: Peer): Future[Unit] = { logger.debug(s"tryToAttemptToConnectPeer=$peer") + val peerConnection = _peerData(peer).peerConnection val peerMessageSender = PeerMessageSender(peerConnection) _peerData.put(peer, AttemptToConnectPeerData(peer, peerMessageSender)) - peerConnection.connect() + peerConnection.reconnect() } /** creates and initialises a new test peer */ diff --git a/node/src/main/scala/org/bitcoins/node/PeerManager.scala b/node/src/main/scala/org/bitcoins/node/PeerManager.scala index 710f59986f..4e7d5098db 100644 --- a/node/src/main/scala/org/bitcoins/node/PeerManager.scala +++ b/node/src/main/scala/org/bitcoins/node/PeerManager.scala @@ -349,8 +349,11 @@ case class PeerManager( s"$peer cannot be both a test and a persistent peer") if (finder.hasPeer(peer)) { - //client actor for one of the test peers stopped, can remove it from map now - finder.removePeer(peer).map(_ => state) + if (peers.isEmpty) { + finder.reconnect(peer).map(_ => state) + } else { + finder.removePeer(peer).map(_ => state) + } } else if (peerDataMap.contains(peer)) { val peerData = _peerDataMap(peer) _peerDataMap.remove(peer) @@ -916,23 +919,6 @@ case class PeerManager( Future.unit } } - - private[node] def inactivityChecks(peer: Peer): Future[Unit] = { - val peerDataOpt = peerDataMap.get(peer) - peerDataOpt match { - case Some(peerData) => - if (peerData.isConnectionTimedOut) { - val stopF = peerData.stop() - stopF - } else { - Future.unit - } - case None => - logger.warn( - s"Could not find peerData to run inactivity check against for peer=$peer") - Future.unit - } - } } case class ResponseTimeout(payload: NetworkPayload) diff --git a/node/src/main/scala/org/bitcoins/node/networking/peer/PeerConnection.scala b/node/src/main/scala/org/bitcoins/node/networking/peer/PeerConnection.scala index 9e3fcf6d7f..d1d6e097c9 100644 --- a/node/src/main/scala/org/bitcoins/node/networking/peer/PeerConnection.scala +++ b/node/src/main/scala/org/bitcoins/node/networking/peer/PeerConnection.scala @@ -4,7 +4,6 @@ import akka.actor.{ActorSystem, Cancellable} import akka.event.Logging import akka.io.Inet.SocketOption import akka.io.Tcp.SO.KeepAlive -import akka.stream.scaladsl.SourceQueue import akka.stream.scaladsl.{ BidiFlow, Flow, @@ -13,6 +12,7 @@ import akka.stream.scaladsl.{ RunnableGraph, Sink, Source, + SourceQueue, Tcp } import akka.stream.{Attributes, KillSwitches, UniqueKillSwitch} @@ -33,9 +33,7 @@ import org.bitcoins.tor.Socks5Connection import scodec.bits.ByteVector import java.net.InetSocketAddress -import java.time.Instant -import java.time.temporal.ChronoUnit -import scala.concurrent.duration.{DurationInt, FiniteDuration} +import scala.concurrent.duration.DurationInt import scala.concurrent.{Future, Promise} case class PeerConnection(peer: Peer, queue: SourceQueue[NodeStreamMessage])( @@ -112,8 +110,6 @@ case class PeerConnection(peer: Peer, queue: SourceQueue[NodeStreamMessage])( val (messages, newUnalignedBytes) = NetworkUtil.parseIndividualMessages(bytes) - if (messages.nonEmpty) updateLastParsedMessageTime() - (ByteString.fromArray(newUnalignedBytes.toArray), messages) } @@ -122,6 +118,7 @@ case class PeerConnection(peer: Peer, queue: SourceQueue[NodeStreamMessage])( Vector[NetworkMessage], NotUsed] = { Flow[ByteString] + .idleTimeout(nodeAppConfig.inactivityTimeout) .statefulMap(() => ByteString.empty)(parseHelper, { _: ByteString => None }) .log( @@ -370,33 +367,6 @@ case class PeerConnection(peer: Peer, queue: SourceQueue[NodeStreamMessage])( }.map(_ => ()) sendMsgF } - - private[this] val INACTIVITY_TIMEOUT: FiniteDuration = - nodeAppConfig.inactivityTimeout - - @volatile private[this] var lastSuccessfulParsedMsgOpt: Option[Long] = None - - private def updateLastParsedMessageTime(): Unit = { - lastSuccessfulParsedMsgOpt = Some(System.currentTimeMillis()) - () - } - - def isConnectionTimedOut: Boolean = { - lastSuccessfulParsedMsgOpt match { - case Some(lastSuccessfulParsedMsg) => - val timeoutInstant = - Instant.now().minus(INACTIVITY_TIMEOUT.toMillis, ChronoUnit.MILLIS) - val diff = Instant - .ofEpochMilli(lastSuccessfulParsedMsg) - .minus(timeoutInstant.toEpochMilli, ChronoUnit.MILLIS) - - val isTimedOut = diff.toEpochMilli < 0 - - isTimedOut - case None => false //we are not initialized yet - } - - } } object PeerConnection {