1
0
mirror of https://github.com/ACINQ/eclair.git synced 2024-11-20 02:27:32 +01:00

Add server address in ElectrumReady object (#556)

* Added server address in ElectrumReady object

* Assigned remote address to variable to improve readability

* Checking that the master address exists in the addresses map
This commit is contained in:
Dominique 2018-05-30 18:53:07 +02:00 committed by Fabrice Drouin
parent d75788039a
commit e1b884c58d
5 changed files with 26 additions and 23 deletions

View File

@ -154,7 +154,7 @@ class ElectrumClient(serverAddress: InetSocketAddress)(implicit val ec: Executio
val response = parseResponse(new String(data.toArray)).right.get
val header = parseHeader(response.result)
log.debug(s"connected, tip = ${header.block_hash} $header")
statusListeners.map(_ ! ElectrumReady(header))
statusListeners.map(_ ! ElectrumReady(header, remote))
context become connected(connection, remote, header, "", Map())
case AddStatusListener(actor) => statusListeners += actor
@ -163,7 +163,7 @@ class ElectrumClient(serverAddress: InetSocketAddress)(implicit val ec: Executio
def connected(connection: ActorRef, remoteAddress: InetSocketAddress, tip: Header, buffer: String, requests: Map[String, (Request, ActorRef)]): Receive = {
case AddStatusListener(actor) =>
statusListeners += actor
actor ! ElectrumReady(tip)
actor ! ElectrumReady(tip, remoteAddress)
case HeaderSubscription(actor) =>
headerSubscriptions += actor
@ -304,7 +304,7 @@ object ElectrumClient {
case class ServerError(request: Request, error: Error) extends Response
sealed trait ElectrumEvent
case class ElectrumReady(tip: Header) extends ElectrumEvent
case class ElectrumReady(tip: Header, serverAddress: InetSocketAddress) extends ElectrumEvent
case object ElectrumDisconnected extends ElectrumEvent
// @formatter:on

View File

@ -43,7 +43,7 @@ class ElectrumClientPool(serverAddresses: Set[InetSocketAddress])(implicit val e
startWith(Disconnected, DisconnectedData)
when(Disconnected) {
case Event(ElectrumClient.ElectrumReady(tip), _) if addresses.contains(sender) =>
case Event(ElectrumClient.ElectrumReady(tip, _), _) if addresses.contains(sender) =>
sender ! ElectrumClient.HeaderSubscription(self)
handleHeader(sender, tip, None)
@ -59,7 +59,7 @@ class ElectrumClientPool(serverAddresses: Set[InetSocketAddress])(implicit val e
}
when(Connected) {
case Event(ElectrumClient.ElectrumReady(tip), d: ConnectedData) if addresses.contains(sender) =>
case Event(ElectrumClient.ElectrumReady(tip, _), d: ConnectedData) if addresses.contains(sender) =>
sender ! ElectrumClient.HeaderSubscription(self)
handleHeader(sender, tip, Some(d))
@ -70,9 +70,9 @@ class ElectrumClientPool(serverAddresses: Set[InetSocketAddress])(implicit val e
master forward request
stay
case Event(ElectrumClient.AddStatusListener(listener), d: ConnectedData) =>
case Event(ElectrumClient.AddStatusListener(listener), d: ConnectedData) if addresses.contains(d.master) =>
statusListeners += listener
listener ! ElectrumClient.ElectrumReady(d.tips(d.master))
listener ! ElectrumClient.ElectrumReady(d.tips(d.master), addresses(d.master))
stay
case Event(Terminated(actor), d: ConnectedData) =>
@ -118,29 +118,30 @@ class ElectrumClientPool(serverAddresses: Set[InetSocketAddress])(implicit val e
initialize()
private def handleHeader(connection: ActorRef, tip: ElectrumClient.Header, d: Option[ConnectedData]) = {
private def handleHeader(connection: ActorRef, tip: ElectrumClient.Header, data: Option[ConnectedData]) = {
val remoteAddress = addresses(connection)
// we update our block count even if it doesn't come from our current master
updateBlockCount(tip.block_height)
d match {
data match {
case None =>
// as soon as we have a connection to an electrum server, we select it as master
log.info(s"selecting master ${addresses(connection)} at $tip")
statusListeners.foreach(_ ! ElectrumClient.ElectrumReady(tip))
context.system.eventStream.publish(ElectrumClient.ElectrumReady(tip))
log.info(s"selecting master $remoteAddress} at $tip")
statusListeners.foreach(_ ! ElectrumClient.ElectrumReady(tip, remoteAddress))
context.system.eventStream.publish(ElectrumClient.ElectrumReady(tip, remoteAddress))
goto(Connected) using ConnectedData(connection, Map(connection -> tip))
case Some(d) if tip.block_height >= d.blockHeight + 2L =>
// we only switch to a new master if there is a significant difference with our current master, because
// we don't want to switch to a new master every time a new block arrives (some servers will be notified before others)
log.info(s"switching to master ${addresses(connection)} at $tip")
log.info(s"switching to master $remoteAddress at $tip")
// we've switched to a new master, treat this as a disconnection/reconnection
// so users (wallet, watcher, ...) will reset their subscriptions
statusListeners.foreach(_ ! ElectrumClient.ElectrumDisconnected)
context.system.eventStream.publish(ElectrumClient.ElectrumDisconnected)
statusListeners.foreach(_ ! ElectrumClient.ElectrumReady(tip))
context.system.eventStream.publish(ElectrumClient.ElectrumReady(tip))
statusListeners.foreach(_ ! ElectrumClient.ElectrumReady(tip, remoteAddress))
context.system.eventStream.publish(ElectrumClient.ElectrumReady(tip, remoteAddress))
goto(Connected) using d.copy(master = connection, tips = d.tips + (connection -> tip))
case Some(d) =>
log.debug(s"received tip from ${addresses(connection)} $tip")
log.debug(s"received tip from $remoteAddress} $tip")
stay using d.copy(tips = d.tips + (connection -> tip))
}
}

View File

@ -56,10 +56,10 @@ class ElectrumWallet(seed: BinaryData, client: ActorRef, params: ElectrumWallet.
client ! ElectrumClient.AddStatusListener(self)
// disconnected --> waitingForTip --> running --
// disconnected --> waitingForTip --> running --+
// ^ |
// | |
// --------------------------------------------
// +--------------------------------------------+
/**
* Send a notification if the wallet is ready and its ready message has not
@ -97,7 +97,7 @@ class ElectrumWallet(seed: BinaryData, client: ActorRef, params: ElectrumWallet.
})
when(DISCONNECTED) {
case Event(ElectrumClient.ElectrumReady(_), data) =>
case Event(ElectrumClient.ElectrumReady(_, _), data) =>
client ! ElectrumClient.HeaderSubscription(self)
goto(WAITING_FOR_TIP) using data
}

View File

@ -52,7 +52,7 @@ class ElectrumWatcher(client: ActorRef) extends Actor with Stash with ActorLoggi
def receive = disconnected(Set.empty, Nil, SortedMap.empty)
def disconnected(watches: Set[Watch], publishQueue: Seq[PublishAsap], block2tx: SortedMap[Long, Seq[Transaction]]): Receive = {
case ElectrumClient.ElectrumReady(_) =>
case ElectrumClient.ElectrumReady(_, _) =>
client ! ElectrumClient.HeaderSubscription(self)
case ElectrumClient.HeaderSubscriptionResponse(header) =>
watches.map(self ! _)
@ -220,7 +220,7 @@ object ElectrumWatcher extends App {
}
def receive = {
case ElectrumClient.ElectrumReady(_) =>
case ElectrumClient.ElectrumReady(_, _) =>
log.info(s"starting watcher")
context become running(context.actorOf(Props(new ElectrumWatcher(client)), "watcher"))
}

View File

@ -16,6 +16,8 @@
package fr.acinq.eclair.blockchain.electrum
import java.net.InetSocketAddress
import akka.actor.{Actor, ActorSystem, Props}
import akka.testkit.{TestFSMRef, TestKit, TestProbe}
import fr.acinq.bitcoin.{BinaryData, Block, MnemonicCode, Satoshi}
@ -58,7 +60,7 @@ class ElectrumWalletSimulatedClientSpec extends TestKit(ActorSystem("test")) wit
test("wait until wallet is ready") {
sender.send(wallet, ElectrumClient.ElectrumReady(header1))
sender.send(wallet, ElectrumClient.ElectrumReady(header1, InetSocketAddress.createUnresolved("0.0.0.0", 9735)))
sender.send(wallet, ElectrumClient.HeaderSubscriptionResponse(header1))
awaitCond(wallet.stateName == ElectrumWallet.RUNNING)
assert(listener.expectMsgType[WalletReady].timestamp == header1.timestamp)
@ -82,7 +84,7 @@ class ElectrumWalletSimulatedClientSpec extends TestKit(ActorSystem("test")) wit
awaitCond(wallet.stateName == ElectrumWallet.DISCONNECTED)
// reconnect wallet
sender.send(wallet, ElectrumClient.ElectrumReady(header3))
sender.send(wallet, ElectrumClient.ElectrumReady(header3, InetSocketAddress.createUnresolved("0.0.0.0", 9735)))
sender.send(wallet, ElectrumClient.HeaderSubscriptionResponse(header3))
awaitCond(wallet.stateName == ElectrumWallet.RUNNING)