Replace inactivity logic with Flow.idleTimeout (#5311)

* Replace inactivity logic with Flow.idleTimeout

* Fix bug where we were calling PeerConnection.connect() rather than PeerConnection.reconnect()
This commit is contained in:
Chris Stewart 2023-11-29 15:13:13 -06:00 committed by GitHub
parent c0e8d376eb
commit 47c433f365
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 11 additions and 58 deletions

View file

@ -191,9 +191,7 @@ case class NeutrinoNode(
val peers = peerManager.peers
logger.debug(s"Running inactivity checks for peers=${peers}")
val resultF = if (peers.nonEmpty) {
Future
.traverse(peers)(peerManager.inactivityChecks)
.map(_ => ())
Future.unit //do nothing?
} else if (isStarted.get) {
//stop and restart to get more peers
stop()

View file

@ -67,8 +67,6 @@ case class PersistentPeerData(
lastTimedOut = System.currentTimeMillis()
}
def isConnectionTimedOut: Boolean = peerConnection.isConnectionTimedOut
/** returns true if the peer has failed due to any reason within the past 30 minutes
*/
def hasFailedRecently: Boolean = {

View file

@ -238,10 +238,11 @@ case class PeerFinder(
private def tryToAttemptToConnectPeer(peer: Peer): Future[Unit] = {
logger.debug(s"tryToAttemptToConnectPeer=$peer")
val peerConnection = _peerData(peer).peerConnection
val peerMessageSender = PeerMessageSender(peerConnection)
_peerData.put(peer, AttemptToConnectPeerData(peer, peerMessageSender))
peerConnection.connect()
peerConnection.reconnect()
}
/** creates and initialises a new test peer */

View file

@ -349,8 +349,11 @@ case class PeerManager(
s"$peer cannot be both a test and a persistent peer")
if (finder.hasPeer(peer)) {
//client actor for one of the test peers stopped, can remove it from map now
finder.removePeer(peer).map(_ => state)
if (peers.isEmpty) {
finder.reconnect(peer).map(_ => state)
} else {
finder.removePeer(peer).map(_ => state)
}
} else if (peerDataMap.contains(peer)) {
val peerData = _peerDataMap(peer)
_peerDataMap.remove(peer)
@ -916,23 +919,6 @@ case class PeerManager(
Future.unit
}
}
private[node] def inactivityChecks(peer: Peer): Future[Unit] = {
val peerDataOpt = peerDataMap.get(peer)
peerDataOpt match {
case Some(peerData) =>
if (peerData.isConnectionTimedOut) {
val stopF = peerData.stop()
stopF
} else {
Future.unit
}
case None =>
logger.warn(
s"Could not find peerData to run inactivity check against for peer=$peer")
Future.unit
}
}
}
case class ResponseTimeout(payload: NetworkPayload)

View file

@ -4,7 +4,6 @@ import akka.actor.{ActorSystem, Cancellable}
import akka.event.Logging
import akka.io.Inet.SocketOption
import akka.io.Tcp.SO.KeepAlive
import akka.stream.scaladsl.SourceQueue
import akka.stream.scaladsl.{
BidiFlow,
Flow,
@ -13,6 +12,7 @@ import akka.stream.scaladsl.{
RunnableGraph,
Sink,
Source,
SourceQueue,
Tcp
}
import akka.stream.{Attributes, KillSwitches, UniqueKillSwitch}
@ -33,9 +33,7 @@ import org.bitcoins.tor.Socks5Connection
import scodec.bits.ByteVector
import java.net.InetSocketAddress
import java.time.Instant
import java.time.temporal.ChronoUnit
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Future, Promise}
case class PeerConnection(peer: Peer, queue: SourceQueue[NodeStreamMessage])(
@ -112,8 +110,6 @@ case class PeerConnection(peer: Peer, queue: SourceQueue[NodeStreamMessage])(
val (messages, newUnalignedBytes) =
NetworkUtil.parseIndividualMessages(bytes)
if (messages.nonEmpty) updateLastParsedMessageTime()
(ByteString.fromArray(newUnalignedBytes.toArray), messages)
}
@ -122,6 +118,7 @@ case class PeerConnection(peer: Peer, queue: SourceQueue[NodeStreamMessage])(
Vector[NetworkMessage],
NotUsed] = {
Flow[ByteString]
.idleTimeout(nodeAppConfig.inactivityTimeout)
.statefulMap(() => ByteString.empty)(parseHelper,
{ _: ByteString => None })
.log(
@ -370,33 +367,6 @@ case class PeerConnection(peer: Peer, queue: SourceQueue[NodeStreamMessage])(
}.map(_ => ())
sendMsgF
}
private[this] val INACTIVITY_TIMEOUT: FiniteDuration =
nodeAppConfig.inactivityTimeout
@volatile private[this] var lastSuccessfulParsedMsgOpt: Option[Long] = None
private def updateLastParsedMessageTime(): Unit = {
lastSuccessfulParsedMsgOpt = Some(System.currentTimeMillis())
()
}
def isConnectionTimedOut: Boolean = {
lastSuccessfulParsedMsgOpt match {
case Some(lastSuccessfulParsedMsg) =>
val timeoutInstant =
Instant.now().minus(INACTIVITY_TIMEOUT.toMillis, ChronoUnit.MILLIS)
val diff = Instant
.ofEpochMilli(lastSuccessfulParsedMsg)
.minus(timeoutInstant.toEpochMilli, ChronoUnit.MILLIS)
val isTimedOut = diff.toEpochMilli < 0
isTimedOut
case None => false //we are not initialized yet
}
}
}
object PeerConnection {