mirror of
https://github.com/ACINQ/eclair.git
synced 2025-02-23 06:35:11 +01:00
Improve startup error handling (#696)
* Improve startup error handling * minor changes to ZMQACtor Scheduler now sends messages instead of directly calling our checkXX methods. It will work the same but should fix the NPE we sometimes get when we stop the app.
This commit is contained in:
parent
8c075ee73a
commit
a85b6d6175
2 changed files with 31 additions and 12 deletions
|
@ -43,7 +43,7 @@ import grizzled.slf4j.Logging
|
||||||
import org.json4s.JsonAST.JArray
|
import org.json4s.JsonAST.JArray
|
||||||
|
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
import scala.concurrent.{Await, ExecutionContext, Future, Promise}
|
import scala.concurrent._
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Setup eclair from a datadir.
|
* Setup eclair from a datadir.
|
||||||
|
@ -98,6 +98,8 @@ class Setup(datadir: File,
|
||||||
// Make sure wallet support is enabled in bitcoind.
|
// Make sure wallet support is enabled in bitcoind.
|
||||||
_ <- bitcoinClient.invoke("getbalance").recover { case _ => throw BitcoinWalletDisabledException }
|
_ <- bitcoinClient.invoke("getbalance").recover { case _ => throw BitcoinWalletDisabledException }
|
||||||
progress = (json \ "verificationprogress").extract[Double]
|
progress = (json \ "verificationprogress").extract[Double]
|
||||||
|
blocks = (json \ "blocks").extract[Long]
|
||||||
|
headers = (json \ "headers").extract[Long]
|
||||||
chainHash <- bitcoinClient.invoke("getblockhash", 0).map(_.extract[String]).map(BinaryData(_)).map(x => BinaryData(x.reverse))
|
chainHash <- bitcoinClient.invoke("getblockhash", 0).map(_.extract[String]).map(BinaryData(_)).map(x => BinaryData(x.reverse))
|
||||||
bitcoinVersion <- bitcoinClient.invoke("getnetworkinfo").map(json => (json \ "version")).map(_.extract[String])
|
bitcoinVersion <- bitcoinClient.invoke("getnetworkinfo").map(json => (json \ "version")).map(_.extract[String])
|
||||||
unspentAddresses <- bitcoinClient.invoke("listunspent").collect { case JArray(values) =>
|
unspentAddresses <- bitcoinClient.invoke("listunspent").collect { case JArray(values) =>
|
||||||
|
@ -105,15 +107,16 @@ class Setup(datadir: File,
|
||||||
.filter(value => (value \ "spendable").extract[Boolean])
|
.filter(value => (value \ "spendable").extract[Boolean])
|
||||||
.map(value => (value \ "address").extract[String])
|
.map(value => (value \ "address").extract[String])
|
||||||
}
|
}
|
||||||
} yield (progress, chainHash, bitcoinVersion, unspentAddresses)
|
} yield (progress, chainHash, bitcoinVersion, unspentAddresses, blocks, headers)
|
||||||
// blocking sanity checks
|
// blocking sanity checks
|
||||||
val (progress, chainHash, bitcoinVersion, unspentAddresses) = Await.result(future, 30 seconds)
|
val (progress, chainHash, bitcoinVersion, unspentAddresses, blocks, headers) = await(future, 30 seconds, "bicoind did not respond after 30 seconds")
|
||||||
assert(bitcoinVersion.startsWith("16"), "Eclair requires Bitcoin Core 0.16.0 or higher")
|
assert(bitcoinVersion.startsWith("16"), "Eclair requires Bitcoin Core 0.16.0 or higher")
|
||||||
assert(chainHash == nodeParams.chainHash, s"chainHash mismatch (conf=${nodeParams.chainHash} != bitcoind=$chainHash)")
|
assert(chainHash == nodeParams.chainHash, s"chainHash mismatch (conf=${nodeParams.chainHash} != bitcoind=$chainHash)")
|
||||||
if (chainHash != Block.RegtestGenesisBlock.hash) {
|
if (chainHash != Block.RegtestGenesisBlock.hash) {
|
||||||
assert(unspentAddresses.forall(address => !isPay2PubkeyHash(address)), "Make sure that all your UTXOS are segwit UTXOS and not p2pkh (check out our README for more details)")
|
assert(unspentAddresses.forall(address => !isPay2PubkeyHash(address)), "Make sure that all your UTXOS are segwit UTXOS and not p2pkh (check out our README for more details)")
|
||||||
}
|
}
|
||||||
assert(progress > 0.99, "bitcoind should be synchronized")
|
assert(progress > 0.999, s"bitcoind should be synchronized (progress=$progress")
|
||||||
|
assert(headers - blocks <= 1, s"bitcoind should be synchronized (headers=$headers blocks=$blocks")
|
||||||
// TODO: add a check on bitcoin version?
|
// TODO: add a check on bitcoin version?
|
||||||
|
|
||||||
Bitcoind(bitcoinClient)
|
Bitcoind(bitcoinClient)
|
||||||
|
@ -248,6 +251,14 @@ class Setup(datadir: File,
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private def await[T](awaitable: Awaitable[T], atMost: Duration, messageOnTimeout: => String): T = try {
|
||||||
|
Await.result(awaitable, atMost)
|
||||||
|
} catch {
|
||||||
|
case e: TimeoutException =>
|
||||||
|
logger.error(messageOnTimeout)
|
||||||
|
throw e
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// @formatter:off
|
// @formatter:off
|
||||||
|
|
|
@ -22,6 +22,7 @@ import fr.acinq.eclair.blockchain.{NewBlock, NewTransaction}
|
||||||
import org.zeromq.ZMQ.Event
|
import org.zeromq.ZMQ.Event
|
||||||
import org.zeromq.{ZContext, ZMQ, ZMsg}
|
import org.zeromq.{ZContext, ZMQ, ZMsg}
|
||||||
|
|
||||||
|
import scala.annotation.tailrec
|
||||||
import scala.concurrent.{ExecutionContext, Promise}
|
import scala.concurrent.{ExecutionContext, Promise}
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
import scala.util.Try
|
import scala.util.Try
|
||||||
|
@ -47,26 +48,33 @@ class ZMQActor(address: String, connected: Option[Promise[Boolean]] = None) exte
|
||||||
implicit val ec: ExecutionContext = context.system.dispatcher
|
implicit val ec: ExecutionContext = context.system.dispatcher
|
||||||
|
|
||||||
// we check messages in a non-blocking manner with an interval, making sure to retrieve all messages before waiting again
|
// we check messages in a non-blocking manner with an interval, making sure to retrieve all messages before waiting again
|
||||||
def checkEvent: Unit = Option(Event.recv(monitor, ZMQ.DONTWAIT)) match {
|
@tailrec
|
||||||
|
final def checkEvent: Unit = Option(Event.recv(monitor, ZMQ.DONTWAIT)) match {
|
||||||
case Some(event) =>
|
case Some(event) =>
|
||||||
self ! event
|
self ! event
|
||||||
checkEvent
|
checkEvent
|
||||||
case None =>
|
case None => ()
|
||||||
context.system.scheduler.scheduleOnce(1 second)(checkEvent)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
def checkMsg: Unit = Option(ZMsg.recvMsg(subscriber, ZMQ.DONTWAIT)) match {
|
@tailrec
|
||||||
|
final def checkMsg: Unit = Option(ZMsg.recvMsg(subscriber, ZMQ.DONTWAIT)) match {
|
||||||
case Some(msg) =>
|
case Some(msg) =>
|
||||||
self ! msg
|
self ! msg
|
||||||
checkMsg
|
checkMsg
|
||||||
case None =>
|
case None => ()
|
||||||
context.system.scheduler.scheduleOnce(1 second)(checkMsg)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
checkEvent
|
self ! 'checkEvent
|
||||||
checkMsg
|
self ! 'checkMsg
|
||||||
|
|
||||||
override def receive: Receive = {
|
override def receive: Receive = {
|
||||||
|
case 'checkEvent =>
|
||||||
|
checkEvent
|
||||||
|
context.system.scheduler.scheduleOnce(1 second, self ,'checkEvent)
|
||||||
|
|
||||||
|
case 'checkMsg =>
|
||||||
|
checkMsg
|
||||||
|
context.system.scheduler.scheduleOnce(1 second, self, 'checkMsg)
|
||||||
|
|
||||||
case event: Event => event.getEvent match {
|
case event: Event => event.getEvent match {
|
||||||
case ZMQ.EVENT_CONNECTED =>
|
case ZMQ.EVENT_CONNECTED =>
|
||||||
|
|
Loading…
Add table
Reference in a new issue