diff --git a/eclair-demo/src/main/scala/fr/acinq/eclair/Boot.scala b/eclair-demo/src/main/scala/fr/acinq/eclair/Boot.scala index fffcf0bba..1e41015e8 100644 --- a/eclair-demo/src/main/scala/fr/acinq/eclair/Boot.scala +++ b/eclair-demo/src/main/scala/fr/acinq/eclair/Boot.scala @@ -7,7 +7,7 @@ import akka.io.IO import akka.util.Timeout import com.typesafe.config.ConfigFactory import fr.acinq.eclair.api.ServiceActor -import fr.acinq.eclair.blockchain.PollingWatcher +import fr.acinq.eclair.blockchain.{ExtendedBitcoinClient, PollingWatcher} import fr.acinq.eclair.channel.Register import fr.acinq.eclair.io.{Client, Server} import grizzled.slf4j.Logging @@ -34,7 +34,7 @@ object Boot extends App with Logging { val chain = Await.result(bitcoin_client.invoke("getblockchaininfo").map(json => (json \ "chain").extract[String]), 10 seconds) assert(chain == "testnet" || chain == "regtest" || chain == "segnet4", "you should be on testnet or regtest or segnet4") - val blockchain = system.actorOf(Props(new PollingWatcher(bitcoin_client)), name = "blockchain") + val blockchain = system.actorOf(Props(new PollingWatcher(new ExtendedBitcoinClient(bitcoin_client))), name = "blockchain") val register = system.actorOf(Props[Register], name = "register") val server = system.actorOf(Server.props(config.getString("eclair.server.address"), config.getInt("eclair.server.port")), "server") diff --git a/eclair-demo/src/main/scala/fr/acinq/eclair/blockchain/ExtendedBitcoinClient.scala b/eclair-demo/src/main/scala/fr/acinq/eclair/blockchain/ExtendedBitcoinClient.scala new file mode 100644 index 000000000..bf7d56211 --- /dev/null +++ b/eclair-demo/src/main/scala/fr/acinq/eclair/blockchain/ExtendedBitcoinClient.scala @@ -0,0 +1,115 @@ +package fr.acinq.eclair.blockchain + +import fr.acinq.bitcoin.{BinaryData, BitcoinJsonRPCClient, JsonRPCError, Transaction, TxIn, TxOut} +import fr.acinq.eclair.channel +import fr.acinq.eclair.channel.Scripts +import org.bouncycastle.util.encoders.Hex +import org.json4s.JsonAST._ + +import scala.concurrent.duration._ +import scala.concurrent.{Await, ExecutionContext, Future} + +/** + * Created by PM on 26/04/2016. + */ +class ExtendedBitcoinClient(client: BitcoinJsonRPCClient) { + + implicit val formats = org.json4s.DefaultFormats + + def getTxConfirmations(txId: String)(implicit ec: ExecutionContext): Future[Option[Int]] = + client.invoke("getrawtransaction", txId, 1) // we choose verbose output to get the number of confirmations + .map(json => Some((json \ "confirmations").extract[Int])) + .recover { + case t: JsonRPCError if t.code == -5 => None + } + + def isUnspent(txId: String, outputIndex: Int)(implicit ec: ExecutionContext): Future[Boolean] = + client.invoke("gettxout", txId, outputIndex, true) // mempool=true so that we are warned as soon as possible + .map(json => json != JNull) + + /** + * tell bitcoind to sent bitcoins from a specific local account + * + * @param account name of the local account to send bitcoins from + * @param destination destination address + * @param amount amount in BTC (not milliBTC, not Satoshis !!) + * @param ec execution context + * @return a Future[txid] where txid (a String) is the is of the tx that sends the bitcoins + */ + def sendFromAccount(account: String, destination: String, amount: Double)(implicit ec: ExecutionContext): Future[String] = + client.invoke("sendfrom", account, destination, amount).map { + case JString(txid) => txid + } + + def getTransaction(txId: String)(implicit ec: ExecutionContext): Future[Transaction] = + client.invoke("getrawtransaction", txId) map { + case JString(raw) => Transaction.read(raw) + } + + case class FundTransactionResponse(tx: Transaction, changepos: Int, fee: Double) + + def fundTransaction(hex: String)(implicit ec: ExecutionContext): Future[FundTransactionResponse] = { + client.invoke("fundrawtransaction", hex.take(4) + "0000" + hex.drop(4)).map(json => { + val JString(hex) = json \ "hex" + val JInt(changepos) = json \ "changepos" + val JDouble(fee) = json \ "fee" + FundTransactionResponse(Transaction.read(hex), changepos.intValue(), fee) + }) + } + + def fundTransaction(tx: Transaction)(implicit ec: ExecutionContext): Future[FundTransactionResponse] = + fundTransaction(Hex.toHexString(Transaction.write(tx))) + + case class SignTransactionResponse(tx: Transaction, complete: Boolean) + + def signTransaction(hex: String)(implicit ec: ExecutionContext): Future[SignTransactionResponse] = + client.invoke("signrawtransaction", hex).map(json => { + val JString(hex) = json \ "hex" + val JBool(complete) = json \ "complete" + SignTransactionResponse(Transaction.read(hex), complete) + }) + + def signTransaction(tx: Transaction)(implicit ec: ExecutionContext): Future[SignTransactionResponse] = + signTransaction(Hex.toHexString(Transaction.write(tx))) + + def publishTransaction(hex: String)(implicit ec: ExecutionContext): Future[String] = + client.invoke("sendrawtransaction", hex).map { + case JString(txid) => txid + } + + def publishTransaction(tx: Transaction)(implicit ec: ExecutionContext): Future[String] = + publishTransaction(Hex.toHexString(Transaction.write(tx))) + + // TODO : this is very dirty + // we only check the memory pool and the last block, and throw an error if tx was not found + def findSpendingTransaction(txid: String, outputIndex: Int)(implicit ec: ExecutionContext): Future[Transaction] = { + for { + mempool <- client.invoke("getrawmempool").map(_.extract[List[String]]) + bestblockhash <- client.invoke("getbestblockhash").map(_.extract[String]) + bestblock <- client.invoke("getblock", bestblockhash).map(b => (b \ "tx").extract[List[String]]) + txs <- Future { + for(txid <- mempool ++ bestblock) yield { + Await.result(client.invoke("getrawtransaction", txid).map(json => { + Transaction.read(json.extract[String]) + }).recover { + case t: Throwable => Transaction(0, Seq(), Seq(), 0) + }, 20 seconds) + } + } + tx = txs.find(tx => tx.txIn.exists(input => input.outPoint.txid == txid && input.outPoint.index == outputIndex)).getOrElse(throw new RuntimeException("tx not found!")) + } yield tx + } + + def makeAnchorTx(ourCommitPub: BinaryData, theirCommitPub: BinaryData, amount: Long)(implicit ec: ExecutionContext): Future[(Transaction, Int)] = { + val anchorOutputScript = channel.Scripts.anchorPubkeyScript(ourCommitPub, theirCommitPub) + val tx = Transaction(version = 1, txIn = Seq.empty[TxIn], txOut = TxOut(amount, anchorOutputScript) :: Nil, lockTime = 0) + val future = for { + FundTransactionResponse(tx1, changepos, fee) <- fundTransaction(tx) + SignTransactionResponse(anchorTx, true) <- signTransaction(tx1) + Some(pos) = Scripts.findPublicKeyScriptIndex(anchorTx, anchorOutputScript) + } yield (anchorTx, pos) + + future + } + +} diff --git a/eclair-demo/src/main/scala/fr/acinq/eclair/blockchain/PollingWatcher.scala b/eclair-demo/src/main/scala/fr/acinq/eclair/blockchain/PollingWatcher.scala index 23666b42e..de2d7cebb 100644 --- a/eclair-demo/src/main/scala/fr/acinq/eclair/blockchain/PollingWatcher.scala +++ b/eclair-demo/src/main/scala/fr/acinq/eclair/blockchain/PollingWatcher.scala @@ -4,11 +4,8 @@ package fr.acinq.eclair.blockchain import akka.actor.{Actor, ActorLogging, Cancellable} import akka.pattern.pipe import fr.acinq.bitcoin._ -import fr.acinq.eclair.{Globals, channel} -import fr.acinq.eclair.channel.{BITCOIN_ANCHOR_SPENT, Scripts} -import grizzled.slf4j.Logging +import fr.acinq.eclair.channel.{BITCOIN_ANCHOR_SPENT} import org.bouncycastle.util.encoders.Hex -import org.json4s.JsonAST._ import scala.concurrent.{Await, ExecutionContext, Future, Promise} import scala.concurrent.duration._ @@ -18,9 +15,7 @@ import scala.concurrent.duration._ * /!\ Obviously not scalable /!\ * Created by PM on 28/08/2015. */ -class PollingWatcher(client: BitcoinJsonRPCClient)(implicit ec: ExecutionContext = ExecutionContext.global) extends Actor with ActorLogging { - - import PollingWatcher._ +class PollingWatcher(client: ExtendedBitcoinClient)(implicit ec: ExecutionContext = ExecutionContext.global) extends Actor with ActorLogging { context.become(watching(Map())) @@ -34,7 +29,7 @@ class PollingWatcher(client: BitcoinJsonRPCClient)(implicit ec: ExecutionContext log.info(s"adding watch $w for $sender") val cancellable = context.system.scheduler.schedule(2 seconds, 10 seconds)(w match { case w@WatchConfirmed(channel, txId, minDepth, event) => - getTxConfirmations(client, txId.toString).map(_ match { + client.getTxConfirmations(txId.toString).map(_ match { case Some(confirmations) if confirmations >= minDepth => channel ! event self !('remove, w) @@ -42,12 +37,12 @@ class PollingWatcher(client: BitcoinJsonRPCClient)(implicit ec: ExecutionContext }) case w@WatchSpent(channel, txId, outputIndex, minDepth, event) => for { - conf <- getTxConfirmations(client, txId.toString) - unspent <- isUnspent(client, txId.toString, outputIndex) + conf <- client.getTxConfirmations(txId.toString) + unspent <- client.isUnspent(txId.toString, outputIndex) } yield { if (conf.isDefined && !unspent) { // NOTE : isSpent=!isUnspent only works if the parent transaction actually exists (which we assume to be true) - findSpendingTransaction(client, txId.toString(), outputIndex).map(tx => channel ! (BITCOIN_ANCHOR_SPENT, tx)) + client.findSpendingTransaction(txId.toString(), outputIndex).map(tx => channel ! (BITCOIN_ANCHOR_SPENT, tx)) self !('remove, w) } else {} } @@ -60,121 +55,11 @@ class PollingWatcher(client: BitcoinJsonRPCClient)(implicit ec: ExecutionContext case Publish(tx) => log.info(s"publishing tx $tx") - PollingWatcher.publishTransaction(client, tx).onFailure { + client.publishTransaction(tx).onFailure { case t: Throwable => log.error(t, s"cannot publish tx ${Hex.toHexString(Transaction.write(tx))}") } case MakeAnchor(ourCommitPub, theirCommitPub, amount) => - PollingWatcher.makeAnchorTx(client, ourCommitPub, theirCommitPub, amount).pipeTo(sender) + client.makeAnchorTx(ourCommitPub, theirCommitPub, amount).pipeTo(sender) } } - -object PollingWatcher extends Logging { - - implicit val formats = org.json4s.DefaultFormats - - def getTxConfirmations(client: BitcoinJsonRPCClient, txId: String)(implicit ec: ExecutionContext): Future[Option[Int]] = - client.invoke("getrawtransaction", txId, 1) // we choose verbose output to get the number of confirmations - .map(json => Some((json \ "confirmations").extract[Int])) - .recover { - case t: JsonRPCError if t.code == -5 => None - } - - def isUnspent(client: BitcoinJsonRPCClient, txId: String, outputIndex: Int)(implicit ec: ExecutionContext): Future[Boolean] = - client.invoke("gettxout", txId, outputIndex, true) // mempool=true so that we are warned as soon as possible - .map(json => json != JNull) - - /** - * tell bitcoind to sent bitcoins from a specific local account - * - * @param client bitcoind client - * @param account name of the local account to send bitcoins from - * @param destination destination address - * @param amount amount in BTC (not milliBTC, not Satoshis !!) - * @param ec execution context - * @return a Future[txid] where txid (a String) is the is of the tx that sends the bitcoins - */ - def sendFromAccount(client: BitcoinJsonRPCClient, account: String, destination: String, amount: Double)(implicit ec: ExecutionContext): Future[String] = - client.invoke("sendfrom", account, destination, amount).map { - case JString(txid) => txid - } - - def getTransaction(client: BitcoinJsonRPCClient, txId: String)(implicit ec: ExecutionContext): Future[Transaction] = - client.invoke("getrawtransaction", txId) map { - case JString(raw) => Transaction.read(raw) - } - - case class FundTransactionResponse(tx: Transaction, changepos: Int, fee: Double) - - def fundTransaction(client: BitcoinJsonRPCClient, hex: String)(implicit ec: ExecutionContext): Future[FundTransactionResponse] = { - client.invoke("fundrawtransaction", hex.take(4) + "0000" + hex.drop(4)).map(json => { - val JString(hex) = json \ "hex" - val JInt(changepos) = json \ "changepos" - val JDouble(fee) = json \ "fee" - FundTransactionResponse(Transaction.read(hex), changepos.intValue(), fee) - }) - } - - def fundTransaction(client: BitcoinJsonRPCClient, tx: Transaction)(implicit ec: ExecutionContext): Future[FundTransactionResponse] = - fundTransaction(client, Hex.toHexString(Transaction.write(tx))) - - case class SignTransactionResponse(tx: Transaction, complete: Boolean) - - def signTransaction(client: BitcoinJsonRPCClient, hex: String)(implicit ec: ExecutionContext): Future[SignTransactionResponse] = - client.invoke("signrawtransaction", hex).map(json => { - val JString(hex) = json \ "hex" - val JBool(complete) = json \ "complete" - SignTransactionResponse(Transaction.read(hex), complete) - }) - - def signTransaction(client: BitcoinJsonRPCClient, tx: Transaction)(implicit ec: ExecutionContext): Future[SignTransactionResponse] = - signTransaction(client, Hex.toHexString(Transaction.write(tx))) - - def publishTransaction(client: BitcoinJsonRPCClient, hex: String)(implicit ec: ExecutionContext): Future[String] = - client.invoke("sendrawtransaction", hex).map { - case JString(txid) => txid - } - - def publishTransaction(client: BitcoinJsonRPCClient, tx: Transaction)(implicit ec: ExecutionContext): Future[String] = - publishTransaction(client, Hex.toHexString(Transaction.write(tx))) - - // TODO : this is very dirty - // we only check the memory pool and the last block, and throw an error if tx was not found - def findSpendingTransaction(client: BitcoinJsonRPCClient, txid: String, outputIndex: Int)(implicit ec: ExecutionContext): Future[Transaction] = { - for { - mempool <- client.invoke("getrawmempool").map(_.extract[List[String]]) - bestblockhash <- client.invoke("getbestblockhash").map(_.extract[String]) - bestblock <- client.invoke("getblock", bestblockhash).map(b => (b \ "tx").extract[List[String]]) - txs <- Future { - for(txid <- mempool ++ bestblock) yield { - Await.result(client.invoke("getrawtransaction", txid).map(json => { - Transaction.read(json.extract[String]) - }).recover { - case t: Throwable => Transaction(0, Seq(), Seq(), 0) - }, 20 seconds) - } - } - tx = txs.find(tx => tx.txIn.exists(input => input.outPoint.txid == txid && input.outPoint.index == outputIndex)).getOrElse(throw new RuntimeException("tx not found!")) - } yield tx - } - - def makeAnchorTx(bitcoind: BitcoinJsonRPCClient, ourCommitPub: BinaryData, theirCommitPub: BinaryData, amount: Long)(implicit ec: ExecutionContext): Future[(Transaction, Int)] = { - val anchorOutputScript = channel.Scripts.anchorPubkeyScript(ourCommitPub, theirCommitPub) - val tx = Transaction(version = 1, txIn = Seq.empty[TxIn], txOut = TxOut(amount, anchorOutputScript) :: Nil, lockTime = 0) - val future = for { - FundTransactionResponse(tx1, changepos, fee) <- PollingWatcher.fundTransaction(bitcoind, tx) - SignTransactionResponse(anchorTx, true) <- PollingWatcher.signTransaction(bitcoind, tx1) - Some(pos) = Scripts.findPublicKeyScriptIndex(anchorTx, anchorOutputScript) - } yield (anchorTx, pos) - - future - } -} - -object MyTest extends App { - import ExecutionContext.Implicits.global - implicit val formats = org.json4s.DefaultFormats - - val client = new BitcoinJsonRPCClient("foo", "bar", port = 18332) - println(Await.result(PollingWatcher.makeAnchorTx(client, Globals.Node.publicKey, Globals.Node.publicKey, 1000000), 10 seconds)) -} \ No newline at end of file diff --git a/eclair-demo/src/main/scala/fr/acinq/eclair/channel/Channel.scala b/eclair-demo/src/main/scala/fr/acinq/eclair/channel/Channel.scala index f1980de71..2ef953df9 100644 --- a/eclair-demo/src/main/scala/fr/acinq/eclair/channel/Channel.scala +++ b/eclair-demo/src/main/scala/fr/acinq/eclair/channel/Channel.scala @@ -1,6 +1,6 @@ package fr.acinq.eclair.channel -import akka.actor.{ActorRef, LoggingFSM, Props, Stash} +import akka.actor.{ActorRef, LoggingFSM, Props} import com.google.protobuf.ByteString import fr.acinq.bitcoin._ import fr.acinq.eclair._ @@ -148,7 +148,7 @@ final case class DATA_OPEN_WAIT_FOR_OPEN (ack_in: Long, ack_out: Lo final case class DATA_OPEN_WITH_ANCHOR_WAIT_FOR_ANCHOR(ack_in: Long, ack_out: Long, ourParams: OurChannelParams, theirParams: TheirChannelParams, theirRevocationHash: BinaryData, theirNextRevocationHash: sha256_hash) extends Data final case class DATA_OPEN_WAIT_FOR_ANCHOR (ack_in: Long, ack_out: Long, ourParams: OurChannelParams, theirParams: TheirChannelParams, theirRevocationHash: sha256_hash, theirNextRevocationHash: sha256_hash) extends Data final case class DATA_OPEN_WAIT_FOR_COMMIT_SIG (ack_in: Long, ack_out: Long, ourParams: OurChannelParams, theirParams: TheirChannelParams, anchorTx: Transaction, anchorOutputIndex: Int, newCommitmentUnsigned: Commitment, theirNextRevocationHash: sha256_hash) extends Data -final case class DATA_OPEN_WAITING (ack_in: Long, ack_out: Long, ourParams: OurChannelParams, theirParams: TheirChannelParams, shaChain: ShaChain, commitment: Commitment, theirNextRevocationHash: sha256_hash) extends Data with CurrentCommitment +final case class DATA_OPEN_WAITING (ack_in: Long, ack_out: Long, ourParams: OurChannelParams, theirParams: TheirChannelParams, shaChain: ShaChain, commitment: Commitment, theirNextRevocationHash: sha256_hash, deferred: Option[open_complete]) extends Data with CurrentCommitment final case class DATA_NORMAL (ack_in: Long, ack_out: Long, ourParams: OurChannelParams, theirParams: TheirChannelParams, shaChain: ShaChain, htlcIdx: Long, staged: List[Change], commitment: Commitment, next: NextCommitment) extends Data with CurrentCommitment final case class DATA_WAIT_FOR_CLOSE_ACK (ack_in: Long, ack_out: Long, ourParams: OurChannelParams, theirParams: TheirChannelParams, shaChain: ShaChain, commitment: Commitment, mutualCloseTx: Transaction) extends Data with CurrentCommitment final case class DATA_CLOSING (ack_in: Long, ack_out: Long, ourParams: OurChannelParams, theirParams: TheirChannelParams, shaChain: ShaChain, commitment: Commitment, @@ -219,7 +219,7 @@ object Channel { } -class Channel(val them: ActorRef, val blockchain: ActorRef, val params: OurChannelParams, theirNodeId: String) extends LoggingFSM[State, Data] with Stash { +class Channel(val them: ActorRef, val blockchain: ActorRef, val params: OurChannelParams, theirNodeId: String) extends LoggingFSM[State, Data] { import Channel._ import context.dispatcher @@ -298,7 +298,7 @@ class Channel(val them: ActorRef, val blockchain: ActorRef, val params: OurChann them ! open_commit_sig(ourSigForThem) blockchain ! WatchConfirmed(self, anchorTxid, ourParams.minDepth, BITCOIN_ANCHOR_DEPTHOK) blockchain ! WatchSpent(self, anchorTxid, anchorOutputIndex, 0, BITCOIN_ANCHOR_SPENT) - goto(OPEN_WAITING_THEIRANCHOR) using DATA_OPEN_WAITING(ack_in + 1, ack_out + 1, ourParams, theirParams, ShaChain.init, Commitment(0, signedCommitTx, state, theirRevocationHash), theirNextRevocationHash) + goto(OPEN_WAITING_THEIRANCHOR) using DATA_OPEN_WAITING(ack_in + 1, ack_out + 1, ourParams, theirParams, ShaChain.init, Commitment(0, signedCommitTx, state, theirRevocationHash), theirNextRevocationHash, None) } case Event(CMD_CLOSE(_), _) => goto(CLOSED) @@ -316,25 +316,24 @@ class Channel(val them: ActorRef, val blockchain: ActorRef, val params: OurChann blockchain ! WatchConfirmed(self, anchorTx.txid, ourParams.minDepth, BITCOIN_ANCHOR_DEPTHOK) blockchain ! WatchSpent(self, anchorTx.txid, anchorOutputIndex, 0, BITCOIN_ANCHOR_SPENT) blockchain ! Publish(anchorTx) - goto(OPEN_WAITING_OURANCHOR) using DATA_OPEN_WAITING(ack_in + 1, ack_out, ourParams, theirParams, ShaChain.init, commitment.copy(tx = signedCommitTx), theirNextRevocationHash) + goto(OPEN_WAITING_OURANCHOR) using DATA_OPEN_WAITING(ack_in + 1, ack_out, ourParams, theirParams, ShaChain.init, commitment.copy(tx = signedCommitTx), theirNextRevocationHash, None) } case Event(CMD_CLOSE(_), _) => goto(CLOSED) } when(OPEN_WAITING_THEIRANCHOR) { - case Event(BITCOIN_ANCHOR_DEPTHOK, DATA_OPEN_WAITING(ack_in, ack_out, ourParams, theirParams, shaChain, commitment, theirNextRevocationHash)) => + case Event(BITCOIN_ANCHOR_DEPTHOK, DATA_OPEN_WAITING(ack_in, ack_out, ourParams, theirParams, shaChain, commitment, theirNextRevocationHash, deferred)) => val anchorTxId = commitment.tx.txIn(0).outPoint.txid // commit tx only has 1 input, which is the anchor blockchain ! WatchLost(self, anchorTxId, ourParams.minDepth, BITCOIN_ANCHOR_LOST) them ! open_complete(None) - unstashAll() + deferred.map(self ! _) //TODO htlcIdx should not be 0 when resuming connection goto(OPEN_WAIT_FOR_COMPLETE_THEIRANCHOR) using DATA_NORMAL(ack_in, ack_out + 1, ourParams, theirParams, shaChain, 0, Nil, commitment, ReadyForSig(theirNextRevocationHash)) case Event(msg@open_complete(blockId_opt), d: DATA_OPEN_WAITING) => log.info(s"received their open_complete, deferring message") - stash() - stay + stay using d.copy(deferred = Some(msg)) case Event(BITCOIN_ANCHOR_TIMEOUT, _) => them ! error(Some("Anchor timed out")) @@ -363,17 +362,16 @@ class Channel(val them: ActorRef, val blockchain: ActorRef, val params: OurChann } when(OPEN_WAITING_OURANCHOR) { - case Event(BITCOIN_ANCHOR_DEPTHOK, DATA_OPEN_WAITING(ack_in, ack_out, ourParams, theirParams, shaChain, commitment, theirNextRevocationHash)) => + case Event(BITCOIN_ANCHOR_DEPTHOK, DATA_OPEN_WAITING(ack_in, ack_out, ourParams, theirParams, shaChain, commitment, theirNextRevocationHash, deferred)) => val anchorTxId = commitment.tx.txIn(0).outPoint.txid // commit tx only has 1 input, which is the anchor blockchain ! WatchLost(self, anchorTxId, ourParams.minDepth, BITCOIN_ANCHOR_LOST) them ! open_complete(None) - unstashAll() + deferred.map(self ! _) goto(OPEN_WAIT_FOR_COMPLETE_OURANCHOR) using DATA_NORMAL(ack_in, ack_out + 1, ourParams, theirParams, shaChain, 0, Nil, commitment, ReadyForSig(theirNextRevocationHash)) case Event(msg@open_complete(blockId_opt), d: DATA_OPEN_WAITING) => log.info(s"received their open_complete, deferring message") - stash() - stay + stay using d.copy(deferred = Some(msg)) /*case Event(pkt: close_channel, d: CurrentCommitment) => val (finalTx, res) = handle_pkt_close(pkt, d.ourParams, d.theirParams, d.commitment) @@ -425,6 +423,7 @@ class Channel(val them: ActorRef, val blockchain: ActorRef, val params: OurChann } when(OPEN_WAIT_FOR_COMPLETE_OURANCHOR) { + case Event(open_complete(blockid_opt), d: DATA_NORMAL) => Register.create_alias(theirNodeId, d.commitment.anchorId) goto(NORMAL) using d.copy(ack_in = d.ack_in + 1) @@ -979,6 +978,7 @@ class Channel(val them: ActorRef, val blockchain: ActorRef, val params: OurChann } whenUnhandled { + case Event(BITCOIN_ANCHOR_LOST, _) => goto(ERR_ANCHOR_LOST) case Event(CMD_GETSTATE, _) => diff --git a/eclair-demo/src/test/resources/logback.xml b/eclair-demo/src/test/resources/logback-test.xml similarity index 100% rename from eclair-demo/src/test/resources/logback.xml rename to eclair-demo/src/test/resources/logback-test.xml diff --git a/eclair-demo/src/test/scala/fr/acinq/eclair/channel/ChannelSpec.scala b/eclair-demo/src/test/scala/fr/acinq/eclair/channel/ChannelSpec.scala deleted file mode 100644 index 66d3124dc..000000000 --- a/eclair-demo/src/test/scala/fr/acinq/eclair/channel/ChannelSpec.scala +++ /dev/null @@ -1,167 +0,0 @@ -package fr.acinq.eclair.channel - -import akka.actor.Actor.Receive -import akka.actor.{Actor, ActorRef, ActorSystem, Props, Stash} -import akka.actor.FSM.{CurrentState, SubscribeTransitionCallBack, Transition} -import akka.testkit.{ImplicitSender, TestKit, TestProbe} -import fr.acinq.bitcoin._ -import fr.acinq.eclair._ -import fr.acinq.eclair.blockchain._ -import lightning.{locktime, update_add_htlc} -import lightning.locktime.Locktime.Blocks -import org.junit.runner.RunWith -import org.scalatest.{FunSuite, FunSuiteLike} -import org.scalatest.junit.JUnitRunner - -import scala.concurrent.duration._ - -@RunWith(classOf[JUnitRunner]) -class ChannelSpec extends TestKit(ActorSystem("MySpec")) with ImplicitSender with FunSuiteLike { - - import ChannelSpec._ - - test("open channels with and without anchor") { - val testData = TestActors() - testData.open - TestActors.cleanup(testData) - } - - test("create and fulfill HTLCs") { - val testData = TestActors() - testData.open - import testData._ - val R: BinaryData = "0102030405060708010203040506070801020304050607080102030405060708" - val H = Crypto.sha256(R) - - alice ! CMD_ADD_HTLC(60000000, H, locktime(Blocks(4))) - - alice ! CMD_GETSTATEDATA - val DATA_NORMAL(_, _, _, _, _, _, List(Change(OUT, _, update_add_htlc(_, _, r1, _, _))), _, _) = receiveOne(5 seconds) - assert(r1 == bin2sha256(H)) - - bob ! CMD_GETSTATEDATA - val DATA_NORMAL(_, _, _, _, _, _, List(Change(IN, _, update_add_htlc(_, _, r2, _, _))), _, _) = receiveOne(5 seconds) - assert(r2 == bin2sha256(H)) - - bob ! CMD_SIGN -// val Transition(_, NORMAL, NORMAL_WAIT_FOR_REV) = waitForBobTransition - // FIXME: sigs are not valid -// val Transition(_, NORMAL, NORMAL_WAIT_FOR_REV_THEIRSIG) = waitForAliceTransition - //val Transition(_, NORMAL_WAIT_FOR_REV, NORMAL_WAIT_FOR_SIG) = waitForBobTransition - //val Transition(_, NORMAL_WAIT_FOR_SIG, NORMAL) = waitForBobTransition - //val Transition(_, NORMAL_WAIT_FOR_REV_THEIRSIG, NORMAL) = waitForAliceTransition - - TestActors.cleanup(testData) - } -} - -object ChannelSpec { - val anchorAmount = 100100000L - - // Alice is funder, Bob is not - - object Alice { - val (Base58.Prefix.SecretKeyTestnet, commitPrivKey) = Base58Check.decode("cQPmcNr6pwBQPyGfab3SksE9nTCtx9ism9T4dkS9dETNU2KKtJHk") - val (Base58.Prefix.SecretKeyTestnet, finalPrivKey) = Base58Check.decode("cUrAtLtV7GGddqdkhUxnbZVDWGJBTducpPoon3eKp9Vnr1zxs6BG") - val channelParams = OurChannelParams(locktime(Blocks(10)), commitPrivKey, finalPrivKey, 1, 100000, "alice-seed".getBytes(), Some(anchorAmount)) - } - - object Bob { - val (Base58.Prefix.SecretKeyTestnet, commitPrivKey) = Base58Check.decode("cSUwLtdZ2tht9ZmHhdQue48pfe7tY2GT2TGWJDtjoZgo6FHrubGk") - val (Base58.Prefix.SecretKeyTestnet, finalPrivKey) = Base58Check.decode("cPR7ZgXpUaDPA3GwGceMDS5pfnSm955yvks3yELf3wMJwegsdGTg") - val channelParams = OurChannelParams(locktime(Blocks(10)), commitPrivKey, finalPrivKey, 2, 100000, "bob-seed".getBytes(), None) - } - - case class TestActors(alice: ActorRef, monitorAlice: TestProbe, bob: ActorRef, monitorBob: TestProbe, blockchain: TestProbe, pipe: ActorRef) { - def waitForAliceTransition = monitorAlice.expectMsgClass(classOf[Transition[_]]) - - def waitForBobTransition = monitorBob.expectMsgClass(classOf[Transition[_]]) - - def open: Unit = { - val MakeAnchor(_, _, amount) = blockchain.expectMsgClass(classOf[MakeAnchor]) - val anchorTx = Transaction(version = 1, - txIn = Seq.empty[TxIn], - txOut = TxOut(amount, Scripts.anchorPubkeyScript(Alice.channelParams.commitPubKey, Bob.channelParams.commitPubKey)) :: Nil, - lockTime = 0 - ) - blockchain.reply((anchorTx, 0)) - blockchain.expectMsgClass(classOf[Publish]) - - val Transition(_, OPEN_WAIT_FOR_OPEN_WITHANCHOR, OPEN_WAIT_FOR_COMMIT_SIG) = waitForAliceTransition - val Transition(_, OPEN_WAIT_FOR_OPEN_NOANCHOR, OPEN_WAIT_FOR_ANCHOR) = waitForBobTransition - - val Transition(_, OPEN_WAIT_FOR_COMMIT_SIG, OPEN_WAITING_OURANCHOR) = waitForAliceTransition - val Transition(_, OPEN_WAIT_FOR_ANCHOR, OPEN_WAITING_THEIRANCHOR) = waitForBobTransition - - blockchain.send(alice, BITCOIN_ANCHOR_DEPTHOK) - - blockchain.send(bob, BITCOIN_ANCHOR_DEPTHOK) - - val Transition(_, OPEN_WAITING_OURANCHOR, OPEN_WAIT_FOR_COMPLETE_OURANCHOR) = waitForAliceTransition - val Transition(_, OPEN_WAITING_THEIRANCHOR, OPEN_WAIT_FOR_COMPLETE_THEIRANCHOR) = waitForBobTransition - - val Transition(_, OPEN_WAIT_FOR_COMPLETE_OURANCHOR, NORMAL) = waitForAliceTransition - val Transition(_, OPEN_WAIT_FOR_COMPLETE_THEIRANCHOR, NORMAL) = waitForBobTransition - } - } - - object TestActors { - def apply()(implicit system: ActorSystem): TestActors = { - val blockchain = TestProbe("blockchain") - blockchain.ignoreMsg { - case m: WatchConfirmed => true - case m: WatchSpent => true - case m: WatchLost => true - } - val pipe = system.actorOf(Props[ChannelSpec.Pipe]) - val alice = system.actorOf(Channel.props(pipe, blockchain.ref, Alice.channelParams), "Alice") - val bob = system.actorOf(Channel.props(pipe, blockchain.ref, Bob.channelParams), "Bob") - - val monitora = TestProbe() - val monitorb = TestProbe() - - alice ! SubscribeTransitionCallBack(monitora.ref) - val CurrentState(_, OPEN_WAIT_FOR_OPEN_WITHANCHOR) = monitora.expectMsgClass(classOf[CurrentState[_]]) - - bob ! SubscribeTransitionCallBack(monitorb.ref) - val CurrentState(_, OPEN_WAIT_FOR_OPEN_NOANCHOR) = monitorb.expectMsgClass(classOf[CurrentState[_]]) - - pipe ! alice - pipe ! bob - - new TestActors(alice, monitora, bob, monitorb, blockchain, pipe) - } - - def cleanup(data: TestActors)(implicit system: ActorSystem): Unit = { - system.stop(data.alice) - system.stop(data.bob) - system.stop(data.pipe) - Thread.sleep(100) - } - } - - - // handle a bi-directional path between 2 actors - // used to avoid the chicken-and-egg problem of: - // a = new Channel(b) - // b = new Channel(a) - class Pipe extends Actor with Stash { - - override def unhandled(message: Any): Unit = stash() - - def receive = { - case a: ActorRef => context become receive1(a) - } - - def receive1(a: ActorRef): Receive = { - case b: ActorRef => - unstashAll() - context become receive2(a, b) - } - - def receive2(a: ActorRef, b: ActorRef): Receive = { - case msg if sender() == a => b forward msg - case msg if sender() == b => a forward msg - } - } -} \ No newline at end of file diff --git a/eclair-demo/src/test/scala/fr/acinq/eclair/channel/NominalChannelSpec.scala b/eclair-demo/src/test/scala/fr/acinq/eclair/channel/NominalChannelSpec.scala new file mode 100644 index 000000000..d7f66bd04 --- /dev/null +++ b/eclair-demo/src/test/scala/fr/acinq/eclair/channel/NominalChannelSpec.scala @@ -0,0 +1,90 @@ +package fr.acinq.eclair.channel + +import akka.actor.ActorSystem +import akka.actor.FSM.{CurrentState, SubscribeTransitionCallBack, Transition} +import akka.testkit.{TestActorRef, TestFSMRef, TestKit, TestProbe} +import fr.acinq.bitcoin.{BinaryData, Crypto} +import fr.acinq.eclair._ +import fr.acinq.eclair.blockchain.PollingWatcher +import fr.acinq.eclair.channel.TestConstants.{Alice, Bob} +import lightning.{locktime, update_add_htlc} +import lightning.locktime.Locktime.Blocks +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuiteLike, Matchers} + +import scala.collection.generic.SeqFactory +import scala.concurrent.duration._ + +/** + * Created by PM on 26/04/2016. + */ +class NominalChannelSpec extends TestKit(ActorSystem("test")) with Matchers with FunSuiteLike with BeforeAndAfterAll { + + override def afterAll { + TestKit.shutdownActorSystem(system) + } + + test("open channel and reach normal state") { + val pipe = TestActorRef[Pipe] + val blockchainA = TestActorRef(new PollingWatcher(new TestBitcoinClient())) + val blockchainB = TestActorRef(new PollingWatcher(new TestBitcoinClient())) + val alice = TestFSMRef(new Channel(pipe, blockchainA, Alice.channelParams, "B")) + val bob = TestFSMRef(new Channel(pipe, blockchainB, Bob.channelParams, "A")) + + val monitorA = TestProbe() + alice ! SubscribeTransitionCallBack(monitorA.ref) + val CurrentState(_, OPEN_WAIT_FOR_OPEN_WITHANCHOR) = monitorA.expectMsgClass(classOf[CurrentState[_]]) + + val monitorB = TestProbe() + bob ! SubscribeTransitionCallBack(monitorB.ref) + val CurrentState(_, OPEN_WAIT_FOR_OPEN_NOANCHOR) = monitorB.expectMsgClass(classOf[CurrentState[_]]) + + pipe !(alice, bob) // this starts the communication between alice and bob + + within(1 minute) { + + val Transition(_, OPEN_WAIT_FOR_OPEN_WITHANCHOR, OPEN_WAIT_FOR_COMMIT_SIG) = monitorA.expectMsgClass(classOf[Transition[_]]) + val Transition(_, OPEN_WAIT_FOR_OPEN_NOANCHOR, OPEN_WAIT_FOR_ANCHOR) = monitorB.expectMsgClass(classOf[Transition[_]]) + + val Transition(_, OPEN_WAIT_FOR_COMMIT_SIG, OPEN_WAITING_OURANCHOR) = monitorA.expectMsgClass(classOf[Transition[_]]) + val Transition(_, OPEN_WAIT_FOR_ANCHOR, OPEN_WAITING_THEIRANCHOR) = monitorB.expectMsgClass(classOf[Transition[_]]) + + val Transition(_, OPEN_WAITING_OURANCHOR, OPEN_WAIT_FOR_COMPLETE_OURANCHOR) = monitorA.expectMsgClass(classOf[Transition[_]]) + val Transition(_, OPEN_WAITING_THEIRANCHOR, OPEN_WAIT_FOR_COMPLETE_THEIRANCHOR) = monitorB.expectMsgClass(classOf[Transition[_]]) + + val Transition(_, OPEN_WAIT_FOR_COMPLETE_OURANCHOR, NORMAL) = monitorA.expectMsgClass(classOf[Transition[_]]) + val Transition(_, OPEN_WAIT_FOR_COMPLETE_THEIRANCHOR, NORMAL) = monitorB.expectMsgClass(classOf[Transition[_]]) + } + } + + test("create and fulfill HTLCs") { + val pipe = TestActorRef[Pipe] + val blockchainA = TestActorRef(new PollingWatcher(new TestBitcoinClient())) + val blockchainB = TestActorRef(new PollingWatcher(new TestBitcoinClient())) + val alice = TestFSMRef(new Channel(pipe, blockchainA, Alice.channelParams, "B")) + val bob = TestFSMRef(new Channel(pipe, blockchainB, Bob.channelParams, "A")) + + pipe !(alice, bob) // this starts the communication between alice and bob + + within(1 minute) { + + awaitCond(alice.stateName == NORMAL) + awaitCond(bob.stateName == NORMAL) + + val R: BinaryData = "0102030405060708010203040506070801020304050607080102030405060708" + val H = Crypto.sha256(R) + + alice ! CMD_ADD_HTLC(60000000, H, locktime(Blocks(4))) + + awaitAssert({ + val DATA_NORMAL(_, _, _, _, _, _, List(Change(OUT, _, update_add_htlc(_, _, r1, _, _))), _, _) = alice.stateData + assert(r1 == bin2sha256(H)) + val DATA_NORMAL(_, _, _, _, _, _, List(Change(IN, _, update_add_htlc(_, _, r2, _, _))), _, _) = bob.stateData + assert(r2 == bin2sha256(H)) + }) + + bob ! CMD_SIGN + + } + } + +} diff --git a/eclair-demo/src/test/scala/fr/acinq/eclair/channel/Pipe.scala b/eclair-demo/src/test/scala/fr/acinq/eclair/channel/Pipe.scala new file mode 100644 index 000000000..2526f52c9 --- /dev/null +++ b/eclair-demo/src/test/scala/fr/acinq/eclair/channel/Pipe.scala @@ -0,0 +1,27 @@ +package fr.acinq.eclair.channel + +import akka.actor.{Actor, ActorRef, Stash} + +/** + * Created by PM on 26/04/2016. + */ + +// handle a bi-directional path between 2 actors +// used to avoid the chicken-and-egg problem of: +// a = new Channel(b) +// b = new Channel(a) +class Pipe extends Actor with Stash { + + def receive = { + case (a: ActorRef, b: ActorRef) => + unstashAll() + context become ready(a, b) + + case msg => stash() + } + + def ready(a: ActorRef, b: ActorRef): Receive = { + case msg if sender() == a => b forward msg + case msg if sender() == b => a forward msg + } +} diff --git a/eclair-demo/src/test/scala/fr/acinq/eclair/channel/TestBitcoinClient.scala b/eclair-demo/src/test/scala/fr/acinq/eclair/channel/TestBitcoinClient.scala new file mode 100644 index 000000000..1e1cfc46b --- /dev/null +++ b/eclair-demo/src/test/scala/fr/acinq/eclair/channel/TestBitcoinClient.scala @@ -0,0 +1,35 @@ +package fr.acinq.eclair.channel + +import fr.acinq.bitcoin.{BinaryData, BitcoinJsonRPCClient, Transaction, TxIn, TxOut} +import fr.acinq.eclair.blockchain.ExtendedBitcoinClient + +import scala.concurrent.{ExecutionContext, Future} + +/** + * Created by PM on 26/04/2016. + */ +class TestBitcoinClient extends ExtendedBitcoinClient(new BitcoinJsonRPCClient("", "", "", 0)) { + + override def makeAnchorTx(ourCommitPub: BinaryData, theirCommitPub: BinaryData, amount: Long)(implicit ec: ExecutionContext): Future[(Transaction, Int)] = { + val anchorTx = Transaction(version = 1, + txIn = Seq.empty[TxIn], + txOut = TxOut(amount, Scripts.anchorPubkeyScript(ourCommitPub, theirCommitPub)) :: Nil, + lockTime = 0 + ) + Future.successful((anchorTx, 0)) + } + + override def publishTransaction(tx: Transaction)(implicit ec: ExecutionContext): Future[String] = Future.successful(tx.txid.toString()) + + + override def getTxConfirmations(txId: String)(implicit ec: ExecutionContext): Future[Option[Int]] = Future.successful(Some(10)) + + override def isUnspent(txId: String, outputIndex: Int)(implicit ec: ExecutionContext): Future[Boolean] = ??? + + override def getTransaction(txId: String)(implicit ec: ExecutionContext): Future[Transaction] = ??? + + override def fundTransaction(tx: Transaction)(implicit ec: ExecutionContext): Future[FundTransactionResponse] = ??? + + override def signTransaction(tx: Transaction)(implicit ec: ExecutionContext): Future[SignTransactionResponse] = ??? + +} diff --git a/eclair-demo/src/test/scala/fr/acinq/eclair/channel/TestConstants.scala b/eclair-demo/src/test/scala/fr/acinq/eclair/channel/TestConstants.scala new file mode 100644 index 000000000..030937e50 --- /dev/null +++ b/eclair-demo/src/test/scala/fr/acinq/eclair/channel/TestConstants.scala @@ -0,0 +1,28 @@ +package fr.acinq.eclair.channel + +import fr.acinq.bitcoin.{Base58, Base58Check, Transaction, TxIn, TxOut} +import fr.acinq.eclair.blockchain.{MakeAnchor, Publish} +import lightning.locktime +import lightning.locktime.Locktime.Blocks + +/** + * Created by PM on 26/04/2016. + */ +object TestConstants { + val anchorAmount = 100100000L + + // Alice is funder, Bob is not + + object Alice { + val (Base58.Prefix.SecretKeyTestnet, commitPrivKey) = Base58Check.decode("cQPmcNr6pwBQPyGfab3SksE9nTCtx9ism9T4dkS9dETNU2KKtJHk") + val (Base58.Prefix.SecretKeyTestnet, finalPrivKey) = Base58Check.decode("cUrAtLtV7GGddqdkhUxnbZVDWGJBTducpPoon3eKp9Vnr1zxs6BG") + val channelParams = OurChannelParams(locktime(Blocks(10)), commitPrivKey, finalPrivKey, 1, 100000, "alice-seed".getBytes(), Some(anchorAmount)) + } + + object Bob { + val (Base58.Prefix.SecretKeyTestnet, commitPrivKey) = Base58Check.decode("cSUwLtdZ2tht9ZmHhdQue48pfe7tY2GT2TGWJDtjoZgo6FHrubGk") + val (Base58.Prefix.SecretKeyTestnet, finalPrivKey) = Base58Check.decode("cPR7ZgXpUaDPA3GwGceMDS5pfnSm955yvks3yELf3wMJwegsdGTg") + val channelParams = OurChannelParams(locktime(Blocks(10)), commitPrivKey, finalPrivKey, 2, 100000, "bob-seed".getBytes(), None) + } + +}