diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcher.scala b/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcher.scala index 0b0faaf31..e42d2e640 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcher.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcher.scala @@ -24,13 +24,13 @@ import fr.acinq.eclair.blockchain.Monitoring.Metrics import fr.acinq.eclair.blockchain._ import fr.acinq.eclair.blockchain.bitcoind.rpc.BitcoinCoreClient import fr.acinq.eclair.blockchain.watchdogs.BlockchainWatchdog -import fr.acinq.eclair.tor.Socks5ProxyParams import fr.acinq.eclair.wire.protocol.ChannelAnnouncement import fr.acinq.eclair.{KamonExt, NodeParams, ShortChannelId} import java.util.concurrent.atomic.AtomicLong import scala.concurrent.duration._ import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Failure, Success} /** * Created by PM on 21/02/2016. @@ -58,6 +58,9 @@ object ZmqWatcher { private case object TickNewBlock extends Command private case object TickBlockTimeout extends Command + private case class GetBlockCountFailed(t: Throwable) extends Command + private case class CheckBlockCount(count: Long) extends Command + private case class PublishBlockCount(count: Long) extends Command private case class ProcessNewBlock(blockHash: ByteVector32) extends Command private case class ProcessNewTransaction(tx: Transaction) extends Command @@ -247,20 +250,28 @@ private class ZmqWatcher(nodeParams: NodeParams, blockCount: AtomicLong, client: case TickBlockTimeout => // we haven't received a block in a while, we check whether we're behind and restart the timer. timers.startSingleTimer(TickBlockTimeout, blockTimeout) - val currentBlockCount = blockCount.get() - client.getBlockCount.map { count => - if (count > currentBlockCount) { - context.log.warn("new block wasn't received via ZMQ, you should verify your bitcoind node") - context.self ! TickNewBlock - } + context.pipeToSelf(client.getBlockCount) { + case Failure(t) => GetBlockCountFailed(t) + case Success(count) => CheckBlockCount(count) + } + Behaviors.same + + case GetBlockCountFailed(t) => + log.error("could not get block count from bitcoind", t) + Behaviors.same + + case CheckBlockCount(count) => + val current = blockCount.get() + if (count > current) { + log.warn("block {} wasn't received via ZMQ, you should verify that your bitcoind node is running", count) + context.self ! TickNewBlock } Behaviors.same case TickNewBlock => - client.getBlockCount.map { count => - log.debug("setting blockCount={}", count) - blockCount.set(count) - context.system.eventStream ! EventStream.Publish(CurrentBlockCount(count)) + context.pipeToSelf(client.getBlockCount) { + case Failure(t) => GetBlockCountFailed(t) + case Success(count) => PublishBlockCount(count) } // TODO: beware of the herd effect KamonExt.timeFuture(Metrics.NewBlockCheckConfirmedDuration.withoutTags()) { @@ -268,6 +279,12 @@ private class ZmqWatcher(nodeParams: NodeParams, blockCount: AtomicLong, client: } Behaviors.same + case PublishBlockCount(count) => + log.debug("setting blockCount={}", count) + blockCount.set(count) + context.system.eventStream ! EventStream.Publish(CurrentBlockCount(count)) + Behaviors.same + case TriggerEvent(replyTo, watch, event) => if (watches.contains(watch)) { log.info("triggering {}", watch)