1
0
mirror of https://github.com/ACINQ/eclair.git synced 2024-11-19 09:54:02 +01:00

Tor support for blockchain watchdogs (#1907)

We now query blockchain watchdogs over Tor when it's activated.
Some watchdogs are automatically disabled because they have no
support for Tor.

We also let users change the list of watchdogs that should be run
from their `eclair.conf`.

Co-authored-by: Bastien Teinturier <31281497+t-bast@users.noreply.github.com>
This commit is contained in:
rorp 2021-09-01 00:09:10 -07:00 committed by GitHub
parent 54fa208c7d
commit bca2a83218
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 250 additions and 65 deletions

View File

@ -98,8 +98,8 @@ You can see what onion address is assigned using `eclair-cli`:
```shell
eclair-cli getinfo
```
Eclair saves the Tor endpoint's private key in `~/.eclair/tor_pk`, so that it can recreate the endpoint address after
a restart. If you remove the private key eclair will regenerate the endpoint address.
Eclair saves the Tor endpoint's private key in `~/.eclair/tor.dat`, so that it can recreate the endpoint address after
a restart. If you remove the private key Eclair will regenerate the endpoint address.
There are two possible values for `protocol-version`:
@ -140,3 +140,30 @@ eclair.socks5.randomize-credentials = true
features from using Tor, use both.
Note, that bitcoind should be configured to use Tor as well (https://en.bitcoin.it/wiki/Setting_up_a_Tor_hidden_service).
### Blockchain watchdogs
Eclair version 0.5.0 introduced blockchain watchdogs, that fetch bitcoin headers from various sources in
order to detect whether the node is being eclipsed. Eclair supports four sources at the moment:
* blockchainheaders.net
* blockcypher.com
* blockstream.info
* mempool.space
Once `eclair.socks5.enabled` is set to `true` blockchain watchdogs connect to their respective sources over Tor.
The most Tor-friendly sources are `blockstream.info` and `mempool.space` since they have native onion endpoints for their APIs.
Tor support for `blockchainheaders.net` is not implemented (yet), so it gets automatically disabled when `eclair.socks5.enabled = true` to protect user's privacy.
`blockcypher.com` can be flaky when used over Tor. It imposes rate limits and sometimes (rather often in fact) requires solving CAPTCHA.
If you experience similar troubles with `blockcypher.com` use this config parameter to disable it:
```
eclair.blockchain-watchdog.sources = [
"bitcoinheaders.net",
"blockstream.info",
"mempool.space"
]
```

View File

@ -305,6 +305,17 @@ eclair {
// override this with a script/exe that will be called everytime a new database backup has been created
# notify-script = "/absolute/path/to/script.sh"
}
blockchain-watchdog {
// all available blockchain watchdog sources
// you can remove items from this list to disable failing or non-needed sources
sources = [
"bitcoinheaders.net",
"blockcypher.com",
"blockstream.info",
"mempool.space"
]
}
}
akka {

View File

@ -90,7 +90,8 @@ case class NodeParams(nodeKeyManager: NodeKeyManager,
socksProxy_opt: Option[Socks5ProxyParams],
maxPaymentAttempts: Int,
enableTrampolinePayment: Boolean,
balanceCheckInterval: FiniteDuration) {
balanceCheckInterval: FiniteDuration,
blockchainWatchdogSources: Seq[String]) {
val privateKey: Crypto.PrivateKey = nodeKeyManager.nodeKey.privateKey
val nodeId: PublicKey = nodeKeyManager.nodeId
@ -411,7 +412,8 @@ object NodeParams extends Logging {
socksProxy_opt = socksProxy_opt,
maxPaymentAttempts = config.getInt("max-payment-attempts"),
enableTrampolinePayment = config.getBoolean("trampoline-payments-enable"),
balanceCheckInterval = FiniteDuration(config.getDuration("balance-check-interval").getSeconds, TimeUnit.SECONDS)
balanceCheckInterval = FiniteDuration(config.getDuration("balance-check-interval").getSeconds, TimeUnit.SECONDS),
blockchainWatchdogSources = config.getStringList("blockchain-watchdog.sources").asScala.toSeq
)
}
}

View File

@ -29,8 +29,8 @@ import fr.acinq.eclair.balance.{BalanceActor, ChannelsListener}
import fr.acinq.eclair.blockchain.bitcoind.rpc.{BasicBitcoinJsonRPCClient, BatchingBitcoinJsonRPCClient, ExtendedBitcoinClient}
import fr.acinq.eclair.blockchain.bitcoind.zmq.ZMQActor
import fr.acinq.eclair.blockchain.bitcoind.{BitcoinCoreWallet, ZmqWatcher}
import fr.acinq.eclair.blockchain.fee.{ConstantFeeProvider, _}
import fr.acinq.eclair.blockchain.{EclairWallet, _}
import fr.acinq.eclair.blockchain.fee._
import fr.acinq.eclair.blockchain._
import fr.acinq.eclair.channel.{Channel, Register}
import fr.acinq.eclair.crypto.WeakEntropyPool
import fr.acinq.eclair.crypto.keymanager.{LocalChannelKeyManager, LocalNodeKeyManager}
@ -244,7 +244,7 @@ class Setup(val datadir: File,
watcher = {
system.actorOf(SimpleSupervisor.props(Props(new ZMQActor(config.getString("bitcoind.zmqblock"), ZMQActor.Topics.HashBlock, Some(zmqBlockConnected))), "zmqblock", SupervisorStrategy.Restart))
system.actorOf(SimpleSupervisor.props(Props(new ZMQActor(config.getString("bitcoind.zmqtx"), ZMQActor.Topics.RawTx, Some(zmqTxConnected))), "zmqtx", SupervisorStrategy.Restart))
system.spawn(Behaviors.supervise(ZmqWatcher(nodeParams.chainHash, blockCount, extendedBitcoinClient)).onFailure(typed.SupervisorStrategy.resume), "watcher")
system.spawn(Behaviors.supervise(ZmqWatcher(nodeParams, blockCount, extendedBitcoinClient)).onFailure(typed.SupervisorStrategy.resume), "watcher")
}
router = system.actorOf(SimpleSupervisor.props(Router.props(nodeParams, watcher, Some(routerInitialized)), "router", SupervisorStrategy.Resume))

View File

@ -24,8 +24,9 @@ import fr.acinq.eclair.blockchain.Monitoring.Metrics
import fr.acinq.eclair.blockchain._
import fr.acinq.eclair.blockchain.bitcoind.rpc.ExtendedBitcoinClient
import fr.acinq.eclair.blockchain.watchdogs.BlockchainWatchdog
import fr.acinq.eclair.tor.Socks5ProxyParams
import fr.acinq.eclair.wire.protocol.ChannelAnnouncement
import fr.acinq.eclair.{KamonExt, ShortChannelId}
import fr.acinq.eclair.{KamonExt, NodeParams, ShortChannelId}
import java.util.concurrent.atomic.AtomicLong
import scala.concurrent.duration._
@ -162,7 +163,7 @@ object ZmqWatcher {
private case object Ignore extends AddWatchResult
// @formatter:on
def apply(chainHash: ByteVector32, blockCount: AtomicLong, client: ExtendedBitcoinClient): Behavior[Command] =
def apply(nodeParams: NodeParams, blockCount: AtomicLong, client: ExtendedBitcoinClient): Behavior[Command] =
Behaviors.setup { context =>
context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[NewBlock](b => ProcessNewBlock(b.blockHash)))
context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[NewTransaction](t => ProcessNewTransaction(t.tx)))
@ -171,7 +172,7 @@ object ZmqWatcher {
timers.startSingleTimer(TickNewBlock, 1 second)
// we start a timer in case we don't receive ZMQ block events
timers.startSingleTimer(TickBlockTimeout, blockTimeout)
new ZmqWatcher(chainHash, blockCount, client, context, timers).watching(Set.empty[GenericWatch], Map.empty[OutPoint, Set[GenericWatch]])
new ZmqWatcher(nodeParams, blockCount, client, context, timers).watching(Set.empty[GenericWatch], Map.empty[OutPoint, Set[GenericWatch]])
}
}
@ -209,13 +210,13 @@ object ZmqWatcher {
}
private class ZmqWatcher(chainHash: ByteVector32, blockCount: AtomicLong, client: ExtendedBitcoinClient, context: ActorContext[ZmqWatcher.Command], timers: TimerScheduler[ZmqWatcher.Command])(implicit ec: ExecutionContext = ExecutionContext.global) {
private class ZmqWatcher(nodeParams: NodeParams, blockCount: AtomicLong, client: ExtendedBitcoinClient, context: ActorContext[ZmqWatcher.Command], timers: TimerScheduler[ZmqWatcher.Command])(implicit ec: ExecutionContext = ExecutionContext.global) {
import ZmqWatcher._
private val log = context.log
private val watchdog = context.spawn(Behaviors.supervise(BlockchainWatchdog(chainHash, 150 seconds)).onFailure(SupervisorStrategy.resume), "blockchain-watchdog")
private val watchdog = context.spawn(Behaviors.supervise(BlockchainWatchdog(nodeParams, 150 seconds)).onFailure(SupervisorStrategy.resume), "blockchain-watchdog")
private def watching(watches: Set[GenericWatch], watchedUtxos: Map[OutPoint, Set[GenericWatch]]): Behavior[Command] = {
Behaviors.receiveMessage {

View File

@ -19,9 +19,11 @@ package fr.acinq.eclair.blockchain.watchdogs
import akka.actor.typed.Behavior
import akka.actor.typed.eventstream.EventStream
import akka.actor.typed.scaladsl.Behaviors
import fr.acinq.bitcoin.{BlockHeader, ByteVector32}
import fr.acinq.bitcoin.BlockHeader
import fr.acinq.eclair.NodeParams
import fr.acinq.eclair.blockchain.CurrentBlockCount
import fr.acinq.eclair.blockchain.watchdogs.Monitoring.{Metrics, Tags}
import fr.acinq.eclair.tor.Socks5ProxyParams
import java.util.UUID
import scala.concurrent.duration.{DurationInt, FiniteDuration}
@ -38,6 +40,11 @@ object BlockchainWatchdog {
case class BlockHeaderAt(blockCount: Long, blockHeader: BlockHeader)
case object NoBlockReceivedTimer
trait SupportsTor {
/** Tor proxy connection parameters */
def socksProxy_opt: Option[Socks5ProxyParams]
}
sealed trait BlockchainWatchdogEvent
/**
* We are missing too many blocks compared to one of our blockchain watchdogs.
@ -55,13 +62,36 @@ object BlockchainWatchdog {
// @formatter:on
/**
* @param chainHash chain we're interested in.
* @param nodeParams provides the chain we're interested in, connection parameters and the list of enabled sources
* @param maxRandomDelay to avoid the herd effect whenever a block is created, we add a random delay before we query
* secondary blockchain sources. This parameter specifies the maximum delay we'll allow.
*/
def apply(chainHash: ByteVector32, maxRandomDelay: FiniteDuration, blockTimeout: FiniteDuration = 15 minutes): Behavior[Command] = {
def apply(nodeParams: NodeParams, maxRandomDelay: FiniteDuration, blockTimeout: FiniteDuration = 15 minutes): Behavior[Command] = {
Behaviors.setup { context =>
implicit val sttpBackend = ExplorerApi.createSttpBackend(nodeParams.socksProxy_opt)
val chainHash = nodeParams.chainHash
val socksProxy_opt = nodeParams.socksProxy_opt
val sources = nodeParams.blockchainWatchdogSources
val explorers = Seq(
ExplorerApi.BlockstreamExplorer(socksProxy_opt),
ExplorerApi.BlockcypherExplorer(socksProxy_opt),
ExplorerApi.MempoolSpaceExplorer(socksProxy_opt)).filter { e =>
val enabled = sources.contains(e.name)
if (!enabled) {
context.log.warn(s"blockchain watchdog ${e.name} is disabled")
}
enabled
}
val headersOverDnsEnabled = socksProxy_opt.isEmpty && sources.contains(HeadersOverDns.Source)
if (!headersOverDnsEnabled) {
context.log.warn(s"blockchain watchdog ${HeadersOverDns.Source} is disabled")
}
context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[CurrentBlockCount](cbc => WrappedCurrentBlockCount(cbc.blockCount)))
Behaviors.withTimers { timers =>
// We start a timer to check blockchain watchdogs regularly even when we don't receive any block.
timers.startSingleTimer(NoBlockReceivedTimer, NoBlockReceivedSince(0), blockTimeout)
@ -77,10 +107,12 @@ object BlockchainWatchdog {
Behaviors.same
case CheckLatestHeaders(blockCount) =>
val id = UUID.randomUUID()
if (headersOverDnsEnabled) {
context.spawn(HeadersOverDns(chainHash, blockCount), s"${HeadersOverDns.Source}-$blockCount-$id") ! HeadersOverDns.CheckLatestHeaders(context.self)
context.spawn(ExplorerApi(chainHash, blockCount, ExplorerApi.BlockstreamExplorer()), s"blockstream-$blockCount-$id") ! ExplorerApi.CheckLatestHeaders(context.self)
context.spawn(ExplorerApi(chainHash, blockCount, ExplorerApi.BlockcypherExplorer()), s"blockcypher-$blockCount-$id") ! ExplorerApi.CheckLatestHeaders(context.self)
context.spawn(ExplorerApi(chainHash, blockCount, ExplorerApi.MempoolSpaceExplorer()), s"mempool.space-$blockCount-$id") ! ExplorerApi.CheckLatestHeaders(context.self)
}
explorers.foreach { explorer =>
context.spawn(ExplorerApi(chainHash, blockCount, explorer), s"${explorer.name}-$blockCount-$id") ! ExplorerApi.CheckLatestHeaders(context.self)
}
Behaviors.same
case headers@LatestHeaders(blockCount, blockHeaders, source) =>
val missingBlocks = blockHeaders match {

View File

@ -16,22 +16,23 @@
package fr.acinq.eclair.blockchain.watchdogs
import java.time.OffsetDateTime
import akka.actor.ActorSystem
import akka.actor.typed.scaladsl.{ActorContext, Behaviors}
import akka.actor.typed.{ActorRef, Behavior}
import akka.pattern.after
import com.softwaremill.sttp.json4s.asJson
import com.softwaremill.sttp.okhttp.OkHttpFutureBackend
import com.softwaremill.sttp.{StatusCodes, SttpBackend, Uri, UriContext, sttp}
import com.softwaremill.sttp.{StatusCodes, SttpBackend, SttpBackendOptions, Uri, UriContext, sttp}
import fr.acinq.bitcoin.{Block, BlockHeader, ByteVector32}
import fr.acinq.eclair.blockchain.watchdogs.BlockchainWatchdog.{BlockHeaderAt, LatestHeaders}
import fr.acinq.eclair.blockchain.watchdogs.BlockchainWatchdog.{BlockHeaderAt, LatestHeaders, SupportsTor}
import fr.acinq.eclair.blockchain.watchdogs.Monitoring.{Metrics, Tags}
import fr.acinq.eclair.randomBytes
import fr.acinq.eclair.tor.Socks5ProxyParams
import org.json4s.JsonAST.{JArray, JInt, JObject, JString}
import org.json4s.jackson.Serialization
import org.json4s.{DefaultFormats, Serialization}
import java.time.OffsetDateTime
import scala.concurrent.duration.DurationInt
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
@ -45,7 +46,6 @@ object ExplorerApi {
implicit val formats: DefaultFormats = DefaultFormats
implicit val serialization: Serialization = Serialization
implicit val sttpBackend: SttpBackend[Future, Nothing] = OkHttpFutureBackend()
sealed trait Explorer {
// @formatter:off
@ -92,11 +92,26 @@ object ExplorerApi {
}
}
def createSttpBackend(socksProxy_opt: Option[Socks5ProxyParams]): SttpBackend[Future, Nothing] = {
val options = SttpBackendOptions(connectionTimeout = 30.seconds, proxy = None)
val sttpBackendOptions = socksProxy_opt match {
case Some(proxy) =>
val host = proxy.address.getHostString
val port = proxy.address.getPort
if (proxy.randomizeCredentials)
options.socksProxy(host, port, username = randomBytes(16).toHex, password = randomBytes(16).toHex)
else
options.socksProxy(host, port)
case None => options
}
OkHttpFutureBackend(sttpBackendOptions)
}
/**
* Query https://blockcypher.com/ to fetch block headers.
* See https://www.blockcypher.com/dev/bitcoin/#introduction.
*/
case class BlockcypherExplorer() extends Explorer {
case class BlockcypherExplorer(socksProxy_opt: Option[Socks5ProxyParams])(implicit val sb: SttpBackend[Future, Nothing]) extends Explorer with SupportsTor {
override val name = "blockcypher.com"
override val baseUris = Map(
Block.TestnetGenesisBlock.hash -> uri"https://api.blockcypher.com/v1/btc/test3",
@ -115,8 +130,10 @@ object ExplorerApi {
} yield headers
}
private def getTip(baseUri: Uri)(implicit ec: ExecutionContext): Future[Long] = for {
private def getTip(baseUri: Uri)(implicit ec: ExecutionContext, sb: SttpBackend[Future, Nothing]): Future[Long] = {
for {
tip <- sttp.readTimeout(30 seconds).get(baseUri)
.headers(Socks5ProxyParams.FakeFirefoxHeaders)
.response(asJson[JObject])
.send()
.map(r => {
@ -124,9 +141,11 @@ object ExplorerApi {
latestHeight.toLong
})
} yield tip
}
private def getHeader(baseUri: Uri, blockCount: Long)(implicit ec: ExecutionContext): Future[Seq[BlockHeaderAt]] = for {
private def getHeader(baseUri: Uri, blockCount: Long)(implicit ec: ExecutionContext, sb: SttpBackend[Future, Nothing]): Future[Seq[BlockHeaderAt]] = for {
header <- sttp.readTimeout(30 seconds).get(baseUri.path(baseUri.path :+ "blocks" :+ blockCount.toString))
.headers(Socks5ProxyParams.FakeFirefoxHeaders)
.response(asJson[JObject])
.send()
.map(r => r.code match {
@ -152,7 +171,9 @@ object ExplorerApi {
}
/** Explorer API based on Esplora: see https://github.com/Blockstream/esplora/blob/master/API.md. */
sealed trait Esplora extends Explorer {
sealed trait Esplora extends Explorer with SupportsTor {
implicit val sb: SttpBackend[Future, Nothing]
override def getLatestHeaders(baseUri: Uri, currentBlockCount: Long)(implicit context: ActorContext[Command]): Future[LatestHeaders] = {
implicit val ec: ExecutionContext = context.system.executionContext
for {
@ -181,21 +202,37 @@ object ExplorerApi {
}
/** Query https://blockstream.info/ to fetch block headers. */
case class BlockstreamExplorer() extends Esplora {
case class BlockstreamExplorer(socksProxy_opt: Option[Socks5ProxyParams])(implicit val sb: SttpBackend[Future, Nothing]) extends Esplora {
override val name = "blockstream.info"
override val baseUris = Map(
override val baseUris = socksProxy_opt match {
case Some(_) =>
Map(
Block.TestnetGenesisBlock.hash -> uri"http://explorerzydxu5ecjrkwceayqybizmpjjznk5izmitf2modhcusuqlid.onion/testnet/api",
Block.LivenetGenesisBlock.hash -> uri"http://explorerzydxu5ecjrkwceayqybizmpjjznk5izmitf2modhcusuqlid.onion/api"
)
case None =>
Map(
Block.TestnetGenesisBlock.hash -> uri"https://blockstream.info/testnet/api",
Block.LivenetGenesisBlock.hash -> uri"https://blockstream.info/api"
)
}
}
/** Query https://mempool.space/ to fetch block headers. */
case class MempoolSpaceExplorer() extends Esplora {
case class MempoolSpaceExplorer(socksProxy_opt: Option[Socks5ProxyParams])(implicit val sb: SttpBackend[Future, Nothing]) extends Esplora {
override val name = "mempool.space"
override val baseUris = Map(
override val baseUris = socksProxy_opt match {
case Some(_) =>
Map(
Block.TestnetGenesisBlock.hash -> uri"http://mempoolhqx4isw62xs7abwphsq7ldayuidyx2v2oethdhhj6mlo2r6ad.onion/testnet/api",
Block.LivenetGenesisBlock.hash -> uri"http://mempoolhqx4isw62xs7abwphsq7ldayuidyx2v2oethdhhj6mlo2r6ad.onion/api"
)
case None =>
Map(
Block.TestnetGenesisBlock.hash -> uri"https://mempool.space/testnet/api",
Block.LivenetGenesisBlock.hash -> uri"https://mempool.space/api"
)
}
}
}

View File

@ -16,8 +16,6 @@
package fr.acinq.eclair.tor
import java.net.{Inet4Address, Inet6Address, InetAddress, InetSocketAddress}
import akka.actor.{Actor, ActorLogging, ActorRef, Props, Terminated}
import akka.io.Tcp
import akka.util.ByteString
@ -25,6 +23,8 @@ import fr.acinq.eclair.randomBytes
import fr.acinq.eclair.tor.Socks5Connection.{Credentials, Socks5Connect}
import fr.acinq.eclair.wire.protocol._
import java.net._
/**
* Simple socks 5 client. It should be given a new connection, and will
@ -221,6 +221,16 @@ case class Socks5ProxyParams(address: InetSocketAddress, credentials_opt: Option
object Socks5ProxyParams {
val FakeFirefoxHeaders = Map(
"User-Agent" -> "Mozilla/5.0 (Windows NT 10.0; rv:78.0) Gecko/20100101 Firefox/78.0",
"Accept" -> "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language" -> "en-US,en;q=0.5",
"Connection" -> "keep-alive",
"Upgrade-Insecure-Requests" -> "1",
"Cache-Control" -> "max-age=0"
)
def proxyAddress(socketAddress: InetSocketAddress, proxyParams: Socks5ProxyParams): Option[InetSocketAddress] =
NodeAddress.fromParts(socketAddress.getHostString, socketAddress.getPort).toOption collect {
case _: IPv4 if proxyParams.useForIPv4 => proxyParams.address
@ -236,4 +246,5 @@ object Socks5ProxyParams {
} else {
proxyParams.credentials_opt
}
}

View File

@ -70,6 +70,14 @@ object TestConstants {
// @formatter:on
}
val blockchainWatchdogSources = Seq(
"bitcoinheaders.net",
"blockcypher.com",
"blockstream.info",
"mempool.space"
)
object Alice {
val seed: ByteVector32 = ByteVector32(ByteVector.fill(32)(1))
val nodeKeyManager = new LocalNodeKeyManager(seed, Block.RegtestGenesisBlock.hash)
@ -177,7 +185,8 @@ object TestConstants {
maxPaymentAttempts = 5,
enableTrampolinePayment = true,
instanceId = UUID.fromString("aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"),
balanceCheckInterval = 1 hour
balanceCheckInterval = 1 hour,
blockchainWatchdogSources = blockchainWatchdogSources
)
def channelParams: LocalParams = Peer.makeChannelParams(
@ -295,7 +304,8 @@ object TestConstants {
maxPaymentAttempts = 5,
enableTrampolinePayment = true,
instanceId = UUID.fromString("bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb"),
balanceCheckInterval = 1 hour
balanceCheckInterval = 1 hour,
blockchainWatchdogSources = blockchainWatchdogSources
)
def channelParams: LocalParams = Peer.makeChannelParams(

View File

@ -29,7 +29,7 @@ import fr.acinq.eclair.blockchain.bitcoind.rpc.ExtendedBitcoinClient.{FundTransa
import fr.acinq.eclair.blockchain.bitcoind.zmq.ZMQActor
import fr.acinq.eclair.blockchain.fee.FeeratePerKw
import fr.acinq.eclair.blockchain.{CurrentBlockCount, NewTransaction}
import fr.acinq.eclair.{ShortChannelId, TestKitBaseClass, randomBytes32}
import fr.acinq.eclair.{ShortChannelId, TestConstants, TestKitBaseClass, randomBytes32}
import grizzled.slf4j.Logging
import org.scalatest.BeforeAndAfterAll
import org.scalatest.funsuite.AnyFunSuiteLike
@ -76,7 +76,8 @@ class ZmqWatcherSpec extends TestKitBaseClass with AnyFunSuiteLike with Bitcoind
system.eventStream.subscribe(listener.ref, classOf[CurrentBlockCount])
val bitcoinClient = new ExtendedBitcoinClient(bitcoinrpcclient)
val bitcoinWallet = new BitcoinCoreWallet(bitcoinrpcclient)
val watcher = system.spawn(ZmqWatcher(Block.RegtestGenesisBlock.hash, blockCount, bitcoinClient), UUID.randomUUID().toString)
val nodeParams = TestConstants.Alice.nodeParams.copy(chainHash = Block.RegtestGenesisBlock.hash)
val watcher = system.spawn(ZmqWatcher(nodeParams, blockCount, bitcoinClient), UUID.randomUUID().toString)
try {
testFun(Fixture(blockCount, bitcoinClient, bitcoinWallet, watcher, probe, listener))
} finally {

View File

@ -20,44 +20,58 @@ import akka.actor.testkit.typed.scaladsl.{ScalaTestWithActorTestKit, TestProbe}
import akka.actor.typed.eventstream.EventStream
import com.typesafe.config.ConfigFactory
import fr.acinq.bitcoin.Block
import fr.acinq.eclair.TestTags
import fr.acinq.eclair.blockchain.watchdogs.BlockchainWatchdog.{DangerousBlocksSkew, WrappedCurrentBlockCount}
import fr.acinq.eclair.tor.Socks5ProxyParams
import fr.acinq.eclair.{NodeParams, TestConstants, TestTags}
import org.scalatest.funsuite.AnyFunSuiteLike
import java.net.{InetSocketAddress, Socket}
import scala.concurrent.duration.DurationInt
import scala.util.Try
class BlockchainWatchdogSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("application")) with AnyFunSuiteLike {
// blockcypher.com is very flaky - it either imposes rate limits or requires captcha
// but sometimes it works. If want to check whether you're lucky uncomment these lines:
// val nodeParamsLivenet = TestConstants.Alice.nodeParams.copy(chainHash = Block.LivenetGenesisBlock.hash)
// val nodeParamsTestnet = TestConstants.Alice.nodeParams.copy(chainHash = Block.TestnetGenesisBlock.hash)
// and comment these:
val nodeParamsLivenet = removeBlockcypher(TestConstants.Alice.nodeParams.copy(chainHash = Block.LivenetGenesisBlock.hash))
val nodeParamsTestnet = removeBlockcypher(TestConstants.Alice.nodeParams.copy(chainHash = Block.TestnetGenesisBlock.hash))
test("fetch block headers from four sources on mainnet", TestTags.ExternalApi) {
val eventListener = TestProbe[DangerousBlocksSkew]()
system.eventStream ! EventStream.Subscribe(eventListener.ref)
val watchdog = testKit.spawn(BlockchainWatchdog(Block.LivenetGenesisBlock.hash, 1 second))
val watchdog = testKit.spawn(BlockchainWatchdog(nodeParamsLivenet, 1 second))
watchdog ! WrappedCurrentBlockCount(630561)
val events = Seq(
eventListener.expectMessageType[DangerousBlocksSkew],
eventListener.expectMessageType[DangerousBlocksSkew],
eventListener.expectMessageType[DangerousBlocksSkew],
eventListener.expectMessageType[DangerousBlocksSkew]
// eventListener.expectMessageType[DangerousBlocksSkew]
)
eventListener.expectNoMessage(100 millis)
assert(events.map(_.recentHeaders.source).toSet === Set("bitcoinheaders.net", "blockcypher.com", "blockstream.info", "mempool.space"))
// assert(events.map(_.recentHeaders.source).toSet === Set("bitcoinheaders.net", "blockcypher.com", "blockstream.info", "mempool.space"))
assert(events.map(_.recentHeaders.source).toSet === Set("bitcoinheaders.net", "blockstream.info", "mempool.space"))
testKit.stop(watchdog)
}
test("fetch block headers from three sources on testnet", TestTags.ExternalApi) {
val eventListener = TestProbe[DangerousBlocksSkew]()
system.eventStream ! EventStream.Subscribe(eventListener.ref)
val watchdog = testKit.spawn(BlockchainWatchdog(Block.TestnetGenesisBlock.hash, 1 second))
val watchdog = testKit.spawn(BlockchainWatchdog(nodeParamsTestnet, 1 second))
watchdog ! WrappedCurrentBlockCount(500000)
val events = Seq(
eventListener.expectMessageType[DangerousBlocksSkew],
eventListener.expectMessageType[DangerousBlocksSkew],
eventListener.expectMessageType[DangerousBlocksSkew]
// eventListener.expectMessageType[DangerousBlocksSkew]
)
eventListener.expectNoMessage(100 millis)
assert(events.map(_.recentHeaders.source).toSet === Set("blockcypher.com", "blockstream.info", "mempool.space"))
// assert(events.map(_.recentHeaders.source).toSet === Set("blockcypher.com", "blockstream.info", "mempool.space"))
assert(events.map(_.recentHeaders.source).toSet === Set("blockstream.info", "mempool.space"))
testKit.stop(watchdog)
}
@ -65,25 +79,62 @@ class BlockchainWatchdogSpec extends ScalaTestWithActorTestKit(ConfigFactory.loa
val eventListener = TestProbe[DangerousBlocksSkew]()
system.eventStream ! EventStream.Subscribe(eventListener.ref)
val blockTimeout = 5 seconds
val watchdog = testKit.spawn(BlockchainWatchdog(Block.TestnetGenesisBlock.hash, 1 second, blockTimeout))
val watchdog = testKit.spawn(BlockchainWatchdog(nodeParamsTestnet, 1 second, blockTimeout))
watchdog ! WrappedCurrentBlockCount(500000)
assert(eventListener.expectMessageType[DangerousBlocksSkew].recentHeaders.currentBlockCount === 500000)
assert(eventListener.expectMessageType[DangerousBlocksSkew].recentHeaders.currentBlockCount === 500000)
assert(eventListener.expectMessageType[DangerousBlocksSkew].recentHeaders.currentBlockCount === 500000)
// assert(eventListener.expectMessageType[DangerousBlocksSkew].recentHeaders.currentBlockCount === 500000)
eventListener.expectNoMessage(100 millis)
// If we don't receive blocks, we check blockchain sources.
assert(eventListener.expectMessageType[DangerousBlocksSkew].recentHeaders.currentBlockCount === 500000)
assert(eventListener.expectMessageType[DangerousBlocksSkew].recentHeaders.currentBlockCount === 500000)
assert(eventListener.expectMessageType[DangerousBlocksSkew].recentHeaders.currentBlockCount === 500000)
// assert(eventListener.expectMessageType[DangerousBlocksSkew].recentHeaders.currentBlockCount === 500000)
eventListener.expectNoMessage(100 millis)
// And we keep checking blockchain sources until we receive a block.
assert(eventListener.expectMessageType[DangerousBlocksSkew].recentHeaders.currentBlockCount === 500000)
assert(eventListener.expectMessageType[DangerousBlocksSkew].recentHeaders.currentBlockCount === 500000)
assert(eventListener.expectMessageType[DangerousBlocksSkew].recentHeaders.currentBlockCount === 500000)
// assert(eventListener.expectMessageType[DangerousBlocksSkew].recentHeaders.currentBlockCount === 500000)
eventListener.expectNoMessage(100 millis)
}
test("fetch block headers on mainnet over Tor", TestTags.ExternalApi) {
val proxyParams = Socks5ProxyParams(new InetSocketAddress("127.0.0.1", 9050),
credentials_opt = None,
randomizeCredentials = true,
useForIPv4 = true,
useForIPv6 = true,
useForTor = true)
if (proxyAcceptsConnections(proxyParams)) {
val eventListener = TestProbe[DangerousBlocksSkew]()
system.eventStream ! EventStream.Subscribe(eventListener.ref)
val nodeParams = nodeParamsLivenet.copy(socksProxy_opt = Some(proxyParams))
val watchdog = testKit.spawn(BlockchainWatchdog(nodeParams, 1 second))
watchdog ! WrappedCurrentBlockCount(630561)
val events = Seq(
eventListener.expectMessageType[DangerousBlocksSkew],
eventListener.expectMessageType[DangerousBlocksSkew],
// eventListener.expectMessageType[DangerousBlocksSkew]
)
eventListener.expectNoMessage(100 millis)
assert(events.map(_.recentHeaders.source).toSet === Set("blockstream.info", "mempool.space"))
testKit.stop(watchdog)
} else {
cancel("Tor daemon is not up and running")
}
}
private def proxyAcceptsConnections(proxyParams: Socks5ProxyParams): Boolean = Try {
val s = new Socket(proxyParams.address.getAddress, proxyParams.address.getPort)
s.close()
}.isSuccess
private def removeBlockcypher(nodeParams: NodeParams): NodeParams = {
nodeParams.copy(blockchainWatchdogSources = nodeParams.blockchainWatchdogSources.filterNot(_ == "blockcypher.com"))
}
}

View File

@ -26,7 +26,9 @@ import org.scalatest.funsuite.AnyFunSuiteLike
class ExplorerApiSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("application")) with AnyFunSuiteLike {
val explorers = Seq(BlockcypherExplorer(), BlockstreamExplorer(), MempoolSpaceExplorer())
implicit val sttpBackend = ExplorerApi.createSttpBackend(None)
val explorers = Seq(BlockcypherExplorer(None), BlockstreamExplorer(None), MempoolSpaceExplorer(None))
test("fetch latest block headers", TestTags.ExternalApi) {
for (explorer <- explorers) {