1
0
Fork 0
mirror of https://github.com/ACINQ/eclair.git synced 2025-02-22 22:25:26 +01:00

Use netty to connect to Electrum servers (#774)

* replaced akka.io by netty in electrum client and enabled ssl support

* updated docker-testkit to 0.9.8 so that electrum tests pass on windows

* use ssl port on testnet/mainnet

* removed experimental warning on electrum
This commit is contained in:
Pierre-Marie Padiou 2018-12-11 18:13:38 +01:00 committed by GitHub
parent 99566f525c
commit ff588b578f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 267 additions and 195 deletions

View file

@ -154,6 +154,12 @@
<artifactId>json4s_${scala.version.short}</artifactId>
<version>${sttp.version}</version>
</dependency>
<!-- TCP -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.32.Final</version>
</dependency>
<!-- BITCOIN -->
<dependency>
<groupId>fr.acinq</groupId>
@ -220,13 +226,13 @@
<dependency>
<groupId>com.whisk</groupId>
<artifactId>docker-testkit-scalatest_${scala.version.short}</artifactId>
<version>0.9.5</version>
<version>0.9.8</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.whisk</groupId>
<artifactId>docker-testkit-impl-spotify_${scala.version.short}</artifactId>
<version>0.9.5</version>
<version>0.9.8</version>
<scope>test</scope>
</dependency>
<dependency>

View file

@ -33,6 +33,8 @@ import fr.acinq.eclair.api.{GetInfoResponse, Service}
import fr.acinq.eclair.blockchain.bitcoind.rpc.{BasicBitcoinJsonRPCClient, BatchingBitcoinJsonRPCClient, ExtendedBitcoinClient}
import fr.acinq.eclair.blockchain.bitcoind.zmq.ZMQActor
import fr.acinq.eclair.blockchain.bitcoind.{BitcoinCoreWallet, ZmqWatcher}
import fr.acinq.eclair.blockchain.electrum.ElectrumClient.SSL
import fr.acinq.eclair.blockchain.electrum.ElectrumClientPool.ElectrumServerAddress
import fr.acinq.eclair.blockchain.electrum._
import fr.acinq.eclair.blockchain.fee.{ConstantFeeProvider, _}
import fr.acinq.eclair.blockchain.{EclairWallet, _}
@ -123,22 +125,26 @@ class Setup(datadir: File,
Bitcoind(bitcoinClient)
case ELECTRUM =>
logger.warn("EXPERIMENTAL ELECTRUM MODE ENABLED!!!")
val addresses = config.hasPath("eclair.electrum") match {
val addresses = config.hasPath("electrum") match {
case true =>
val host = config.getString("eclair.electrum.host")
val port = config.getInt("eclair.electrum.port")
val host = config.getString("electrum.host")
val port = config.getInt("electrum.port")
val ssl = config.getString("electrum.ssl") match {
case "off" => SSL.OFF
case "loose" => SSL.LOOSE
case _ => SSL.STRICT // strict mode is the default when we specify a custom electrum server, we don't want to be MITMed
}
val address = InetSocketAddress.createUnresolved(host, port)
logger.info(s"override electrum default with server=$address")
Set(address)
logger.info(s"override electrum default with server=$address ssl=$ssl")
Set(ElectrumServerAddress(address, ssl))
case false =>
val addressesFile = nodeParams.chainHash match {
case Block.RegtestGenesisBlock.hash => "/electrum/servers_regtest.json"
case Block.TestnetGenesisBlock.hash => "/electrum/servers_testnet.json"
case Block.LivenetGenesisBlock.hash => "/electrum/servers_mainnet.json"
val (addressesFile, sslEnabled) = nodeParams.chainHash match {
case Block.RegtestGenesisBlock.hash => ("/electrum/servers_regtest.json", false) // in regtest we connect in plaintext
case Block.TestnetGenesisBlock.hash => ("/electrum/servers_testnet.json", true)
case Block.LivenetGenesisBlock.hash => ("/electrum/servers_mainnet.json", true)
}
val stream = classOf[Setup].getResourceAsStream(addressesFile)
ElectrumClientPool.readServerAddresses(stream)
ElectrumClientPool.readServerAddresses(stream, sslEnabled)
}
val electrumClient = system.actorOf(SimpleSupervisor.props(Props(new ElectrumClientPool(addresses)), "electrum-client", SupervisorStrategy.Resume))
Electrum(electrumClient)

View file

@ -16,13 +16,23 @@
package fr.acinq.eclair.blockchain.electrum
import java.net.InetSocketAddress
import java.net.{InetSocketAddress, SocketAddress}
import java.util
import akka.actor.{Actor, ActorLogging, ActorRef, Props, Stash, Terminated}
import akka.io.{IO, Tcp}
import akka.util.ByteString
import akka.actor.{Actor, ActorLogging, ActorRef, Stash, Terminated}
import fr.acinq.bitcoin._
import fr.acinq.eclair.blockchain.bitcoind.rpc.{Error, JsonRPCRequest, JsonRPCResponse}
import fr.acinq.eclair.blockchain.electrum.ElectrumClient.SSL
import io.netty.bootstrap.Bootstrap
import io.netty.channel._
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.{SocketChannel, SocketChannelConfig}
import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.handler.codec.string.{LineEncoder, StringDecoder}
import io.netty.handler.codec.{LineBasedFrameDecoder, MessageToMessageDecoder, MessageToMessageEncoder}
import io.netty.handler.ssl.SslContextBuilder
import io.netty.handler.ssl.util.InsecureTrustManagerFactory
import io.netty.util.CharsetUtil
import org.json4s.JsonAST._
import org.json4s.jackson.JsonMethods
import org.json4s.{DefaultFormats, JInt, JLong, JString}
@ -32,36 +42,157 @@ import scala.annotation.tailrec
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
class ElectrumClient(serverAddress: InetSocketAddress)(implicit val ec: ExecutionContext) extends Actor with Stash with ActorLogging {
class ElectrumClient(serverAddress: InetSocketAddress, ssl: SSL)(implicit val ec: ExecutionContext) extends Actor with Stash with ActorLogging {
import ElectrumClient._
import context.system
implicit val formats = DefaultFormats
val newline = "\n"
val socketOptions = Tcp.SO.KeepAlive(true) :: Nil
val workerGroup = new NioEventLoopGroup()
val b = new Bootstrap
b.group(workerGroup)
b.channel(classOf[NioSocketChannel])
b.option[java.lang.Boolean](ChannelOption.SO_KEEPALIVE, true)
b.option[java.lang.Integer](ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
b.handler(new ChannelInitializer[SocketChannel]() {
override def initChannel(ch: SocketChannel): Unit = {
ssl match {
case SSL.OFF => ()
case SSL.STRICT =>
val sslCtx = SslContextBuilder.forClient.build
ch.pipeline.addLast(sslCtx.newHandler(ch.alloc(), serverAddress.getHostName, serverAddress.getPort))
case SSL.LOOSE =>
// INSECURE VERSION THAT DOESN'T CHECK CERTIFICATE
val sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build()
ch.pipeline.addLast(sslCtx.newHandler(ch.alloc(), serverAddress.getHostName, serverAddress.getPort))
}
// inbound handlers
ch.pipeline.addLast(new LineBasedFrameDecoder(Int.MaxValue, true, true)) // JSON messages are separated by a new line
ch.pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8))
ch.pipeline.addLast(new ElectrumResponseDecoder)
ch.pipeline.addLast(new ActorHandler(self))
// outbound handlers
ch.pipeline.addLast(new LineEncoder)
ch.pipeline.addLast(new JsonRPCRequestEncoder)
// error handler
ch.pipeline().addLast(new ExceptionHandler)
}
})
// Start the client.
log.info(s"connecting to $serverAddress")
val channelFuture = b.connect(serverAddress.getHostName, serverAddress.getPort)
def errorHandler(t: Throwable) = {
log.info(s"connection error (reason=${t.getMessage})")
statusListeners.map(_ ! ElectrumDisconnected)
context stop self
}
channelFuture.addListeners(new ChannelFutureListener {
override def operationComplete(future: ChannelFuture): Unit = {
if (!future.isSuccess) {
errorHandler(future.cause())
}
}
})
/**
* This error handler catches all exceptions and kill the actor
* See https://stackoverflow.com/questions/30994095/how-to-catch-all-exception-in-netty
*/
class ExceptionHandler extends ChannelDuplexHandler {
override def connect(ctx: ChannelHandlerContext, remoteAddress: SocketAddress, localAddress: SocketAddress, promise: ChannelPromise): Unit = {
ctx.connect(remoteAddress, localAddress, promise.addListener(new ChannelFutureListener() {
override def operationComplete(future: ChannelFuture): Unit = {
if (!future.isSuccess) {
errorHandler(future.cause())
}
}
}))
}
override def write(ctx: ChannelHandlerContext, msg: scala.Any, promise: ChannelPromise): Unit = {
ctx.write(msg, promise.addListener(new ChannelFutureListener() {
override def operationComplete(future: ChannelFuture): Unit = {
if (!future.isSuccess) {
errorHandler(future.cause())
}
}
}))
}
override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = {
errorHandler(cause)
}
}
/**
* A decoder ByteBuf -> Either[Response, JsonRPCResponse]
*/
class ElectrumResponseDecoder extends MessageToMessageDecoder[String] {
override def decode(ctx: ChannelHandlerContext, msg: String, out: util.List[AnyRef]): Unit = {
val s = msg.asInstanceOf[String]
val r = parseResponse(s)
out.add(r)
}
}
/**
* An encoder JsonRPCRequest -> ByteBuf
*/
class JsonRPCRequestEncoder extends MessageToMessageEncoder[JsonRPCRequest] {
override def encode(ctx: ChannelHandlerContext, request: JsonRPCRequest, out: util.List[AnyRef]): Unit = {
import org.json4s.JsonDSL._
import org.json4s._
import org.json4s.jackson.JsonMethods._
log.info(s"sending $request")
val json = ("method" -> request.method) ~ ("params" -> request.params.map {
case s: String => new JString(s)
case b: BinaryData => new JString(b.toString())
case t: Int => new JInt(t)
case t: Long => new JLong(t)
case t: Double => new JDouble(t)
}) ~ ("id" -> request.id) ~ ("jsonrpc" -> request.jsonrpc)
val serialized = compact(render(json))
out.add(serialized)
}
}
/**
* Forwards incoming messages to the underlying actor
*
* @param actor
*/
class ActorHandler(actor: ActorRef) extends ChannelInboundHandlerAdapter {
override def channelActive(ctx: ChannelHandlerContext): Unit = {
actor ! ctx
}
override def channelRead(ctx: ChannelHandlerContext, msg: Any): Unit = {
actor ! msg
}
}
var addressSubscriptions = Map.empty[String, Set[ActorRef]]
var scriptHashSubscriptions = Map.empty[BinaryData, Set[ActorRef]]
val headerSubscriptions = collection.mutable.HashSet.empty[ActorRef]
val version = ServerVersion("2.1.7", "1.1")
val version = ServerVersion("2.1.7", "1.2")
val statusListeners = collection.mutable.HashSet.empty[ActorRef]
val keepHeaders = 100
var reqId = 0L
self ! Tcp.Connect(serverAddress, options = socketOptions)
var reqId = 0
// we need to regularly send a ping in order not to get disconnected
val versionTrigger = context.system.scheduler.schedule(30 seconds, 30 seconds, self, version)
override def unhandled(message: Any): Unit = {
message match {
case _: Tcp.ConnectionClosed =>
log.info(s"connection to $serverAddress closed")
statusListeners.map(_ ! ElectrumDisconnected)
context stop self
case Terminated(deadActor) =>
addressSubscriptions = addressSubscriptions.mapValues(subscribers => subscribers - deadActor)
scriptHashSubscriptions = scriptHashSubscriptions.mapValues(subscribers => subscribers - deadActor)
@ -78,92 +209,62 @@ class ElectrumClient(serverAddress: InetSocketAddress)(implicit val ec: Executio
}
}
override def postStop(): Unit = {
versionTrigger.cancel()
super.postStop()
}
def send(connection: ActorRef, request: JsonRPCRequest): Unit = {
import org.json4s.JsonDSL._
import org.json4s._
import org.json4s.jackson.JsonMethods._
log.debug(s"sending $request")
val json = ("method" -> request.method) ~ ("params" -> request.params.map {
case s: String => new JString(s)
case b: BinaryData => new JString(b.toString())
case t: Int => new JInt(t)
case t: Long => new JLong(t)
case t: Double => new JDouble(t)
}) ~ ("id" -> request.id) ~ ("jsonrpc" -> request.jsonrpc)
val serialized = compact(render(json))
val bytes = (serialized + newline).getBytes
connection ! ByteString.fromArray(bytes)
}
/**
* send an electrum request to the server
* @param connection connection to the electrumx server
*
* @param ctx connection to the electrumx server
* @param request electrum request
* @return the request id used to send the request
*/
def send(connection: ActorRef, request: Request): String = {
def send(ctx: ChannelHandlerContext, request: Request): String = {
val electrumRequestId = "" + reqId
send(connection, makeRequest(request, electrumRequestId))
reqId = reqId + 1
ctx.channel().writeAndFlush(makeRequest(request, electrumRequestId))
reqId = reqId + 1
electrumRequestId
}
def receive = disconnected
def disconnected: Receive = {
case c: Tcp.Connect =>
log.info(s"connecting to $c")
IO(Tcp) ! c
case Tcp.Connected(remote, _) =>
log.info(s"connected to $remote")
val conn = sender()
conn ! Tcp.Register(self)
val connection = context.actorOf(Props(new WriteAckSender(conn)), name = "electrum-sender")
send(connection, version)
context become waitingForVersion(connection, remote)
case ctx: ChannelHandlerContext =>
log.info(s"connected to $serverAddress")
send(ctx, version)
context become waitingForVersion(ctx)
case AddStatusListener(actor) => statusListeners += actor
case Tcp.CommandFailed(Tcp.Connect(remoteAddress, _, _, _, _)) =>
context stop self
}
def waitingForVersion(connection: ActorRef, remote: InetSocketAddress): Receive = {
case Tcp.Received(data) =>
val response = parseResponse(new String(data.toArray)).right.get
val serverVersion = parseJsonResponse(version, response)
def waitingForVersion(ctx: ChannelHandlerContext): Receive = {
case Right(json: JsonRPCResponse) =>
val serverVersion = parseJsonResponse(version, json)
log.debug(s"serverVersion=$serverVersion")
send(connection, HeaderSubscription(self))
send(ctx, HeaderSubscription(self))
headerSubscriptions += self
log.debug("waiting for tip")
context become waitingForTip(connection, remote: InetSocketAddress)
context become waitingForTip(ctx)
case AddStatusListener(actor) => statusListeners += actor
}
def waitingForTip(connection: ActorRef, remote: InetSocketAddress): Receive = {
case Tcp.Received(data) =>
val response = parseResponse(new String(data.toArray)).right.get
val header = parseHeader(response.result)
def waitingForTip(ctx: ChannelHandlerContext): Receive = {
case Right(json: JsonRPCResponse) =>
val header = parseHeader(json.result)
log.debug(s"connected, tip = ${header.block_hash} $header")
statusListeners.map(_ ! ElectrumReady(header, remote))
context become connected(connection, remote, header, "", Map())
statusListeners.map(_ ! ElectrumReady(header, serverAddress))
context become connected(ctx, header, "", Map())
case AddStatusListener(actor) => statusListeners += actor
}
def connected(connection: ActorRef, remoteAddress: InetSocketAddress, tip: Header, buffer: String, requests: Map[String, (Request, ActorRef)]): Receive = {
def connected(ctx: ChannelHandlerContext, tip: Header, buffer: String, requests: Map[String, (Request, ActorRef)]): Receive = {
case AddStatusListener(actor) =>
statusListeners += actor
actor ! ElectrumReady(tip, remoteAddress)
actor ! ElectrumReady(tip, serverAddress)
case HeaderSubscription(actor) =>
headerSubscriptions += actor
@ -171,7 +272,7 @@ class ElectrumClient(serverAddress: InetSocketAddress)(implicit val ec: Executio
context watch actor
case request: Request =>
val curReqId = send(connection, request)
val curReqId = send(ctx, request)
request match {
case AddressSubscription(address, actor) =>
addressSubscriptions = addressSubscriptions.updated(address, addressSubscriptions.getOrElse(address, Set()) + actor)
@ -181,16 +282,7 @@ class ElectrumClient(serverAddress: InetSocketAddress)(implicit val ec: Executio
context watch actor
case _ => ()
}
context become connected(connection, remoteAddress, tip, buffer, requests + (curReqId -> (request, sender())))
case Tcp.Received(data) =>
val buffer1 = buffer + new String(data.toArray)
val (jsons, buffer2) = buffer1.split(newline) match {
case chunks if buffer1.endsWith(newline) => (chunks, "")
case chunks => (chunks.dropRight(1), chunks.last)
}
jsons.map(parseResponse(_)).map(self ! _)
context become connected(connection, remoteAddress, tip, buffer2, requests)
context become connected(ctx, tip, buffer, requests + (curReqId -> (request, sender())))
case Right(json: JsonRPCResponse) =>
requests.get(json.id) match {
@ -201,7 +293,7 @@ class ElectrumClient(serverAddress: InetSocketAddress)(implicit val ec: Executio
case None =>
log.warning(s"could not find requestor for reqId=${json.id} response=$json")
}
context become connected(connection, remoteAddress, tip, buffer, requests - json.id)
context become connected(ctx, tip, buffer, requests - json.id)
case Left(response: HeaderSubscriptionResponse) => headerSubscriptions.map(_ ! response)
@ -211,7 +303,7 @@ class ElectrumClient(serverAddress: InetSocketAddress)(implicit val ec: Executio
case HeaderSubscriptionResponse(newtip) =>
log.info(s"new tip $newtip")
context become connected(connection, remoteAddress, newtip, buffer, requests)
context become connected(ctx, newtip, buffer, requests)
}
}
@ -307,6 +399,13 @@ object ElectrumClient {
case class ElectrumReady(tip: Header, serverAddress: InetSocketAddress) extends ElectrumEvent
case object ElectrumDisconnected extends ElectrumEvent
sealed trait SSL
object SSL {
case object OFF extends SSL
case object STRICT extends SSL
case object LOOSE extends SSL
}
// @formatter:on
def parseResponse(input: String): Either[Response, JsonRPCResponse] = {

View file

@ -22,6 +22,8 @@ import java.net.InetSocketAddress
import akka.actor.{Actor, ActorRef, FSM, Props, Terminated}
import fr.acinq.eclair.Globals
import fr.acinq.eclair.blockchain.CurrentBlockCount
import fr.acinq.eclair.blockchain.electrum.ElectrumClient.SSL
import fr.acinq.eclair.blockchain.electrum.ElectrumClientPool.ElectrumServerAddress
import org.json4s.JsonAST.{JObject, JString}
import org.json4s.jackson.JsonMethods
@ -29,7 +31,7 @@ import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.util.Random
class ElectrumClientPool(serverAddresses: Set[InetSocketAddress])(implicit val ec: ExecutionContext) extends Actor with FSM[ElectrumClientPool.State, ElectrumClientPool.Data] {
class ElectrumClientPool(serverAddresses: Set[ElectrumServerAddress])(implicit val ec: ExecutionContext) extends Actor with FSM[ElectrumClientPool.State, ElectrumClientPool.Data] {
import ElectrumClientPool._
val statusListeners = collection.mutable.HashSet.empty[ActorRef]
@ -40,6 +42,8 @@ class ElectrumClientPool(serverAddresses: Set[InetSocketAddress])(implicit val e
// terminate if they cannot connect
(0 until MAX_CONNECTION_COUNT) foreach (_ => self ! Connect)
log.debug(s"starting electrum pool with serverAddresses={}", serverAddresses)
startWith(Disconnected, DisconnectedData)
when(Disconnected) {
@ -95,9 +99,9 @@ class ElectrumClientPool(serverAddresses: Set[InetSocketAddress])(implicit val e
whenUnhandled {
case Event(Connect, _) =>
Random.shuffle(serverAddresses.toSeq diff addresses.values.toSeq).headOption match {
case Some(address) =>
case Some(ElectrumServerAddress(address, ssl)) =>
val resolved = new InetSocketAddress(address.getHostName, address.getPort)
val client = context.actorOf(Props(new ElectrumClient(resolved)))
val client = context.actorOf(Props(new ElectrumClient(resolved, ssl)))
client ! ElectrumClient.AddStatusListener(self)
// we watch each electrum client, they will stop on disconnection
context watch client
@ -160,15 +164,36 @@ object ElectrumClientPool {
val MAX_CONNECTION_COUNT = 3
def readServerAddresses(stream: InputStream): Set[InetSocketAddress] = try {
case class ElectrumServerAddress(adress: InetSocketAddress, ssl: SSL)
/**
* Parses default electrum server list and extract addresses
*
* @param stream
* @param sslEnabled select plaintext/ssl ports
* @return
*/
def readServerAddresses(stream: InputStream, sslEnabled: Boolean): Set[ElectrumServerAddress] = try {
val JObject(values) = JsonMethods.parse(stream)
val addresses = values.flatMap {
case (name, fields) if !name.endsWith(".onion") =>
fields \ "t" match {
case JString(port) => Some(InetSocketAddress.createUnresolved(name, port.toInt))
case _ => None // we only support raw TCP (not SSL) connection to electrum servers for now
val addresses = values
.toMap
.filterKeys(!_.endsWith(".onion"))
.flatMap {
case (name, fields) =>
if (sslEnabled) {
// We don't authenticate seed servers (SSL.LOOSE), because:
// - we don't know them so authentication doesn't really bring anything
// - most of them have self-signed SSL certificates so it would always fail
fields \ "s" match {
case JString(port) => Some(ElectrumServerAddress(InetSocketAddress.createUnresolved(name, port.toInt), SSL.LOOSE))
case _ => None
}
} else {
fields \ "t" match {
case JString(port) => Some(ElectrumServerAddress(InetSocketAddress.createUnresolved(name, port.toInt), SSL.OFF))
case _ => None
}
}
case _ => None
}
addresses.toSet
} finally {

View file

@ -211,7 +211,7 @@ object ElectrumWatcher extends App {
import scala.concurrent.ExecutionContext.Implicits.global
class Root extends Actor with ActorLogging {
val client = context.actorOf(Props(new ElectrumClient(new InetSocketAddress("localhost", 51000))), "client")
val client = context.actorOf(Props(new ElectrumClient(new InetSocketAddress("localhost", 51000), ssl = SSL.OFF)), "client")
client ! ElectrumClient.AddStatusListener(self)
override def unhandled(message: Any): Unit = {

View file

@ -1,69 +0,0 @@
/*
* Copyright 2018 ACINQ SAS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package fr.acinq.eclair.blockchain.electrum
import akka.actor.{Actor, ActorLogging, ActorRef, PoisonPill, Terminated}
import akka.io.Tcp
import akka.util.ByteString
/**
* Simple ACK-based throttling mechanism for sending messages to a TCP connection
* See https://doc.akka.io/docs/akka/snapshot/scala/io-tcp.html#throttling-reads-and-writes
*/
class WriteAckSender(connection: ActorRef) extends Actor with ActorLogging {
// this actor will kill itself if connection dies
context watch connection
case object Ack extends Tcp.Event
override def receive = idle
def idle: Receive = {
case data: ByteString =>
connection ! Tcp.Write(data, Ack)
context become buffering(Vector.empty[ByteString])
}
def buffering(buffer: Vector[ByteString]): Receive = {
case _: ByteString if buffer.size > MAX_BUFFERED =>
log.warning(s"buffer overrun, closing connection")
connection ! PoisonPill
case data: ByteString =>
log.debug("buffering write {}", data)
context become buffering(buffer :+ data)
case Ack =>
buffer.headOption match {
case Some(data) =>
connection ! Tcp.Write(data, Ack)
context become buffering(buffer.drop(1))
case None =>
log.debug(s"got last ack, back to idle")
context become idle
}
}
override def unhandled(message: Any): Unit = message match {
case _: Tcp.ConnectionClosed => context stop self
case Terminated(_) => context stop self
case _ => log.warning(s"unhandled message $message")
}
val MAX_BUFFERED = 100000L
}

View file

@ -28,25 +28,26 @@ import scala.util.Random
class ElectrumClientPoolSpec extends TestKit(ActorSystem("test")) with FunSuiteLike with Logging with BeforeAndAfterAll {
var router: ActorRef = _
var pool: ActorRef = _
val probe = TestProbe()
val referenceTx = Transaction.read("0200000003947e307df3ab452d23f02b5a65f4ada1804ee733e168e6197b0bd6cc79932b6c010000006a473044022069346ec6526454a481690a3664609f9e8032c34553015cfa2e9b25ebb420a33002206998f21a2aa771ad92a0c1083f4181a3acdb0d42ca51d01be1309da2ffb9cecf012102b4568cc6ee751f6d39f4a908b1fcffdb878f5f784a26a48c0acb0acff9d88e3bfeffffff966d9d969cd5f95bfd53003a35fcc1a50f4fb51f211596e6472583fdc5d38470000000006b4830450221009c9757515009c5709b5b678d678185202b817ef9a69ffb954144615ab11762210220732216384da4bf79340e9c46d0effba6ba92982cca998adfc3f354cec7715f800121035f7c3e077108035026f4ebd5d6ca696ef088d4f34d45d94eab4c41202ec74f9bfefffffff8d5062f5b04455c6cfa7e3f250e5a4fb44308ba2b86baf77f9ad0d782f57071010000006a47304402207f9f7dd91fe537a26d5554105977e3949a5c8c4ef53a6a3bff6da2d36eff928f02202b9427bef487a1825fd0c3c6851d17d5f19e6d73dfee22bf06db591929a2044d012102b4568cc6ee751f6d39f4a908b1fcffdb878f5f784a26a48c0acb0acff9d88e3bfeffffff02809698000000000017a914c82753548fdf4be1c3c7b14872c90b5198e67eaa876e642500000000001976a914e2365ec29471b3e271388b22eadf0e7f54d307a788ac6f771200")
val scriptHash: BinaryData = Crypto.sha256(referenceTx.txOut(0).publicKeyScript).reverse
import scala.concurrent.ExecutionContext.Implicits.global
override protected def beforeAll(): Unit = {
val stream = classOf[ElectrumClientSpec].getResourceAsStream("/electrum/servers_testnet.json")
val addresses = Random.shuffle(ElectrumClientPool.readServerAddresses(stream))
stream.close()
router = system.actorOf(Props(new ElectrumClientPool(addresses)), "electrum-client")
}
override protected def afterAll(): Unit = {
TestKit.shutdownActorSystem(system)
}
test("init an electrumx connection pool") {
val stream = classOf[ElectrumClientSpec].getResourceAsStream("/electrum/servers_testnet.json")
val addresses = ElectrumClientPool.readServerAddresses(stream, sslEnabled = false)
assert(addresses.nonEmpty)
stream.close()
pool = system.actorOf(Props(new ElectrumClientPool(addresses)), "electrum-client")
}
test("connect to an electrumx testnet server") {
probe.send(router, AddStatusListener(probe.ref))
probe.send(pool, AddStatusListener(probe.ref))
// make sure our master is stable, if the first master that we select is behind the other servers we will switch
// during the first few seconds
awaitCond({
@ -55,13 +56,13 @@ class ElectrumClientPoolSpec extends TestKit(ActorSystem("test")) with FunSuiteL
}, max = 15 seconds, interval = 1000 millis) }
test("get transaction") {
probe.send(router, GetTransaction("c5efb5cbd35a44ba956b18100be0a91c9c33af4c7f31be20e33741d95f04e202"))
probe.send(pool, GetTransaction("c5efb5cbd35a44ba956b18100be0a91c9c33af4c7f31be20e33741d95f04e202"))
val GetTransactionResponse(tx) = probe.expectMsgType[GetTransactionResponse]
assert(tx.txid == BinaryData("c5efb5cbd35a44ba956b18100be0a91c9c33af4c7f31be20e33741d95f04e202"))
}
test("get merkle tree") {
probe.send(router, GetMerkle("c5efb5cbd35a44ba956b18100be0a91c9c33af4c7f31be20e33741d95f04e202", 1210223L))
probe.send(pool, GetMerkle("c5efb5cbd35a44ba956b18100be0a91c9c33af4c7f31be20e33741d95f04e202", 1210223L))
val response = probe.expectMsgType[GetMerkleResponse]
assert(response.txid == BinaryData("c5efb5cbd35a44ba956b18100be0a91c9c33af4c7f31be20e33741d95f04e202"))
assert(response.block_height == 1210223L)
@ -71,26 +72,26 @@ class ElectrumClientPoolSpec extends TestKit(ActorSystem("test")) with FunSuiteL
test("header subscription") {
val probe1 = TestProbe()
probe1.send(router, HeaderSubscription(probe1.ref))
probe1.send(pool, HeaderSubscription(probe1.ref))
val HeaderSubscriptionResponse(header) = probe1.expectMsgType[HeaderSubscriptionResponse]
logger.info(s"received header for block ${header.block_hash}")
}
test("scripthash subscription") {
val probe1 = TestProbe()
probe1.send(router, ScriptHashSubscription(scriptHash, probe1.ref))
probe1.send(pool, ScriptHashSubscription(scriptHash, probe1.ref))
val ScriptHashSubscriptionResponse(scriptHash1, status) = probe1.expectMsgType[ScriptHashSubscriptionResponse]
assert(status != "")
}
test("get scripthash history") {
probe.send(router, GetScriptHashHistory(scriptHash))
probe.send(pool, GetScriptHashHistory(scriptHash))
val GetScriptHashHistoryResponse(scriptHash1, history) = probe.expectMsgType[GetScriptHashHistoryResponse]
assert(history.contains((TransactionHistoryItem(1210224, "3903726806aa044fe59f40e42eed71bded068b43aaa9e2d716e38b7825412de0"))))
}
test("list script unspents") {
probe.send(router, ScriptHashListUnspent(scriptHash))
probe.send(pool, ScriptHashListUnspent(scriptHash))
val ScriptHashListUnspentResponse(scriptHash1, unspents) = probe.expectMsgType[ScriptHashListUnspentResponse]
assert(unspents.contains(UnspentItem("3903726806aa044fe59f40e42eed71bded068b43aaa9e2d716e38b7825412de0", 0, 10000000L, 1210224L)))
}

View file

@ -38,7 +38,7 @@ class ElectrumClientSpec extends TestKit(ActorSystem("test")) with FunSuiteLike
val scriptHash: BinaryData = Crypto.sha256(referenceTx.txOut(0).publicKeyScript).reverse
override protected def beforeAll(): Unit = {
client = system.actorOf(Props(new ElectrumClient(new InetSocketAddress("testnet.qtornado.com", 51001))), "electrum-client")
client = system.actorOf(Props(new ElectrumClient(new InetSocketAddress("testnet.qtornado.com", 51002), SSL.LOOSE)), "electrum-client")
}
override protected def afterAll(): Unit = {

View file

@ -25,10 +25,10 @@ import com.whisk.docker.DockerReadyChecker
import fr.acinq.bitcoin.{BinaryData, Block, Btc, DeterministicWallet, MnemonicCode, Satoshi, Transaction, TxOut}
import fr.acinq.eclair.blockchain.bitcoind.BitcoinCoreWallet.{FundTransactionResponse, SignTransactionResponse}
import fr.acinq.eclair.blockchain.bitcoind.{BitcoinCoreWallet, BitcoindService}
import fr.acinq.eclair.blockchain.electrum.ElectrumClient.{BroadcastTransaction, BroadcastTransactionResponse}
import fr.acinq.eclair.blockchain.electrum.ElectrumClient.{BroadcastTransaction, BroadcastTransactionResponse, SSL}
import fr.acinq.eclair.blockchain.electrum.ElectrumClientPool.ElectrumServerAddress
import grizzled.slf4j.Logging
import org.json4s.JsonAST.{JDecimal, JString, JValue}
import org.scalatest.junit.JUnitRunner
import org.scalatest.{BeforeAndAfterAll, FunSuiteLike}
import scala.concurrent.Await
@ -85,7 +85,7 @@ class ElectrumWalletSpec extends TestKit(ActorSystem("test")) with FunSuiteLike
}
test("wait until wallet is ready") {
electrumClient = system.actorOf(Props(new ElectrumClientPool(Set(new InetSocketAddress("localhost", 50001)))))
electrumClient = system.actorOf(Props(new ElectrumClientPool(Set(ElectrumServerAddress(new InetSocketAddress("localhost", 50001), SSL.OFF)))))
wallet = system.actorOf(Props(new ElectrumWallet(seed, electrumClient, WalletParameters(Block.RegtestGenesisBlock.hash, minimumFee = Satoshi(5000)))), "wallet")
val probe = TestProbe()
awaitCond({

View file

@ -23,6 +23,8 @@ import akka.testkit.{TestKit, TestProbe}
import fr.acinq.bitcoin.Crypto.PrivateKey
import fr.acinq.bitcoin.{Base58, OutPoint, SIGHASH_ALL, Satoshi, Script, ScriptFlags, ScriptWitness, SigVersion, Transaction, TxIn, TxOut}
import fr.acinq.eclair.blockchain.bitcoind.BitcoindService
import fr.acinq.eclair.blockchain.electrum.ElectrumClient.SSL
import fr.acinq.eclair.blockchain.electrum.ElectrumClientPool.ElectrumServerAddress
import fr.acinq.eclair.blockchain.{WatchConfirmed, WatchEventConfirmed, WatchEventSpent, WatchSpent}
import fr.acinq.eclair.channel.{BITCOIN_FUNDING_DEPTHOK, BITCOIN_FUNDING_SPENT}
import grizzled.slf4j.Logging
@ -48,9 +50,11 @@ class ElectrumWatcherSpec extends TestKit(ActorSystem("test")) with FunSuiteLike
TestKit.shutdownActorSystem(system)
}
val electrumAddress = ElectrumServerAddress(new InetSocketAddress("localhost", 50001), SSL.OFF)
test("watch for confirmed transactions") {
val probe = TestProbe()
val electrumClient = system.actorOf(Props(new ElectrumClientPool(Set(new InetSocketAddress("localhost", 50001)))))
val electrumClient = system.actorOf(Props(new ElectrumClientPool(Set(electrumAddress))))
val watcher = system.actorOf(Props(new ElectrumWatcher(electrumClient)))
probe.send(bitcoincli, BitcoinReq("getnewaddress"))
@ -74,7 +78,7 @@ class ElectrumWatcherSpec extends TestKit(ActorSystem("test")) with FunSuiteLike
test("watch for spent transactions") {
val probe = TestProbe()
val electrumClient = system.actorOf(Props(new ElectrumClientPool(Set(new InetSocketAddress("localhost", 50001)))))
val electrumClient = system.actorOf(Props(new ElectrumClientPool(Set(electrumAddress))))
val watcher = system.actorOf(Props(new ElectrumWatcher(electrumClient)))
probe.send(bitcoincli, BitcoinReq("getnewaddress"))