Fix PeerDb.lastSeen race condition in unit tests (#5435)

* Disconnect from bitcoind side rather than bitcoin-s side to make sure we don't run into a race condition for updating lastSeen

* Remove usage of CachedAppConfig, switch implicit param args for PeerDAO to be consistent with rest of code base

* Fix bug in getNodeURIFromBitcoind(), disconnect from bitcoind side in PeerManagerTest

* Add println to see why still fialing

* Add PeerConnection.getLocalAddress, add parameter localAddressBitcoinS  to NodeTestUtil.getNodeURIFromBitcoind() to make sure we are getting the correct URI

* Empty commit to run CI
This commit is contained in:
Chris Stewart 2024-02-28 10:33:01 -06:00 committed by GitHub
parent 42e5e87350
commit d1ecd35e63
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 96 additions and 38 deletions

View File

@ -139,7 +139,7 @@ class NeutrinoNodeTest extends NodeTestWithCachedBitcoindPair {
_ <- assertConnAndInit
ourPeers <- Future.sequence(
nodeConnectedWithBitcoind.bitcoinds.map(NodeTestUtil.getBitcoindPeer))
peerDbs <- PeerDAO()(executionContext, node.nodeAppConfig).findAll()
peerDbs <- PeerDAO()(node.nodeAppConfig, executionContext).findAll()
} yield {
val allInDb = ourPeers.forall { p =>

View File

@ -68,9 +68,17 @@ class PeerManagerTest extends NodeTestWithCachedBitcoindNewest {
_ <- node.start()
peer <- peerF
peerManager = node.peerManager
_ <- NodeTestUtil.awaitSyncAndIBD(node = node, bitcoind = bitcoind)
//disconnect
_ <- peerManager.disconnectPeer(peer)
address <- peerManager
.getPeerData(peer)
.get
.peerConnection
.getLocalAddress
.map(_.get)
nodeUri <- NodeTestUtil.getNodeURIFromBitcoind(bitcoind, address)
_ <- bitcoind.disconnectNode(nodeUri)
_ <- NodeTestUtil.awaitConnectionCount(node, 0)
_ <- node.peerManager.connectPeer(peer)
_ <- NodeTestUtil.awaitConnectionCount(node, 1)
@ -88,14 +96,20 @@ class PeerManagerTest extends NodeTestWithCachedBitcoindNewest {
for {
_ <- node.start()
peer <- peerF
peerManager = node.peerManager
_ <- NodeTestUtil.awaitSyncAndIBD(node = node, bitcoind = bitcoind)
//disconnect
timestamp = Instant.now()
_ <- peerManager.disconnectPeer(peer)
address <- node.peerManager
.getPeerData(peer)
.get
.peerConnection
.getLocalAddress
.map(_.get)
uri <- NodeTestUtil.getNodeURIFromBitcoind(bitcoind, address)
_ <- bitcoind.disconnectNode(uri)
_ <- NodeTestUtil.awaitConnectionCount(node, 0)
addrBytes = PeerDAOHelper.getAddrBytes(peer)
peerDb <- PeerDAO()(system.dispatcher, node.nodeConfig)
peerDb <- PeerDAO()(node.nodeConfig, system.dispatcher)
.read((addrBytes, peer.port))
.map(_.get)
} yield {

View File

@ -1,9 +1,11 @@
package org.bitcoins.node.models
import org.bitcoins.core.api.node.Peer
import org.bitcoins.core.p2p.{AddrV2Message, ServiceIdentifier}
import org.bitcoins.testkit.fixtures.NodeDAOFixture
import scodec.bits.ByteVector
import java.net.InetSocketAddress
import java.time.Instant
class PeerDAOTest extends NodeDAOFixture {
@ -35,4 +37,26 @@ class PeerDAOTest extends NodeDAOFixture {
)
}
}
it must "update last seen time" in { daos =>
val peerDAO = daos.peerDAO
val bytes = ByteVector(Array[Byte](127, 0, 0, 1))
val start = Instant.now
val peer = PeerDb(
address = bytes,
port = 8333,
lastSeen = start,
firstSeen = start,
networkId = AddrV2Message.IPV4_NETWORK_BYTE,
serviceBytes = ServiceIdentifier.NODE_COMPACT_FILTERS.bytes
)
for {
_ <- peerDAO.create(peer)
_ <- peerDAO.read((peer.address, peer.port))
socket = InetSocketAddress.createUnresolved("127.0.0.1", peer.port)
updated <- peerDAO.updateLastSeenTime(Peer(socket, None))
} yield {
assert(updated.get.lastSeen.isAfter(start))
}
}
}

View File

@ -790,6 +790,8 @@ case class PeerManager(
s"Shut down already requested, ignoring new shutdown request")
Future.successful(s)
case r: NodeRunningState =>
logger.info(
s"Received NodeShutdown message, beginning shutdown procedures")
val shutdownState =
NodeShuttingDown(peerDataMap = r.peerDataMap,
waitingForDisconnection =

View File

@ -2,7 +2,7 @@ package org.bitcoins.node.db
import org.bitcoins.db.{DbManagement, JdbcProfileComponent}
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models.BroadcastAbleTransactionDAO
import org.bitcoins.node.models.{BroadcastAbleTransactionDAO, PeerDAO}
import scala.concurrent.ExecutionContext
@ -17,6 +17,12 @@ trait NodeDbManagement extends DbManagement {
BroadcastAbleTransactionDAO()(appConfig, ec).table
}
override lazy val allTables: List[TableQuery[Table[_]]] = List(txTable)
private lazy val peerTable: TableQuery[Table[_]] = {
PeerDAO()(appConfig, ec).table
}
override lazy val allTables: List[TableQuery[Table[_]]] = {
List(txTable, peerTable)
}
}

View File

@ -22,7 +22,7 @@ case class PeerDb(
serviceBytes: ByteVector
)
case class PeerDAO()(implicit ec: ExecutionContext, appConfig: NodeAppConfig)
case class PeerDAO()(implicit appConfig: NodeAppConfig, ec: ExecutionContext)
extends CRUD[PeerDb, (ByteVector, Int)]
with SlickUtil[PeerDb, (ByteVector, Int)] {
@ -93,7 +93,8 @@ case class PeerDAO()(implicit ec: ExecutionContext, appConfig: NodeAppConfig)
val action = findByPrimaryKey((address, port)).result.headOption
val updatedLastSeenA = action.flatMap {
case Some(peerDb) =>
updateAction(peerDb.copy(lastSeen = Instant.now()))
val now = Instant.now()
updateAction(peerDb.copy(lastSeen = now))
.map(Some(_))
case None => DBIOAction.successful(None)
}

View File

@ -220,6 +220,13 @@ case class PeerConnection(peer: Peer, queue: SourceQueue[NodeStreamMessage])(
runningStream
}
def getLocalAddress: Future[Option[InetSocketAddress]] = {
connectionGraphOpt match {
case Some(g) => g.connectionF.map(c => Some(c.localAddress))
case None => Future.successful(None)
}
}
@volatile private[this] var connectionGraphOpt: Option[ConnectionGraph] = None
/** Initiates a connection with the given peer */

View File

@ -1,58 +1,56 @@
package org.bitcoins.testkit.fixtures
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models.{BroadcastAbleTransactionDAO, PeerDAO}
import org.bitcoins.server.BitcoinSAppConfig
import org.bitcoins.testkit.BitcoinSTestAppConfig
import org.bitcoins.testkit.node.{CachedBitcoinSAppConfig, NodeUnitTest}
import org.bitcoins.testkit.node.{NodeUnitTest}
import org.scalatest._
import scala.concurrent.Future
case class NodeDAOs(txDAO: BroadcastAbleTransactionDAO, peerDAO: PeerDAO)
case class NodeDAOs(
txDAO: BroadcastAbleTransactionDAO,
peerDAO: PeerDAO,
nodeAppConfig: NodeAppConfig)
/** Provides a fixture where all DAOs used by the node projects are provided */
trait NodeDAOFixture extends NodeUnitTest with CachedBitcoinSAppConfig {
trait NodeDAOFixture extends NodeUnitTest {
/** Wallet config with data directory set to user temp directory */
override protected def getFreshConfig: BitcoinSAppConfig =
BitcoinSTestAppConfig.getNeutrinoWithEmbeddedDbTestConfig(() => pgUrl(),
Vector.empty)
private lazy val daos = {
val tx = BroadcastAbleTransactionDAO()
val peerDao = PeerDAO()
NodeDAOs(tx, peerDao)
}
final override type FixtureParam = NodeDAOs
def withFixture(test: OneArgAsyncTest): FutureOutcome = {
makeFixture(build = () => {
for {
_ <- cachedChainConf.start()
_ <- cachedNodeConf.start()
} yield daos
},
destroy =
() => destroyAppConfig(cachedChainConf, cachedNodeConf))(test)
override def withFixture(test: OneArgAsyncTest): FutureOutcome = {
makeDependentFixture[NodeDAOs](
build = () => {
val config = getFreshConfig
val nodeConf = config.nodeConf
for {
_ <- nodeConf.start()
} yield {
val tx = BroadcastAbleTransactionDAO()(nodeConf, executionContext)
val peerDao = PeerDAO()(nodeConf, executionContext)
NodeDAOs(tx, peerDao, nodeConf)
}
},
destroy = { case dao: NodeDAOs =>
destroyAppConfig(dao.nodeAppConfig)
}
)(test)
}
private def destroyAppConfig(
chainConfig: ChainAppConfig,
nodeConfig: NodeAppConfig): Future[Unit] = {
private def destroyAppConfig(nodeConfig: NodeAppConfig): Future[Unit] = {
nodeConfig.clean()
for {
_ <- nodeConfig.dropAll()
_ <- nodeConfig.stop()
_ <- chainConfig.dropAll()
_ <- chainConfig.stop()
} yield ()
}
override def afterAll(): Unit = {
super[CachedBitcoinSAppConfig].afterAll()
super[NodeUnitTest].afterAll()
}
}

View File

@ -5,6 +5,7 @@ import org.bitcoins.asyncutil.AsyncUtil
import org.bitcoins.core.api.node.Peer
import org.bitcoins.core.api.tor.Socks5ProxyParams
import org.bitcoins.crypto.DoubleSha256DigestBE
import org.bitcoins.node.constant.NodeConstants
import org.bitcoins.node.{NeutrinoNode, Node, P2PLogger}
import org.bitcoins.rpc.client.common.BitcoindRpcClient
import org.bitcoins.testkit.async.TestAsyncUtil
@ -263,11 +264,16 @@ abstract class NodeTestUtil extends P2PLogger {
/** get our neutrino node's uri from a test bitcoind instance to send rpc commands for our node.
* The peer must be initialized by the node.
*/
def getNodeURIFromBitcoind(bitcoind: BitcoindRpcClient)(implicit
def getNodeURIFromBitcoind(
bitcoind: BitcoindRpcClient,
localAddressBitcoinS: InetSocketAddress)(implicit
system: ActorSystem): Future[URI] = {
import system.dispatcher
bitcoind.getPeerInfo.map { peerInfo =>
val localFilter = peerInfo.filter(_.networkInfo.addrlocal.isDefined)
val localFilter = peerInfo.filter { p =>
p.networkInfo.addrlocal.isDefined && p.subver.contains(
NodeConstants.userAgent) && p.networkInfo.addr.getPort == localAddressBitcoinS.getPort
}
val result = localFilter.head.networkInfo.addr
result
}