1
0
Fork 0
mirror of https://github.com/ACINQ/eclair.git synced 2025-02-22 22:25:26 +01:00

added fuzzy disconnection test (wip)

This commit is contained in:
pm47 2017-02-17 20:46:35 +01:00
parent 7e9d07b9b4
commit 3e1b0fe202
4 changed files with 209 additions and 12 deletions

View file

@ -429,7 +429,7 @@ class Channel(val r: ActorRef, val blockchain: ActorRef, router: ActorRef, relay
case Failure(cause) => handleCommandError(sender, cause)
}
case Event(fail@UpdateFailHtlc(_, id, reason), d@DATA_NORMAL(params, commitments, _)) =>
case Event(fail@UpdateFailHtlc(_, id, reason), d@DATA_NORMAL(params, _, _)) =>
Try(Commitments.receiveFail(d.commitments, fail)) match {
case Success((commitments1, htlc)) =>
relayer ! (htlc, fail)
@ -472,7 +472,14 @@ class Channel(val r: ActorRef, val blockchain: ActorRef, router: ActorRef, relay
case Success(commitments1) =>
// we forward HTLCs only when they have been committed by both sides
// it always happen when we receive a revocation, because, we always sign our changes before they sign them
val newlySignedHtlcs = d.commitments.remoteChanges.signed.collect { case htlc: UpdateAddHtlc => relayer ! htlc }
d.commitments.remoteChanges.signed.collect {
case htlc: UpdateAddHtlc =>
log.info(s"relaying $htlc")
relayer ! htlc
}
if (d.commitments.remoteNextCommitInfo.left.map(_.reSignAsap) == Left(true)) {
self ! CMD_SIGN
}
stay using d.copy(commitments = commitments1)
case Failure(cause) => handleLocalError(cause, d)
}
@ -614,6 +621,9 @@ class Channel(val r: ActorRef, val blockchain: ActorRef, router: ActorRef, relay
remote ! closingSigned
goto(NEGOTIATING) using DATA_NEGOTIATING(params, commitments1, localShutdown, remoteShutdown, closingSigned)
case Success(commitments1) =>
if (d.commitments.remoteNextCommitInfo.left.map(_.reSignAsap) == Left(true)) {
self ! CMD_SIGN
}
stay using d.copy(commitments = commitments1)
case Failure(cause) => handleLocalError(cause, d)
}
@ -768,21 +778,38 @@ class Channel(val r: ActorRef, val blockchain: ActorRef, router: ActorRef, relay
case Event(INPUT_RECONNECTED(r), d@DATA_NORMAL(_, commitments, _)) =>
remote = r
log.info(s"resuming in state NORMAL")
// first we reverse remote changes
log.info(s"reversing remote changes:")
commitments.remoteChanges.proposed.foreach(c => log.info(s"reversing $c"))
val remoteChanges1 = commitments.remoteChanges.copy(proposed = Nil)
val remoteNextHtlcId1 = commitments.remoteNextHtlcId - commitments.remoteChanges.proposed.count(_.isInstanceOf[UpdateAddHtlc])
commitments.remoteNextCommitInfo match {
case Left(WaitForRevocation(_, commitSig, _)) =>
// we had sent a CommitSig and didn't receive their RevokeAndAck
// first we re-send the changes included in the CommitSig
log.info(s"re-sending signed changes:")
commitments.localChanges.signed.foreach(c => log.info(s"re-sending $c"))
commitments.localChanges.signed.foreach(remote ! _)
// then we re-send the CommitSig
log.info(s"re-sending commit-sig containing ${commitSig.htlcSignatures.size} htlcs")
remote ! commitSig
case Right(_) =>
require(commitments.localChanges.signed == 0, "there can't be local signed changes if we haven't sent a CommitSig")
require(commitments.localChanges.signed.size == 0, "there can't be local signed changes if we haven't sent a CommitSig")
}
// and finally we re-send the updates that were not included in a CommitSig
log.info(s"re-sending unsigned changes:")
commitments.localChanges.proposed.foreach(c => log.info(s"re-sending $c"))
commitments.localChanges.proposed.foreach(remote ! _)
goto(NORMAL) using d.copy(commitments = commitments.copy(remoteChanges = remoteChanges1))
val commitments1 = commitments.copy(remoteChanges = remoteChanges1)
if (Commitments.localHasChanges(commitments1)) {
// if we have newly acknowledged changes let's sign them
self ! CMD_SIGN
}
log.info(s"resuming with commitments $commitments1")
log.info(s"local changes: ${commitments1.localChanges}")
log.info(s"remote changes: ${commitments1.remoteChanges}")
goto(NORMAL) using d.copy(commitments = commitments1)
}
when(ERR_INFORMATION_LEAK, stateTimeout = 10 seconds) {

View file

@ -1,27 +1,42 @@
package fr.acinq.eclair
import akka.actor.{Actor, ActorRef, Stash}
import fr.acinq.eclair.channel.{INPUT_DISCONNECTED, INPUT_RECONNECTED}
/**
* Created by PM on 26/04/2016.
* Handles a bi-directional path between 2 actors
* used to avoid the chicken-and-egg problem of:
* a = new Channel(b)
* b = new Channel(a)
*/
// handle a bi-directional path between 2 actors
// used to avoid the chicken-and-egg problem of:
// a = new Channel(b)
// b = new Channel(a)
class Pipe extends Actor with Stash {
def receive = {
case (a: ActorRef, b: ActorRef) =>
unstashAll()
context become ready(a, b)
context become connected(a, b)
case msg => stash()
}
def ready(a: ActorRef, b: ActorRef): Receive = {
def connected(a: ActorRef, b: ActorRef): Receive = {
case msg if sender() == a => b forward msg
case msg if sender() == b => a forward msg
case msg@INPUT_DISCONNECTED =>
// used for fuzzy testing (eg: send Disconnected messages)
b forward msg
a forward msg
context become disconnected(a, b)
}
def disconnected(a: ActorRef, b: ActorRef): Receive = {
case msg if sender() == a => {} // dropped
case msg if sender() == b => {} // dropped
case msg: INPUT_RECONNECTED =>
// used for fuzzy testing (eg: send Disconnected messages)
b forward msg
a forward msg
context become connected(a, b)
}
}

View file

@ -0,0 +1,134 @@
package fr.acinq.eclair.channel.states
import akka.actor.{Actor, ActorRef, Props}
import akka.testkit.{TestFSMRef, TestProbe}
import fr.acinq.bitcoin.{BinaryData, Crypto}
import fr.acinq.eclair.TestConstants.{Alice, Bob}
import fr.acinq.eclair.blockchain._
import fr.acinq.eclair.channel.{Data, State, _}
import fr.acinq.eclair.wire._
import fr.acinq.eclair.{Pipe, TestBitcoinClient, TestConstants, TestkitBaseClass}
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import scala.concurrent.duration._
import scala.util.Random
/**
* Created by PM on 05/07/2016.
*/
@RunWith(classOf[JUnitRunner])
class NormalOfflineFuzzySpec extends TestkitBaseClass with StateTestsHelperMethods {
type FixtureParam = Tuple5[TestFSMRef[State, Data, Channel], TestFSMRef[State, Data, Channel], ActorRef, ActorRef, ActorRef]
override def withFixture(test: OneArgTest) = {
val pipe = system.actorOf(Props(new Pipe()))
val alice2blockchain = TestProbe()
val blockchainA = system.actorOf(Props(new PeerWatcher(new TestBitcoinClient())))
val bob2blockchain = TestProbe()
val relayerA = system.actorOf(Props(new FuzzyRelayer()))
val relayerB = system.actorOf(Props(new FuzzyRelayer()))
val router = TestProbe()
val alice: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(pipe, alice2blockchain.ref, router.ref, relayerA))
val bob: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(pipe, bob2blockchain.ref, router.ref, relayerB))
within(30 seconds) {
val aliceInit = Init(Alice.channelParams.globalFeatures, Alice.channelParams.localFeatures)
val bobInit = Init(Bob.channelParams.globalFeatures, Bob.channelParams.localFeatures)
relayerA ! alice
relayerB ! bob
alice ! INPUT_INIT_FUNDER(Bob.id, 0, TestConstants.fundingSatoshis, TestConstants.pushMsat, Alice.channelParams, bobInit)
bob ! INPUT_INIT_FUNDEE(Alice.id, 0, Bob.channelParams, aliceInit)
pipe ! (alice, bob)
alice2blockchain.expectMsgType[MakeFundingTx]
alice2blockchain.forward(blockchainA)
alice2blockchain.expectMsgType[WatchSpent]
alice2blockchain.expectMsgType[WatchConfirmed]
alice2blockchain.forward(blockchainA)
alice2blockchain.expectMsgType[PublishAsap]
alice2blockchain.forward(blockchainA)
bob2blockchain.expectMsgType[WatchSpent]
bob2blockchain.expectMsgType[WatchConfirmed]
bob ! WatchEventConfirmed(BITCOIN_FUNDING_DEPTHOK, 400000, 42)
alice2blockchain.expectMsgType[WatchLost]
bob2blockchain.expectMsgType[WatchLost]
awaitCond(alice.stateName == NORMAL)
awaitCond(bob.stateName == NORMAL)
awaitCond(alice.stateName == NORMAL)
awaitCond(bob.stateName == NORMAL)
}
test((alice, bob, pipe, relayerA, relayerB))
}
class FuzzyRelayer() extends Actor {
val paymentpreimage = BinaryData("42" * 32)
val paymentHash = Crypto.sha256(paymentpreimage)
override def receive: Receive = {
case channel: ActorRef => context become ready(channel)
}
def ready(channel: ActorRef): Receive = {
case 'start => context become main(sender, channel, 0, 0)
}
def main(origin: ActorRef, channel: ActorRef, htlcSent: Int, htlcInFlight: Int): Receive = {
case htlc: UpdateAddHtlc =>
val preimage = BinaryData("42" * 32)
sender ! CMD_FULFILL_HTLC(htlc.id, preimage)
sender ! CMD_SIGN
case (add: UpdateAddHtlc, fulfill: UpdateFulfillHtlc) =>
println(s"received fulfill for htlc ${fulfill.id}, htlcInFlight = ${htlcInFlight - 1}")
if (htlcInFlight <= 1) {
if (htlcSent < 100) {
self ! 'add
} else {
origin ! "done"
context stop self
}
}
context become main(origin, channel, htlcSent + 1, htlcInFlight - 1)
case 'add =>
val cmds = for (i <- 0 to Random.nextInt(10)) yield CMD_ADD_HTLC(Random.nextInt(1000000), paymentHash, 400144)
println(s"sending ${cmds.size} htlcs")
cmds.foreach(channel ! _)
channel ! CMD_SIGN
context become main(origin, channel, htlcSent + 1, htlcInFlight + cmds.size)
case 'sign =>
channel ! CMD_SIGN
case "ok" => {}
/*case paymentHash: BinaryData =>
for(i <- 0 until Random.nextInt(5))
channel ! CMD_ADD_HTLC(42000, paymentHash, 400144)
if (Random.nextInt(5) == 0) {
self ! 'sign
} else {
self ! 'add
}
context become main(htlcSent + 1)*/
}
}
test("fuzzy testing in NORMAL state with only one party sending HTLCs") {
case (alice, bob, pipe, relayerA, relayerB) =>
val sender = TestProbe()
sender.send(relayerA, 'start)
sender.send(relayerB, 'start)
relayerA ! 'add
/*import scala.concurrent.ExecutionContext.Implicits.global
system.scheduler.scheduleOnce(2 seconds, pipe, INPUT_DISCONNECTED)
system.scheduler.scheduleOnce(5 seconds, pipe, INPUT_RECONNECTED(pipe))*/
println(sender.expectMsgType[String](1 hour))
}
}

View file

@ -589,6 +589,27 @@ class NormalStateSpec extends TestkitBaseClass with StateTestsHelperMethods {
}
}
test("recv RevokeAndAck (with reSignAsap=true)") { case (alice, bob, alice2bob, bob2alice, alice2blockchain, _, _) =>
within(30 seconds) {
val sender = TestProbe()
val (r1, htlc1) = addHtlc(50000000, alice, bob, alice2bob, bob2alice)
awaitCond(alice.stateData.asInstanceOf[DATA_NORMAL].commitments.remoteNextCommitInfo.isRight)
sender.send(alice, CMD_SIGN)
sender.expectMsg("ok")
alice2bob.expectMsgType[CommitSig]
alice2bob.forward(bob)
val (r2, htlc2) = addHtlc(50000000, alice, bob, alice2bob, bob2alice)
sender.send(alice, CMD_SIGN)
sender.expectNoMsg(300 millis)
assert(alice.stateData.asInstanceOf[DATA_NORMAL].commitments.remoteNextCommitInfo.left.toOption.get.reSignAsap === true)
// actual test starts here
bob2alice.expectMsgType[RevokeAndAck]
bob2alice.forward(alice)
alice2bob.expectMsgType[CommitSig]
}
}
test("recv RevokeAndAck (invalid preimage)") { case (alice, bob, alice2bob, bob2alice, alice2blockchain, _, _) =>
within(30 seconds) {
val tx = alice.stateData.asInstanceOf[DATA_NORMAL].commitments.localCommit.publishableTxs.commitTx.tx