mirror of
https://github.com/ACINQ/eclair.git
synced 2025-02-24 06:47:46 +01:00
Fix ZmqWatcher block timeout (#1989)
Unfortunately, `context.log` is *not* thread safe and shouldn't be used in future continuation. We should instead use `pipeToSelf` when we want to act on the results of a `Future`.
This commit is contained in:
parent
d6b46aed4d
commit
0621ccfe0c
1 changed files with 28 additions and 11 deletions
|
@ -24,13 +24,13 @@ import fr.acinq.eclair.blockchain.Monitoring.Metrics
|
|||
import fr.acinq.eclair.blockchain._
|
||||
import fr.acinq.eclair.blockchain.bitcoind.rpc.BitcoinCoreClient
|
||||
import fr.acinq.eclair.blockchain.watchdogs.BlockchainWatchdog
|
||||
import fr.acinq.eclair.tor.Socks5ProxyParams
|
||||
import fr.acinq.eclair.wire.protocol.ChannelAnnouncement
|
||||
import fr.acinq.eclair.{KamonExt, NodeParams, ShortChannelId}
|
||||
|
||||
import java.util.concurrent.atomic.AtomicLong
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
import scala.util.{Failure, Success}
|
||||
|
||||
/**
|
||||
* Created by PM on 21/02/2016.
|
||||
|
@ -58,6 +58,9 @@ object ZmqWatcher {
|
|||
|
||||
private case object TickNewBlock extends Command
|
||||
private case object TickBlockTimeout extends Command
|
||||
private case class GetBlockCountFailed(t: Throwable) extends Command
|
||||
private case class CheckBlockCount(count: Long) extends Command
|
||||
private case class PublishBlockCount(count: Long) extends Command
|
||||
private case class ProcessNewBlock(blockHash: ByteVector32) extends Command
|
||||
private case class ProcessNewTransaction(tx: Transaction) extends Command
|
||||
|
||||
|
@ -247,20 +250,28 @@ private class ZmqWatcher(nodeParams: NodeParams, blockCount: AtomicLong, client:
|
|||
case TickBlockTimeout =>
|
||||
// we haven't received a block in a while, we check whether we're behind and restart the timer.
|
||||
timers.startSingleTimer(TickBlockTimeout, blockTimeout)
|
||||
val currentBlockCount = blockCount.get()
|
||||
client.getBlockCount.map { count =>
|
||||
if (count > currentBlockCount) {
|
||||
context.log.warn("new block wasn't received via ZMQ, you should verify your bitcoind node")
|
||||
context.self ! TickNewBlock
|
||||
}
|
||||
context.pipeToSelf(client.getBlockCount) {
|
||||
case Failure(t) => GetBlockCountFailed(t)
|
||||
case Success(count) => CheckBlockCount(count)
|
||||
}
|
||||
Behaviors.same
|
||||
|
||||
case GetBlockCountFailed(t) =>
|
||||
log.error("could not get block count from bitcoind", t)
|
||||
Behaviors.same
|
||||
|
||||
case CheckBlockCount(count) =>
|
||||
val current = blockCount.get()
|
||||
if (count > current) {
|
||||
log.warn("block {} wasn't received via ZMQ, you should verify that your bitcoind node is running", count)
|
||||
context.self ! TickNewBlock
|
||||
}
|
||||
Behaviors.same
|
||||
|
||||
case TickNewBlock =>
|
||||
client.getBlockCount.map { count =>
|
||||
log.debug("setting blockCount={}", count)
|
||||
blockCount.set(count)
|
||||
context.system.eventStream ! EventStream.Publish(CurrentBlockCount(count))
|
||||
context.pipeToSelf(client.getBlockCount) {
|
||||
case Failure(t) => GetBlockCountFailed(t)
|
||||
case Success(count) => PublishBlockCount(count)
|
||||
}
|
||||
// TODO: beware of the herd effect
|
||||
KamonExt.timeFuture(Metrics.NewBlockCheckConfirmedDuration.withoutTags()) {
|
||||
|
@ -268,6 +279,12 @@ private class ZmqWatcher(nodeParams: NodeParams, blockCount: AtomicLong, client:
|
|||
}
|
||||
Behaviors.same
|
||||
|
||||
case PublishBlockCount(count) =>
|
||||
log.debug("setting blockCount={}", count)
|
||||
blockCount.set(count)
|
||||
context.system.eventStream ! EventStream.Publish(CurrentBlockCount(count))
|
||||
Behaviors.same
|
||||
|
||||
case TriggerEvent(replyTo, watch, event) =>
|
||||
if (watches.contains(watch)) {
|
||||
log.info("triggering {}", watch)
|
||||
|
|
Loading…
Add table
Reference in a new issue