1
0
Fork 0
mirror of https://github.com/ACINQ/eclair.git synced 2025-02-23 06:35:11 +01:00

Use the replyTo pattern for Register messages (#1514)

* add replyTo to Register messages

In preparation of the migration to typed actors, we need to remove the
use of `sender`, which doesn't exist in typed actors (and even returns
dead letters when mixing typed/untyped actors).

This also means that, in the tests, we should stop using the
`TestProbe.send()` method, which also relies on the recipient replying
to `sender`. Instead, we should use `targetActor ! msg` which guarantees
that the sender is void.

Using the `replyTo` pattern doesn't mix well with the `ask` pattern,
because we don't know the reference to the temporary actor. To deal with
that, we set `replyTo = ActorRef.noSender` which is a bit hackish.

Co-authored-by: Bastien Teinturier <31281497+t-bast@users.noreply.github.com>
This commit is contained in:
Pierre-Marie Padiou 2020-09-01 16:43:15 +02:00 committed by GitHub
parent 153f82c8cf
commit ef1bf0b9a5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 155 additions and 121 deletions

View file

@ -364,8 +364,8 @@ class EclairImpl(appKit: Kit) extends Eclair {
* @param channel either a shortChannelId (BOLT encoded) or a channelId (32-byte hex encoded).
*/
private def sendToChannel[T: ClassTag](channel: ApiTypes.ChannelIdentifier, request: Any)(implicit timeout: Timeout): Future[T] = (channel match {
case Left(channelId) => appKit.register ? Forward(channelId, request)
case Right(shortChannelId) => appKit.register ? ForwardShortId(shortChannelId, request)
case Left(channelId) => appKit.register ? Forward(ActorRef.noSender, channelId, request)
case Right(shortChannelId) => appKit.register ? ForwardShortId(ActorRef.noSender, shortChannelId, request)
}).mapTo[T]
/**

View file

@ -62,16 +62,20 @@ class Register extends Actor with ActorLogging {
case Symbol("channelsTo") => sender ! channelsTo
case fwd@Forward(channelId, msg) =>
case fwd@Forward(replyTo, channelId, msg) =>
// for backward compatibility with legacy ask, we use the replyTo as sender
val compatReplyTo = if (replyTo == ActorRef.noSender) sender else replyTo
channels.get(channelId) match {
case Some(channel) => channel forward msg
case None => sender ! Failure(ForwardFailure(fwd))
case Some(channel) => channel.tell(msg, compatReplyTo)
case None => compatReplyTo ! Failure(ForwardFailure(fwd))
}
case fwd@ForwardShortId(shortChannelId, msg) =>
case fwd@ForwardShortId(replyTo, shortChannelId, msg) =>
// for backward compatibility with legacy ask, we use the replyTo as sender
val compatReplyTo = if (replyTo == ActorRef.noSender) sender else replyTo
shortIds.get(shortChannelId).flatMap(channels.get) match {
case Some(channel) => channel forward msg
case None => sender ! Failure(ForwardShortIdFailure(fwd))
case Some(channel) => channel.tell(msg, compatReplyTo)
case None => compatReplyTo ! Failure(ForwardShortIdFailure(fwd))
}
}
}
@ -79,10 +83,10 @@ class Register extends Actor with ActorLogging {
object Register {
// @formatter:off
case class Forward[T](channelId: ByteVector32, message: T)
case class ForwardShortId[T](shortChannelId: ShortChannelId, message: T)
case class Forward[T](replyTo: ActorRef, channelId: ByteVector32, message: T)
case class ForwardShortId[T](replyTo: ActorRef, shortChannelId: ShortChannelId, message: T)
case class ForwardFailure[T](fwd: Forward[T]) extends RuntimeException(s"channel ${fwd.channelId} not found")
case class ForwardShortIdFailure[T](fwd: ForwardShortId[T]) extends RuntimeException(s"channel ${fwd.shortChannelId} not found")
// @formatter:on
}
}

View file

@ -55,7 +55,7 @@ object PendingRelayDb {
* incoming htlcs, which would lead to unwanted channel closings.
*/
def safeSend(register: ActorRef, db: PendingRelayDb, channelId: ByteVector32, cmd: Command with HasHtlcId)(implicit ctx: ActorContext): Unit = {
register ! Register.Forward(channelId, cmd)
register ! Register.Forward(ctx.self, channelId, cmd)
// we store the command in a db (note that this happens *after* forwarding the command to the channel, so we don't add latency)
db.addPendingRelay(channelId, cmd)
}

View file

@ -53,10 +53,10 @@ class ChannelRelayer(nodeParams: NodeParams, relayer: ActorRef, register: ActorR
PendingRelayDb.safeSend(register, nodeParams.db.pendingRelay, r.add.channelId, cmdFail)
case RelaySuccess(selectedShortChannelId, cmdAdd) =>
log.info(s"forwarding htlc #${r.add.id} from channelId=${r.add.channelId} to shortChannelId=$selectedShortChannelId")
register ! Register.ForwardShortId(selectedShortChannelId, cmdAdd)
register ! Register.ForwardShortId(self, selectedShortChannelId, cmdAdd)
}
case Status.Failure(Register.ForwardShortIdFailure(Register.ForwardShortId(shortChannelId, CMD_ADD_HTLC(_, _, _, _, Upstream.Relayed(add), _, _)))) =>
case Status.Failure(Register.ForwardShortIdFailure(Register.ForwardShortId(_, shortChannelId, CMD_ADD_HTLC(_, _, _, _, Upstream.Relayed(add), _, _)))) =>
log.warning(s"couldn't resolve downstream channel $shortChannelId, failing htlc #${add.id}")
val cmdFail = CMD_FAIL_HTLC(add.id, Right(UnknownNextPeer), commit = true)
Metrics.recordPaymentRelayFailed(Tags.FailureType(cmdFail), Tags.RelayType.Channel)
@ -83,7 +83,7 @@ class ChannelRelayer(nodeParams: NodeParams, relayer: ActorRef, register: ActorR
override def mdc(currentMessage: Any): MDC = {
val paymentHash_opt = currentMessage match {
case relay: RelayHtlc => Some(relay.r.add.paymentHash)
case Status.Failure(Register.ForwardShortIdFailure(Register.ForwardShortId(_, c: CMD_ADD_HTLC))) => Some(c.paymentHash)
case Status.Failure(Register.ForwardShortIdFailure(Register.ForwardShortId(_, _, c: CMD_ADD_HTLC))) => Some(c.paymentHash)
case Status.Failure(addFailed: AddHtlcFailed) => Some(addFailed.paymentHash)
case _ => None
}

View file

@ -183,6 +183,8 @@ class Relayer(nodeParams: NodeParams, router: ActorRef, register: ActorRef, paym
}
case ChannelCommandResponse.Ok => () // ignoring responses from channels
case GetChildActors(replyTo) => replyTo ! ChildActors(postRestartCleaner, channelRelayer, nodeRelayer)
}
override def mdc(currentMessage: Any): MDC = {
@ -234,6 +236,10 @@ object Relayer extends Logging {
isPublic = commitments.announceChannel)
}
case class OutgoingChannels(channels: Seq[OutgoingChannel])
// internal classes, used for testing
private[payment] case class GetChildActors(replyTo: ActorRef)
private[payment] case class ChildActors(postRestartCleaner: ActorRef, channelRelayer: ActorRef, nodeRelayer: ActorRef)
// @formatter:on
}

View file

@ -103,7 +103,7 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A
case Event(RouteResponse(route +: _), WaitingForRoute(s, c, failures, ignore)) =>
log.info(s"route found: attempt=${failures.size + 1}/${c.maxAttempts} route=${route.printNodes()} channels=${route.printChannels()}")
val (cmd, sharedSecrets) = OutgoingPacket.buildCommand(cfg.upstream, paymentHash, route.hops, c.finalPayload)
register ! Register.ForwardShortId(route.hops.head.lastUpdate.shortChannelId, cmd)
register ! Register.ForwardShortId(self, route.hops.head.lastUpdate.shortChannelId, cmd)
goto(WAITING_FOR_PAYMENT_COMPLETE) using WaitingForComplete(s, c, cmd, failures, sharedSecrets, ignore, route)
case Event(Status.Failure(t), WaitingForRoute(s, _, failures, _)) =>

View file

@ -18,6 +18,7 @@ package fr.acinq.eclair
import java.util.UUID
import akka.actor.ActorRef
import akka.testkit.TestProbe
import akka.util.Timeout
import fr.acinq.bitcoin.Crypto.{PrivateKey, PublicKey}
@ -283,31 +284,31 @@ class EclairImplSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with I
val eclair = new EclairImpl(kit)
eclair.forceClose(Left(ByteVector32.Zeroes) :: Nil)
register.expectMsg(Register.Forward(ByteVector32.Zeroes, CMD_FORCECLOSE))
register.expectMsg(Register.Forward(ActorRef.noSender, ByteVector32.Zeroes, CMD_FORCECLOSE))
eclair.forceClose(Right(ShortChannelId("568749x2597x0")) :: Nil)
register.expectMsg(Register.ForwardShortId(ShortChannelId("568749x2597x0"), CMD_FORCECLOSE))
register.expectMsg(Register.ForwardShortId(ActorRef.noSender, ShortChannelId("568749x2597x0"), CMD_FORCECLOSE))
eclair.forceClose(Left(ByteVector32.Zeroes) :: Right(ShortChannelId("568749x2597x0")) :: Nil)
register.expectMsgAllOf(
Register.Forward(ByteVector32.Zeroes, CMD_FORCECLOSE),
Register.ForwardShortId(ShortChannelId("568749x2597x0"), CMD_FORCECLOSE)
Register.Forward(ActorRef.noSender, ByteVector32.Zeroes, CMD_FORCECLOSE),
Register.ForwardShortId(ActorRef.noSender, ShortChannelId("568749x2597x0"), CMD_FORCECLOSE)
)
eclair.close(Left(ByteVector32.Zeroes) :: Nil, None)
register.expectMsg(Register.Forward(ByteVector32.Zeroes, CMD_CLOSE(None)))
register.expectMsg(Register.Forward(ActorRef.noSender, ByteVector32.Zeroes, CMD_CLOSE(None)))
eclair.close(Right(ShortChannelId("568749x2597x0")) :: Nil, None)
register.expectMsg(Register.ForwardShortId(ShortChannelId("568749x2597x0"), CMD_CLOSE(None)))
register.expectMsg(Register.ForwardShortId(ActorRef.noSender, ShortChannelId("568749x2597x0"), CMD_CLOSE(None)))
eclair.close(Right(ShortChannelId("568749x2597x0")) :: Nil, Some(ByteVector.empty))
register.expectMsg(Register.ForwardShortId(ShortChannelId("568749x2597x0"), CMD_CLOSE(Some(ByteVector.empty))))
register.expectMsg(Register.ForwardShortId(ActorRef.noSender, ShortChannelId("568749x2597x0"), CMD_CLOSE(Some(ByteVector.empty))))
eclair.close(Right(ShortChannelId("568749x2597x0")) :: Left(ByteVector32.One) :: Right(ShortChannelId("568749x2597x1")) :: Nil, None)
register.expectMsgAllOf(
Register.ForwardShortId(ShortChannelId("568749x2597x0"), CMD_CLOSE(None)),
Register.Forward(ByteVector32.One, CMD_CLOSE(None)),
Register.ForwardShortId(ShortChannelId("568749x2597x1"), CMD_CLOSE(None))
Register.ForwardShortId(ActorRef.noSender, ShortChannelId("568749x2597x0"), CMD_CLOSE(None)),
Register.Forward(ActorRef.noSender, ByteVector32.One, CMD_CLOSE(None)),
Register.ForwardShortId(ActorRef.noSender, ShortChannelId("568749x2597x1"), CMD_CLOSE(None))
)
}

View file

@ -101,8 +101,8 @@ class FuzzySpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with StateT
}
def main(channel: ActorRef): Receive = {
case Register.Forward(_, msg) => channel forward msg
case Register.ForwardShortId(_, msg) => channel forward msg
case fwd: Register.Forward[_] => channel forward fwd.message
case fwd: Register.ForwardShortId[_] => channel forward fwd.message
}
}

View file

@ -32,7 +32,6 @@ import fr.acinq.eclair.blockchain.bitcoind.BitcoindService.BitcoinReq
import fr.acinq.eclair.blockchain.bitcoind.rpc.ExtendedBitcoinClient
import fr.acinq.eclair.blockchain.{Watch, WatchConfirmed}
import fr.acinq.eclair.channel.Channel.{BroadcastChannelUpdate, PeriodicRefresh}
import fr.acinq.eclair.channel.Register.{Forward, ForwardShortId}
import fr.acinq.eclair.channel._
import fr.acinq.eclair.crypto.Sphinx.DecryptedFailurePacket
import fr.acinq.eclair.crypto.TransportHandler
@ -300,18 +299,18 @@ class IntegrationSpec extends TestKitBaseClass with BitcoindService with AnyFunS
sender.send(fundeeChannel, CMD_GETSTATEDATA)
val channelId = sender.expectMsgType[HasCommitments].channelId
awaitCond({
sender.send(funder.register, Forward(channelId, CMD_GETSTATE))
funder.register ! Register.Forward(sender.ref, channelId, CMD_GETSTATE)
sender.expectMsgType[State] == WAIT_FOR_FUNDING_LOCKED
})
generateBlocks(bitcoincli, 6)
// after 8 blocks the fundee is still waiting for more confirmations
sender.send(fundee.register, Forward(channelId, CMD_GETSTATE))
fundee.register ! Register.Forward(sender.ref, channelId, CMD_GETSTATE)
assert(sender.expectMsgType[State] == WAIT_FOR_FUNDING_CONFIRMED)
// after 8 blocks the funder is still waiting for funding_locked from the fundee
sender.send(funder.register, Forward(channelId, CMD_GETSTATE))
funder.register ! Register.Forward(sender.ref, channelId, CMD_GETSTATE)
assert(sender.expectMsgType[State] == WAIT_FOR_FUNDING_LOCKED)
// simulate a disconnection
@ -319,9 +318,9 @@ class IntegrationSpec extends TestKitBaseClass with BitcoindService with AnyFunS
assert(sender.expectMsgType[String] == "disconnecting")
awaitCond({
sender.send(fundee.register, Forward(channelId, CMD_GETSTATE))
fundee.register ! Register.Forward(sender.ref, channelId, CMD_GETSTATE)
val fundeeState = sender.expectMsgType[State]
sender.send(funder.register, Forward(channelId, CMD_GETSTATE))
funder.register ! Register.Forward(sender.ref, channelId, CMD_GETSTATE)
val funderState = sender.expectMsgType[State]
fundeeState == OFFLINE && funderState == OFFLINE
})
@ -335,9 +334,9 @@ class IntegrationSpec extends TestKitBaseClass with BitcoindService with AnyFunS
))
sender.expectMsgAnyOf(10 seconds, PeerConnection.ConnectionResult.Connected, PeerConnection.ConnectionResult.AlreadyConnected)
sender.send(fundee.register, Forward(channelId, CMD_GETSTATE))
fundee.register ! Register.Forward(sender.ref, channelId, CMD_GETSTATE)
val fundeeState = sender.expectMsgType[State](max = 30 seconds)
sender.send(funder.register, Forward(channelId, CMD_GETSTATE))
funder.register ! Register.Forward(sender.ref, channelId, CMD_GETSTATE)
val funderState = sender.expectMsgType[State](max = 30 seconds)
fundeeState == WAIT_FOR_FUNDING_CONFIRMED && funderState == WAIT_FOR_FUNDING_LOCKED
}, max = 30 seconds, interval = 10 seconds)
@ -346,9 +345,9 @@ class IntegrationSpec extends TestKitBaseClass with BitcoindService with AnyFunS
generateBlocks(bitcoincli, 5)
awaitCond({
sender.send(fundee.register, Forward(channelId, CMD_GETSTATE))
fundee.register ! Register.Forward(sender.ref, channelId, CMD_GETSTATE)
val fundeeState = sender.expectMsgType[State]
sender.send(funder.register, Forward(channelId, CMD_GETSTATE))
funder.register ! Register.Forward(sender.ref, channelId, CMD_GETSTATE)
val funderState = sender.expectMsgType[State]
fundeeState == NORMAL && funderState == NORMAL
})
@ -376,7 +375,7 @@ class IntegrationSpec extends TestKitBaseClass with BitcoindService with AnyFunS
sender.send(nodes("B").router, Symbol("channels"))
val shortIdBC = sender.expectMsgType[Iterable[ChannelAnnouncement]].find(c => Set(c.nodeId1, c.nodeId2) == Set(nodes("B").nodeParams.nodeId, nodes("C").nodeParams.nodeId)).get.shortChannelId
// we also need the full commitment
sender.send(nodes("B").register, ForwardShortId(shortIdBC, CMD_GETINFO))
nodes("B").register ! Register.ForwardShortId(sender.ref, shortIdBC, CMD_GETINFO)
val commitmentBC = sender.expectMsgType[RES_GETINFO].data.asInstanceOf[DATA_NORMAL].commitments
// we then forge a new channel_update for B-C...
val channelUpdateBC = Announcements.makeChannelUpdate(Block.RegtestGenesisBlock.hash, nodes("B").nodeParams.privateKey, nodes("C").nodeParams.nodeId, shortIdBC, nodes("B").nodeParams.expiryDelta + 1, nodes("C").nodeParams.htlcMinimum, nodes("B").nodeParams.feeBase, nodes("B").nodeParams.feeProportionalMillionth, 500000000 msat)
@ -406,8 +405,8 @@ class IntegrationSpec extends TestKitBaseClass with BitcoindService with AnyFunS
// first let's wait 3 seconds to make sure the timestamp of the new channel_update will be strictly greater than the former
sender.expectNoMsg(3 seconds)
sender.send(nodes("B").register, ForwardShortId(shortIdBC, BroadcastChannelUpdate(PeriodicRefresh)))
sender.send(nodes("B").register, ForwardShortId(shortIdBC, CMD_GETINFO))
nodes("B").register ! Register.ForwardShortId(sender.ref, shortIdBC, BroadcastChannelUpdate(PeriodicRefresh))
nodes("B").register ! Register.ForwardShortId(sender.ref, shortIdBC, CMD_GETINFO)
val channelUpdateBC_new = sender.expectMsgType[RES_GETINFO].data.asInstanceOf[DATA_NORMAL].channelUpdate
logger.info(s"channelUpdateBC=$channelUpdateBC")
logger.info(s"channelUpdateBC_new=$channelUpdateBC_new")
@ -848,24 +847,24 @@ class IntegrationSpec extends TestKitBaseClass with BitcoindService with AnyFunS
assert(channels.head._2 === nodes("C").nodeParams.nodeId)
val channelId = channels.head._1
sender.send(nodes("C").register, Forward(channelId, CMD_GETSTATEDATA))
sender.send(nodes("C").register, Register.Forward(sender.ref, channelId, CMD_GETSTATEDATA))
val commitmentsC = sender.expectMsgType[DATA_NORMAL].commitments
val finalPubKeyScriptC = commitmentsC.localParams.defaultFinalScriptPubKey
val fundingOutpoint = commitmentsC.commitInput.outPoint
sender.send(nodes("F1").register, Forward(channelId, CMD_GETSTATEDATA))
sender.send(nodes("F1").register, Register.Forward(sender.ref, channelId, CMD_GETSTATEDATA))
val finalPubKeyScriptF = sender.expectMsgType[DATA_NORMAL].commitments.localParams.defaultFinalScriptPubKey
sender.send(nodes("F1").register, Forward(channelId, CMD_CLOSE(Some(finalPubKeyScriptF))))
sender.send(nodes("F1").register, Register.Forward(sender.ref, channelId, CMD_CLOSE(Some(finalPubKeyScriptF))))
sender.expectMsg(ChannelCommandResponse.Ok)
// we then wait for C and F to negotiate the closing fee
awaitCond({
sender.send(nodes("C").register, Forward(channelId, CMD_GETSTATE))
sender.send(nodes("C").register, Register.Forward(sender.ref, channelId, CMD_GETSTATE))
sender.expectMsgType[State] == CLOSING
}, max = 20 seconds, interval = 1 second)
generateBlocks(bitcoincli, 2)
awaitCond({
sender.send(nodes("C").register, Forward(channelId, CMD_GETSTATE))
sender.send(nodes("C").register, Register.Forward(sender.ref, channelId, CMD_GETSTATE))
sender.expectMsgType[State] == CLOSED
}, max = 20 seconds, interval = 1 second)
@ -888,7 +887,7 @@ class IntegrationSpec extends TestKitBaseClass with BitcoindService with AnyFunS
// retrieve the channelId of C <--> F2
val Some(channelId) = sender.expectMsgType[Map[ByteVector32, PublicKey]].find(_._2 == nodes("C").nodeParams.nodeId).map(_._1)
sender.send(nodes("F2").register, Forward(channelId, CMD_GETSTATEDATA))
sender.send(nodes("F2").register, Register.Forward(sender.ref, channelId, CMD_GETSTATEDATA))
val initialStateDataF2 = sender.expectMsgType[DATA_NORMAL]
val initialCommitmentIndex = initialStateDataF2.commitments.localCommit.index
@ -909,7 +908,7 @@ class IntegrationSpec extends TestKitBaseClass with BitcoindService with AnyFunS
val ps = sender.expectMsgType[PaymentSent](5 seconds)
assert(ps.id == paymentId)
sender.send(nodes("F2").register, Forward(channelId, CMD_GETSTATEDATA))
sender.send(nodes("F2").register, Register.Forward(sender.ref, channelId, CMD_GETSTATEDATA))
val stateDataF2 = sender.expectMsgType[DATA_NORMAL]
val commitmentIndex = stateDataF2.commitments.localCommit.index
val commitTx = stateDataF2.commitments.localCommit.publishableTxs.commitTx.tx
@ -923,11 +922,11 @@ class IntegrationSpec extends TestKitBaseClass with BitcoindService with AnyFunS
assert(toRemoteOutCNew.amount < toRemoteOutC.amount)
// now let's force close the channel and check the toRemote is what we had at the beginning
sender.send(nodes("F2").register, Forward(channelId, CMD_FORCECLOSE))
sender.send(nodes("F2").register, Register.Forward(sender.ref, channelId, CMD_FORCECLOSE))
sender.expectMsg(ChannelCommandResponse.Ok)
// we then wait for C to detect the unilateral close and go to CLOSING state
awaitCond({
sender.send(nodes("C").register, Forward(channelId, CMD_GETSTATE))
nodes("C").register ! Register.Forward(sender.ref, channelId, CMD_GETSTATE)
sender.expectMsgType[State] == CLOSING
}, max = 20 seconds, interval = 1 second)
@ -941,7 +940,7 @@ class IntegrationSpec extends TestKitBaseClass with BitcoindService with AnyFunS
// bury the unilateral close in a block, since there are no outputs to claim the channel can go to CLOSED state
generateBlocks(bitcoincli, 2)
awaitCond({
sender.send(nodes("C").register, Forward(channelId, CMD_GETSTATE))
nodes("C").register ! Register.Forward(sender.ref, channelId, CMD_GETSTATE)
sender.expectMsgType[State] == CLOSED
}, max = 20 seconds, interval = 1 second)
awaitAnnouncements(nodes.filterKeys(_ == "A").toMap, 5, 7, 16)
@ -992,7 +991,7 @@ class IntegrationSpec extends TestKitBaseClass with BitcoindService with AnyFunS
peers.head ! Peer.Disconnect(nodes("C").nodeParams.nodeId)
// we then wait for F to be in disconnected state
awaitCond({
sender.send(nodes(nodeF).register, Forward(channelId, CMD_GETSTATE))
sender.send(nodes(nodeF).register, Register.Forward(sender.ref, channelId, CMD_GETSTATE))
sender.expectMsgType[State] == OFFLINE
}, max = 20 seconds, interval = 1 second)
}
@ -1027,11 +1026,11 @@ class IntegrationSpec extends TestKitBaseClass with BitcoindService with AnyFunS
// F gets the htlc
val htlc = htlcReceiver.expectMsgType[IncomingPacket.FinalPacket].add
// now that we have the channel id, we retrieve channels default final addresses
sender.send(nodes("C").register, Forward(htlc.channelId, CMD_GETSTATEDATA))
sender.send(nodes("C").register, Register.Forward(sender.ref, htlc.channelId, CMD_GETSTATEDATA))
val dataC = sender.expectMsgType[DATA_NORMAL]
assert(dataC.commitments.commitmentFormat === commitmentFormat)
val finalAddressC = scriptPubKeyToAddress(dataC.commitments.localParams.defaultFinalScriptPubKey)
sender.send(nodes(nodeF).register, Forward(htlc.channelId, CMD_GETSTATEDATA))
sender.send(nodes(nodeF).register, Register.Forward(sender.ref, htlc.channelId, CMD_GETSTATEDATA))
val dataF = sender.expectMsgType[DATA_NORMAL]
assert(dataF.commitments.commitmentFormat === commitmentFormat)
val finalAddressF = scriptPubKeyToAddress(dataF.commitments.localParams.defaultFinalScriptPubKey)
@ -1047,17 +1046,17 @@ class IntegrationSpec extends TestKitBaseClass with BitcoindService with AnyFunS
// we then kill the connection between C and F
disconnectCF(nodeF, htlc.channelId, sender)
// we then have C unilaterally close the channel (which will make F redeem the htlc onchain)
sender.send(nodes("C").register, Forward(htlc.channelId, CMD_FORCECLOSE))
sender.send(nodes("C").register, Register.Forward(sender.ref, htlc.channelId, CMD_FORCECLOSE))
sender.expectMsg(ChannelCommandResponse.Ok)
// we then wait for F to detect the unilateral close and go to CLOSING state
awaitCond({
sender.send(nodes(nodeF).register, Forward(htlc.channelId, CMD_GETSTATE))
sender.send(nodes(nodeF).register, Register.Forward(sender.ref, htlc.channelId, CMD_GETSTATE))
sender.expectMsgType[State] == CLOSING
}, max = 20 seconds, interval = 1 second)
// we generate a few blocks to get the commit tx confirmed
generateBlocks(bitcoincli, 3, Some(minerAddress))
// we then fulfill the htlc, which will make F redeem it on-chain
sender.send(nodes(nodeF).register, Forward(htlc.channelId, CMD_FULFILL_HTLC(htlc.id, preimage)))
sender.send(nodes(nodeF).register, Register.Forward(sender.ref, htlc.channelId, CMD_FULFILL_HTLC(htlc.id, preimage)))
// we don't need to generate blocks to confirm the htlc-success; C should extract the preimage as soon as it enters
// the mempool and fulfill the payment upstream.
paymentSender.expectMsgType[PaymentSent](30 seconds)
@ -1075,7 +1074,7 @@ class IntegrationSpec extends TestKitBaseClass with BitcoindService with AnyFunS
// and we wait for the channel to close
awaitCond(stateListener.expectMsgType[ChannelStateChanged].currentState == CLOSED, max = 30 seconds)
awaitCond({
sender.send(nodes(nodeF).register, Forward(htlc.channelId, CMD_GETSTATE))
sender.send(nodes(nodeF).register, Register.Forward(sender.ref, htlc.channelId, CMD_GETSTATE))
sender.expectMsgType[State] == CLOSED
}, max = 20 seconds, interval = 1 second)
awaitAnnouncements(nodes.filterKeys(_ == "A").toMap, 5, 7, 16)
@ -1094,13 +1093,13 @@ class IntegrationSpec extends TestKitBaseClass with BitcoindService with AnyFunS
// we then kill the connection between C and F
disconnectCF(nodeF, htlc.channelId, sender)
// then we have F unilaterally close the channel
sender.send(nodes(nodeF).register, Forward(htlc.channelId, CMD_FORCECLOSE))
sender.send(nodes(nodeF).register, Register.Forward(sender.ref, htlc.channelId, CMD_FORCECLOSE))
sender.expectMsg(ChannelCommandResponse.Ok)
awaitCond(stateListener.expectMsgType[ChannelStateChanged].currentState == CLOSING, max = 30 seconds)
// we generate a few blocks to get the commit tx confirmed
generateBlocks(bitcoincli, 3, Some(minerAddress))
// we then fulfill the htlc (it won't be sent to C, and will be used to pull funds on-chain)
sender.send(nodes(nodeF).register, Forward(htlc.channelId, CMD_FULFILL_HTLC(htlc.id, preimage)))
sender.send(nodes(nodeF).register, Register.Forward(sender.ref, htlc.channelId, CMD_FULFILL_HTLC(htlc.id, preimage)))
// we don't need to generate blocks to confirm the htlc-success; C should extract the preimage as soon as it enters
// the mempool and fulfill the payment upstream.
paymentSender.expectMsgType[PaymentSent](30 seconds)
@ -1118,7 +1117,7 @@ class IntegrationSpec extends TestKitBaseClass with BitcoindService with AnyFunS
// and we wait for the channel to close
awaitCond(stateListener.expectMsgType[ChannelStateChanged].currentState == CLOSED, max = 30 seconds)
awaitCond({
sender.send(nodes(nodeF).register, Forward(htlc.channelId, CMD_GETSTATE))
sender.send(nodes(nodeF).register, Register.Forward(sender.ref, htlc.channelId, CMD_GETSTATE))
sender.expectMsgType[State] == CLOSED
}, max = 20 seconds, interval = 1 second)
awaitAnnouncements(nodes.filterKeys(_ == "A").toMap, 5, 7, 16)
@ -1137,7 +1136,7 @@ class IntegrationSpec extends TestKitBaseClass with BitcoindService with AnyFunS
// we generate enough blocks to reach the htlc timeout
generateBlocks(bitcoincli, (htlc.cltvExpiry.toLong - getBlockCount).toInt, Some(minerAddress))
awaitCond(stateListener.expectMsgType[ChannelStateChanged].currentState == CLOSING, max = 30 seconds)
sender.send(nodes("C").register, Forward(htlc.channelId, CMD_GETSTATEDATA))
sender.send(nodes("C").register, Register.Forward(sender.ref, htlc.channelId, CMD_GETSTATEDATA))
val Some(localCommit) = sender.expectMsgType[DATA_CLOSING].localCommitPublished
// we wait until the commit tx has been broadcast
val bitcoinClient = new ExtendedBitcoinClient(bitcoinrpcclient)
@ -1168,7 +1167,7 @@ class IntegrationSpec extends TestKitBaseClass with BitcoindService with AnyFunS
// and we wait for the channel to close
awaitCond(stateListener.expectMsgType[ChannelStateChanged].currentState == CLOSED, max = 30 seconds)
awaitCond({
sender.send(nodes(nodeF).register, Forward(htlc.channelId, CMD_GETSTATE))
sender.send(nodes(nodeF).register, Register.Forward(sender.ref, htlc.channelId, CMD_GETSTATE))
sender.expectMsgType[State] == CLOSED
}, max = 20 seconds, interval = 1 second)
awaitAnnouncements(nodes.filterKeys(_ == "A").toMap, 5, 7, 16)
@ -1185,17 +1184,17 @@ class IntegrationSpec extends TestKitBaseClass with BitcoindService with AnyFunS
// we retrieve transactions already received so that we don't take them into account when evaluating the outcome of this test
val previouslyReceivedByC = listReceivedByAddress(finalAddressC, sender)
// we ask F to unilaterally close the channel
sender.send(nodes(nodeF).register, Forward(htlc.channelId, CMD_FORCECLOSE))
sender.send(nodes(nodeF).register, Register.Forward(sender.ref, htlc.channelId, CMD_FORCECLOSE))
sender.expectMsg(ChannelCommandResponse.Ok)
// we wait for C to detect the unilateral close
awaitCond({
sender.send(nodes("C").register, Forward(htlc.channelId, CMD_GETSTATEDATA))
sender.send(nodes("C").register, Register.Forward(sender.ref, htlc.channelId, CMD_GETSTATEDATA))
sender.expectMsgType[Data] match {
case d: DATA_CLOSING if d.remoteCommitPublished.nonEmpty => true
case _ => false
}
}, max = 30 seconds, interval = 1 second)
sender.send(nodes("C").register, Forward(htlc.channelId, CMD_GETSTATEDATA))
sender.send(nodes("C").register, Register.Forward(sender.ref, htlc.channelId, CMD_GETSTATEDATA))
val Some(remoteCommit) = sender.expectMsgType[DATA_CLOSING].remoteCommitPublished
// we generate enough blocks to make the htlc timeout
generateBlocks(bitcoincli, (htlc.cltvExpiry.toLong - getBlockCount).toInt, Some(minerAddress))
@ -1224,7 +1223,7 @@ class IntegrationSpec extends TestKitBaseClass with BitcoindService with AnyFunS
// and we wait for the channel to close
awaitCond(stateListener.expectMsgType[ChannelStateChanged].currentState == CLOSED, max = 30 seconds)
awaitCond({
sender.send(nodes(nodeF).register, Forward(htlc.channelId, CMD_GETSTATE))
sender.send(nodes(nodeF).register, Register.Forward(sender.ref, htlc.channelId, CMD_GETSTATE))
sender.expectMsgType[State] == CLOSED
}, max = 20 seconds, interval = 1 second)
awaitAnnouncements(nodes.filterKeys(_ == "A").toMap, 5, 7, 16)
@ -1315,7 +1314,7 @@ class IntegrationSpec extends TestKitBaseClass with BitcoindService with AnyFunS
// we then generate blocks to make htlcs timeout (nothing will happen in the channel because all of them have already been fulfilled)
generateBlocks(bitcoincli, 40)
// we retrieve C's default final address
sender.send(nodes("C").register, Forward(commitmentsF.channelId, CMD_GETSTATEDATA))
sender.send(nodes("C").register, Register.Forward(sender.ref, commitmentsF.channelId, CMD_GETSTATEDATA))
val finalAddressC = scriptPubKeyToAddress(sender.expectMsgType[DATA_NORMAL].commitments.localParams.defaultFinalScriptPubKey)
// we prepare the revoked transactions F will publish
val revokedCommitTx = localCommitF.commitTx.tx

View file

@ -369,8 +369,8 @@ class MultiPartHandlerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
val commands = f.register.expectMsgType[Register.Forward[CMD_FAIL_HTLC]] :: f.register.expectMsgType[Register.Forward[CMD_FAIL_HTLC]] :: Nil
assert(commands.toSet === Set(
Register.Forward(ByteVector32.One, CMD_FAIL_HTLC(0, Right(PaymentTimeout), commit = true)),
Register.Forward(ByteVector32.One, CMD_FAIL_HTLC(1, Right(PaymentTimeout), commit = true))
Register.Forward(handler, ByteVector32.One, CMD_FAIL_HTLC(0, Right(PaymentTimeout), commit = true)),
Register.Forward(handler, ByteVector32.One, CMD_FAIL_HTLC(1, Right(PaymentTimeout), commit = true))
))
awaitCond({
f.sender.send(handler, GetPendingPayments)
@ -379,7 +379,7 @@ class MultiPartHandlerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
// Extraneous HTLCs should be failed.
f.sender.send(handler, MultiPartPaymentFSM.ExtraPaymentReceived(pr1.paymentHash, HtlcPart(1000 msat, UpdateAddHtlc(ByteVector32.One, 42, 200 msat, pr1.paymentHash, add1.cltvExpiry, add1.onionRoutingPacket)), Some(PaymentTimeout)))
f.register.expectMsg(Register.Forward(ByteVector32.One, CMD_FAIL_HTLC(42, Right(PaymentTimeout), commit = true)))
f.register.expectMsg(Register.Forward(handler, ByteVector32.One, CMD_FAIL_HTLC(42, Right(PaymentTimeout), commit = true)))
// The payment should still be pending in DB.
val Some(incomingPayment) = nodeParams.db.payments.getIncomingPayment(pr1.paymentHash)
@ -401,12 +401,12 @@ class MultiPartHandlerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
val add3 = add2.copy(id = 43)
f.sender.send(handler, IncomingPacket.FinalPacket(add3, Onion.createMultiPartPayload(add3.amountMsat, 1000 msat, add3.cltvExpiry, pr.paymentSecret.get)))
f.register.expectMsg(Register.Forward(add2.channelId, CMD_FAIL_HTLC(add2.id, Right(IncorrectOrUnknownPaymentDetails(1000 msat, nodeParams.currentBlockHeight)), commit = true)))
f.register.expectMsg(Register.Forward(handler, add2.channelId, CMD_FAIL_HTLC(add2.id, Right(IncorrectOrUnknownPaymentDetails(1000 msat, nodeParams.currentBlockHeight)), commit = true)))
val cmd1 = f.register.expectMsgType[Register.Forward[CMD_FULFILL_HTLC]]
assert(cmd1.message.id === add1.id)
assert(cmd1.channelId === add1.channelId)
assert(Crypto.sha256(cmd1.message.r) === pr.paymentHash)
f.register.expectMsg(Register.Forward(add3.channelId, CMD_FULFILL_HTLC(add3.id, cmd1.message.r, commit = true)))
f.register.expectMsg(Register.Forward(handler, add3.channelId, CMD_FULFILL_HTLC(add3.id, cmd1.message.r, commit = true)))
val paymentReceived = f.eventListener.expectMsgType[PaymentReceived]
assert(paymentReceived.copy(parts = paymentReceived.parts.map(_.copy(timestamp = 0))) === PaymentReceived(pr.paymentHash, PartialPayment(800 msat, ByteVector32.One, 0) :: PartialPayment(200 msat, ByteVector32.Zeroes, 0) :: Nil))
@ -420,7 +420,7 @@ class MultiPartHandlerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
// Extraneous HTLCs should be fulfilled.
f.sender.send(handler, MultiPartPaymentFSM.ExtraPaymentReceived(pr.paymentHash, HtlcPart(1000 msat, UpdateAddHtlc(ByteVector32.One, 44, 200 msat, pr.paymentHash, add1.cltvExpiry, add1.onionRoutingPacket)), None))
f.register.expectMsg(Register.Forward(ByteVector32.One, CMD_FULFILL_HTLC(44, cmd1.message.r, commit = true)))
f.register.expectMsg(Register.Forward(handler, ByteVector32.One, CMD_FULFILL_HTLC(44, cmd1.message.r, commit = true)))
assert(f.eventListener.expectMsgType[PaymentReceived].amount === 200.msat)
val received2 = nodeParams.db.payments.getIncomingPayment(pr.paymentHash)
assert(received2.get.status.asInstanceOf[IncomingPaymentStatus.Received].amount === 1200.msat)
@ -439,7 +439,7 @@ class MultiPartHandlerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
val add1 = UpdateAddHtlc(ByteVector32.One, 0, 800 msat, pr.paymentHash, f.defaultExpiry, TestConstants.emptyOnionPacket)
f.sender.send(handler, IncomingPacket.FinalPacket(add1, Onion.createMultiPartPayload(add1.amountMsat, 1000 msat, add1.cltvExpiry, pr.paymentSecret.get)))
f.register.expectMsg(Register.Forward(ByteVector32.One, CMD_FAIL_HTLC(0, Right(PaymentTimeout), commit = true)))
f.register.expectMsg(Register.Forward(handler, ByteVector32.One, CMD_FAIL_HTLC(0, Right(PaymentTimeout), commit = true)))
awaitCond({
f.sender.send(handler, GetPendingPayments)
f.sender.expectMsgType[PendingPayments].paymentHashes.isEmpty
@ -454,7 +454,7 @@ class MultiPartHandlerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
assert(cmd1.channelId === add2.channelId)
assert(cmd1.message.id === 2)
assert(Crypto.sha256(cmd1.message.r) === pr.paymentHash)
f.register.expectMsg(Register.Forward(add3.channelId, CMD_FULFILL_HTLC(5, cmd1.message.r, commit = true)))
f.register.expectMsg(Register.Forward(handler, add3.channelId, CMD_FULFILL_HTLC(5, cmd1.message.r, commit = true)))
val paymentReceived = f.eventListener.expectMsgType[PaymentReceived]
assert(paymentReceived.copy(parts = paymentReceived.parts.map(_.copy(timestamp = 0))) === PaymentReceived(pr.paymentHash, PartialPayment(300 msat, ByteVector32.One, 0) :: PartialPayment(700 msat, ByteVector32.Zeroes, 0) :: Nil))
@ -501,7 +501,7 @@ class MultiPartHandlerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
val add = UpdateAddHtlc(ByteVector32.One, 0, amountMsat, paymentHash, defaultExpiry, TestConstants.emptyOnionPacket)
sender.send(normalHandler, IncomingPacket.FinalPacket(add, payload))
f.register.expectMsg(Register.Forward(add.channelId, CMD_FAIL_HTLC(add.id, Right(IncorrectOrUnknownPaymentDetails(42000 msat, nodeParams.currentBlockHeight)), commit = true)))
f.register.expectMsg(Register.Forward(normalHandler, add.channelId, CMD_FAIL_HTLC(add.id, Right(IncorrectOrUnknownPaymentDetails(42000 msat, nodeParams.currentBlockHeight)), commit = true)))
assert(nodeParams.db.payments.getIncomingPayment(paymentHash) === None)
}
}

View file

@ -78,7 +78,7 @@ class NodeRelayerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike {
val parts = incomingMultiPart.dropRight(1).map(i => MultiPartPaymentFSM.HtlcPart(incomingAmount, i.add))
sender.send(nodeRelayer, MultiPartPaymentFSM.MultiPartPaymentFailed(paymentHash, PaymentTimeout, Queue(parts: _*)))
incomingMultiPart.dropRight(1).foreach(p => register.expectMsg(Register.Forward(p.add.channelId, CMD_FAIL_HTLC(p.add.id, Right(PaymentTimeout), commit = true))))
incomingMultiPart.dropRight(1).foreach(p => register.expectMsg(Register.Forward(nodeRelayer, p.add.channelId, CMD_FAIL_HTLC(p.add.id, Right(PaymentTimeout), commit = true))))
sender.expectNoMsg(100 millis)
outgoingPayFSM.expectNoMsg(100 millis)
}
@ -90,7 +90,7 @@ class NodeRelayerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike {
val partial = MultiPartPaymentFSM.HtlcPart(incomingAmount, UpdateAddHtlc(randomBytes32, 15, 100 msat, paymentHash, CltvExpiry(42000), TestConstants.emptyOnionPacket))
sender.send(nodeRelayer, MultiPartPaymentFSM.ExtraPaymentReceived(paymentHash, partial, Some(InvalidRealm)))
register.expectMsg(Register.Forward(partial.htlc.channelId, CMD_FAIL_HTLC(partial.htlc.id, Right(InvalidRealm), commit = true)))
register.expectMsg(Register.Forward(nodeRelayer, partial.htlc.channelId, CMD_FAIL_HTLC(partial.htlc.id, Right(InvalidRealm), commit = true)))
sender.expectNoMsg(100 millis)
outgoingPayFSM.expectNoMsg(100 millis)
}
@ -112,7 +112,7 @@ class NodeRelayerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike {
Onion.createNodeRelayPayload(outgoingAmount, outgoingExpiry, outgoingNodeId),
nextTrampolinePacket)
relayer.send(nodeRelayer, i1)
register.expectMsg(Register.Forward(i1.add.channelId, CMD_FAIL_HTLC(i1.add.id, Right(IncorrectOrUnknownPaymentDetails(1000 msat, nodeParams.currentBlockHeight)), commit = true)))
register.expectMsg(Register.Forward(nodeRelayer, i1.add.channelId, CMD_FAIL_HTLC(i1.add.id, Right(IncorrectOrUnknownPaymentDetails(1000 msat, nodeParams.currentBlockHeight)), commit = true)))
// Receive new HTLC with different details, but for the same payment hash.
val i2 = IncomingPacket.NodeRelayPacket(
@ -121,7 +121,7 @@ class NodeRelayerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike {
Onion.createNodeRelayPayload(1250 msat, outgoingExpiry, outgoingNodeId),
nextTrampolinePacket)
relayer.send(nodeRelayer, i2)
register.expectMsg(Register.Forward(i2.add.channelId, CMD_FAIL_HTLC(i2.add.id, Right(IncorrectOrUnknownPaymentDetails(1500 msat, nodeParams.currentBlockHeight)), commit = true)))
register.expectMsg(Register.Forward(nodeRelayer, i2.add.channelId, CMD_FAIL_HTLC(i2.add.id, Right(IncorrectOrUnknownPaymentDetails(1500 msat, nodeParams.currentBlockHeight)), commit = true)))
outgoingPayFSM.expectNoMsg(100 millis)
}
@ -135,7 +135,7 @@ class NodeRelayerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike {
relayer.send(nodeRelayer, p)
val failure = IncorrectOrUnknownPaymentDetails(2000000 msat, nodeParams.currentBlockHeight)
register.expectMsg(Register.Forward(p.add.channelId, CMD_FAIL_HTLC(p.add.id, Right(failure), commit = true)))
register.expectMsg(Register.Forward(nodeRelayer, p.add.channelId, CMD_FAIL_HTLC(p.add.id, Right(failure), commit = true)))
outgoingPayFSM.expectNoMsg(100 millis)
}
@ -150,7 +150,7 @@ class NodeRelayerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike {
relayer.send(nodeRelayer, p2)
val failure = IncorrectOrUnknownPaymentDetails(1000000 msat, nodeParams.currentBlockHeight)
register.expectMsg(Register.Forward(p2.add.channelId, CMD_FAIL_HTLC(p2.add.id, Right(failure), commit = true)))
register.expectMsg(Register.Forward(nodeRelayer, p2.add.channelId, CMD_FAIL_HTLC(p2.add.id, Right(failure), commit = true)))
register.expectNoMsg(100 millis)
outgoingPayFSM.expectNoMsg(100 millis)
}
@ -163,7 +163,7 @@ class NodeRelayerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike {
val p = createValidIncomingPacket(2000000 msat, 2000000 msat, expiryIn, 1000000 msat, expiryOut)
relayer.send(nodeRelayer, p)
register.expectMsg(Register.Forward(p.add.channelId, CMD_FAIL_HTLC(p.add.id, Right(TrampolineExpiryTooSoon), commit = true)))
register.expectMsg(Register.Forward(nodeRelayer, p.add.channelId, CMD_FAIL_HTLC(p.add.id, Right(TrampolineExpiryTooSoon), commit = true)))
register.expectNoMsg(100 millis)
outgoingPayFSM.expectNoMsg(100 millis)
}
@ -180,7 +180,7 @@ class NodeRelayerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike {
)
p.foreach(p => relayer.send(nodeRelayer, p))
p.foreach(p => register.expectMsg(Register.Forward(p.add.channelId, CMD_FAIL_HTLC(p.add.id, Right(TrampolineExpiryTooSoon), commit = true))))
p.foreach(p => register.expectMsg(Register.Forward(nodeRelayer, p.add.channelId, CMD_FAIL_HTLC(p.add.id, Right(TrampolineExpiryTooSoon), commit = true))))
register.expectNoMsg(100 millis)
outgoingPayFSM.expectNoMsg(100 millis)
}
@ -191,7 +191,7 @@ class NodeRelayerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike {
val p = createValidIncomingPacket(2000000 msat, 2000000 msat, CltvExpiry(500000), 1999000 msat, CltvExpiry(490000))
relayer.send(nodeRelayer, p)
register.expectMsg(Register.Forward(p.add.channelId, CMD_FAIL_HTLC(p.add.id, Right(TrampolineFeeInsufficient), commit = true)))
register.expectMsg(Register.Forward(nodeRelayer, p.add.channelId, CMD_FAIL_HTLC(p.add.id, Right(TrampolineFeeInsufficient), commit = true)))
register.expectNoMsg(100 millis)
outgoingPayFSM.expectNoMsg(100 millis)
}
@ -205,7 +205,7 @@ class NodeRelayerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike {
)
p.foreach(p => relayer.send(nodeRelayer, p))
p.foreach(p => register.expectMsg(Register.Forward(p.add.channelId, CMD_FAIL_HTLC(p.add.id, Right(TrampolineFeeInsufficient), commit = true))))
p.foreach(p => register.expectMsg(Register.Forward(nodeRelayer, p.add.channelId, CMD_FAIL_HTLC(p.add.id, Right(TrampolineFeeInsufficient), commit = true))))
register.expectNoMsg(100 millis)
outgoingPayFSM.expectNoMsg(100 millis)
}
@ -219,7 +219,7 @@ class NodeRelayerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike {
outgoingPayFSM.expectMsgType[SendMultiPartPayment]
outgoingPayFSM.send(nodeRelayer, PaymentFailed(outgoingPaymentId, paymentHash, LocalFailure(Nil, BalanceTooLow) :: Nil))
incomingMultiPart.foreach(p => register.expectMsg(Register.Forward(p.add.channelId, CMD_FAIL_HTLC(p.add.id, Right(TemporaryNodeFailure), commit = true))))
incomingMultiPart.foreach(p => register.expectMsg(Register.Forward(nodeRelayer, p.add.channelId, CMD_FAIL_HTLC(p.add.id, Right(TemporaryNodeFailure), commit = true))))
register.expectNoMsg(100 millis)
eventListener.expectNoMsg(100 millis)
}
@ -235,7 +235,7 @@ class NodeRelayerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike {
// If we're having a hard time finding routes, raising the fee/cltv will likely help.
val failures = LocalFailure(Nil, RouteNotFound) :: RemoteFailure(Nil, Sphinx.DecryptedFailurePacket(outgoingNodeId, PermanentNodeFailure)) :: LocalFailure(Nil, RouteNotFound) :: Nil
outgoingPayFSM.send(nodeRelayer, PaymentFailed(outgoingPaymentId, paymentHash, failures))
incomingMultiPart.foreach(p => register.expectMsg(Register.Forward(p.add.channelId, CMD_FAIL_HTLC(p.add.id, Right(TrampolineFeeInsufficient), commit = true))))
incomingMultiPart.foreach(p => register.expectMsg(Register.Forward(nodeRelayer, p.add.channelId, CMD_FAIL_HTLC(p.add.id, Right(TrampolineFeeInsufficient), commit = true))))
register.expectNoMsg(100 millis)
eventListener.expectNoMsg(100 millis)
}
@ -250,7 +250,7 @@ class NodeRelayerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike {
val failures = RemoteFailure(Nil, Sphinx.DecryptedFailurePacket(outgoingNodeId, FinalIncorrectHtlcAmount(42 msat))) :: UnreadableRemoteFailure(Nil) :: Nil
outgoingPayFSM.send(nodeRelayer, PaymentFailed(outgoingPaymentId, paymentHash, failures))
incomingMultiPart.foreach(p => register.expectMsg(Register.Forward(p.add.channelId, CMD_FAIL_HTLC(p.add.id, Right(FinalIncorrectHtlcAmount(42 msat)), commit = true))))
incomingMultiPart.foreach(p => register.expectMsg(Register.Forward(nodeRelayer, p.add.channelId, CMD_FAIL_HTLC(p.add.id, Right(FinalIncorrectHtlcAmount(42 msat)), commit = true))))
register.expectNoMsg(100 millis)
eventListener.expectNoMsg(100 millis)
}
@ -282,7 +282,7 @@ class NodeRelayerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike {
// A first downstream HTLC is fulfilled: we should immediately forward the fulfill upstream.
outgoingPayFSM.send(nodeRelayer, PreimageReceived(paymentHash, paymentPreimage))
incomingMultiPart.foreach(p => register.expectMsg(Register.Forward(p.add.channelId, CMD_FULFILL_HTLC(p.add.id, paymentPreimage, commit = true))))
incomingMultiPart.foreach(p => register.expectMsg(Register.Forward(nodeRelayer, p.add.channelId, CMD_FULFILL_HTLC(p.add.id, paymentPreimage, commit = true))))
// If the payment FSM sends us duplicate preimage events, we should not fulfill a second time upstream.
outgoingPayFSM.send(nodeRelayer, PreimageReceived(paymentHash, paymentPreimage))
@ -310,7 +310,7 @@ class NodeRelayerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike {
outgoingPayFSM.send(nodeRelayer, PreimageReceived(paymentHash, paymentPreimage))
val incomingAdd = incomingSinglePart.add
register.expectMsg(Register.Forward(incomingAdd.channelId, CMD_FULFILL_HTLC(incomingAdd.id, paymentPreimage, commit = true)))
register.expectMsg(Register.Forward(nodeRelayer, incomingAdd.channelId, CMD_FULFILL_HTLC(incomingAdd.id, paymentPreimage, commit = true)))
outgoingPayFSM.send(nodeRelayer, createSuccessEvent(outgoingCfg.id))
val relayEvent = eventListener.expectMsgType[TrampolinePaymentRelayed]
@ -343,7 +343,7 @@ class NodeRelayerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike {
assert(outgoingPayment.assistedRoutes === hints)
outgoingPayFSM.send(nodeRelayer, PreimageReceived(paymentHash, paymentPreimage))
incomingMultiPart.foreach(p => register.expectMsg(Register.Forward(p.add.channelId, CMD_FULFILL_HTLC(p.add.id, paymentPreimage, commit = true))))
incomingMultiPart.foreach(p => register.expectMsg(Register.Forward(nodeRelayer, p.add.channelId, CMD_FULFILL_HTLC(p.add.id, paymentPreimage, commit = true))))
outgoingPayFSM.send(nodeRelayer, createSuccessEvent(outgoingCfg.id))
val relayEvent = eventListener.expectMsgType[TrampolinePaymentRelayed]
@ -373,7 +373,7 @@ class NodeRelayerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike {
assert(outgoingPayment.assistedRoutes === hints)
outgoingPayFSM.send(nodeRelayer, PreimageReceived(paymentHash, paymentPreimage))
incomingMultiPart.foreach(p => register.expectMsg(Register.Forward(p.add.channelId, CMD_FULFILL_HTLC(p.add.id, paymentPreimage, commit = true))))
incomingMultiPart.foreach(p => register.expectMsg(Register.Forward(nodeRelayer, p.add.channelId, CMD_FULFILL_HTLC(p.add.id, paymentPreimage, commit = true))))
outgoingPayFSM.send(nodeRelayer, createSuccessEvent(outgoingCfg.id))
val relayEvent = eventListener.expectMsgType[TrampolinePaymentRelayed]

View file

@ -217,7 +217,7 @@ class PaymentLifecycleSpec extends BaseRouterSpec {
val WaitingForComplete(_, _, cmd1, Nil, _, ignore1, route) = paymentFSM.stateData
assert(ignore1.nodes.isEmpty)
register.expectMsg(ForwardShortId(channelId_ab, cmd1))
register.expectMsg(ForwardShortId(paymentFSM, channelId_ab, cmd1))
sender.send(paymentFSM, Relayer.ForwardRemoteFail(UpdateFailHtlc(ByteVector32.Zeroes, 0, randomBytes32), defaultOrigin, UpdateAddHtlc(ByteVector32.Zeroes, 0, defaultAmountMsat, defaultPaymentHash, defaultExpiry, TestConstants.emptyOnionPacket))) // unparsable message
// then the payment lifecycle will ask for a new route excluding all intermediate nodes
@ -229,7 +229,7 @@ class PaymentLifecycleSpec extends BaseRouterSpec {
val WaitingForComplete(_, _, cmd2, _, _, ignore2, _) = paymentFSM.stateData
assert(ignore2.nodes === Set(c))
// and reply a 2nd time with an unparsable failure
register.expectMsg(ForwardShortId(channelId_ab, cmd2))
register.expectMsg(ForwardShortId(paymentFSM, channelId_ab, cmd2))
sender.send(paymentFSM, UpdateFailHtlc(ByteVector32.Zeroes, 0, defaultPaymentHash)) // unparsable message
// we allow 2 tries, so we send a 2nd request to the router
@ -252,7 +252,7 @@ class PaymentLifecycleSpec extends BaseRouterSpec {
awaitCond(paymentFSM.stateName == WAITING_FOR_PAYMENT_COMPLETE)
val WaitingForComplete(_, _, cmd1, Nil, _, _, _) = paymentFSM.stateData
register.expectMsg(ForwardShortId(channelId_ab, cmd1))
register.expectMsg(ForwardShortId(paymentFSM, channelId_ab, cmd1))
sender.send(paymentFSM, Status.Failure(AddHtlcFailed(ByteVector32.Zeroes, defaultPaymentHash, ChannelUnavailable(ByteVector32.Zeroes), Local(id, Some(paymentFSM.underlying.self)), None, None)))
// then the payment lifecycle will ask for a new route excluding the channel
@ -275,7 +275,7 @@ class PaymentLifecycleSpec extends BaseRouterSpec {
awaitCond(paymentFSM.stateName == WAITING_FOR_PAYMENT_COMPLETE)
val WaitingForComplete(_, _, cmd1, Nil, _, _, _) = paymentFSM.stateData
register.expectMsg(ForwardShortId(channelId_ab, cmd1))
register.expectMsg(ForwardShortId(paymentFSM, channelId_ab, cmd1))
sender.send(paymentFSM, UpdateFailMalformedHtlc(ByteVector32.Zeroes, 0, randomBytes32, FailureMessageCodecs.BADONION))
// then the payment lifecycle will ask for a new route excluding the channel
@ -297,7 +297,7 @@ class PaymentLifecycleSpec extends BaseRouterSpec {
awaitCond(paymentFSM.stateName == WAITING_FOR_PAYMENT_COMPLETE)
val WaitingForComplete(_, _, cmd1, Nil, sharedSecrets1, _, route) = paymentFSM.stateData
register.expectMsg(ForwardShortId(channelId_ab, cmd1))
register.expectMsg(ForwardShortId(paymentFSM, channelId_ab, cmd1))
val failure = TemporaryChannelFailure(update_bc)
sender.send(paymentFSM, UpdateFailHtlc(ByteVector32.Zeroes, 0, Sphinx.FailurePacket.create(sharedSecrets1.head._1, failure)))
@ -327,7 +327,7 @@ class PaymentLifecycleSpec extends BaseRouterSpec {
routerForwarder.forward(routerFixture.router)
awaitCond(paymentFSM.stateName == WAITING_FOR_PAYMENT_COMPLETE)
val WaitingForComplete(_, _, cmd1, Nil, sharedSecrets1, _, route1) = paymentFSM.stateData
register.expectMsg(ForwardShortId(channelId_ab, cmd1))
register.expectMsg(ForwardShortId(paymentFSM, channelId_ab, cmd1))
// we change the cltv expiry
val channelUpdate_bc_modified = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_b, c, channelId_bc, CltvExpiryDelta(42), htlcMinimumMsat = update_bc.htlcMinimumMsat, feeBaseMsat = update_bc.feeBaseMsat, feeProportionalMillionths = update_bc.feeProportionalMillionths, htlcMaximumMsat = update_bc.htlcMaximumMsat.get)
@ -344,7 +344,7 @@ class PaymentLifecycleSpec extends BaseRouterSpec {
// router answers with a new route, taking into account the new update
awaitCond(paymentFSM.stateName == WAITING_FOR_PAYMENT_COMPLETE)
val WaitingForComplete(_, _, cmd2, _, sharedSecrets2, _, route2) = paymentFSM.stateData
register.expectMsg(ForwardShortId(channelId_ab, cmd2))
register.expectMsg(ForwardShortId(paymentFSM, channelId_ab, cmd2))
// we change the cltv expiry one more time
val channelUpdate_bc_modified_2 = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_b, c, channelId_bc, CltvExpiryDelta(43), htlcMinimumMsat = update_bc.htlcMinimumMsat, feeBaseMsat = update_bc.feeBaseMsat, feeProportionalMillionths = update_bc.feeProportionalMillionths, htlcMaximumMsat = update_bc.htlcMaximumMsat.get)
@ -377,7 +377,7 @@ class PaymentLifecycleSpec extends BaseRouterSpec {
routerForwarder.forward(routerFixture.router)
awaitCond(paymentFSM.stateName == WAITING_FOR_PAYMENT_COMPLETE)
val WaitingForComplete(_, _, cmd1, Nil, sharedSecrets1, _, _) = paymentFSM.stateData
register.expectMsg(ForwardShortId(channelId_ab, cmd1))
register.expectMsg(ForwardShortId(paymentFSM, channelId_ab, cmd1))
// the node replies with a temporary failure containing the same update as the one we already have (likely a balance issue)
val failure = TemporaryChannelFailure(update_bc)
@ -410,7 +410,7 @@ class PaymentLifecycleSpec extends BaseRouterSpec {
routerForwarder.forward(routerFixture.router)
awaitCond(paymentFSM.stateName == WAITING_FOR_PAYMENT_COMPLETE)
val WaitingForComplete(_, _, cmd1, Nil, sharedSecrets1, _, _) = paymentFSM.stateData
register.expectMsg(ForwardShortId(channelId_ab, cmd1))
register.expectMsg(ForwardShortId(paymentFSM, channelId_ab, cmd1))
// we change the cltv expiry
val channelUpdate_bc_modified = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_b, c, channelId_bc, CltvExpiryDelta(42), htlcMinimumMsat = update_bc.htlcMinimumMsat, feeBaseMsat = update_bc.feeBaseMsat, feeProportionalMillionths = update_bc.feeProportionalMillionths, htlcMaximumMsat = update_bc.htlcMaximumMsat.get)
@ -431,7 +431,7 @@ class PaymentLifecycleSpec extends BaseRouterSpec {
// router answers with a new route, taking into account the new update
awaitCond(paymentFSM.stateName == WAITING_FOR_PAYMENT_COMPLETE)
val WaitingForComplete(_, _, cmd2, _, _, _, _) = paymentFSM.stateData
register.expectMsg(ForwardShortId(channelId_ab, cmd2))
register.expectMsg(ForwardShortId(paymentFSM, channelId_ab, cmd2))
assert(cmd2.cltvExpiry > cmd1.cltvExpiry)
}
@ -450,7 +450,7 @@ class PaymentLifecycleSpec extends BaseRouterSpec {
routerForwarder.forward(routerFixture.router)
awaitCond(paymentFSM.stateName == WAITING_FOR_PAYMENT_COMPLETE)
val WaitingForComplete(_, _, cmd1, Nil, sharedSecrets1, _, _) = paymentFSM.stateData
register.expectMsg(ForwardShortId(channelId_ab, cmd1))
register.expectMsg(ForwardShortId(paymentFSM, channelId_ab, cmd1))
// we disable the channel
val channelUpdate_cd_disabled = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_c, d, channelId_cd, CltvExpiryDelta(42), update_cd.htlcMinimumMsat, update_cd.feeBaseMsat, update_cd.feeProportionalMillionths, update_cd.htlcMaximumMsat.get, enable = false)
@ -477,7 +477,7 @@ class PaymentLifecycleSpec extends BaseRouterSpec {
awaitCond(paymentFSM.stateName == WAITING_FOR_PAYMENT_COMPLETE)
val WaitingForComplete(_, _, cmd1, Nil, sharedSecrets1, _, route1) = paymentFSM.stateData
register.expectMsg(ForwardShortId(channelId_ab, cmd1))
register.expectMsg(ForwardShortId(paymentFSM, channelId_ab, cmd1))
sender.send(paymentFSM, UpdateFailHtlc(ByteVector32.Zeroes, 0, Sphinx.FailurePacket.create(sharedSecrets1.head._1, failure)))
// payment lifecycle forwards the embedded channelUpdate to the router

View file

@ -403,12 +403,16 @@ class PostRestartHtlcCleanerSpec extends TestKitBaseClass with FixtureAnyFunSuit
channel_upstream_1.expectNoMsg(100 millis)
channel_upstream_2.expectNoMsg(100 millis)
// we need a reference to the post-htlc-restart child actor
sender.send(relayer, Relayer.GetChildActors(sender.ref))
val postRestartHtlcCleaner = sender.expectMsgType[Relayer.ChildActors].postRestartCleaner
// Payment 2 should fulfill once we receive the preimage.
val origin_2 = Origin.TrampolineRelayed(upstream_2.adds.map(add => (add.channelId, add.id)).toList, None)
sender.send(relayer, Relayer.ForwardOnChainFulfill(preimage2, origin_2, htlc_2_2))
register.expectMsgAllOf(
Register.Forward(channelId_ab_1, CMD_FULFILL_HTLC(5, preimage2, commit = true)),
Register.Forward(channelId_ab_2, CMD_FULFILL_HTLC(9, preimage2, commit = true))
Register.Forward(replyTo = postRestartHtlcCleaner, channelId_ab_1, CMD_FULFILL_HTLC(5, preimage2, commit = true)),
Register.Forward(replyTo = postRestartHtlcCleaner, channelId_ab_2, CMD_FULFILL_HTLC(9, preimage2, commit = true))
)
// Payment 3 should not be failed: we are still waiting for on-chain confirmation.
@ -422,11 +426,15 @@ class PostRestartHtlcCleanerSpec extends TestKitBaseClass with FixtureAnyFunSuit
val relayer = f.createRelayer()
register.expectNoMsg(100 millis)
// we need a reference to the post-htlc-restart child actor
sender.send(relayer, Relayer.GetChildActors(sender.ref))
val postRestartHtlcCleaner = sender.expectMsgType[Relayer.ChildActors].postRestartCleaner
// This downstream HTLC has two upstream HTLCs.
sender.send(relayer, buildForwardFail(testCase.downstream_1_1, testCase.upstream_1))
val fails = register.expectMsgType[Register.Forward[CMD_FAIL_HTLC]] :: register.expectMsgType[Register.Forward[CMD_FAIL_HTLC]] :: Nil
assert(fails.toSet === testCase.upstream_1.origins.map {
case (channelId, htlcId) => Register.Forward(channelId, CMD_FAIL_HTLC(htlcId, Right(TemporaryNodeFailure), commit = true))
case (channelId, htlcId) => Register.Forward(postRestartHtlcCleaner, channelId, CMD_FAIL_HTLC(htlcId, Right(TemporaryNodeFailure), commit = true))
}.toSet)
sender.send(relayer, buildForwardFail(testCase.downstream_1_1, testCase.upstream_1))
@ -438,7 +446,7 @@ class PostRestartHtlcCleanerSpec extends TestKitBaseClass with FixtureAnyFunSuit
sender.send(relayer, buildForwardFail(testCase.downstream_2_3, testCase.upstream_2))
register.expectMsg(testCase.upstream_2.origins.map {
case (channelId, htlcId) => Register.Forward(channelId, CMD_FAIL_HTLC(htlcId, Right(TemporaryNodeFailure), commit = true))
case (channelId, htlcId) => Register.Forward(postRestartHtlcCleaner, channelId, CMD_FAIL_HTLC(htlcId, Right(TemporaryNodeFailure), commit = true))
}.head)
register.expectNoMsg(100 millis)
@ -452,11 +460,15 @@ class PostRestartHtlcCleanerSpec extends TestKitBaseClass with FixtureAnyFunSuit
val relayer = f.createRelayer()
register.expectNoMsg(100 millis)
// we need a reference to the post-htlc-restart child actor
sender.send(relayer, Relayer.GetChildActors(sender.ref))
val postRestartHtlcCleaner = sender.expectMsgType[Relayer.ChildActors].postRestartCleaner
// This downstream HTLC has two upstream HTLCs.
sender.send(relayer, buildForwardFulfill(testCase.downstream_1_1, testCase.upstream_1, preimage1))
val fails = register.expectMsgType[Register.Forward[CMD_FULFILL_HTLC]] :: register.expectMsgType[Register.Forward[CMD_FULFILL_HTLC]] :: Nil
assert(fails.toSet === testCase.upstream_1.origins.map {
case (channelId, htlcId) => Register.Forward(channelId, CMD_FULFILL_HTLC(htlcId, preimage1, commit = true))
case (channelId, htlcId) => Register.Forward(postRestartHtlcCleaner, channelId, CMD_FULFILL_HTLC(htlcId, preimage1, commit = true))
}.toSet)
sender.send(relayer, buildForwardFulfill(testCase.downstream_1_1, testCase.upstream_1, preimage1))
@ -465,7 +477,7 @@ class PostRestartHtlcCleanerSpec extends TestKitBaseClass with FixtureAnyFunSuit
// This payment has 3 downstream HTLCs, but we should fulfill upstream as soon as we receive the preimage.
sender.send(relayer, buildForwardFulfill(testCase.downstream_2_1, testCase.upstream_2, preimage2))
register.expectMsg(testCase.upstream_2.origins.map {
case (channelId, htlcId) => Register.Forward(channelId, CMD_FULFILL_HTLC(htlcId, preimage2, commit = true))
case (channelId, htlcId) => Register.Forward(postRestartHtlcCleaner, channelId, CMD_FULFILL_HTLC(htlcId, preimage2, commit = true))
}.head)
sender.send(relayer, buildForwardFulfill(testCase.downstream_2_2, testCase.upstream_2, preimage2))
@ -481,11 +493,15 @@ class PostRestartHtlcCleanerSpec extends TestKitBaseClass with FixtureAnyFunSuit
val relayer = f.createRelayer()
register.expectNoMsg(100 millis)
// we need a reference to the post-htlc-restart child actor
sender.send(relayer, Relayer.GetChildActors(sender.ref))
val postRestartHtlcCleaner = sender.expectMsgType[Relayer.ChildActors].postRestartCleaner
sender.send(relayer, buildForwardFail(testCase.downstream_2_1, testCase.upstream_2))
sender.send(relayer, buildForwardFulfill(testCase.downstream_2_2, testCase.upstream_2, preimage2))
register.expectMsg(testCase.upstream_2.origins.map {
case (channelId, htlcId) => Register.Forward(channelId, CMD_FULFILL_HTLC(htlcId, preimage2, commit = true))
case (channelId, htlcId) => Register.Forward(postRestartHtlcCleaner, channelId, CMD_FULFILL_HTLC(htlcId, preimage2, commit = true))
}.head)
sender.send(relayer, buildForwardFail(testCase.downstream_2_3, testCase.upstream_2))
@ -576,12 +592,12 @@ object PostRestartHtlcCleanerSpec {
* - the first one has 2 upstream HTLCs and 1 downstream HTLC
* - the second one has 1 upstream HTLC and 3 downstream HTLCs
* - the third one has 2 downstream HTLCs temporarily stuck in closing channels: the upstream HTLCs have been
* correctly resolved when the channel went to closing, so we should ignore that payment (downstream will eventually
* settle on-chain).
* correctly resolved when the channel went to closing, so we should ignore that payment (downstream will eventually
* settle on-chain).
*
* We also setup one normal relayed payment:
* - the downstream HTLC is stuck in a closing channel: the upstream HTLC has been correctly resolved, so we should
* ignore that payment (downstream will eventually settle on-chain).
* ignore that payment (downstream will eventually settle on-chain).
*/
def setupTrampolinePayments(nodeParams: NodeParams): TrampolinePaymentTest = {
// Upstream HTLCs.

View file

@ -210,9 +210,13 @@ class RelayerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike {
val fulfill_ba = UpdateFulfillHtlc(channelId_bc, 72, paymentPreimage)
sender.send(relayer, ForwardRemoteFulfill(fulfill_ba, origin2, add_bc))
// we need a reference to the node-relayer child actor
sender.send(relayer, Relayer.GetChildActors(sender.ref))
val nodeRelayer = sender.expectMsgType[Relayer.ChildActors].nodeRelayer
// it should trigger a fulfill on the upstream HTLCs
register.expectMsg(Register.Forward(channelId_ab, CMD_FULFILL_HTLC(561, paymentPreimage, commit = true)))
register.expectMsg(Register.Forward(channelId_ab, CMD_FULFILL_HTLC(565, paymentPreimage, commit = true)))
register.expectMsg(Register.Forward(nodeRelayer, channelId_ab, CMD_FULFILL_HTLC(561, paymentPreimage, commit = true)))
register.expectMsg(Register.Forward(nodeRelayer, channelId_ab, CMD_FULFILL_HTLC(565, paymentPreimage, commit = true)))
}
test("relay an htlc-add at the final node to the payment handler") { f =>
@ -553,16 +557,20 @@ class RelayerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike {
}
sender.send(relayer, forwardFulfill)
// we need a reference to the node-relayer child actor
sender.send(relayer, Relayer.GetChildActors(sender.ref))
val nodeRelayer = sender.expectMsgType[Relayer.ChildActors].nodeRelayer
// the FSM responsible for the payment should receive the fulfill and emit a preimage event.
payFSM.expectMsg(forwardFulfill)
system.actorSelection(relayer.path.child("node-relayer")).tell(PreimageReceived(paymentHash, preimage), payFSM.ref)
nodeRelayer.tell(PreimageReceived(paymentHash, preimage), payFSM.ref)
// the payment should be immediately fulfilled upstream.
val upstream1 = register.expectMsgType[Register.Forward[CMD_FULFILL_HTLC]]
val upstream2 = register.expectMsgType[Register.Forward[CMD_FULFILL_HTLC]]
assert(Set(upstream1, upstream2) === Set(
Register.Forward(channelId_ab, CMD_FULFILL_HTLC(561, preimage, commit = true)),
Register.Forward(channelId_ab, CMD_FULFILL_HTLC(565, preimage, commit = true))
Register.Forward(nodeRelayer, channelId_ab, CMD_FULFILL_HTLC(561, preimage, commit = true)),
Register.Forward(nodeRelayer, channelId_ab, CMD_FULFILL_HTLC(565, preimage, commit = true))
))
register.expectNoMsg(50 millis)