1
0
Fork 0
mirror of https://github.com/ACINQ/eclair.git synced 2025-02-22 22:25:26 +01:00

improved blockchain watcher and closing flow

This commit is contained in:
pm47 2015-09-01 20:41:58 +02:00
parent ee0fc2575f
commit 3433adcad0
3 changed files with 49 additions and 58 deletions

View file

@ -2,14 +2,16 @@ package fr.acinq.eclair
import akka.actor.{ActorRef, Actor, ActorLogging}
import fr.acinq.bitcoin.{Transaction, BinaryData}
import fr.acinq.lightning._
import scala.concurrent.duration._
import scala.collection.mutable
// @formatter:off
final case class Watch(txId: BinaryData)
final case class Unwatch(txId: BinaryData)
sealed trait TxType
case object Anchor extends TxType
case object Final extends TxType
final case class Watch(channel: ActorRef, txId: BinaryData, t: TxType, minDepth: Int)
final case class TxConfirmed(txId: BinaryData, blockId: BinaryData, confirmations: Int)
final case class Publish(tx: Transaction)
@ -20,23 +22,29 @@ final case class Publish(tx: Transaction)
*/
class BlockchainWatcher extends Actor with ActorLogging {
val m = new mutable.HashMap[BinaryData, mutable.Set[ActorRef]] with mutable.MultiMap[BinaryData, ActorRef]
context.become(watching(m))
context.become(watching(Set()))
override def receive: Receive = ???
def watching(m: mutable.MultiMap[BinaryData, ActorRef]): Receive = {
case Watch(txId) =>
def watching(watches: Set[Watch]): Receive = {
case w@Watch(channel, txId, typ, minDepth) =>
log.info(s"watching tx $txId for $sender")
// instant confirmation for testing
(0 until 3) foreach(i => self ! TxConfirmed(txId, "5deedc4c7f4c8e3250a486f340e57a565cda908eef7b7df2c1cd61b8ad6b42e6", i))
context.become(watching(m.addBinding(txId, sender)))
// TODO : for testing
import scala.concurrent.ExecutionContext.Implicits.global
(0 until 3) foreach(i => context.system.scheduler.scheduleOnce(i * 100 milliseconds, self, TxConfirmed(txId, "5deedc4c7f4c8e3250a486f340e57a565cda908eef7b7df2c1cd61b8ad6b42e6", i)))
context.become(watching(watches + w))
case Unwatch(txId) =>
context.become(watching(m.removeBinding(txId, sender)))
case TxConfirmed(txId, blockId, confirmations) if m.contains(txId) =>
m(txId).foreach(_ ! BITCOIN_TX_CONFIRMED(blockId, confirmations))
case TxConfirmed(txId, blockId, confirmations) =>
log.info(s"got $confirmations confirmation(s) for tx $txId")
watches.filter(_.txId == txId).foreach(w => w match {
case Watch(channel, _, Anchor, minDepth) if confirmations >= minDepth =>
channel ! BITCOIN_ANCHOR_DEPTHOK
context.become(watching(watches - w))
case Watch(channel, _, Final, minDepth) if confirmations >= minDepth =>
channel ! BITCOIN_CLOSE_DONE
context.become(watching(watches - w))
case _ => {}
})
case Publish(tx) =>
log.info(s"publishing tx $tx")

View file

@ -52,10 +52,9 @@ object Boot extends App {
alice ! CMD_CLOSE(0)
while (Await.result(alice ? CMD_GETSTATE, 5 seconds) != CLOSE_WAIT_CLOSE) Thread.sleep(200)
while (Await.result(bob ? CMD_GETSTATE, 5 seconds) != CLOSE_WAIT_CLOSE) Thread.sleep(200)
while (Await.result(alice ? CMD_GETSTATE, 5 seconds) != CLOSED) Thread.sleep(200)
while (Await.result(bob ? CMD_GETSTATE, 5 seconds) != CLOSED) Thread.sleep(200)
alice ! BITCOIN_CLOSE_DONE
bob ! BITCOIN_CLOSE_DONE
system.shutdown()
}

View file

@ -110,13 +110,13 @@ sealed trait LowPriority extends State
case object INPUT_NONE
sealed trait BlockchainEvent
final case class BITCOIN_TX_CONFIRMED(blockId: sha256_hash, confirmations: Int) extends BlockchainEvent
case object BITCOIN_CLOSE_DONE
case object BITCOIN_ANCHOR_DEPTHOK
case object BITCOIN_ANCHOR_UNSPENT
case object BITCOIN_ANCHOR_TIMEOUT
case object BITCOIN_ANCHOR_THEIRSPEND
case object BITCOIN_ANCHOR_OURCOMMIT_DELAYPASSED
case object BITCOIN_ANCHOR_OTHERSPEND
case object BITCOIN_CLOSE_DONE
sealed trait Command
final case class CMD_SEND_HTLC_UPDATE(amount: Int, rHash: sha256_hash, expiry: locktime) extends Command
@ -146,6 +146,7 @@ final case class DATA_WAIT_FOR_UPDATE_ACCEPT(ourParams: ChannelParams, theirPara
final case class DATA_WAIT_FOR_HTLC_ACCEPT(ourParams: ChannelParams, theirParams: ChannelParams, previousCommitmentTxSigned: CommitmentTx, updateProposal: UpdateProposal) extends Data
final case class DATA_WAIT_FOR_UPDATE_SIG(ourParams: ChannelParams, theirParams: ChannelParams, previousCommitmentTxSigned: CommitmentTx, newCommitmentTxUnsigned: CommitmentTx) extends Data
final case class DATA_WAIT_FOR_UPDATE_COMPLETE(ourParams: ChannelParams, theirParams: ChannelParams, previousCommitmentTxSigned: CommitmentTx, newCommitmentTxUnsigned: CommitmentTx) extends Data
final case class DATA_WAIT_FOR_CLOSE_ACK(finalTx: Transaction) extends Data
// @formatter:on
@ -237,7 +238,7 @@ class Node(val blockchain: ActorRef, val commitPrivKey: BinaryData, val finalPri
val theirCommitTx = makeCommitTx(theirParams.finalKey, ourParams.finalKey, ourParams.delay, anchorTxid, anchorOutputIndex, theirRevocationHash, state.reverse)
val ourSigForThem = bin2signature(Transaction.signInput(theirCommitTx, 0, multiSig2of2(ourParams.commitKey, theirParams.commitKey), SIGHASH_ALL, commitPrivKey))
them ! open_commit_sig(ourSigForThem)
blockchain ! Watch(anchorTxid)
blockchain ! Watch(self, anchorTxid, Anchor, ourParams.minDepth)
goto(OPEN_WAITING_THEIRANCHOR) using DATA_OPEN_WAITING(ourParams, theirParams, CommitmentTx(signedCommitTx, state, ourRevocationHashPreimage, theirRevocationHash))
case Event(CMD_CLOSE(_), _) => goto(CLOSED)
@ -251,7 +252,7 @@ class Node(val blockchain: ActorRef, val commitPrivKey: BinaryData, val finalPri
val ok = Try(Transaction.correctlySpends(signedCommitTx, Map(OutPoint(anchorTx.hash, anchorOutputIndex) -> anchorPubkeyScript(ourParams.commitKey, theirParams.commitKey)), ScriptFlags.STANDARD_SCRIPT_VERIFY_FLAGS)).isSuccess
// TODO : return Error and close channel if !ok
if (!ok) log.error(s"invalid sig")
blockchain ! Watch(anchorTx.hash)
blockchain ! Watch(self, anchorTx.hash, Anchor, ourParams.minDepth)
blockchain ! Publish(anchorTx)
goto(OPEN_WAITING_OURANCHOR) using DATA_OPEN_WAITING(ourParams, theirParams, newCommitTx.copy(tx = signedCommitTx))
@ -259,15 +260,10 @@ class Node(val blockchain: ActorRef, val commitPrivKey: BinaryData, val finalPri
}
when(OPEN_WAITING_THEIRANCHOR) {
case Event(BITCOIN_TX_CONFIRMED(blockId, confirmations), DATA_OPEN_WAITING(ourParams, _, _)) if confirmations < ourParams.minDepth =>
log.info(s"got $confirmations confirmation(s) for anchor tx")
stay
case Event(BITCOIN_TX_CONFIRMED(blockId, confirmations), d@DATA_OPEN_WAITING(ourParams, _, _)) if confirmations >= ourParams.minDepth =>
log.info(s"got $confirmations confirmation(s) for anchor tx, minDepth reached")
them ! open_complete(Some(blockId))
case Event(BITCOIN_ANCHOR_DEPTHOK, DATA_OPEN_WAITING(ourParams, theirParams, commitmentTx)) =>
them ! open_complete(None)
unstashAll()
goto(OPEN_WAIT_FOR_COMPLETE_THEIRANCHOR) using DATA_NORMAL(d.ourParams, d.theirParams, d.commitmentTx)
goto(OPEN_WAIT_FOR_COMPLETE_THEIRANCHOR) using DATA_NORMAL(ourParams, theirParams, commitmentTx)
case Event(msg@open_complete(blockId_opt), d@DATA_OPEN_WAITING(ourParams, _, _)) =>
log.info(s"received their open_complete, deferring message")
@ -298,15 +294,10 @@ class Node(val blockchain: ActorRef, val commitPrivKey: BinaryData, val finalPri
}
when(OPEN_WAITING_OURANCHOR) {
case Event(BITCOIN_TX_CONFIRMED(blockId, confirmations), DATA_OPEN_WAITING(ourParams, _, _)) if confirmations < ourParams.minDepth =>
log.info(s"got $confirmations confirmation(s) for anchor tx")
stay
case Event(BITCOIN_TX_CONFIRMED(blockId, confirmations), d@DATA_OPEN_WAITING(ourParams, _, _)) if confirmations >= ourParams.minDepth =>
log.info(s"got $confirmations confirmation(s) for anchor tx, minDepth reached")
them ! open_complete(Some(blockId))
case Event(BITCOIN_ANCHOR_DEPTHOK, DATA_OPEN_WAITING(ourParams, theirParams, commitmentTx)) =>
them ! open_complete(None)
unstashAll()
goto(OPEN_WAIT_FOR_COMPLETE_OURANCHOR) using DATA_NORMAL(d.ourParams, d.theirParams, d.commitmentTx)
goto(OPEN_WAIT_FOR_COMPLETE_OURANCHOR) using DATA_NORMAL(ourParams, theirParams, commitmentTx)
case Event(msg@open_complete(blockId_opt), d@DATA_OPEN_WAITING(ourParams, _, _)) =>
log.info(s"received their open_complete, deferring message")
@ -594,27 +585,19 @@ class Node(val blockchain: ActorRef, val commitPrivKey: BinaryData, val finalPri
when(WAIT_FOR_UPDATE_COMPLETE_LOWPRIO)(WAIT_FOR_UPDATE_COMPLETE_handler)
when(WAIT_FOR_CLOSE_COMPLETE) {
case Event(close_channel(theirSig, closeFee), DATA_NORMAL(ourParams, theirParams, CommitmentTx(commitmentTx, state, _, _))) =>
//the only difference between their final tx and ours is the order of the outputs, because state is symmetric
val theirFinalTx = makeFinalTx(commitmentTx.txIn, ourParams.finalKey, theirParams.finalKey, state.reverse)
val ourSigForThem = bin2signature(Transaction.signInput(theirFinalTx, 0, multiSig2of2(ourParams.commitKey, theirParams.commitKey), SIGHASH_ALL, commitPrivKey))
val ourFinalTx = makeFinalTx(commitmentTx.txIn, ourParams.finalKey, theirParams.finalKey, state)
val ourSig = bin2signature(Transaction.signInput(ourFinalTx, 0, multiSig2of2(ourParams.commitKey, theirParams.commitKey), SIGHASH_ALL, commitPrivKey))
val signedFinaltx = ourFinalTx.updateSigScript(0, sigScript2of2(theirSig, ourSig, theirParams.commitKey, ourParams.commitKey))
log.debug(s"final tx : ${Hex.toHexString(Transaction.write(signedFinaltx))}")
// ok now we can broadcast the final tx if we want
them ! close_channel_ack()
goto(CLOSE_WAIT_CLOSE)
case Event(close_channel_complete(theirSig), DATA_NORMAL(ourParams, theirParams, CommitmentTx(commitmentTx, state, _, _))) =>
val finalTx = makeFinalTx(commitmentTx.txIn, ourParams.finalKey, theirParams.finalKey, state)
val ourSig = Transaction.signInput(finalTx, 0, multiSig2of2(ourParams.commitKey, theirParams.commitKey), SIGHASH_ALL, commitPrivKey)
val signedFinaltx = finalTx.updateSigScript(0, sigScript2of2(theirSig, ourSig, theirParams.commitKey, ourParams.commitKey))
log.debug(s"final tx : ${Hex.toHexString(Transaction.write(signedFinaltx))}")
// ok now we can broadcast the final tx if we want
them ! close_channel_ack()
blockchain ! Watch(self, signedFinaltx.hash, Final, 1)
blockchain ! Publish(signedFinaltx)
goto(CLOSE_WAIT_CLOSE)
case Event(BITCOIN_CLOSE_DONE, _) => goto(CLOSED)
case Event(BITCOIN_ANCHOR_THEIRSPEND, _) =>
them ! handle_btc_anchor_theirspend()
goto(CLOSE_WAIT_SPENDTHEM_CLOSE)
@ -630,6 +613,8 @@ class Node(val blockchain: ActorRef, val commitPrivKey: BinaryData, val finalPri
case Event(close_channel_ack(), _) =>
goto(CLOSE_WAIT_CLOSE)
case Event(BITCOIN_CLOSE_DONE, _) => goto(CLOSED)
case Event(pkt: error, _) => goto(CLOSE_WAIT_CLOSE)
case Event(BITCOIN_ANCHOR_THEIRSPEND, _) =>
@ -692,6 +677,7 @@ class Node(val blockchain: ActorRef, val commitPrivKey: BinaryData, val finalPri
}
when(CLOSE_WAIT_CLOSE_OURCOMMIT) {
case Event(BITCOIN_ANCHOR_UNSPENT, _) => goto(ERROR_ANCHOR_LOST)
case Event(BITCOIN_ANCHOR_THEIRSPEND, _) =>
@ -705,6 +691,8 @@ class Node(val blockchain: ActorRef, val commitPrivKey: BinaryData, val finalPri
case Event(BITCOIN_ANCHOR_OTHERSPEND, _) =>
handle_btc_anchor_otherspend()
goto(CLOSE_WAIT_STEAL_CLOSE_OURCOMMIT)
case Event(BITCOIN_CLOSE_DONE, _) => goto(CLOSED)
}
when(CLOSE_WAIT_CLOSE_SPENDOURS) {
@ -904,15 +892,10 @@ class Node(val blockchain: ActorRef, val commitPrivKey: BinaryData, val finalPri
}
when(CLOSED) {
case _ => stay
case null => stay
}
whenUnhandled {
case Event(e@BITCOIN_TX_CONFIRMED(_, confirmations), _) =>
log.debug(s"dropped $e")
// drops silently, we don't care for confirmations above minDepth
stay
case Event(CMD_GETSTATE, _) =>
sender ! stateName
stay
@ -948,7 +931,8 @@ class Node(val blockchain: ActorRef, val commitPrivKey: BinaryData, val finalPri
val ourSig = Transaction.signInput(ourFinalTx, 0, multiSig2of2(ourParams.commitKey, theirParams.commitKey), SIGHASH_ALL, commitPrivKey)
val signedFinaltx = ourFinalTx.updateSigScript(0, sigScript2of2(pkt.sig, ourSig, theirParams.commitKey, ourParams.commitKey))
log.debug(s"*** final tx : ${Hex.toHexString(Transaction.write(signedFinaltx))}")
// ok now we can broadcast the final tx if we want
blockchain ! Watch(self, signedFinaltx.hash, Final, 1)
blockchain ! Publish(signedFinaltx)
close_channel_complete(ourSigForThem)
}