From 3433adcad03d6fc00266dc58e44bef3695d5e137 Mon Sep 17 00:00:00 2001 From: pm47 Date: Tue, 1 Sep 2015 20:41:58 +0200 Subject: [PATCH] improved blockchain watcher and closing flow --- .../fr/acinq/eclair/BlockchainWatcher.scala | 38 +++++++----- .../src/main/scala/fr/acinq/eclair/Boot.scala | 7 +-- .../src/main/scala/fr/acinq/eclair/Node.scala | 62 +++++++------------ 3 files changed, 49 insertions(+), 58 deletions(-) diff --git a/eclair-demo/src/main/scala/fr/acinq/eclair/BlockchainWatcher.scala b/eclair-demo/src/main/scala/fr/acinq/eclair/BlockchainWatcher.scala index 6f24677ae..147e438c5 100644 --- a/eclair-demo/src/main/scala/fr/acinq/eclair/BlockchainWatcher.scala +++ b/eclair-demo/src/main/scala/fr/acinq/eclair/BlockchainWatcher.scala @@ -2,14 +2,16 @@ package fr.acinq.eclair import akka.actor.{ActorRef, Actor, ActorLogging} import fr.acinq.bitcoin.{Transaction, BinaryData} -import fr.acinq.lightning._ +import scala.concurrent.duration._ import scala.collection.mutable // @formatter:off -final case class Watch(txId: BinaryData) -final case class Unwatch(txId: BinaryData) +sealed trait TxType +case object Anchor extends TxType +case object Final extends TxType +final case class Watch(channel: ActorRef, txId: BinaryData, t: TxType, minDepth: Int) final case class TxConfirmed(txId: BinaryData, blockId: BinaryData, confirmations: Int) final case class Publish(tx: Transaction) @@ -20,23 +22,29 @@ final case class Publish(tx: Transaction) */ class BlockchainWatcher extends Actor with ActorLogging { - val m = new mutable.HashMap[BinaryData, mutable.Set[ActorRef]] with mutable.MultiMap[BinaryData, ActorRef] - context.become(watching(m)) + context.become(watching(Set())) override def receive: Receive = ??? - def watching(m: mutable.MultiMap[BinaryData, ActorRef]): Receive = { - case Watch(txId) => + def watching(watches: Set[Watch]): Receive = { + case w@Watch(channel, txId, typ, minDepth) => log.info(s"watching tx $txId for $sender") - // instant confirmation for testing - (0 until 3) foreach(i => self ! TxConfirmed(txId, "5deedc4c7f4c8e3250a486f340e57a565cda908eef7b7df2c1cd61b8ad6b42e6", i)) - context.become(watching(m.addBinding(txId, sender))) + // TODO : for testing + import scala.concurrent.ExecutionContext.Implicits.global + (0 until 3) foreach(i => context.system.scheduler.scheduleOnce(i * 100 milliseconds, self, TxConfirmed(txId, "5deedc4c7f4c8e3250a486f340e57a565cda908eef7b7df2c1cd61b8ad6b42e6", i))) + context.become(watching(watches + w)) - case Unwatch(txId) => - context.become(watching(m.removeBinding(txId, sender))) - - case TxConfirmed(txId, blockId, confirmations) if m.contains(txId) => - m(txId).foreach(_ ! BITCOIN_TX_CONFIRMED(blockId, confirmations)) + case TxConfirmed(txId, blockId, confirmations) => + log.info(s"got $confirmations confirmation(s) for tx $txId") + watches.filter(_.txId == txId).foreach(w => w match { + case Watch(channel, _, Anchor, minDepth) if confirmations >= minDepth => + channel ! BITCOIN_ANCHOR_DEPTHOK + context.become(watching(watches - w)) + case Watch(channel, _, Final, minDepth) if confirmations >= minDepth => + channel ! BITCOIN_CLOSE_DONE + context.become(watching(watches - w)) + case _ => {} + }) case Publish(tx) => log.info(s"publishing tx $tx") diff --git a/eclair-demo/src/main/scala/fr/acinq/eclair/Boot.scala b/eclair-demo/src/main/scala/fr/acinq/eclair/Boot.scala index f91d5604e..b06ee5a76 100644 --- a/eclair-demo/src/main/scala/fr/acinq/eclair/Boot.scala +++ b/eclair-demo/src/main/scala/fr/acinq/eclair/Boot.scala @@ -52,10 +52,9 @@ object Boot extends App { alice ! CMD_CLOSE(0) - while (Await.result(alice ? CMD_GETSTATE, 5 seconds) != CLOSE_WAIT_CLOSE) Thread.sleep(200) - while (Await.result(bob ? CMD_GETSTATE, 5 seconds) != CLOSE_WAIT_CLOSE) Thread.sleep(200) + while (Await.result(alice ? CMD_GETSTATE, 5 seconds) != CLOSED) Thread.sleep(200) + while (Await.result(bob ? CMD_GETSTATE, 5 seconds) != CLOSED) Thread.sleep(200) - alice ! BITCOIN_CLOSE_DONE - bob ! BITCOIN_CLOSE_DONE + system.shutdown() } diff --git a/eclair-demo/src/main/scala/fr/acinq/eclair/Node.scala b/eclair-demo/src/main/scala/fr/acinq/eclair/Node.scala index b0c01c5a3..196089267 100644 --- a/eclair-demo/src/main/scala/fr/acinq/eclair/Node.scala +++ b/eclair-demo/src/main/scala/fr/acinq/eclair/Node.scala @@ -110,13 +110,13 @@ sealed trait LowPriority extends State case object INPUT_NONE sealed trait BlockchainEvent -final case class BITCOIN_TX_CONFIRMED(blockId: sha256_hash, confirmations: Int) extends BlockchainEvent -case object BITCOIN_CLOSE_DONE +case object BITCOIN_ANCHOR_DEPTHOK case object BITCOIN_ANCHOR_UNSPENT case object BITCOIN_ANCHOR_TIMEOUT case object BITCOIN_ANCHOR_THEIRSPEND case object BITCOIN_ANCHOR_OURCOMMIT_DELAYPASSED case object BITCOIN_ANCHOR_OTHERSPEND +case object BITCOIN_CLOSE_DONE sealed trait Command final case class CMD_SEND_HTLC_UPDATE(amount: Int, rHash: sha256_hash, expiry: locktime) extends Command @@ -146,6 +146,7 @@ final case class DATA_WAIT_FOR_UPDATE_ACCEPT(ourParams: ChannelParams, theirPara final case class DATA_WAIT_FOR_HTLC_ACCEPT(ourParams: ChannelParams, theirParams: ChannelParams, previousCommitmentTxSigned: CommitmentTx, updateProposal: UpdateProposal) extends Data final case class DATA_WAIT_FOR_UPDATE_SIG(ourParams: ChannelParams, theirParams: ChannelParams, previousCommitmentTxSigned: CommitmentTx, newCommitmentTxUnsigned: CommitmentTx) extends Data final case class DATA_WAIT_FOR_UPDATE_COMPLETE(ourParams: ChannelParams, theirParams: ChannelParams, previousCommitmentTxSigned: CommitmentTx, newCommitmentTxUnsigned: CommitmentTx) extends Data +final case class DATA_WAIT_FOR_CLOSE_ACK(finalTx: Transaction) extends Data // @formatter:on @@ -237,7 +238,7 @@ class Node(val blockchain: ActorRef, val commitPrivKey: BinaryData, val finalPri val theirCommitTx = makeCommitTx(theirParams.finalKey, ourParams.finalKey, ourParams.delay, anchorTxid, anchorOutputIndex, theirRevocationHash, state.reverse) val ourSigForThem = bin2signature(Transaction.signInput(theirCommitTx, 0, multiSig2of2(ourParams.commitKey, theirParams.commitKey), SIGHASH_ALL, commitPrivKey)) them ! open_commit_sig(ourSigForThem) - blockchain ! Watch(anchorTxid) + blockchain ! Watch(self, anchorTxid, Anchor, ourParams.minDepth) goto(OPEN_WAITING_THEIRANCHOR) using DATA_OPEN_WAITING(ourParams, theirParams, CommitmentTx(signedCommitTx, state, ourRevocationHashPreimage, theirRevocationHash)) case Event(CMD_CLOSE(_), _) => goto(CLOSED) @@ -251,7 +252,7 @@ class Node(val blockchain: ActorRef, val commitPrivKey: BinaryData, val finalPri val ok = Try(Transaction.correctlySpends(signedCommitTx, Map(OutPoint(anchorTx.hash, anchorOutputIndex) -> anchorPubkeyScript(ourParams.commitKey, theirParams.commitKey)), ScriptFlags.STANDARD_SCRIPT_VERIFY_FLAGS)).isSuccess // TODO : return Error and close channel if !ok if (!ok) log.error(s"invalid sig") - blockchain ! Watch(anchorTx.hash) + blockchain ! Watch(self, anchorTx.hash, Anchor, ourParams.minDepth) blockchain ! Publish(anchorTx) goto(OPEN_WAITING_OURANCHOR) using DATA_OPEN_WAITING(ourParams, theirParams, newCommitTx.copy(tx = signedCommitTx)) @@ -259,15 +260,10 @@ class Node(val blockchain: ActorRef, val commitPrivKey: BinaryData, val finalPri } when(OPEN_WAITING_THEIRANCHOR) { - case Event(BITCOIN_TX_CONFIRMED(blockId, confirmations), DATA_OPEN_WAITING(ourParams, _, _)) if confirmations < ourParams.minDepth => - log.info(s"got $confirmations confirmation(s) for anchor tx") - stay - - case Event(BITCOIN_TX_CONFIRMED(blockId, confirmations), d@DATA_OPEN_WAITING(ourParams, _, _)) if confirmations >= ourParams.minDepth => - log.info(s"got $confirmations confirmation(s) for anchor tx, minDepth reached") - them ! open_complete(Some(blockId)) + case Event(BITCOIN_ANCHOR_DEPTHOK, DATA_OPEN_WAITING(ourParams, theirParams, commitmentTx)) => + them ! open_complete(None) unstashAll() - goto(OPEN_WAIT_FOR_COMPLETE_THEIRANCHOR) using DATA_NORMAL(d.ourParams, d.theirParams, d.commitmentTx) + goto(OPEN_WAIT_FOR_COMPLETE_THEIRANCHOR) using DATA_NORMAL(ourParams, theirParams, commitmentTx) case Event(msg@open_complete(blockId_opt), d@DATA_OPEN_WAITING(ourParams, _, _)) => log.info(s"received their open_complete, deferring message") @@ -298,15 +294,10 @@ class Node(val blockchain: ActorRef, val commitPrivKey: BinaryData, val finalPri } when(OPEN_WAITING_OURANCHOR) { - case Event(BITCOIN_TX_CONFIRMED(blockId, confirmations), DATA_OPEN_WAITING(ourParams, _, _)) if confirmations < ourParams.minDepth => - log.info(s"got $confirmations confirmation(s) for anchor tx") - stay - - case Event(BITCOIN_TX_CONFIRMED(blockId, confirmations), d@DATA_OPEN_WAITING(ourParams, _, _)) if confirmations >= ourParams.minDepth => - log.info(s"got $confirmations confirmation(s) for anchor tx, minDepth reached") - them ! open_complete(Some(blockId)) + case Event(BITCOIN_ANCHOR_DEPTHOK, DATA_OPEN_WAITING(ourParams, theirParams, commitmentTx)) => + them ! open_complete(None) unstashAll() - goto(OPEN_WAIT_FOR_COMPLETE_OURANCHOR) using DATA_NORMAL(d.ourParams, d.theirParams, d.commitmentTx) + goto(OPEN_WAIT_FOR_COMPLETE_OURANCHOR) using DATA_NORMAL(ourParams, theirParams, commitmentTx) case Event(msg@open_complete(blockId_opt), d@DATA_OPEN_WAITING(ourParams, _, _)) => log.info(s"received their open_complete, deferring message") @@ -594,27 +585,19 @@ class Node(val blockchain: ActorRef, val commitPrivKey: BinaryData, val finalPri when(WAIT_FOR_UPDATE_COMPLETE_LOWPRIO)(WAIT_FOR_UPDATE_COMPLETE_handler) when(WAIT_FOR_CLOSE_COMPLETE) { - case Event(close_channel(theirSig, closeFee), DATA_NORMAL(ourParams, theirParams, CommitmentTx(commitmentTx, state, _, _))) => - //the only difference between their final tx and ours is the order of the outputs, because state is symmetric - val theirFinalTx = makeFinalTx(commitmentTx.txIn, ourParams.finalKey, theirParams.finalKey, state.reverse) - val ourSigForThem = bin2signature(Transaction.signInput(theirFinalTx, 0, multiSig2of2(ourParams.commitKey, theirParams.commitKey), SIGHASH_ALL, commitPrivKey)) - val ourFinalTx = makeFinalTx(commitmentTx.txIn, ourParams.finalKey, theirParams.finalKey, state) - val ourSig = bin2signature(Transaction.signInput(ourFinalTx, 0, multiSig2of2(ourParams.commitKey, theirParams.commitKey), SIGHASH_ALL, commitPrivKey)) - val signedFinaltx = ourFinalTx.updateSigScript(0, sigScript2of2(theirSig, ourSig, theirParams.commitKey, ourParams.commitKey)) - log.debug(s"final tx : ${Hex.toHexString(Transaction.write(signedFinaltx))}") - // ok now we can broadcast the final tx if we want - them ! close_channel_ack() - goto(CLOSE_WAIT_CLOSE) case Event(close_channel_complete(theirSig), DATA_NORMAL(ourParams, theirParams, CommitmentTx(commitmentTx, state, _, _))) => val finalTx = makeFinalTx(commitmentTx.txIn, ourParams.finalKey, theirParams.finalKey, state) val ourSig = Transaction.signInput(finalTx, 0, multiSig2of2(ourParams.commitKey, theirParams.commitKey), SIGHASH_ALL, commitPrivKey) val signedFinaltx = finalTx.updateSigScript(0, sigScript2of2(theirSig, ourSig, theirParams.commitKey, ourParams.commitKey)) log.debug(s"final tx : ${Hex.toHexString(Transaction.write(signedFinaltx))}") - // ok now we can broadcast the final tx if we want them ! close_channel_ack() + blockchain ! Watch(self, signedFinaltx.hash, Final, 1) + blockchain ! Publish(signedFinaltx) goto(CLOSE_WAIT_CLOSE) + case Event(BITCOIN_CLOSE_DONE, _) => goto(CLOSED) + case Event(BITCOIN_ANCHOR_THEIRSPEND, _) => them ! handle_btc_anchor_theirspend() goto(CLOSE_WAIT_SPENDTHEM_CLOSE) @@ -630,6 +613,8 @@ class Node(val blockchain: ActorRef, val commitPrivKey: BinaryData, val finalPri case Event(close_channel_ack(), _) => goto(CLOSE_WAIT_CLOSE) + case Event(BITCOIN_CLOSE_DONE, _) => goto(CLOSED) + case Event(pkt: error, _) => goto(CLOSE_WAIT_CLOSE) case Event(BITCOIN_ANCHOR_THEIRSPEND, _) => @@ -692,6 +677,7 @@ class Node(val blockchain: ActorRef, val commitPrivKey: BinaryData, val finalPri } when(CLOSE_WAIT_CLOSE_OURCOMMIT) { + case Event(BITCOIN_ANCHOR_UNSPENT, _) => goto(ERROR_ANCHOR_LOST) case Event(BITCOIN_ANCHOR_THEIRSPEND, _) => @@ -705,6 +691,8 @@ class Node(val blockchain: ActorRef, val commitPrivKey: BinaryData, val finalPri case Event(BITCOIN_ANCHOR_OTHERSPEND, _) => handle_btc_anchor_otherspend() goto(CLOSE_WAIT_STEAL_CLOSE_OURCOMMIT) + + case Event(BITCOIN_CLOSE_DONE, _) => goto(CLOSED) } when(CLOSE_WAIT_CLOSE_SPENDOURS) { @@ -904,15 +892,10 @@ class Node(val blockchain: ActorRef, val commitPrivKey: BinaryData, val finalPri } when(CLOSED) { - case _ => stay + case null => stay } whenUnhandled { - case Event(e@BITCOIN_TX_CONFIRMED(_, confirmations), _) => - log.debug(s"dropped $e") - // drops silently, we don't care for confirmations above minDepth - stay - case Event(CMD_GETSTATE, _) => sender ! stateName stay @@ -948,7 +931,8 @@ class Node(val blockchain: ActorRef, val commitPrivKey: BinaryData, val finalPri val ourSig = Transaction.signInput(ourFinalTx, 0, multiSig2of2(ourParams.commitKey, theirParams.commitKey), SIGHASH_ALL, commitPrivKey) val signedFinaltx = ourFinalTx.updateSigScript(0, sigScript2of2(pkt.sig, ourSig, theirParams.commitKey, ourParams.commitKey)) log.debug(s"*** final tx : ${Hex.toHexString(Transaction.write(signedFinaltx))}") - // ok now we can broadcast the final tx if we want + blockchain ! Watch(self, signedFinaltx.hash, Final, 1) + blockchain ! Publish(signedFinaltx) close_channel_complete(ourSigForThem) }