Fix race condition with BitcoindChainHandlerViaZmqTest (#2990)

Turn off logging again

Try to move things around even more to make sure zmq is started

Turn logging level down to ERROR to hopefully get some meaningful output

Adjust both valdiation & ChainHandler to be ERROR level logging

switch zmqpubrawblock to be the port we know is free

Reduce logging to WARN

Bump bitcoind max retries to 120

Fix rebase

Revert logging

Remove some noisy logs

Restore logs to trace level

Add explicit tag
This commit is contained in:
Chris Stewart 2021-05-01 13:11:13 -05:00 committed by GitHub
parent e7d34a9ba9
commit a2911f31ed
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 75 additions and 32 deletions

View file

@ -21,7 +21,7 @@ mdocExtraArguments := List("--no-link-hygiene")
// these variables gets passed to mdoc, and can be read
// from there
mdocVariables := Map(
"STABLE_VERSION" -> previousStableVersion.value.get.toString,
"STABLE_VERSION" -> "0.5.0",
"UNSTABLE_VERSION" -> version.value
)

View file

@ -145,7 +145,7 @@ trait Client
_ = isStartedFlag.set(true)
_ <- AsyncUtil.retryUntilSatisfiedF(() => isStartedF,
interval = 1.seconds,
maxTries = 60)
maxTries = 120)
} yield this.asInstanceOf[BitcoindRpcClient]
}

View file

@ -1,10 +1,12 @@
package org.bitcoins.chain.blockchain
import org.bitcoins.rpc.util.RpcUtil
import org.bitcoins.asyncutil.AsyncUtil
import org.bitcoins.testkit.chain.ChainDbUnitTest
import org.bitcoins.testkit.chain.fixture.BitcoindChainHandlerViaZmq
import org.scalatest.FutureOutcome
import scala.concurrent.duration.DurationInt
class BitcoindChainHandlerViaZmqTest extends ChainDbUnitTest {
override type FixtureParam = BitcoindChainHandlerViaZmq
@ -20,11 +22,13 @@ class BitcoindChainHandlerViaZmqTest extends ChainDbUnitTest {
val chainHandler = bitcoindChainHandler.chainHandler
val bitcoindBlockCountF = bitcoind.getBlockCount
for {
_ <-
bitcoinSBlockCount <-
chainHandler
.getBlockCount()
.map(count => assert(count == 0))
bitcoindCount <- bitcoindBlockCountF
_ = assert(bitcoindCount == bitcoinSBlockCount)
address <- bitcoind.getNewAddress
hash +: _ <- bitcoind.generateToAddress(1, address)
_ <- {
@ -32,10 +36,10 @@ class BitcoindChainHandlerViaZmqTest extends ChainDbUnitTest {
//can't monitor processing flow for zmq
//so we just need to await until we
//have fully processed the header
RpcUtil.awaitConditionF(() =>
chainHandler.getHeader(hash).map(_.isDefined))
AsyncUtil.awaitConditionF(
() => chainHandler.getHeader(hash).map(_.isDefined),
interval = 250.millis)
}
header <- chainHandler.getHeader(hash)
} yield assert(header.get.hashBE == hash)
}

View file

@ -398,6 +398,8 @@ class BlockHeaderDAOTest extends ChainDbUnitTest {
blockchains <- blockchainsF
} yield {
assert(blockchains.length == 1)
assert(blockchains.head.length == 1)
assert(blockchains.head.headers.head.hashBE == genesisHeaderDb.hashBE)
}
}

View file

@ -975,6 +975,18 @@ object ChainHandler {
blockFilterCheckpoints = Map.empty)
}
def fromDatabase()(implicit
ec: ExecutionContext,
chainConfig: ChainAppConfig): ChainHandler = {
lazy val blockHeaderDAO = BlockHeaderDAO()
lazy val filterHeaderDAO = CompactFilterHeaderDAO()
lazy val filterDAO = CompactFilterDAO()
ChainHandler.fromDatabase(blockHeaderDAO = blockHeaderDAO,
filterHeaderDAO = filterHeaderDAO,
filterDAO = filterDAO)
}
/** Converts a [[ChainHandler]] to [[ChainHandlerCached]] by calling [[BlockHeaderDAO.getBlockchains()]] */
def toChainHandlerCached(chainHandler: ChainHandler)(implicit
ec: ExecutionContext): Future[ChainHandlerCached] = {

View file

@ -2,6 +2,7 @@ package org.bitcoins.testkit.chain
import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory
import org.bitcoins.asyncutil.AsyncUtil
import org.bitcoins.chain.ChainVerificationLogger
import org.bitcoins.chain.blockchain.sync.ChainSync
import org.bitcoins.chain.blockchain.{ChainHandler, ChainHandlerCached}
@ -26,7 +27,7 @@ import org.bitcoins.testkit.chain.models.{
import org.bitcoins.testkit.fixtures.BitcoinSFixture
import org.bitcoins.testkit.node.CachedChainAppConfig
import org.bitcoins.testkit.rpc.BitcoindRpcTestUtil
import org.bitcoins.testkit.util.{AkkaUtil, ScalaTestUtil}
import org.bitcoins.testkit.util.ScalaTestUtil
import org.bitcoins.testkit.{chain, BitcoinSTestAppConfig}
import org.bitcoins.zmq.ZMQSubscriber
import org.scalatest._
@ -231,18 +232,18 @@ trait ChainUnitTest
() => ChainUnitTest.destroyAllTables())(test)
}
def createChainHandlerWithBitcoindZmq(
bitcoind: BitcoindRpcClient): Future[(ChainHandler, ZMQSubscriber)] = {
val handlerWithGenesisHeaderF =
ChainUnitTest.setupHeaderTableWithGenesisHeader()
val chainHandlerF = handlerWithGenesisHeaderF.map(_._1)
def createChainHandlerWithBitcoindZmq(bitcoind: BitcoindRpcClient)(implicit
chainAppConfig: ChainAppConfig): Future[(ChainHandler, ZMQSubscriber)] = {
val zmqRawBlockUriOpt: Option[InetSocketAddress] =
bitcoind.instance.zmqConfig.rawBlock
val handleRawBlock: Block => Unit = { block: Block =>
chainHandlerF.flatMap(_.processHeader(block.blockHeader))
val chainApiF =
ChainHandler.fromDatabase()(executionContext, chainAppConfig)
val processF = chainApiF.processHeader(block.blockHeader)
processF.failed.foreach { err =>
logger.error(s"Failed to parse handleRawBlock callback", err)
}
()
}
@ -255,10 +256,25 @@ trait ChainUnitTest
rawBlockListener = Some(handleRawBlock))
zmqSubscriber.start()
for {
_ <- AkkaUtil.nonBlockingSleep(1.second)
val handlerWithGenesisHeaderF =
ChainUnitTest.setupHeaderTableWithGenesisHeader()(executionContext,
chainAppConfig)
val chainHandlerF = handlerWithGenesisHeaderF.map(_._1)
//generate a block and make sure we see it so we know the subscription is complete
val subscribedF = for {
chainHandler <- chainHandlerF
addr <- bitcoind.getNewAddress
hash +: _ <- bitcoind.generateToAddress(1, addr)
//wait until we see the hash, to make sure the subscription is started
_ <- AsyncUtil.retryUntilSatisfiedF(
() => {
chainHandler.getHeader(hash).map(_.isDefined)
},
250.millis)
} yield (chainHandler, zmqSubscriber)
subscribedF
}
def createBitcoindChainHandlerViaZmq(): Future[BitcoindChainHandlerViaZmq] = {
@ -288,12 +304,16 @@ trait ChainUnitTest
* @return
*/
def withBitcoindChainHandlerViaZmq(test: OneArgAsyncTest)(implicit
system: ActorSystem): FutureOutcome = {
system: ActorSystem,
chainAppConfig: ChainAppConfig): FutureOutcome = {
val builder: () => Future[BitcoindChainHandlerViaZmq] =
composeBuildersAndWrap(builder = () => BitcoinSFixture.createBitcoind(),
dependentBuilder =
createChainHandlerWithBitcoindZmq,
wrap = BitcoindChainHandlerViaZmq.apply)
composeBuildersAndWrap(
builder = () => BitcoinSFixture.createBitcoind(),
dependentBuilder = { rpc: BitcoindRpcClient =>
createChainHandlerWithBitcoindZmq(rpc)(chainAppConfig)
},
wrap = BitcoindChainHandlerViaZmq.apply
)
makeDependentFixture(builder, destroyBitcoindChainHandlerViaZmq)(test)
}

View file

@ -91,6 +91,9 @@ trait BitcoindRpcTestUtil extends Logging {
blockFilterIndex: Boolean = false): BitcoindConfig = {
val pass = FileUtil.randomDirName
val username = "random_user_name"
/* pruning and txindex are not compatible */
val txindex = if (pruneMode) 0 else 1
val conf = s"""
|regtest=1
|daemon=1
@ -103,8 +106,7 @@ trait BitcoindRpcTestUtil extends Logging {
|walletbroadcast=1
|peerbloomfilters=1
|fallbackfee=0.0002
|txindex=${if (pruneMode) 0
else 1 /* pruning and txindex are not compatible */}
|txindex=$txindex
|zmqpubhashtx=tcp://${zmqConfig.hashTx.get.getHostString}:${zmqConfig.hashTx.get.getPort}
|zmqpubhashblock=tcp://${zmqConfig.hashBlock.get.getHostString}:${zmqConfig.hashBlock.get.getPort}
|zmqpubrawtx=tcp://${zmqConfig.rawTx.get.getHostString}:${zmqConfig.rawTx.get.getPort}

View file

@ -455,7 +455,7 @@ private[wallet] trait TransactionProcessing extends WalletLogger {
newTags: Vector[AddressTag]): Future[Seq[SpendingInfoDb]] = {
getRelevantOutputs(transaction).flatMap {
case Nil =>
logger.debug(
logger.trace(
s"Found no outputs relevant to us in transaction${transaction.txIdBE.hex}")
Future.successful(Vector.empty)

View file

@ -193,7 +193,7 @@ private[wallet] trait UtxoHandling extends WalletLogger {
_ =
if (toUpdate.nonEmpty)
logger.info(s"${toUpdate.size} txos are now confirmed!")
else logger.debug("No txos to be confirmed")
else logger.trace("No txos to be confirmed")
updated <- spendingInfoDAO.upsertAllSpendingInfoDb(toUpdate.flatten)
} yield updated
}

View file

@ -3,6 +3,7 @@ package org.bitcoins.zmq
import grizzled.slf4j.Logging
import org.bitcoins.core.protocol.blockchain.Block
import org.bitcoins.core.protocol.transaction.Transaction
import org.bitcoins.core.util.StartStop
import org.bitcoins.crypto.DoubleSha256DigestBE
import org.zeromq.{SocketType, ZMQ, ZMQException, ZMsg}
import scodec.bits.ByteVector
@ -26,7 +27,8 @@ class ZMQSubscriber(
hashBlockListener: Option[DoubleSha256DigestBE => Unit],
rawTxListener: Option[Transaction => Unit],
rawBlockListener: Option[Block => Unit])
extends Logging {
extends Logging
with StartStop[Unit] {
private var running = true
private val context = ZMQ.context(1)
@ -96,7 +98,7 @@ class ZMQSubscriber(
s"ZMQSubscriber-thread-${System.currentTimeMillis()}")
subscriberThread.setDaemon(true)
def start(): Unit = {
override def start(): Unit = {
logger.info("starting zmq")
subscriberThread.start()
}
@ -104,7 +106,7 @@ class ZMQSubscriber(
/** Stops running the zmq subscriber and cleans up after zmq
* http://zguide.zeromq.org/java:psenvsub
*/
def stop(): Unit = {
override def stop(): Unit = {
logger.info(s"Stopping zmq")
//i think this could technically not work, because currently we are blocking
//on Zmsg.recvMsg in our while loop. If we don't get another message we won't
@ -113,6 +115,7 @@ class ZMQSubscriber(
subscriber.close()
logger.info("Attempting to terminate context")
context.term()
subscriberThread.interrupt()
logger.info(s"Done with closing zmq")
()
}