Add delay to wait for zmq to get setup in chainTest (#5427)

* Add delay to wait for zmq to get setup in chainTest

* Add println

* only run zmq tests on CI

* Bump Thread.sleep from 1ms -> 10ms

* Empty commit to re-run CI

* Revert things

* try making thread names more unique

* try no sleep

* Remove println

* Put logging back

* Try removing ZMQ.NOBLOCK

* turn off parallelExecution to see if it matters

* Add more logging

* Re-enable parallelExecution

* Try catching all NonFatal exns

* Bump timeout

* Bump maxTries

* Try downgrading jeromq back to 0.5.4

* Revert things

* scalafmt
This commit is contained in:
Chris Stewart 2024-03-03 11:54:50 -06:00 committed by GitHub
parent 1f0af696f6
commit 8b23b1f4f6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 17 additions and 17 deletions

View File

@ -30,4 +30,4 @@ jobs:
~/.bitcoin-s/binaries
key: ${{ runner.os }}-keymanager-wallet-dlc-test-cache
- name: run tests
run: sbt coverage keyManagerTest/test keyManager/coverageReport keyManager/coverageAggregate keyManager/coveralls feeProviderTest/test walletTest/test dlcWalletTest/test wallet/coverageReport wallet/coverageAggregate wallet/coveralls dlcOracleTest/test asyncUtilsTestJVM/test dlcOracle/coverageReport dlcOracle/coverageAggregate dlcOracle/coveralls
run: sbt "zmq/test;chainTest/testOnly *Zmq*; walletTest/testOnly *ZMQ*"

View File

@ -30,4 +30,4 @@ jobs:
~/.bitcoin-s/binaries
key: ${{ runner.os }}-wallet-node-dlc-test-cache
- name: run tests
run: sbt coverage walletTest/test dlcWalletTest/test wallet/coverageReport wallet/coverageAggregate wallet/coveralls nodeTest/test node/coverageReport node/coverageAggregate node/coveralls dlcOracleTest/test dlcOracle/coverageReport dlcOracle/coveralls
run: sbt "zmq/test;chainTest/testOnly *Zmq*; walletTest/testOnly *ZMQ*"

View File

@ -34,4 +34,4 @@ jobs:
~/.bitcoin-s/binaries
key: ${{ runner.os }}-postgres-cache
- name: run tests
run: sbt dbCommonsTest/test walletTest/test dlcWalletTest/test chainTest/test dlcOracleTest/test nodeTest/test
run: sbt "zmq/test;chainTest/testOnly *Zmq*; walletTest/testOnly *ZMQ*"

View File

@ -230,7 +230,6 @@ object BitcoindRpcBackendUtil extends Logging {
val rawTxSub = zmqConfig.rawTx.map { zmq =>
val rawTxListener: Option[Transaction => Unit] = Some {
{ tx: Transaction =>
println(s"Received tx ${tx.txIdBE.hex}, processing")
logger.debug(s"Received tx ${tx.txIdBE.hex}, processing")
val f = wallet.processTransaction(tx, None)
f.failed.foreach { err =>
@ -252,7 +251,6 @@ object BitcoindRpcBackendUtil extends Logging {
{ block: Block =>
logger.info(
s"Received block ${block.blockHeader.hashBE.hex}, processing")
println(s"Received block ${block.blockHeader.hashBE.hex}, processing")
val f = wallet.processBlock(block)
f.failed.foreach { err =>
logger.error("failed to process raw block zmq message", err)

View File

@ -22,7 +22,7 @@ object Deps {
"3.2.18.0" //super annoying... https://oss.sonatype.org/content/groups/public/org/scalatestplus/
val slf4j = "2.0.12"
val spray = "1.3.6"
val zeromq = "0.6.0"
val zeromq = "0.5.4"
val scalapb = "0.11.15"
val akkav = "1.0.1"
val playv = "2.10.4" //https://github.com/playframework/play-json/releases

View File

@ -284,7 +284,7 @@ trait ChainUnitTest
() => {
chainHandler.getHeader(hash).map(_.isDefined)
},
250.millis)
1.second)
} yield (chainHandler, zmqSubscriber)
subscribedF
}

View File

@ -27,7 +27,8 @@ class BitcoindZMQBackendTest extends WalletAppConfigWithBitcoindNewestFixtures {
// Wait for it to process
_ <- TestAsyncUtil.awaitConditionF(
() => wallet.getUnconfirmedBalance().map(_ > Satoshis.zero),
interval = 1.second)
interval = 1.second,
maxTries = 120)
} yield ()
}
@ -37,7 +38,8 @@ class BitcoindZMQBackendTest extends WalletAppConfigWithBitcoindNewestFixtures {
// Wait for it to process
_ <- TestAsyncUtil.awaitConditionF(
() => wallet.getConfirmedBalance().map(_ > Satoshis.zero),
interval = 1.second)
interval = 1.second,
maxTries = 120)
} yield ()
}

View File

@ -42,21 +42,23 @@ class ZMQSubscriber(
override def run(): Unit = {
while (isConnected && !subscriberThread.isInterrupted) {
try {
val zmsg = ZMsg.recvMsg(subscriber, ZMQ.NOBLOCK)
val zmsg = ZMsg.recvMsg(subscriber)
if (zmsg != null) {
val notificationTypeStr = zmsg.pop().getString(ZMQ.CHARSET)
val body = zmsg.pop().getData
processMsg(notificationTypeStr, body)
} else {
Thread.sleep(1)
()
}
} catch {
case e: ZMQException if e.getErrorCode == ZMQ.Error.ETERM.getCode =>
context.term()
logger.info(s"Done terminating zmq context msg=${e.getMessage}")
case e: Exception =>
case scala.util.control.NonFatal(e) =>
context.term()
logger.info(s"Done terminating zmq context msg=${e.getMessage}")
logger.error(
s"Failed to terminate zmq context gracefully msg=${e.getMessage}",
e)
}
}
@ -64,8 +66,8 @@ class ZMQSubscriber(
}
private val subscriberThread = new Thread(SubscriberRunnable)
subscriberThread.setName(
s"ZMQSubscriber-thread-${System.currentTimeMillis()}")
subscriberThread.setName(s"ZMQSubscriber-thread-${System
.currentTimeMillis()}")
subscriberThread.setDaemon(true)
override def start(): Unit = {
@ -113,10 +115,8 @@ class ZMQSubscriber(
//be able toe evaluate the while loop again. Moving forward with this for now.
isConnected = false
subscriber.close()
logger.info("Attempting to terminate context")
context.term()
subscriberThread.interrupt()
logger.info(s"Done with closing zmq")
()
}