1
0
Fork 0
mirror of https://github.com/ACINQ/eclair.git synced 2025-02-24 14:50:46 +01:00

Electrum Wallet: Send ready notification when reconnected (#324)

* Electrum: Add unix timestamp to WalletReady notification

* Electrum: don't send WalletReady when we get disconnected

* Electrum: check that WalletReady notification is sent on reconnection

* Electrum: don't same the same ready notification more than once
We don't use state transitions to send notifications anymore

* Electrum client: use tcp keep-alive

* factorized socket options
This commit is contained in:
Fabrice Drouin 2017-12-22 21:08:30 +01:00 committed by GitHub
parent 059f211916
commit 95bd4a12cb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 124 additions and 25 deletions

View file

@ -29,6 +29,7 @@ class ElectrumClient(serverAddresses: Seq[InetSocketAddress]) extends Actor with
val newline = "\n" val newline = "\n"
val connectionFailures = collection.mutable.HashMap.empty[InetSocketAddress, Long] val connectionFailures = collection.mutable.HashMap.empty[InetSocketAddress, Long]
val socketOptions = Tcp.SO.KeepAlive(true) :: Nil
val version = ServerVersion("2.1.7", "1.1") val version = ServerVersion("2.1.7", "1.1")
// we need to regularly send a ping in order not to get disconnected // we need to regularly send a ping in order not to get disconnected
@ -39,7 +40,7 @@ class ElectrumClient(serverAddresses: Seq[InetSocketAddress]) extends Actor with
case _: Tcp.ConnectionClosed => case _: Tcp.ConnectionClosed =>
val nextAddress = nextPeer() val nextAddress = nextPeer()
log.warning(s"connection failed, trying $nextAddress") log.warning(s"connection failed, trying $nextAddress")
self ! Tcp.Connect(nextAddress) self ! Tcp.Connect(nextAddress, options = socketOptions)
statusListeners.map(_ ! ElectrumDisconnected) statusListeners.map(_ ! ElectrumDisconnected)
context.system.eventStream.publish(ElectrumDisconnected) context.system.eventStream.publish(ElectrumDisconnected)
context become disconnected context become disconnected
@ -104,7 +105,7 @@ class ElectrumClient(serverAddresses: Seq[InetSocketAddress]) extends Actor with
val headerSubscriptions = collection.mutable.HashSet.empty[ActorRef] val headerSubscriptions = collection.mutable.HashSet.empty[ActorRef]
context.system.eventStream.publish(ElectrumDisconnected) context.system.eventStream.publish(ElectrumDisconnected)
self ! Tcp.Connect(serverAddresses.head) self ! Tcp.Connect(serverAddresses.head, options = socketOptions)
var reqId = 0L var reqId = 0L
@ -133,7 +134,7 @@ class ElectrumClient(serverAddresses: Seq[InetSocketAddress]) extends Actor with
connectionFailures.put(remoteAddress, connectionFailures.getOrElse(remoteAddress, 0L) + 1L) connectionFailures.put(remoteAddress, connectionFailures.getOrElse(remoteAddress, 0L) + 1L)
val count = connectionFailures.getOrElse(nextAddress, 0L) val count = connectionFailures.getOrElse(nextAddress, 0L)
val delay = Math.min(Math.pow(2.0, count), 60.0) seconds; val delay = Math.min(Math.pow(2.0, count), 60.0) seconds;
context.system.scheduler.scheduleOnce(delay, self, Tcp.Connect(nextAddress)) context.system.scheduler.scheduleOnce(delay, self, Tcp.Connect(nextAddress, options = socketOptions))
} }
def waitingForVersion(connection: ActorRef, remote: InetSocketAddress): Receive = { def waitingForVersion(connection: ActorRef, remote: InetSocketAddress): Receive = {

View file

@ -50,6 +50,28 @@ class ElectrumWallet(mnemonics: Seq[String], client: ActorRef, params: ElectrumW
// | | // | |
// -------------------------------------------- // --------------------------------------------
/**
* Send a notification if the wallet is ready and its ready message has not
* already been sent
* @param data wallet data
* @return the input data with an updated 'last ready message' if needed
*/
def notifyReady(data: ElectrumWallet.Data) : ElectrumWallet.Data = {
if(data.isReady(swipeRange)) {
data.lastReadyMessage match {
case Some(value) if value == data.readyMessage =>
log.debug(s"ready message $value has already been sent")
data
case _ =>
val ready = data.readyMessage
log.info(s"wallet is ready with $ready")
context.system.eventStream.publish(ready)
context.system.eventStream.publish(NewWalletReceiveAddress(data.currentReceiveAddress))
data.copy(lastReadyMessage = Some(ready))
}
} else data
}
startWith(DISCONNECTED, { startWith(DISCONNECTED, {
val header = chainHash match { val header = chainHash match {
case Block.RegtestGenesisBlock.hash => ElectrumClient.Header.RegtestGenesisHeader case Block.RegtestGenesisBlock.hash => ElectrumClient.Header.RegtestGenesisHeader
@ -72,7 +94,8 @@ class ElectrumWallet(mnemonics: Seq[String], client: ActorRef, params: ElectrumW
case Event(ElectrumClient.HeaderSubscriptionResponse(header), data) => case Event(ElectrumClient.HeaderSubscriptionResponse(header), data) =>
data.accountKeys.foreach(key => client ! ElectrumClient.ScriptHashSubscription(computeScriptHashFromPublicKey(key.publicKey), self)) data.accountKeys.foreach(key => client ! ElectrumClient.ScriptHashSubscription(computeScriptHashFromPublicKey(key.publicKey), self))
data.changeKeys.foreach(key => client ! ElectrumClient.ScriptHashSubscription(computeScriptHashFromPublicKey(key.publicKey), self)) data.changeKeys.foreach(key => client ! ElectrumClient.ScriptHashSubscription(computeScriptHashFromPublicKey(key.publicKey), self))
goto(RUNNING) using data.copy(tip = header) // make sure there is not last ready message
goto(RUNNING) using data.copy(tip = header, lastReadyMessage = None)
case Event(ElectrumClient.ElectrumDisconnected, data) => case Event(ElectrumClient.ElectrumDisconnected, data) =>
log.info(s"wallet got disconnected") log.info(s"wallet got disconnected")
@ -89,9 +112,10 @@ class ElectrumWallet(mnemonics: Seq[String], client: ActorRef, params: ElectrumW
val confirmations = computeDepth(header.block_height, height) val confirmations = computeDepth(header.block_height, height)
context.system.eventStream.publish(TransactionConfidenceChanged(txid, confirmations)) context.system.eventStream.publish(TransactionConfidenceChanged(txid, confirmations))
} }
stay using data.copy(tip = header) stay using notifyReady(data.copy(tip = header))
case Event(ElectrumClient.ScriptHashSubscriptionResponse(scriptHash, status), data) if data.status.get(scriptHash) == Some(status) => stay // we already have it case Event(ElectrumClient.ScriptHashSubscriptionResponse(scriptHash, status), data) if data.status.get(scriptHash) == Some(status) =>
stay using notifyReady(data)// we already have it
case Event(ElectrumClient.ScriptHashSubscriptionResponse(scriptHash, status), data) if !data.accountKeyMap.contains(scriptHash) && !data.changeKeyMap.contains(scriptHash) => case Event(ElectrumClient.ScriptHashSubscriptionResponse(scriptHash, status), data) if !data.accountKeyMap.contains(scriptHash) && !data.changeKeyMap.contains(scriptHash) =>
log.warning(s"received status=$status for scriptHash=$scriptHash which does not match any of our keys") log.warning(s"received status=$status for scriptHash=$scriptHash which does not match any of our keys")
@ -99,7 +123,7 @@ class ElectrumWallet(mnemonics: Seq[String], client: ActorRef, params: ElectrumW
case Event(ElectrumClient.ScriptHashSubscriptionResponse(scriptHash, status), data) if status == "" => case Event(ElectrumClient.ScriptHashSubscriptionResponse(scriptHash, status), data) if status == "" =>
val data1 = data.copy(status = data.status + (scriptHash -> status)) // empty status, nothing to do val data1 = data.copy(status = data.status + (scriptHash -> status)) // empty status, nothing to do
goto(stateName) using data1 stay using notifyReady(data1)
case Event(ElectrumClient.ScriptHashSubscriptionResponse(scriptHash, status), data) => case Event(ElectrumClient.ScriptHashSubscriptionResponse(scriptHash, status), data) =>
val key = data.accountKeyMap.getOrElse(scriptHash, data.changeKeyMap(scriptHash)) val key = data.accountKeyMap.getOrElse(scriptHash, data.changeKeyMap(scriptHash))
@ -127,7 +151,7 @@ class ElectrumWallet(mnemonics: Seq[String], client: ActorRef, params: ElectrumW
status = data.status + (scriptHash -> status), status = data.status + (scriptHash -> status),
pendingHistoryRequests = data.pendingHistoryRequests + scriptHash) pendingHistoryRequests = data.pendingHistoryRequests + scriptHash)
goto(stateName) using data1 // goto instead of stay because we want to fire transitions stay using notifyReady(data1)
case Event(ElectrumClient.GetScriptHashHistoryResponse(scriptHash, items), data) => case Event(ElectrumClient.GetScriptHashHistoryResponse(scriptHash, items), data) =>
log.debug(s"scriptHash=$scriptHash has history=$items") log.debug(s"scriptHash=$scriptHash has history=$items")
@ -166,7 +190,7 @@ class ElectrumWallet(mnemonics: Seq[String], client: ActorRef, params: ElectrumW
} }
} }
val data1 = data.copy(heights = heights1, history = data.history + (scriptHash -> items0), pendingHistoryRequests = data.pendingHistoryRequests - scriptHash, pendingTransactionRequests = pendingTransactionRequests1) val data1 = data.copy(heights = heights1, history = data.history + (scriptHash -> items0), pendingHistoryRequests = data.pendingHistoryRequests - scriptHash, pendingTransactionRequests = pendingTransactionRequests1)
goto(stateName) using data1 // goto instead of stay because we want to fire transitions stay using notifyReady(data1)
case Event(GetTransactionResponse(tx), data) => case Event(GetTransactionResponse(tx), data) =>
log.debug(s"received transaction ${tx.txid}") log.debug(s"received transaction ${tx.txid}")
@ -177,12 +201,12 @@ class ElectrumWallet(mnemonics: Seq[String], client: ActorRef, params: ElectrumW
// when we have successfully processed a new tx, we retry all pending txes to see if they can be added now // when we have successfully processed a new tx, we retry all pending txes to see if they can be added now
data.pendingTransactions.foreach(self ! GetTransactionResponse(_)) data.pendingTransactions.foreach(self ! GetTransactionResponse(_))
val data1 = data.copy(transactions = data.transactions + (tx.txid -> tx), pendingTransactionRequests = data.pendingTransactionRequests - tx.txid, pendingTransactions = Nil) val data1 = data.copy(transactions = data.transactions + (tx.txid -> tx), pendingTransactionRequests = data.pendingTransactionRequests - tx.txid, pendingTransactions = Nil)
goto(stateName) using data1 // goto instead of stay because we want to fire transitions stay using notifyReady(data1)
case None => case None =>
// missing parents // missing parents
log.info(s"couldn't connect txid=${tx.txid}") log.info(s"couldn't connect txid=${tx.txid}")
val data1 = data.copy(pendingTransactions = data.pendingTransactions :+ tx) val data1 = data.copy(pendingTransactions = data.pendingTransactions :+ tx)
stay using data1 stay using notifyReady(data1)
} }
case Event(CompleteTransaction(tx, feeRatePerKw), data) => case Event(CompleteTransaction(tx, feeRatePerKw), data) =>
@ -200,11 +224,11 @@ class ElectrumWallet(mnemonics: Seq[String], client: ActorRef, params: ElectrumW
val (received, sent, Some(fee)) = data.computeTransactionDelta(tx).get val (received, sent, Some(fee)) = data.computeTransactionDelta(tx).get
// we notify here because the tx won't be downloaded again (it has been added to the state at commit) // we notify here because the tx won't be downloaded again (it has been added to the state at commit)
context.system.eventStream.publish(TransactionReceived(tx, data1.computeTransactionDepth(tx.txid), received, sent, Some(fee))) context.system.eventStream.publish(TransactionReceived(tx, data1.computeTransactionDepth(tx.txid), received, sent, Some(fee)))
goto(stateName) using data1 replying CommitTransactionResponse(tx) // goto instead of stay because we want to fire transitions stay using notifyReady(data1) replying CommitTransactionResponse(tx) // goto instead of stay because we want to fire transitions
case Event(CancelTransaction(tx), data) => case Event(CancelTransaction(tx), data) =>
log.info(s"cancelling txid=${tx.txid}") log.info(s"cancelling txid=${tx.txid}")
stay using data.cancelTransaction(tx) replying CancelTransactionResponse(tx) stay using notifyReady(data.cancelTransaction(tx)) replying CancelTransactionResponse(tx)
case Event(bc@ElectrumClient.BroadcastTransaction(tx), _) => case Event(bc@ElectrumClient.BroadcastTransaction(tx), _) =>
log.info(s"broadcasting txid=${tx.txid}") log.info(s"broadcasting txid=${tx.txid}")
@ -230,14 +254,6 @@ class ElectrumWallet(mnemonics: Seq[String], client: ActorRef, params: ElectrumW
case Event(ElectrumClient.BroadcastTransaction(tx), _) => stay replying ElectrumClient.BroadcastTransactionResponse(tx, Some(Error(-1, "wallet is not connected"))) case Event(ElectrumClient.BroadcastTransaction(tx), _) => stay replying ElectrumClient.BroadcastTransactionResponse(tx, Some(Error(-1, "wallet is not connected")))
} }
onTransition {
case _ -> _ if nextStateData.isReady(params.swipeRange) =>
val ready = nextStateData.readyMessage
log.info(s"wallet is ready with $ready")
context.system.eventStream.publish(ready)
context.system.eventStream.publish(NewWalletReceiveAddress(nextStateData.currentReceiveAddress))
}
initialize() initialize()
} }
@ -316,7 +332,7 @@ object ElectrumWallet {
case class TransactionReceived(tx: Transaction, depth: Long, received: Satoshi, sent: Satoshi, feeOpt: Option[Satoshi]) extends WalletEvent case class TransactionReceived(tx: Transaction, depth: Long, received: Satoshi, sent: Satoshi, feeOpt: Option[Satoshi]) extends WalletEvent
case class TransactionConfidenceChanged(txid: BinaryData, depth: Long) extends WalletEvent case class TransactionConfidenceChanged(txid: BinaryData, depth: Long) extends WalletEvent
case class NewWalletReceiveAddress(address: String) extends WalletEvent case class NewWalletReceiveAddress(address: String) extends WalletEvent
case class WalletReady(confirmedBalance: Satoshi, unconfirmedBalance: Satoshi, height: Long) extends WalletEvent case class WalletReady(confirmedBalance: Satoshi, unconfirmedBalance: Satoshi, height: Long, timestamp: Long) extends WalletEvent
// @formatter:on // @formatter:on
/** /**
@ -432,7 +448,8 @@ object ElectrumWallet {
locks: Set[Transaction], locks: Set[Transaction],
pendingHistoryRequests: Set[BinaryData], pendingHistoryRequests: Set[BinaryData],
pendingTransactionRequests: Set[BinaryData], pendingTransactionRequests: Set[BinaryData],
pendingTransactions: Seq[Transaction]) extends Logging { pendingTransactions: Seq[Transaction],
lastReadyMessage: Option[WalletReady]) extends Logging {
lazy val accountKeyMap = accountKeys.map(key => computeScriptHashFromPublicKey(key.publicKey) -> key).toMap lazy val accountKeyMap = accountKeys.map(key => computeScriptHashFromPublicKey(key.publicKey) -> key).toMap
lazy val changeKeyMap = changeKeys.map(key => computeScriptHashFromPublicKey(key.publicKey) -> key).toMap lazy val changeKeyMap = changeKeys.map(key => computeScriptHashFromPublicKey(key.publicKey) -> key).toMap
@ -454,7 +471,7 @@ object ElectrumWallet {
def readyMessage: WalletReady = { def readyMessage: WalletReady = {
val (confirmed, unconfirmed) = balance val (confirmed, unconfirmed) = balance
WalletReady(confirmed, unconfirmed, tip.block_height) WalletReady(confirmed, unconfirmed, tip.block_height, tip.timestamp)
} }
/** /**
@ -739,7 +756,7 @@ object ElectrumWallet {
object Data { object Data {
def apply(params: ElectrumWallet.WalletParameters, tip: ElectrumClient.Header, accountKeys: Vector[ExtendedPrivateKey], changeKeys: Vector[ExtendedPrivateKey]): Data def apply(params: ElectrumWallet.WalletParameters, tip: ElectrumClient.Header, accountKeys: Vector[ExtendedPrivateKey], changeKeys: Vector[ExtendedPrivateKey]): Data
= Data(tip, accountKeys, changeKeys, Map(), Map(), Map(), Map(), Set(), Set(), Set(), Seq()) = Data(tip, accountKeys, changeKeys, Map(), Map(), Map(), Map(), Set(), Set(), Set(), Seq(), None)
} }
case class InfiniteLoopException(data: Data, tx: Transaction) extends Exception case class InfiniteLoopException(data: Data, tx: Transaction) extends Exception

View file

@ -0,0 +1,81 @@
package fr.acinq.eclair.blockchain.electrum
import akka.actor.{Actor, ActorSystem, Props}
import akka.testkit.{TestFSMRef, TestKit, TestProbe}
import fr.acinq.bitcoin.{BinaryData, Block, MnemonicCode, Satoshi}
import fr.acinq.eclair.blockchain.electrum.ElectrumClient.{ScriptHashSubscription, ScriptHashSubscriptionResponse}
import fr.acinq.eclair.blockchain.electrum.ElectrumWallet.{NewWalletReceiveAddress, WalletEvent, WalletParameters, WalletReady}
import org.junit.runner.RunWith
import org.scalatest.{BeforeAndAfterAll, FunSuite, FunSuiteLike}
import org.scalatest.junit.JUnitRunner
import scala.concurrent.duration._
@RunWith(classOf[JUnitRunner])
class ElectrumWalletSimulatedClientSpec extends TestKit(ActorSystem("test")) with FunSuiteLike {
val sender = TestProbe()
class SimulatedClient extends Actor {
def receive = {
case ScriptHashSubscription(scriptHash, replyTo) => replyTo ! ScriptHashSubscriptionResponse(scriptHash, "")
}
}
val entropy = BinaryData("01" * 32)
val mnemonics = MnemonicCode.toMnemonics(entropy)
val seed = MnemonicCode.toSeed(mnemonics, "")
val listener = TestProbe()
system.eventStream.subscribe(listener.ref, classOf[WalletEvent])
val wallet = TestFSMRef(new ElectrumWallet(mnemonics, system.actorOf(Props(new SimulatedClient())), WalletParameters(Block.RegtestGenesisBlock.hash, minimumFee = Satoshi(5000))))
// wallet sends a receive address notification as soon as it is created
listener.expectMsgType[NewWalletReceiveAddress]
val genesis = ElectrumClient.Header(1, 1, Block.RegtestGenesisBlock.hash, BinaryData("01" * 32), timestamp = 12346L, bits = 0, nonce = 0)
val header1 = makeHeader(genesis, 12345L)
val header2 = makeHeader(header1, 12346L)
val header3 = makeHeader(header2, 12347L)
val header4 = makeHeader(header3, 12348L)
def makeHeader(previousHeader: ElectrumClient.Header, timestamp: Long): ElectrumClient.Header = ElectrumClient.Header(previousHeader.block_height + 1, 1, previousHeader.block_hash, BinaryData("01" * 32), timestamp = timestamp, bits = 0, nonce = 0)
test("wait until wallet is ready") {
sender.send(wallet, ElectrumClient.ElectrumReady)
sender.send(wallet, ElectrumClient.HeaderSubscriptionResponse(header1))
awaitCond(wallet.stateName == ElectrumWallet.RUNNING)
assert(listener.expectMsgType[WalletReady].timestamp == header1.timestamp)
listener.expectMsgType[NewWalletReceiveAddress]
}
test("tell wallet is ready when a new block comes in, even if nothing else has changed") {
sender.send(wallet, ElectrumClient.HeaderSubscriptionResponse(header2))
assert(listener.expectMsgType[WalletReady].timestamp == header2.timestamp)
listener.expectMsgType[NewWalletReceiveAddress]
}
test("tell wallet is ready when it is reconnected, even if nothing has changed") {
// disconnect wallet
sender.send(wallet, ElectrumClient.ElectrumDisconnected)
awaitCond(wallet.stateName == ElectrumWallet.DISCONNECTED)
// reconnect wallet
sender.send(wallet, ElectrumClient.ElectrumReady)
sender.send(wallet, ElectrumClient.HeaderSubscriptionResponse(header3))
awaitCond(wallet.stateName == ElectrumWallet.RUNNING)
// listener should be notified
assert(listener.expectMsgType[WalletReady].timestamp == header3.timestamp)
listener.expectMsgType[NewWalletReceiveAddress]
}
test("don't send the same ready mnessage more then once") {
// listener should be notified
sender.send(wallet, ElectrumClient.HeaderSubscriptionResponse(header4))
assert(listener.expectMsgType[WalletReady].timestamp == header4.timestamp)
listener.expectMsgType[NewWalletReceiveAddress]
sender.send(wallet, ElectrumClient.HeaderSubscriptionResponse(header4))
listener.expectNoMsg(500 milliseconds)
}
}