1
0
mirror of https://github.com/ACINQ/eclair.git synced 2024-11-20 02:27:32 +01:00

Electrum: fixes and improvements (#924)

* Electrum: Update mainnet servers list

* Electrum: make pool address selection more readable

We connect to a random server we're not already connected to.

* Electrum Tests: increase "wait for ready" test timeout

If was a bit short and sometimes failed on travis.

* Electrum: better parsing of invalid responses

On testnet some Electrum servers are not compliant with the protocole version they advertise
and will return responses formatted with 1.0 rules.
This commit is contained in:
Fabrice Drouin 2019-03-29 18:56:00 +01:00 committed by GitHub
parent 5bed099206
commit c99026828c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 52 additions and 27 deletions

View File

@ -195,12 +195,6 @@
"t": "50001",
"version": "1.4"
},
"electrum3.hachre.de": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.4"
},
"electrumx.bot.nu": {
"pruning": "-",
"s": "50002",
@ -317,12 +311,6 @@
"t": "50001",
"version": "1.4"
},
"oneweek.duckdns.org": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.4"
},
"orannis.com": {
"pruning": "-",
"s": "50002",

View File

@ -42,6 +42,7 @@ import scodec.bits.ByteVector
import scala.annotation.tailrec
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.util.{Failure, Success, Try}
/**
* For later optimizations, see http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html
@ -599,9 +600,15 @@ object ElectrumClient {
case _ => ScriptHashSubscriptionResponse(scriptHash, "")
}
case BroadcastTransaction(tx) =>
val JString(txid) = json.result
require(ByteVector32.fromValidHex(txid) == tx.txid)
BroadcastTransactionResponse(tx, None)
val JString(message) = json.result
// if we got here, it means that the server's response does not contain an error and message should be our
// transaction id. However, it seems that at least on testnet some servers still use an older version of the
// Electrum protocol and return an error message in the result field
Try(ByteVector32.fromValidHex(message)) match {
case Success(txid) if txid == tx.txid => BroadcastTransactionResponse(tx, None)
case Success(txid) => BroadcastTransactionResponse(tx, Some(Error(1, s"response txid $txid does not match request txid ${tx.txid}")))
case Failure(_) => BroadcastTransactionResponse(tx, Some(Error(1, message)))
}
case GetHeader(height) =>
val JString(hex) = json.result
GetHeaderResponse(height, BlockHeader.read(hex))

View File

@ -106,7 +106,7 @@ class ElectrumClientPool(serverAddresses: Set[ElectrumServerAddress])(implicit v
whenUnhandled {
case Event(Connect, _) =>
Random.shuffle(serverAddresses.toSeq diff addresses.values.toSeq).headOption match {
pickAddress(serverAddresses, addresses.values.toSet) match {
case Some(ElectrumServerAddress(address, ssl)) =>
val resolved = new InetSocketAddress(address.getHostName, address.getPort)
val client = context.actorOf(Props(new ElectrumClient(resolved, ssl)))
@ -211,6 +211,16 @@ object ElectrumClientPool {
stream.close()
}
/**
*
* @param serverAddresses all addresses to choose from
* @param usedAddresses current connections
* @return a random address that we're not connected to yet
*/
def pickAddress(serverAddresses: Set[ElectrumServerAddress], usedAddresses: Set[InetSocketAddress]): Option[ElectrumServerAddress] = {
Random.shuffle(serverAddresses.filterNot(a => usedAddresses.contains(a.adress)).toSeq).headOption
}
// @formatter:off
sealed trait State
case object Disconnected extends State

View File

@ -20,6 +20,7 @@ import java.net.InetSocketAddress
import akka.actor.{ActorRef, ActorSystem, Props}
import akka.testkit.{TestKit, TestProbe}
import akka.util.Timeout
import fr.acinq.bitcoin.{ByteVector32, Crypto, Transaction}
import fr.acinq.eclair.blockchain.electrum.ElectrumClient._
import grizzled.slf4j.Logging
@ -27,6 +28,7 @@ import org.scalatest.{BeforeAndAfterAll, FunSuiteLike}
import scodec.bits._
import scala.concurrent.duration._
import scala.util.Random
class ElectrumClientPoolSpec extends TestKit(ActorSystem("test")) with FunSuiteLike with Logging with BeforeAndAfterAll {
@ -35,17 +37,35 @@ class ElectrumClientPoolSpec extends TestKit(ActorSystem("test")) with FunSuiteL
// this is tx #2690 of block #500000
val referenceTx = Transaction.read("0200000001983c5b32ced1de5ae97d3ce9b7436f8bb0487d15bf81e5cae97b1e238dc395c6000000006a47304402205957c75766e391350eba2c7b752f0056cb34b353648ecd0992a8a81fc9bcfe980220629c286592842d152cdde71177cd83086619744a533f262473298cacf60193500121021b8b51f74dbf0ac1e766d162c8707b5e8d89fc59da0796f3b4505e7c0fb4cf31feffffff0276bd0101000000001976a914219de672ba773aa0bc2e15cdd9d2e69b734138fa88ac3e692001000000001976a914301706dede031e9fb4b60836e073a4761855f6b188ac09a10700")
val scriptHash = Crypto.sha256(referenceTx.txOut(0).publicKeyScript).reverse
import scala.concurrent.ExecutionContext.Implicits.global
val serverAddresses = {
val stream = classOf[ElectrumClientSpec].getResourceAsStream("/electrum/servers_mainnet.json")
val addresses = ElectrumClientPool.readServerAddresses(stream, sslEnabled = false)
stream.close()
addresses
}
implicit val timeout = 20 seconds
import concurrent.ExecutionContext.Implicits.global
override protected def afterAll(): Unit = {
TestKit.shutdownActorSystem(system)
}
test("pick a random, unused server address") {
val usedAddresses = Random.shuffle(serverAddresses.toSeq).take(serverAddresses.size / 2).map(_.adress).toSet
for(_ <- 1 to 10) {
val Some(pick) = ElectrumClientPool.pickAddress(serverAddresses, usedAddresses)
assert(!usedAddresses.contains(pick.adress))
}
}
test("init an electrumx connection pool") {
val random = new Random()
val stream = classOf[ElectrumClientSpec].getResourceAsStream("/electrum/servers_mainnet.json")
val addresses = ElectrumClientPool.readServerAddresses(stream, sslEnabled = false).take(2) + ElectrumClientPool.ElectrumServerAddress(new InetSocketAddress("electrum.acinq.co", 50002), SSL.STRICT)
assert(addresses.nonEmpty)
val addresses = random.shuffle(serverAddresses.toSeq).take(2).toSet + ElectrumClientPool.ElectrumServerAddress(new InetSocketAddress("electrum.acinq.co", 50002), SSL.STRICT)
stream.close()
assert(addresses.nonEmpty)
pool = system.actorOf(Props(new ElectrumClientPool(addresses)), "electrum-client")
}
@ -54,19 +74,19 @@ class ElectrumClientPoolSpec extends TestKit(ActorSystem("test")) with FunSuiteL
// make sure our master is stable, if the first master that we select is behind the other servers we will switch
// during the first few seconds
awaitCond({
probe.expectMsgType[ElectrumReady]
probe.expectMsgType[ElectrumReady](30 seconds)
probe.receiveOne(5 seconds) == null
}, max = 15 seconds, interval = 1000 millis) }
}, max = 60 seconds, interval = 1000 millis) }
test("get transaction") {
probe.send(pool, GetTransaction(referenceTx.txid))
val GetTransactionResponse(tx) = probe.expectMsgType[GetTransactionResponse]
val GetTransactionResponse(tx) = probe.expectMsgType[GetTransactionResponse](timeout)
assert(tx == referenceTx)
}
test("get merkle tree") {
probe.send(pool, GetMerkle(referenceTx.txid, 500000))
val response = probe.expectMsgType[GetMerkleResponse]
val response = probe.expectMsgType[GetMerkleResponse](timeout)
assert(response.txid == referenceTx.txid)
assert(response.block_height == 500000)
assert(response.pos == 2690)
@ -76,26 +96,26 @@ class ElectrumClientPoolSpec extends TestKit(ActorSystem("test")) with FunSuiteL
test("header subscription") {
val probe1 = TestProbe()
probe1.send(pool, HeaderSubscription(probe1.ref))
val HeaderSubscriptionResponse(_, header) = probe1.expectMsgType[HeaderSubscriptionResponse]
val HeaderSubscriptionResponse(_, header) = probe1.expectMsgType[HeaderSubscriptionResponse](timeout)
logger.info(s"received header for block ${header.blockId}")
}
test("scripthash subscription") {
val probe1 = TestProbe()
probe1.send(pool, ScriptHashSubscription(scriptHash, probe1.ref))
val ScriptHashSubscriptionResponse(scriptHash1, status) = probe1.expectMsgType[ScriptHashSubscriptionResponse]
val ScriptHashSubscriptionResponse(scriptHash1, status) = probe1.expectMsgType[ScriptHashSubscriptionResponse](timeout)
assert(status != "")
}
test("get scripthash history") {
probe.send(pool, GetScriptHashHistory(scriptHash))
val GetScriptHashHistoryResponse(scriptHash1, history) = probe.expectMsgType[GetScriptHashHistoryResponse]
val GetScriptHashHistoryResponse(scriptHash1, history) = probe.expectMsgType[GetScriptHashHistoryResponse](timeout)
assert(history.contains((TransactionHistoryItem(500000, referenceTx.txid))))
}
test("list script unspents") {
probe.send(pool, ScriptHashListUnspent(scriptHash))
val ScriptHashListUnspentResponse(scriptHash1, unspents) = probe.expectMsgType[ScriptHashListUnspentResponse]
val ScriptHashListUnspentResponse(scriptHash1, unspents) = probe.expectMsgType[ScriptHashListUnspentResponse](timeout)
assert(unspents.isEmpty)
}
}