Implement logic to restart PeerManager in inactivity checks when we have 0 peers (#5171)

* Implement logic to restart PeerManager in inactivity checks when we have 0 peers

* Revert logback-test.xml

* cleanup
This commit is contained in:
Chris Stewart 2023-08-01 10:07:40 -05:00 committed by GitHub
parent 4d5488f193
commit 9b85838823
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 87 additions and 33 deletions

View File

@ -3,7 +3,7 @@ package org.bitcoins.core.api.node
import org.bitcoins.core.api.db.DbRowAutoInc
import org.bitcoins.core.api.tor.Socks5ProxyParams
import java.net.InetSocketAddress
import java.net.{InetSocketAddress, URI}
case class Peer(
socket: InetSocketAddress,
@ -27,4 +27,11 @@ object Peer {
socks5ProxyParams: Option[Socks5ProxyParams]): Peer = {
Peer(socket, socks5ProxyParams = socks5ProxyParams)
}
def fromURI(
uri: URI,
socks5ProxyParamsOpt: Option[Socks5ProxyParams]): Peer = {
val socket = new InetSocketAddress(uri.getHost, uri.getPort)
fromSocket(socket, socks5ProxyParamsOpt)
}
}

View File

@ -12,6 +12,7 @@ import org.bitcoins.testkit.node.{
}
import org.scalatest.FutureOutcome
import scala.concurrent.Future
import scala.concurrent.duration.DurationInt
class ReConnectionTest extends NodeTestWithCachedBitcoindNewest {
@ -60,32 +61,8 @@ class ReConnectionTest extends NodeTestWithCachedBitcoindNewest {
it must "disconnect a peer after a period of inactivity" in {
nodeConnectedWithBitcoind: NeutrinoNodeConnectedWithBitcoind =>
//val bitcoind = nodeConnectedWithBitcoind.bitcoind
val initNode = nodeConnectedWithBitcoind.node
//make a custom config, set the inactivity timeout very low
//so we will disconnect our peer organically
val config =
ConfigFactory.parseString("bitcoin-s.node.inactivity-timeout=5s")
val stoppedConfigF = initNode.nodeConfig.stop()
val newNodeAppConfigF =
stoppedConfigF.map(_ => initNode.nodeConfig.withOverrides(config))
val nodeF = {
for {
newNodeAppConfig <- newNodeAppConfigF
_ <- newNodeAppConfig.start()
} yield {
NeutrinoNode(
walletCreationTimeOpt = initNode.walletCreationTimeOpt,
nodeConfig = newNodeAppConfig,
chainConfig = initNode.chainAppConfig,
actorSystem = initNode.system,
paramPeers = initNode.paramPeers
)
}
}
val startedF = nodeF.flatMap(_.start())
val startedF =
getSmallInactivityCheckNeutrinoNode(nodeConnectedWithBitcoind.node)
for {
started <- startedF
_ <- AsyncUtil.retryUntilSatisfiedF(() =>
@ -99,6 +76,63 @@ class ReConnectionTest extends NodeTestWithCachedBitcoindNewest {
} yield {
succeed
}
}
it must "reconnect a peer when inactivity checks run and we have 0 peers" in {
nodeConnectedWithBitcoind: NeutrinoNodeConnectedWithBitcoind =>
//see: https://github.com/bitcoin-s/bitcoin-s/issues/5162
val bitcoind = nodeConnectedWithBitcoind.bitcoind
val startedF =
getSmallInactivityCheckNeutrinoNode(nodeConnectedWithBitcoind.node)
for {
started <- startedF
_ <- AsyncUtil.retryUntilSatisfiedF(() =>
started.getConnectionCount.map(_ == 1))
//explicitly disconnect it
bitcoindPeer <- NodeTestUtil.getBitcoindPeer(bitcoind)
_ <- started.peerManager.disconnectPeer(bitcoindPeer)
//wait until we have zero connections
_ <- AsyncUtil.retryUntilSatisfiedF(
() => started.getConnectionCount.map(_ == 0),
1.second)
//wait until there is a timeout for inactivity
_ <- AsyncUtil.retryUntilSatisfiedF(
() => started.getConnectionCount.map(_ == 1),
1.second)
_ <- started.stop()
_ <- started.nodeConfig.stop()
} yield {
succeed
}
}
private def getSmallInactivityCheckNeutrinoNode(
initNode: NeutrinoNode): Future[NeutrinoNode] = {
//make a custom config, set the inactivity timeout very low
//so we will disconnect our peer organically
val config =
ConfigFactory.parseString("bitcoin-s.node.inactivity-timeout=5s")
val stoppedConfigF = initNode.nodeConfig.stop()
val newNodeAppConfigF =
stoppedConfigF.map(_ => initNode.nodeConfig.withOverrides(config))
val nodeF = {
for {
newNodeAppConfig <- newNodeAppConfigF
_ <- newNodeAppConfig.start()
} yield {
NeutrinoNode(
walletCreationTimeOpt = initNode.walletCreationTimeOpt,
nodeConfig = newNodeAppConfig,
chainConfig = initNode.chainAppConfig,
actorSystem = initNode.system,
paramPeers = initNode.paramPeers
)
}
}
val startedF = nodeF.flatMap(_.start())
startedF
}
}

View File

@ -1087,21 +1087,34 @@ case class PeerManager(
@volatile private[this] var inactivityCancellableOpt: Option[Cancellable] =
None
private def inactivityChecks(peerData: PeerData): Unit = {
private def inactivityChecks(peerData: PeerData): Future[Unit] = {
if (peerData.isConnectionTimedOut) {
val stopF = peerData.stop()
stopF.failed.foreach(err =>
logger.error(s"Failed to stop node inside of inactivityChecks()", err))
()
stopF
} else {
()
Future.unit
}
}
private def inactivityChecksRunnable(): Runnable = { () =>
logger.debug(
s"Running inactivity checks for peers=${peerDataMap.map(_._1)}")
peerDataMap.map(_._2).map(inactivityChecks)
val resultF = if (peerDataMap.nonEmpty) {
Future
.traverse(peerDataMap.map(_._2))(inactivityChecks)
.map(_ => ())
} else if (isStarted.get) {
//stop and restart to get more peers
stop()
.flatMap(_.start())
.map(_ => ())
} else {
start().map(_ => ())
}
resultF.failed.foreach(err =>
logger.error(s"Failed to run inactivity checks for peers=${peers}", err))
()
}