1
0
Fork 0
mirror of https://github.com/ACINQ/eclair.git synced 2025-02-23 06:35:11 +01:00

Remove the command buffer (#1476)

When we receive an `UpdateFulfillHtlc` from a downstream channel, it is
critical that we don't lose it because that is what allows us to pull
funds from the corresponding upstream channel(s). But this(ese) upstream
channel(s) may very well be currently OFFLINE and unable to update the
commitment right away, so we need to remember it for later. Same applies
to an `UpdateFailHtlc` (although it is less critical), because we don't
want the upstream htlc to timeout and cause a channel to force-close.

We were previously relying on a `CommandBuffer` actor, that uses a
"pending relay" database to store commands. Once the command is processed
by the target channel, it sends back an acknowledgment to the
`CommandBuffer`, which then removes the command from the database.
Unacknowledged commands are replayed at each reconnection or app restart.
This works well, but the flow is a little cumbersome and not easy to
understand.

With this PR, the sender (channel, payment handler, ...) is responsible for
storing commands to the pending relay db, instead of the command buffer,
which is completely removed. The target channel will acknowledge the
message and remove it from the pending relay db.

In the end, the logic is the same as before, we have just dropped the
intermediate `CommandBuffer`.
This commit is contained in:
Pierre-Marie Padiou 2020-07-01 16:41:27 +02:00 committed by GitHub
parent b63c4aa5a4
commit ae3d396413
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 325 additions and 310 deletions

View file

@ -46,7 +46,7 @@ import fr.acinq.eclair.db.{Databases, FileBackupHandler}
import fr.acinq.eclair.io.{ClientSpawner, Server, Switchboard}
import fr.acinq.eclair.payment.Auditor
import fr.acinq.eclair.payment.receive.PaymentHandler
import fr.acinq.eclair.payment.relay.{CommandBuffer, Relayer}
import fr.acinq.eclair.payment.relay.Relayer
import fr.acinq.eclair.payment.send.{Autoprobe, PaymentInitiator}
import fr.acinq.eclair.router._
import fr.acinq.eclair.tor.TorProtocolHandler.OnionServiceVersion
@ -298,9 +298,8 @@ class Setup(datadir: File,
}
audit = system.actorOf(SimpleSupervisor.props(Auditor.props(nodeParams), "auditor", SupervisorStrategy.Resume))
register = system.actorOf(SimpleSupervisor.props(Props(new Register), "register", SupervisorStrategy.Resume))
commandBuffer = system.actorOf(SimpleSupervisor.props(Props(new CommandBuffer(nodeParams, register)), "command-buffer", SupervisorStrategy.Resume))
paymentHandler = system.actorOf(SimpleSupervisor.props(PaymentHandler.props(nodeParams, commandBuffer), "payment-handler", SupervisorStrategy.Resume))
relayer = system.actorOf(SimpleSupervisor.props(Relayer.props(nodeParams, router, register, commandBuffer, paymentHandler, Some(postRestartCleanUpInitialized)), "relayer", SupervisorStrategy.Resume))
paymentHandler = system.actorOf(SimpleSupervisor.props(PaymentHandler.props(nodeParams, register), "payment-handler", SupervisorStrategy.Resume))
relayer = system.actorOf(SimpleSupervisor.props(Relayer.props(nodeParams, router, register, paymentHandler, Some(postRestartCleanUpInitialized)), "relayer", SupervisorStrategy.Resume))
// Before initializing the switchboard (which re-connects us to the network) and the user-facing parts of the system,
// we want to make sure the handler for post-restart broken HTLCs has finished initializing.
_ <- postRestartCleanUpInitialized.future
@ -316,7 +315,6 @@ class Setup(datadir: File,
watcher = watcher,
paymentHandler = paymentHandler,
register = register,
commandBuffer = commandBuffer,
relayer = relayer,
router = router,
switchboard = switchboard,
@ -382,7 +380,6 @@ case class Kit(nodeParams: NodeParams,
watcher: ActorRef,
paymentHandler: ActorRef,
register: ActorRef,
commandBuffer: ActorRef,
relayer: ActorRef,
router: ActorRef,
switchboard: ActorRef,

View file

@ -28,9 +28,10 @@ import fr.acinq.eclair.blockchain._
import fr.acinq.eclair.channel.Helpers.{Closing, Funding}
import fr.acinq.eclair.channel.Monitoring.{Metrics, Tags}
import fr.acinq.eclair.crypto.ShaChain
import fr.acinq.eclair.db.PendingRelayDb
import fr.acinq.eclair.io.Peer
import fr.acinq.eclair.payment._
import fr.acinq.eclair.payment.relay.{CommandBuffer, Origin, Relayer}
import fr.acinq.eclair.payment.relay.{Origin, Relayer}
import fr.acinq.eclair.router.Announcements
import fr.acinq.eclair.transactions._
import fr.acinq.eclair.wire._
@ -88,12 +89,6 @@ object Channel {
// we will receive this message when we waited too long for a revocation for that commit number (NB: we explicitly specify the peer to allow for testing)
case class RevocationTimeout(remoteCommitNumber: Long, peer: ActorRef)
def ackPendingFailsAndFulfills(updates: List[UpdateMessage], relayer: ActorRef): Unit = updates.collect {
case u: UpdateFailMalformedHtlc => relayer ! CommandBuffer.CommandAck(u.channelId, u.id)
case u: UpdateFulfillHtlc => relayer ! CommandBuffer.CommandAck(u.channelId, u.id)
case u: UpdateFailHtlc => relayer ! CommandBuffer.CommandAck(u.channelId, u.id)
}
/**
* Outgoing messages go through the [[Peer]] for logging purposes.
*
@ -660,8 +655,8 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
context.system.eventStream.publish(AvailableBalanceChanged(self, d.channelId, d.shortChannelId, commitments1))
handleCommandSuccess(sender, d.copy(commitments = commitments1)) sending fulfill
case Failure(cause) =>
// we can clean up the command right away in case of failure
relayer ! CommandBuffer.CommandAck(d.channelId, c.id)
// we acknowledge the command right away in case of failure
PendingRelayDb.ackCommand(nodeParams.db.pendingRelay, d.channelId, c)
handleCommandError(cause, c)
}
@ -681,8 +676,8 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
context.system.eventStream.publish(AvailableBalanceChanged(self, d.channelId, d.shortChannelId, commitments1))
handleCommandSuccess(sender, d.copy(commitments = commitments1)) sending fail
case Failure(cause) =>
// we can clean up the command right away in case of failure
relayer ! CommandBuffer.CommandAck(d.channelId, c.id)
// we acknowledge the command right away in case of failure
PendingRelayDb.ackCommand(nodeParams.db.pendingRelay, d.channelId, c)
handleCommandError(cause, c)
}
@ -693,8 +688,8 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
context.system.eventStream.publish(AvailableBalanceChanged(self, d.channelId, d.shortChannelId, commitments1))
handleCommandSuccess(sender, d.copy(commitments = commitments1)) sending fail
case Failure(cause) =>
// we can clean up the command right away in case of failure
relayer ! CommandBuffer.CommandAck(d.channelId, c.id)
// we acknowledge the command right away in case of failure
PendingRelayDb.ackCommand(nodeParams.db.pendingRelay, d.channelId, c)
handleCommandError(cause, c)
}
@ -734,7 +729,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
Commitments.sendCommit(d.commitments, keyManager) match {
case Success((commitments1, commit)) =>
log.debug("sending a new sig, spec:\n{}", Commitments.specs2String(commitments1))
ackPendingFailsAndFulfills(commitments1.localChanges.signed, relayer)
PendingRelayDb.ackPendingFailsAndFulfills(nodeParams.db.pendingRelay, commitments1.localChanges.signed)
val nextRemoteCommit = commitments1.remoteNextCommitInfo.left.get.nextRemoteCommit
val nextCommitNumber = nextRemoteCommit.index
// we persist htlc data in order to be able to claim htlc outputs in case a revoked tx is published by our
@ -1007,8 +1002,8 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
if (c.commit) self ! CMD_SIGN
handleCommandSuccess(sender, d.copy(commitments = commitments1)) sending fulfill
case Failure(cause) =>
// we can clean up the command right away in case of failure
relayer ! CommandBuffer.CommandAck(d.channelId, c.id)
// we acknowledge the command right away in case of failure
PendingRelayDb.ackCommand(nodeParams.db.pendingRelay, d.channelId, c)
handleCommandError(cause, c)
}
@ -1027,8 +1022,8 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
if (c.commit) self ! CMD_SIGN
handleCommandSuccess(sender, d.copy(commitments = commitments1)) sending fail
case Failure(cause) =>
// we can clean up the command right away in case of failure
relayer ! CommandBuffer.CommandAck(d.channelId, c.id)
// we acknowledge the command right away in case of failure
PendingRelayDb.ackCommand(nodeParams.db.pendingRelay, d.channelId, c)
handleCommandError(cause, c)
}
@ -1038,8 +1033,8 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
if (c.commit) self ! CMD_SIGN
handleCommandSuccess(sender, d.copy(commitments = commitments1)) sending fail
case Failure(cause) =>
// we can clean up the command right away in case of failure
relayer ! CommandBuffer.CommandAck(d.channelId, c.id)
// we acknowledge the command right away in case of failure
PendingRelayDb.ackCommand(nodeParams.db.pendingRelay, d.channelId, c)
handleCommandError(cause, c)
}
@ -1079,7 +1074,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
Commitments.sendCommit(d.commitments, keyManager) match {
case Success((commitments1, commit)) =>
log.debug("sending a new sig, spec:\n{}", Commitments.specs2String(commitments1))
ackPendingFailsAndFulfills(commitments1.localChanges.signed, relayer)
PendingRelayDb.ackPendingFailsAndFulfills(nodeParams.db.pendingRelay, commitments1.localChanges.signed)
context.system.eventStream.publish(ChannelSignatureSent(self, commitments1))
// we expect a quick response from our peer
setTimer(RevocationTimeout.toString, RevocationTimeout(commitments1.remoteCommit.index, peer), timeout = nodeParams.revocationTimeout, repeat = false)
@ -1759,6 +1754,26 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
if (nextState != WAIT_FOR_INIT_INTERNAL) Metrics.ChannelsCount.withTag(Tags.State, nextState.toString).increment()
}
onTransition {
case _ -> CLOSING =>
PendingRelayDb.getPendingFailsAndFulfills(nodeParams.db.pendingRelay, nextStateData.asInstanceOf[HasCommitments].channelId) match {
case Nil =>
log.debug("nothing to replay")
case cmds =>
log.info("replaying {} unacked fulfills/fails", cmds.size)
cmds.foreach(self ! _) // they all have commit = false
}
case SYNCING -> (NORMAL | SHUTDOWN) =>
PendingRelayDb.getPendingFailsAndFulfills(nodeParams.db.pendingRelay, nextStateData.asInstanceOf[HasCommitments].channelId) match {
case Nil =>
log.debug("nothing to replay")
case cmds =>
log.info("replaying {} unacked fulfills/fails", cmds.size)
cmds.foreach(self ! _) // they all have commit = false
self ! CMD_SIGN // so we can sign all of them at once
}
}
/*
888 888 d8888 888b 888 8888888b. 888 8888888888 8888888b. .d8888b.
888 888 d88888 8888b 888 888 "Y88b 888 888 888 Y88b d88P Y88b

View file

@ -16,6 +16,7 @@
package fr.acinq.eclair.channel
import akka.actor.{ActorContext, ActorRef}
import akka.event.{DiagnosticLoggingAdapter, LoggingAdapter}
import fr.acinq.bitcoin.Crypto.{PrivateKey, PublicKey, ripemd160, sha256}
import fr.acinq.bitcoin.Script._
@ -24,7 +25,7 @@ import fr.acinq.eclair.blockchain.EclairWallet
import fr.acinq.eclair.blockchain.fee.{FeeEstimator, FeeTargets, FeerateTolerance}
import fr.acinq.eclair.channel.Channel.REFRESH_CHANNEL_UPDATE_INTERVAL
import fr.acinq.eclair.crypto.{Generators, KeyManager}
import fr.acinq.eclair.db.ChannelsDb
import fr.acinq.eclair.db.{ChannelsDb, PendingRelayDb}
import fr.acinq.eclair.transactions.DirectedHtlc._
import fr.acinq.eclair.transactions.Scripts._
import fr.acinq.eclair.transactions.Transactions._

View file

@ -18,8 +18,11 @@ package fr.acinq.eclair.db
import java.io.Closeable
import akka.actor.{ActorContext, ActorRef}
import akka.event.LoggingAdapter
import fr.acinq.bitcoin.ByteVector32
import fr.acinq.eclair.channel.{Command, HasHtlcId}
import fr.acinq.eclair.channel.{CMD_FAIL_HTLC, CMD_FAIL_MALFORMED_HTLC, CMD_FULFILL_HTLC, Command, HasHtlcId, Register}
import fr.acinq.eclair.wire.{UpdateFailHtlc, UpdateFailMalformedHtlc, UpdateFulfillHtlc, UpdateMessage}
/**
* This database stores CMD_FULFILL_HTLC and CMD_FAIL_HTLC that we have received from downstream
@ -43,4 +46,37 @@ trait PendingRelayDb extends Closeable {
def listPendingRelay(): Set[(ByteVector32, Long)]
}
object PendingRelayDb {
/**
* We store [[CMD_FULFILL_HTLC]]/[[CMD_FAIL_HTLC]]/[[CMD_FAIL_MALFORMED_HTLC]]
* in a database because we don't want to lose preimages, or to forget to fail
* incoming htlcs, which would lead to unwanted channel closings.
*/
def safeSend(register: ActorRef, db: PendingRelayDb, channelId: ByteVector32, cmd: Command with HasHtlcId)(implicit ctx: ActorContext): Unit = {
register ! Register.Forward(channelId, cmd)
// we store the command in a db (note that this happens *after* forwarding the command to the channel, so we don't add latency)
db.addPendingRelay(channelId, cmd)
}
def ackCommand(db: PendingRelayDb, channelId: ByteVector32, cmd: Command with HasHtlcId): Unit = {
db.removePendingRelay(channelId, cmd.id)
}
def ackPendingFailsAndFulfills(db: PendingRelayDb, updates: List[UpdateMessage])(implicit log: LoggingAdapter): Unit = updates.collect {
case u: UpdateFulfillHtlc =>
log.debug(s"fulfill acked for htlcId=${u.id}")
db.removePendingRelay(u.channelId, u.id)
case u: UpdateFailHtlc =>
log.debug(s"fail acked for htlcId=${u.id}")
db.removePendingRelay(u.channelId, u.id)
case u: UpdateFailMalformedHtlc =>
log.debug(s"fail-malformed acked for htlcId=${u.id}")
db.removePendingRelay(u.channelId, u.id)
}
def getPendingFailsAndFulfills(db: PendingRelayDb, channelId: ByteVector32)(implicit log: LoggingAdapter): Seq[Command with HasHtlcId] = {
db.listPendingRelay(channelId)
}
}

View file

@ -21,10 +21,9 @@ import akka.actor.{ActorContext, ActorRef, PoisonPill, Status}
import akka.event.{DiagnosticLoggingAdapter, LoggingAdapter}
import fr.acinq.bitcoin.{ByteVector32, Crypto}
import fr.acinq.eclair.channel.{CMD_FAIL_HTLC, CMD_FULFILL_HTLC, Channel, ChannelCommandResponse}
import fr.acinq.eclair.db.{IncomingPayment, IncomingPaymentStatus, IncomingPaymentsDb, PaymentType}
import fr.acinq.eclair.db._
import fr.acinq.eclair.payment.Monitoring.{Metrics, Tags}
import fr.acinq.eclair.payment.PaymentRequest.ExtraHop
import fr.acinq.eclair.payment.relay.CommandBuffer
import fr.acinq.eclair.payment.{IncomingPacket, PaymentReceived, PaymentRequest}
import fr.acinq.eclair.wire._
import fr.acinq.eclair.{CltvExpiry, Features, Logs, MilliSatoshi, NodeParams, randomBytes32}
@ -36,7 +35,7 @@ import scala.util.{Failure, Success, Try}
*
* Created by PM on 17/06/2016.
*/
class MultiPartHandler(nodeParams: NodeParams, db: IncomingPaymentsDb, commandBuffer: ActorRef) extends ReceiveHandler {
class MultiPartHandler(nodeParams: NodeParams, register: ActorRef, db: IncomingPaymentsDb) extends ReceiveHandler {
import MultiPartHandler._
@ -82,7 +81,7 @@ class MultiPartHandler(nodeParams: NodeParams, db: IncomingPaymentsDb, commandBu
case Some(record) => validatePayment(p, record, nodeParams.currentBlockHeight) match {
case Some(cmdFail) =>
Metrics.PaymentFailed.withTag(Tags.Direction, Tags.Directions.Received).withTag(Tags.Failure, Tags.FailureType(cmdFail)).increment()
commandBuffer ! CommandBuffer.CommandSend(p.add.channelId, cmdFail)
PendingRelayDb.safeSend(register, nodeParams.db.pendingRelay, p.add.channelId, cmdFail)
case None =>
log.info("received payment for amount={} totalAmount={}", p.add.amountMsat, p.payload.totalAmount)
pendingPayments.get(p.add.paymentHash) match {
@ -97,7 +96,7 @@ class MultiPartHandler(nodeParams: NodeParams, db: IncomingPaymentsDb, commandBu
case None =>
Metrics.PaymentFailed.withTag(Tags.Direction, Tags.Directions.Received).withTag(Tags.Failure, "InvoiceNotFound").increment()
val cmdFail = CMD_FAIL_HTLC(p.add.id, Right(IncorrectOrUnknownPaymentDetails(p.payload.totalAmount, nodeParams.currentBlockHeight)), commit = true)
commandBuffer ! CommandBuffer.CommandSend(p.add.channelId, cmdFail)
PendingRelayDb.safeSend(register, nodeParams.db.pendingRelay, p.add.channelId, cmdFail)
}
}
@ -107,7 +106,7 @@ class MultiPartHandler(nodeParams: NodeParams, db: IncomingPaymentsDb, commandBu
log.warning("payment with paidAmount={} failed ({})", parts.map(_.amount).sum, failure)
pendingPayments.get(paymentHash).foreach { case (_, handler: ActorRef) => handler ! PoisonPill }
parts.collect {
case p: MultiPartPaymentFSM.HtlcPart => commandBuffer ! CommandBuffer.CommandSend(p.htlc.channelId, CMD_FAIL_HTLC(p.htlc.id, Right(failure), commit = true))
case p: MultiPartPaymentFSM.HtlcPart => PendingRelayDb.safeSend(register, nodeParams.db.pendingRelay, p.htlc.channelId, CMD_FAIL_HTLC(p.htlc.id, Right(failure), commit = true))
}
pendingPayments = pendingPayments - paymentHash
}
@ -127,13 +126,13 @@ class MultiPartHandler(nodeParams: NodeParams, db: IncomingPaymentsDb, commandBu
Logs.withMdc(log)(Logs.mdc(paymentHash_opt = Some(paymentHash))) {
failure match {
case Some(failure) => p match {
case p: MultiPartPaymentFSM.HtlcPart => commandBuffer ! CommandBuffer.CommandSend(p.htlc.channelId, CMD_FAIL_HTLC(p.htlc.id, Right(failure), commit = true))
case p: MultiPartPaymentFSM.HtlcPart => PendingRelayDb.safeSend(register, nodeParams.db.pendingRelay, p.htlc.channelId, CMD_FAIL_HTLC(p.htlc.id, Right(failure), commit = true))
}
case None => p match {
// NB: this case shouldn't happen unless the sender violated the spec, so it's ok that we take a slightly more
// expensive code path by fetching the preimage from DB.
case p: MultiPartPaymentFSM.HtlcPart => db.getIncomingPayment(paymentHash).foreach(record => {
commandBuffer ! CommandBuffer.CommandSend(p.htlc.channelId, CMD_FULFILL_HTLC(p.htlc.id, record.paymentPreimage, commit = true))
PendingRelayDb.safeSend(register, nodeParams.db.pendingRelay, p.htlc.channelId, CMD_FULFILL_HTLC(p.htlc.id, record.paymentPreimage, commit = true))
val received = PaymentReceived(paymentHash, PaymentReceived.PartialPayment(p.amount, p.htlc.channelId) :: Nil)
db.receiveIncomingPayment(paymentHash, p.amount, received.timestamp)
ctx.system.eventStream.publish(received)
@ -150,7 +149,7 @@ class MultiPartHandler(nodeParams: NodeParams, db: IncomingPaymentsDb, commandBu
})
db.receiveIncomingPayment(paymentHash, received.amount, received.timestamp)
parts.collect {
case p: MultiPartPaymentFSM.HtlcPart => commandBuffer ! CommandBuffer.CommandSend(p.htlc.channelId, CMD_FULFILL_HTLC(p.htlc.id, preimage, commit = true))
case p: MultiPartPaymentFSM.HtlcPart => PendingRelayDb.safeSend(register, nodeParams.db.pendingRelay, p.htlc.channelId, CMD_FULFILL_HTLC(p.htlc.id, preimage, commit = true))
}
postFulfill(received)
ctx.system.eventStream.publish(received)
@ -158,8 +157,6 @@ class MultiPartHandler(nodeParams: NodeParams, db: IncomingPaymentsDb, commandBu
case GetPendingPayments => ctx.sender ! PendingPayments(pendingPayments.keySet)
case ack: CommandBuffer.CommandAck => commandBuffer forward ack
case ChannelCommandResponse.Ok => // ignoring responses from channels
}

View file

@ -29,10 +29,10 @@ trait ReceiveHandler {
/**
* Generic payment handler that delegates handling of incoming messages to a list of handlers.
*/
class PaymentHandler(nodeParams: NodeParams, commandBuffer: ActorRef) extends Actor with DiagnosticActorLogging {
class PaymentHandler(nodeParams: NodeParams, register: ActorRef) extends Actor with DiagnosticActorLogging {
// we do this instead of sending it to ourselves, otherwise there is no guarantee that this would be the first processed message
private val defaultHandler = new MultiPartHandler(nodeParams, nodeParams.db.payments, commandBuffer)
private val defaultHandler = new MultiPartHandler(nodeParams, register, nodeParams.db.payments)
override def receive: Receive = normal(defaultHandler.handle(context, log))
@ -47,5 +47,5 @@ class PaymentHandler(nodeParams: NodeParams, commandBuffer: ActorRef) extends Ac
}
object PaymentHandler {
def props(nodeParams: NodeParams, commandBuffer: ActorRef): Props = Props(new PaymentHandler(nodeParams, commandBuffer))
def props(nodeParams: NodeParams, register: ActorRef): Props = Props(new PaymentHandler(nodeParams, register))
}

View file

@ -21,6 +21,7 @@ import akka.event.Logging.MDC
import akka.event.LoggingAdapter
import fr.acinq.bitcoin.ByteVector32
import fr.acinq.eclair.channel._
import fr.acinq.eclair.db.PendingRelayDb
import fr.acinq.eclair.payment.IncomingPacket
import fr.acinq.eclair.payment.Monitoring.{Metrics, Tags}
import fr.acinq.eclair.payment.relay.Relayer.{ChannelUpdates, NodeChannels, OutgoingChannel}
@ -36,7 +37,7 @@ import fr.acinq.eclair.{Logs, NodeParams, ShortChannelId, nodeFee}
* The Channel Relayer is used to relay a single upstream HTLC to a downstream channel.
* It selects the best channel to use to relay and retries using other channels in case a local failure happens.
*/
class ChannelRelayer(nodeParams: NodeParams, relayer: ActorRef, register: ActorRef, commandBuffer: ActorRef) extends Actor with DiagnosticActorLogging {
class ChannelRelayer(nodeParams: NodeParams, relayer: ActorRef, register: ActorRef) extends Actor with DiagnosticActorLogging {
import ChannelRelayer._
@ -49,7 +50,7 @@ class ChannelRelayer(nodeParams: NodeParams, relayer: ActorRef, register: ActorR
case RelayFailure(cmdFail) =>
Metrics.recordPaymentRelayFailed(Tags.FailureType(cmdFail), Tags.RelayType.Channel)
log.info(s"rejecting htlc #${r.add.id} from channelId=${r.add.channelId} to shortChannelId=${r.payload.outgoingChannelId} reason=${cmdFail.reason}")
commandBuffer ! CommandBuffer.CommandSend(r.add.channelId, cmdFail)
PendingRelayDb.safeSend(register, nodeParams.db.pendingRelay, r.add.channelId, cmdFail)
case RelaySuccess(selectedShortChannelId, cmdAdd) =>
log.info(s"forwarding htlc #${r.add.id} from channelId=${r.add.channelId} to shortChannelId=$selectedShortChannelId")
register ! Register.ForwardShortId(selectedShortChannelId, cmdAdd)
@ -59,7 +60,7 @@ class ChannelRelayer(nodeParams: NodeParams, relayer: ActorRef, register: ActorR
log.warning(s"couldn't resolve downstream channel $shortChannelId, failing htlc #${add.id}")
val cmdFail = CMD_FAIL_HTLC(add.id, Right(UnknownNextPeer), commit = true)
Metrics.recordPaymentRelayFailed(Tags.FailureType(cmdFail), Tags.RelayType.Channel)
commandBuffer ! CommandBuffer.CommandSend(add.channelId, cmdFail)
PendingRelayDb.safeSend(register, nodeParams.db.pendingRelay, add.channelId, cmdFail)
case Status.Failure(addFailed: AddHtlcFailed) => addFailed.origin match {
case Origin.Relayed(originChannelId, originHtlcId, _, _) => addFailed.originalCommand match {
@ -71,13 +72,11 @@ class ChannelRelayer(nodeParams: NodeParams, relayer: ActorRef, register: ActorR
val cmdFail = CMD_FAIL_HTLC(originHtlcId, Right(failure), commit = true)
Metrics.recordPaymentRelayFailed(Tags.FailureType(cmdFail), Tags.RelayType.Channel)
log.info(s"rejecting htlc #$originHtlcId from channelId=$originChannelId reason=${cmdFail.reason}")
commandBuffer ! CommandBuffer.CommandSend(originChannelId, cmdFail)
PendingRelayDb.safeSend(register, nodeParams.db.pendingRelay, originChannelId, cmdFail)
}
case _ => throw new IllegalArgumentException(s"channel relayer received unexpected failure: $addFailed")
}
case ack: CommandBuffer.CommandAck => commandBuffer forward ack
case ChannelCommandResponse.Ok => // ignoring responses from channels
}
@ -94,7 +93,7 @@ class ChannelRelayer(nodeParams: NodeParams, relayer: ActorRef, register: ActorR
object ChannelRelayer {
def props(nodeParams: NodeParams, relayer: ActorRef, register: ActorRef, commandBuffer: ActorRef) = Props(classOf[ChannelRelayer], nodeParams, relayer, register, commandBuffer)
def props(nodeParams: NodeParams, relayer: ActorRef, register: ActorRef) = Props(new ChannelRelayer(nodeParams, relayer, register))
case class RelayHtlc(r: IncomingPacket.ChannelRelayPacket, previousFailures: Seq[AddHtlcFailed], channelUpdates: ChannelUpdates, node2channels: NodeChannels)

View file

@ -1,69 +0,0 @@
/*
* Copyright 2019 ACINQ SAS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package fr.acinq.eclair.payment.relay
import akka.actor.{Actor, ActorLogging, ActorRef}
import fr.acinq.bitcoin.ByteVector32
import fr.acinq.eclair.NodeParams
import fr.acinq.eclair.channel._
/**
* We store [[CMD_FULFILL_HTLC]]/[[CMD_FAIL_HTLC]]/[[CMD_FAIL_MALFORMED_HTLC]]
* in a database because we don't want to lose preimages, or to forget to fail
* incoming htlcs, which would lead to unwanted channel closings.
*/
class CommandBuffer(nodeParams: NodeParams, register: ActorRef) extends Actor with ActorLogging {
import CommandBuffer._
val db = nodeParams.db.pendingRelay
context.system.eventStream.subscribe(self, classOf[ChannelStateChanged])
override def receive: Receive = {
case CommandSend(channelId, cmd) =>
register forward Register.Forward(channelId, cmd)
// we store the command in a db (note that this happens *after* forwarding the command to the channel, so we don't add latency)
db.addPendingRelay(channelId, cmd)
case CommandAck(channelId, htlcId) =>
log.debug(s"fulfill/fail acked for channelId=$channelId htlcId=$htlcId")
db.removePendingRelay(channelId, htlcId)
case ChannelStateChanged(channel, _, _, WAIT_FOR_INIT_INTERNAL | OFFLINE | SYNCING, NORMAL | SHUTDOWN | CLOSING, d: HasCommitments) =>
db.listPendingRelay(d.channelId) match {
case Nil => ()
case cmds =>
log.info(s"re-sending ${cmds.size} unacked fulfills/fails to channel ${d.channelId}")
cmds.foreach(channel ! _) // they all have commit = false
channel ! CMD_SIGN // so we can sign all of them at once
}
case _: ChannelStateChanged => () // ignored
}
}
object CommandBuffer {
case class CommandSend[T <: Command with HasHtlcId](channelId: ByteVector32, cmd: T)
case class CommandAck(channelId: ByteVector32, htlcId: Long)
}

View file

@ -23,6 +23,7 @@ import akka.event.Logging.MDC
import fr.acinq.bitcoin.ByteVector32
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.eclair.channel.{CMD_FAIL_HTLC, CMD_FULFILL_HTLC, Upstream}
import fr.acinq.eclair.db.PendingRelayDb
import fr.acinq.eclair.payment.Monitoring.{Metrics, Tags}
import fr.acinq.eclair.payment._
import fr.acinq.eclair.payment.receive.MultiPartPaymentFSM
@ -46,7 +47,7 @@ import scala.collection.immutable.Queue
* It aggregates incoming HTLCs (in case multi-part was used upstream) and then forwards the requested amount (using the
* router to find a route to the remote node and potentially splitting the payment using multi-part).
*/
class NodeRelayer(nodeParams: NodeParams, router: ActorRef, commandBuffer: ActorRef, register: ActorRef) extends Actor with DiagnosticActorLogging {
class NodeRelayer(nodeParams: NodeParams, router: ActorRef, register: ActorRef) extends Actor with DiagnosticActorLogging {
import NodeRelayer._
@ -135,9 +136,6 @@ class NodeRelayer(nodeParams: NodeParams, router: ActorRef, commandBuffer: Actor
rejectPayment(p.upstream, translateError(failures, p.nextPayload.outgoingNodeId))
})
context become main(pendingIncoming, pendingOutgoing - paymentHash)
case ack: CommandBuffer.CommandAck => commandBuffer forward ack
}
def spawnOutgoingPayFSM(cfg: SendPaymentConfig, multiPart: Boolean): ActorRef = {
@ -179,7 +177,7 @@ class NodeRelayer(nodeParams: NodeParams, router: ActorRef, commandBuffer: Actor
private def rejectHtlc(htlcId: Long, channelId: ByteVector32, amount: MilliSatoshi, failure: Option[FailureMessage] = None): Unit = {
val failureMessage = failure.getOrElse(IncorrectOrUnknownPaymentDetails(amount, nodeParams.currentBlockHeight))
commandBuffer ! CommandBuffer.CommandSend(channelId, CMD_FAIL_HTLC(htlcId, Right(failureMessage), commit = true))
PendingRelayDb.safeSend(register, nodeParams.db.pendingRelay, channelId, CMD_FAIL_HTLC(htlcId, Right(failureMessage), commit = true))
}
private def rejectPayment(upstream: Upstream.TrampolineRelayed, failure: Option[FailureMessage]): Unit = {
@ -189,7 +187,7 @@ class NodeRelayer(nodeParams: NodeParams, router: ActorRef, commandBuffer: Actor
private def fulfillPayment(upstream: Upstream.TrampolineRelayed, paymentPreimage: ByteVector32): Unit = upstream.adds.foreach(add => {
val cmdFulfill = CMD_FULFILL_HTLC(add.id, paymentPreimage, commit = true)
commandBuffer ! CommandBuffer.CommandSend(add.channelId, cmdFulfill)
PendingRelayDb.safeSend(register, nodeParams.db.pendingRelay, add.channelId, cmdFulfill)
})
override def mdc(currentMessage: Any): MDC = {
@ -209,7 +207,7 @@ class NodeRelayer(nodeParams: NodeParams, router: ActorRef, commandBuffer: Actor
object NodeRelayer {
def props(nodeParams: NodeParams, router: ActorRef, commandBuffer: ActorRef, register: ActorRef) = Props(new NodeRelayer(nodeParams, router, commandBuffer, register))
def props(nodeParams: NodeParams, router: ActorRef, register: ActorRef) = Props(new NodeRelayer(nodeParams, router, register))
/**
* We start by aggregating an incoming HTLC set. Once we received the whole set, we will compute a route to the next

View file

@ -30,9 +30,7 @@ import fr.acinq.eclair.transactions.DirectedHtlc.outgoing
import fr.acinq.eclair.transactions.OutgoingHtlc
import fr.acinq.eclair.wire.{TemporaryNodeFailure, UpdateAddHtlc}
import fr.acinq.eclair.{Features, LongToBtcAmount, NodeParams}
import scodec.bits.ByteVector
import scala.compat.Platform
import scala.concurrent.Promise
import scala.util.Try
@ -51,7 +49,7 @@ import scala.util.Try
* payment (because of multi-part): we have lost the intermediate state necessary to retry that payment, so we need to
* wait for the partial HTLC set sent downstream to either fail or fulfill the payment in our DB.
*/
class PostRestartHtlcCleaner(nodeParams: NodeParams, commandBuffer: ActorRef, initialized: Option[Promise[Done]] = None) extends Actor with ActorLogging {
class PostRestartHtlcCleaner(nodeParams: NodeParams, register: ActorRef, initialized: Option[Promise[Done]] = None) extends Actor with ActorLogging {
import PostRestartHtlcCleaner._
@ -118,8 +116,6 @@ class PostRestartHtlcCleaner(nodeParams: NodeParams, commandBuffer: ActorRef, in
case GetBrokenHtlcs => sender ! brokenHtlcs
case ack: CommandBuffer.CommandAck => commandBuffer forward ack
case ChannelCommandResponse.Ok => // ignoring responses from channels
}
@ -161,7 +157,7 @@ class PostRestartHtlcCleaner(nodeParams: NodeParams, commandBuffer: ActorRef, in
log.info(s"received preimage for paymentHash=${fulfilledHtlc.paymentHash}: fulfilling ${origins.length} HTLCs upstream")
origins.foreach { case (channelId, htlcId) =>
Metrics.Resolved.withTag(Tags.Success, value = true).withTag(Metrics.Relayed, value = true).increment()
commandBuffer ! CommandBuffer.CommandSend(channelId, CMD_FULFILL_HTLC(htlcId, paymentPreimage, commit = true))
PendingRelayDb.safeSend(register, nodeParams.db.pendingRelay, channelId, CMD_FULFILL_HTLC(htlcId, paymentPreimage, commit = true))
}
}
val relayedOut1 = relayedOut diff Set((fulfilledHtlc.channelId, fulfilledHtlc.id))
@ -210,7 +206,7 @@ class PostRestartHtlcCleaner(nodeParams: NodeParams, commandBuffer: ActorRef, in
Metrics.Resolved.withTag(Tags.Success, value = false).withTag(Metrics.Relayed, value = true).increment()
// We don't bother decrypting the downstream failure to forward a more meaningful error upstream, it's
// very likely that it won't be actionable anyway because of our node restart.
commandBuffer ! CommandBuffer.CommandSend(channelId, CMD_FAIL_HTLC(htlcId, Right(TemporaryNodeFailure), commit = true))
PendingRelayDb.safeSend(register, nodeParams.db.pendingRelay, channelId, CMD_FAIL_HTLC(htlcId, Right(TemporaryNodeFailure), commit = true))
}
case _: Origin.Relayed =>
Metrics.Unhandled.withTag(Metrics.Hint, origin.getClass.getSimpleName).increment()
@ -232,7 +228,7 @@ class PostRestartHtlcCleaner(nodeParams: NodeParams, commandBuffer: ActorRef, in
object PostRestartHtlcCleaner {
def props(nodeParams: NodeParams, commandBuffer: ActorRef, initialized: Option[Promise[Done]] = None) = Props(classOf[PostRestartHtlcCleaner], nodeParams, commandBuffer, initialized)
def props(nodeParams: NodeParams, register: ActorRef, initialized: Option[Promise[Done]] = None) = Props(new PostRestartHtlcCleaner(nodeParams, register, initialized))
case object GetBrokenHtlcs
@ -375,7 +371,7 @@ object PostRestartHtlcCleaner {
/**
* We store [[CMD_FULFILL_HTLC]]/[[CMD_FAIL_HTLC]]/[[CMD_FAIL_MALFORMED_HTLC]] in a database
* (see [[fr.acinq.eclair.payment.relay.CommandBuffer]]) because we don't want to lose preimages, or to forget to fail
* (see [[fr.acinq.eclair.db.PendingRelayDb]]) because we don't want to lose preimages, or to forget to fail
* incoming htlcs, which would lead to unwanted channel closings.
*
* Because of the way our watcher works, in a scenario where a downstream channel has gone to the blockchain, it may

View file

@ -25,6 +25,7 @@ import akka.event.LoggingAdapter
import fr.acinq.bitcoin.ByteVector32
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.eclair.channel._
import fr.acinq.eclair.db.PendingRelayDb
import fr.acinq.eclair.payment.Monitoring.{Metrics, Tags}
import fr.acinq.eclair.payment._
import fr.acinq.eclair.router.Announcements
@ -65,7 +66,7 @@ object Origin {
* It also receives channel HTLC events (fulfill / failed) and relays those to the appropriate handlers.
* It also maintains an up-to-date view of local channel balances.
*/
class Relayer(nodeParams: NodeParams, router: ActorRef, register: ActorRef, commandBuffer: ActorRef, paymentHandler: ActorRef, initialized: Option[Promise[Done]] = None) extends Actor with DiagnosticActorLogging {
class Relayer(nodeParams: NodeParams, router: ActorRef, register: ActorRef, paymentHandler: ActorRef, initialized: Option[Promise[Done]] = None) extends Actor with DiagnosticActorLogging {
import Relayer._
@ -77,9 +78,9 @@ class Relayer(nodeParams: NodeParams, router: ActorRef, register: ActorRef, comm
context.system.eventStream.subscribe(self, classOf[AvailableBalanceChanged])
context.system.eventStream.subscribe(self, classOf[ShortChannelIdAssigned])
private val postRestartCleaner = context.actorOf(PostRestartHtlcCleaner.props(nodeParams, commandBuffer, initialized), "post-restart-htlc-cleaner")
private val channelRelayer = context.actorOf(ChannelRelayer.props(nodeParams, self, register, commandBuffer), "channel-relayer")
private val nodeRelayer = context.actorOf(NodeRelayer.props(nodeParams, router, commandBuffer, register), "node-relayer")
private val postRestartCleaner = context.actorOf(PostRestartHtlcCleaner.props(nodeParams, register, initialized), "post-restart-htlc-cleaner")
private val channelRelayer = context.actorOf(ChannelRelayer.props(nodeParams, self, register), "channel-relayer")
private val nodeRelayer = context.actorOf(NodeRelayer.props(nodeParams, router, register), "node-relayer")
override def receive: Receive = main(Map.empty, mutable.MultiDict.empty[PublicKey, ShortChannelId])
@ -132,7 +133,7 @@ class Relayer(nodeParams: NodeParams, router: ActorRef, register: ActorRef, comm
case Right(r: IncomingPacket.NodeRelayPacket) =>
if (!nodeParams.enableTrampolinePayment) {
log.warning(s"rejecting htlc #${add.id} from channelId=${add.channelId} to nodeId=${r.innerPayload.outgoingNodeId} reason=trampoline disabled")
commandBuffer ! CommandBuffer.CommandSend(add.channelId, CMD_FAIL_HTLC(add.id, Right(RequiredNodeFeatureMissing), commit = true))
PendingRelayDb.safeSend(register, nodeParams.db.pendingRelay, add.channelId, CMD_FAIL_HTLC(add.id, Right(RequiredNodeFeatureMissing), commit = true))
} else {
nodeRelayer forward r
}
@ -140,11 +141,11 @@ class Relayer(nodeParams: NodeParams, router: ActorRef, register: ActorRef, comm
log.warning(s"couldn't parse onion: reason=${badOnion.message}")
val cmdFail = CMD_FAIL_MALFORMED_HTLC(add.id, badOnion.onionHash, badOnion.code, commit = true)
log.warning(s"rejecting htlc #${add.id} from channelId=${add.channelId} reason=malformed onionHash=${cmdFail.onionHash} failureCode=${cmdFail.failureCode}")
commandBuffer ! CommandBuffer.CommandSend(add.channelId, cmdFail)
PendingRelayDb.safeSend(register, nodeParams.db.pendingRelay, add.channelId, cmdFail)
case Left(failure) =>
log.warning(s"rejecting htlc #${add.id} from channelId=${add.channelId} reason=$failure")
val cmdFail = CMD_FAIL_HTLC(add.id, Right(failure), commit = true)
commandBuffer ! CommandBuffer.CommandSend(add.channelId, cmdFail)
PendingRelayDb.safeSend(register, nodeParams.db.pendingRelay, add.channelId, cmdFail)
}
case Status.Failure(addFailed: AddHtlcFailed) => addFailed.origin match {
@ -160,7 +161,7 @@ class Relayer(nodeParams: NodeParams, router: ActorRef, register: ActorRef, comm
case Origin.Local(_, Some(sender)) => sender ! ff
case Origin.Relayed(originChannelId, originHtlcId, amountIn, amountOut) =>
val cmd = CMD_FULFILL_HTLC(originHtlcId, ff.paymentPreimage, commit = true)
commandBuffer ! CommandBuffer.CommandSend(originChannelId, cmd)
PendingRelayDb.safeSend(register, nodeParams.db.pendingRelay, originChannelId, cmd)
context.system.eventStream.publish(ChannelPaymentRelayed(amountIn, amountOut, ff.htlc.paymentHash, originChannelId, ff.htlc.channelId))
case Origin.TrampolineRelayed(_, None) => postRestartCleaner forward ff
case Origin.TrampolineRelayed(_, Some(paymentSender)) => paymentSender ! ff
@ -176,13 +177,11 @@ class Relayer(nodeParams: NodeParams, router: ActorRef, register: ActorRef, comm
case ForwardRemoteFailMalformed(fail, _, _) => CMD_FAIL_MALFORMED_HTLC(originHtlcId, fail.onionHash, fail.failureCode, commit = true)
case _: ForwardOnChainFail => CMD_FAIL_HTLC(originHtlcId, Right(PermanentChannelFailure), commit = true)
}
commandBuffer ! CommandBuffer.CommandSend(originChannelId, cmd)
PendingRelayDb.safeSend(register, nodeParams.db.pendingRelay, originChannelId, cmd)
case Origin.TrampolineRelayed(_, None) => postRestartCleaner forward ff
case Origin.TrampolineRelayed(_, Some(paymentSender)) => paymentSender ! ff
}
case ack: CommandBuffer.CommandAck => commandBuffer forward ack
case ChannelCommandResponse.Ok => () // ignoring responses from channels
}
@ -201,8 +200,8 @@ class Relayer(nodeParams: NodeParams, router: ActorRef, register: ActorRef, comm
object Relayer extends Logging {
def props(nodeParams: NodeParams, router: ActorRef, register: ActorRef, commandBuffer: ActorRef, paymentHandler: ActorRef, initialized: Option[Promise[Done]] = None) =
Props(new Relayer(nodeParams, router, register, commandBuffer, paymentHandler, initialized))
def props(nodeParams: NodeParams, router: ActorRef, register: ActorRef, paymentHandler: ActorRef, initialized: Option[Promise[Done]] = None) =
Props(new Relayer(nodeParams, router, register, paymentHandler, initialized))
type ChannelUpdates = Map[ShortChannelId, OutgoingChannel]
type NodeChannels = mutable.MultiDict[PublicKey, ShortChannelId]

View file

@ -67,7 +67,6 @@ class EclairImplSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with I
watcher.ref,
paymentHandler.ref,
register.ref,
commandBuffer.ref,
relayer.ref,
router.ref,
switchboard.ref,

View file

@ -30,7 +30,7 @@ import fr.acinq.eclair.channel.states.StateTestsHelperMethods
import fr.acinq.eclair.payment._
import fr.acinq.eclair.payment.receive.MultiPartHandler.ReceivePayment
import fr.acinq.eclair.payment.receive.PaymentHandler
import fr.acinq.eclair.payment.relay.{CommandBuffer, Relayer}
import fr.acinq.eclair.payment.relay.Relayer
import fr.acinq.eclair.router.Router.ChannelHop
import fr.acinq.eclair.wire.Onion.FinalLegacyPayload
import fr.acinq.eclair.wire._
@ -53,6 +53,8 @@ class FuzzySpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with StateT
override def withFixture(test: OneArgTest): Outcome = {
val fuzzy = test.tags.contains("fuzzy")
val pipe = system.actorOf(Props(new FuzzyPipe(fuzzy)))
val aliceParams = Alice.nodeParams
val bobParams = Bob.nodeParams
val alicePeer = TestProbe()
val bobPeer = TestProbe()
TestUtils.forwardOutgoingToPipe(alicePeer, pipe)
@ -61,15 +63,13 @@ class FuzzySpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with StateT
val bob2blockchain = TestProbe()
val registerA = system.actorOf(Props(new TestRegister()))
val registerB = system.actorOf(Props(new TestRegister()))
val commandBufferA = system.actorOf(Props(new TestCommandBuffer(Alice.nodeParams, registerA)))
val commandBufferB = system.actorOf(Props(new TestCommandBuffer(Bob.nodeParams, registerB)))
val paymentHandlerA = system.actorOf(Props(new PaymentHandler(Alice.nodeParams, commandBufferA)))
val paymentHandlerB = system.actorOf(Props(new PaymentHandler(Bob.nodeParams, commandBufferB)))
val relayerA = system.actorOf(Relayer.props(Alice.nodeParams, TestProbe().ref, registerA, commandBufferA, paymentHandlerA))
val relayerB = system.actorOf(Relayer.props(Bob.nodeParams, TestProbe().ref, registerB, commandBufferB, paymentHandlerB))
val paymentHandlerA = system.actorOf(Props(new PaymentHandler(aliceParams, registerA)))
val paymentHandlerB = system.actorOf(Props(new PaymentHandler(bobParams, registerB)))
val relayerA = system.actorOf(Relayer.props(aliceParams, TestProbe().ref, registerA, paymentHandlerA))
val relayerB = system.actorOf(Relayer.props(bobParams, TestProbe().ref, registerB, paymentHandlerB))
val wallet = new TestWallet
val alice: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(Alice.nodeParams, wallet, Bob.nodeParams.nodeId, alice2blockchain.ref, relayerA), alicePeer.ref)
val bob: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(Bob.nodeParams, wallet, Alice.nodeParams.nodeId, bob2blockchain.ref, relayerB), bobPeer.ref)
val alice: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(aliceParams, wallet, bobParams.nodeId, alice2blockchain.ref, relayerA), alicePeer.ref)
val bob: TestFSMRef[State, Data, Channel] = TestFSMRef(new Channel(bobParams, wallet, aliceParams.nodeId, bob2blockchain.ref, relayerB), bobPeer.ref)
within(30 seconds) {
val aliceInit = Init(Alice.channelParams.features)
val bobInit = Init(Bob.channelParams.features)
@ -106,16 +106,6 @@ class FuzzySpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with StateT
}
}
class TestCommandBuffer(nodeParams: NodeParams, register: ActorRef) extends CommandBuffer(nodeParams, register) {
def filterRemoteEvents: Receive = {
// This is needed because we use the same actor system for A and B's CommandBuffers. If A receives an event from
// B's channel and it has a pending relay with the same htlcId as one of B's htlc, it may mess up B's state.
case ChannelStateChanged(_, _, remoteNodeId, _, _, _) if remoteNodeId == nodeParams.nodeId => ()
}
override def receive: Receive = filterRemoteEvents orElse super.receive
}
class SenderActor(sendChannel: TestFSMRef[State, Data, Channel], paymentHandler: ActorRef, latch: CountDownLatch, count: Int) extends Actor with ActorLogging {
// we don't want to be below htlcMinimumMsat

View file

@ -27,7 +27,7 @@ import fr.acinq.eclair.TestConstants.{Alice, Bob}
import fr.acinq.eclair._
import fr.acinq.eclair.blockchain._
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher
import fr.acinq.eclair.payment.relay.{CommandBuffer, Relayer}
import fr.acinq.eclair.payment.relay.Relayer
import fr.acinq.eclair.wire.{Init, UpdateAddHtlc}
import org.scalatest.funsuite.AnyFunSuite
@ -64,10 +64,8 @@ class ThroughputSpec extends AnyFunSuite {
}), "payment-handler")
val registerA = TestProbe()
val registerB = TestProbe()
val commandBufferA = system.actorOf(Props(new CommandBuffer(Alice.nodeParams, registerA.ref)))
val commandBufferB = system.actorOf(Props(new CommandBuffer(Bob.nodeParams, registerB.ref)))
val relayerA = system.actorOf(Relayer.props(Alice.nodeParams, TestProbe().ref, registerA.ref, commandBufferA, paymentHandler))
val relayerB = system.actorOf(Relayer.props(Bob.nodeParams, TestProbe().ref, registerB.ref, commandBufferB, paymentHandler))
val relayerA = system.actorOf(Relayer.props(Alice.nodeParams, TestProbe().ref, registerA.ref, paymentHandler))
val relayerB = system.actorOf(Relayer.props(Bob.nodeParams, TestProbe().ref, registerB.ref, paymentHandler))
val wallet = new TestWallet
val alice = system.actorOf(Channel.props(Alice.nodeParams, wallet, Bob.nodeParams.nodeId, blockchain, relayerA, None), "a")
val bob = system.actorOf(Channel.props(Bob.nodeParams, wallet, Alice.nodeParams.nodeId, blockchain, relayerB, None), "b")

View file

@ -35,7 +35,7 @@ import fr.acinq.eclair.channel.{ChannelErrorOccurred, _}
import fr.acinq.eclair.crypto.Sphinx
import fr.acinq.eclair.io.Peer
import fr.acinq.eclair.payment.relay.Relayer._
import fr.acinq.eclair.payment.relay.{CommandBuffer, Origin}
import fr.acinq.eclair.payment.relay.Origin
import fr.acinq.eclair.router.Announcements
import fr.acinq.eclair.transactions.DirectedHtlc.{incoming, outgoing}
import fr.acinq.eclair.transactions.Transactions
@ -1238,7 +1238,7 @@ class NormalStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with
sender.send(bob, CMD_FULFILL_HTLC(42, randomBytes32)) // this will fail
sender.expectMsg(Failure(UnknownHtlcId(channelId(bob), 42)))
relayerB.expectMsg(CommandBuffer.CommandAck(initialState.channelId, 42))
awaitCond(bob.underlyingActor.nodeParams.db.pendingRelay.listPendingRelay(initialState.channelId).isEmpty)
}
private def testUpdateFulfillHtlc(f: FixtureParam) = {
@ -1363,12 +1363,11 @@ class NormalStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with
test("recv CMD_FAIL_HTLC (acknowledge in case of failure)") { f =>
import f._
val sender = TestProbe()
val r = randomBytes32
val initialState = bob.stateData.asInstanceOf[DATA_NORMAL]
sender.send(bob, CMD_FAIL_HTLC(42, Right(PermanentChannelFailure))) // this will fail
sender.expectMsg(Failure(UnknownHtlcId(channelId(bob), 42)))
relayerB.expectMsg(CommandBuffer.CommandAck(initialState.channelId, 42))
awaitCond(bob.underlyingActor.nodeParams.db.pendingRelay.listPendingRelay(initialState.channelId).isEmpty)
}
test("recv CMD_FAIL_MALFORMED_HTLC") { f =>
@ -1413,7 +1412,7 @@ class NormalStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with
sender.send(bob, CMD_FAIL_MALFORMED_HTLC(42, ByteVector32.Zeroes, FailureMessageCodecs.BADONION)) // this will fail
sender.expectMsg(Failure(UnknownHtlcId(channelId(bob), 42)))
relayerB.expectMsg(CommandBuffer.CommandAck(initialState.channelId, 42))
awaitCond(bob.underlyingActor.nodeParams.db.pendingRelay.listPendingRelay(initialState.channelId).isEmpty)
}
private def testUpdateFailHtlc(f: FixtureParam) = {

View file

@ -27,7 +27,6 @@ import fr.acinq.eclair.blockchain._
import fr.acinq.eclair.blockchain.fee.FeeratesPerKw
import fr.acinq.eclair.channel._
import fr.acinq.eclair.channel.states.StateTestsHelperMethods
import fr.acinq.eclair.payment.relay.CommandBuffer
import fr.acinq.eclair.router.Announcements
import fr.acinq.eclair.transactions.Transactions.HtlcSuccessTx
import fr.acinq.eclair.wire._
@ -404,11 +403,80 @@ class OfflineStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with
assert(!Announcements.isEnabled(update.channelUpdate.channelFlags))
}
test("replay pending commands when going back to NORMAL") { f =>
import f._
val sender = TestProbe()
val (r, htlc) = addHtlc(50000000 msat, alice, bob, alice2bob, bob2alice)
crossSign(alice, bob, alice2bob, bob2alice)
val initialState = bob.stateData.asInstanceOf[DATA_NORMAL]
sender.send(alice, INPUT_DISCONNECTED)
sender.send(bob, INPUT_DISCONNECTED)
awaitCond(alice.stateName == OFFLINE)
awaitCond(bob.stateName == OFFLINE)
// We simulate a pending fulfill
bob.underlyingActor.nodeParams.db.pendingRelay.addPendingRelay(initialState.channelId, CMD_FULFILL_HTLC(htlc.id, r, commit = true))
// then we reconnect them
sender.send(alice, INPUT_RECONNECTED(alice2bob.ref, aliceInit, bobInit))
sender.send(bob, INPUT_RECONNECTED(bob2alice.ref, bobInit, aliceInit))
// peers exchange channel_reestablish messages
alice2bob.expectMsgType[ChannelReestablish]
bob2alice.expectMsgType[ChannelReestablish]
alice2bob.forward(bob)
bob2alice.forward(alice)
bob2alice.expectMsgType[UpdateFulfillHtlc]
}
test("replay pending commands when going back to SHUTDOWN") { f =>
import f._
val sender = TestProbe()
val (r, htlc) = addHtlc(50000000 msat, alice, bob, alice2bob, bob2alice)
crossSign(alice, bob, alice2bob, bob2alice)
val initialState = bob.stateData.asInstanceOf[DATA_NORMAL]
// We initiate a mutual close
sender.send(alice, CMD_CLOSE(None))
alice2bob.expectMsgType[Shutdown]
alice2bob.forward(bob)
bob2alice.expectMsgType[Shutdown]
bob2alice.forward(alice)
sender.send(alice, INPUT_DISCONNECTED)
sender.send(bob, INPUT_DISCONNECTED)
awaitCond(alice.stateName == OFFLINE)
awaitCond(bob.stateName == OFFLINE)
// We simulate a pending fulfill
bob.underlyingActor.nodeParams.db.pendingRelay.addPendingRelay(initialState.channelId, CMD_FULFILL_HTLC(htlc.id, r, commit = true))
// then we reconnect them
sender.send(alice, INPUT_RECONNECTED(alice2bob.ref, aliceInit, bobInit))
sender.send(bob, INPUT_RECONNECTED(bob2alice.ref, bobInit, aliceInit))
// peers exchange channel_reestablish messages
alice2bob.expectMsgType[ChannelReestablish]
bob2alice.expectMsgType[ChannelReestablish]
alice2bob.forward(bob)
bob2alice.forward(alice)
// peers re-exchange shutdown messages
alice2bob.expectMsgType[Shutdown]
bob2alice.expectMsgType[Shutdown]
alice2bob.forward(bob)
bob2alice.forward(alice)
bob2alice.expectMsgType[UpdateFulfillHtlc]
}
test("pending non-relayed fulfill htlcs will timeout upstream") { f =>
import f._
val sender = TestProbe()
val register = TestProbe()
val commandBuffer = TestActorRef(new CommandBuffer(bob.underlyingActor.nodeParams, register.ref))
val (r, htlc) = addHtlc(50000000 msat, alice, bob, alice2bob, bob2alice)
crossSign(alice, bob, alice2bob, bob2alice)
@ -426,7 +494,7 @@ class OfflineStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with
// We simulate a pending fulfill on that HTLC but not relayed.
// When it is close to expiring upstream, we should close the channel.
sender.send(commandBuffer, CommandBuffer.CommandSend(htlc.channelId, CMD_FULFILL_HTLC(htlc.id, r, commit = true)))
bob.underlyingActor.nodeParams.db.pendingRelay.addPendingRelay(initialState.channelId, CMD_FULFILL_HTLC(htlc.id, r, commit = true))
sender.send(bob, CurrentBlockCount((htlc.cltvExpiry - bob.underlyingActor.nodeParams.fulfillSafetyBeforeTimeoutBlocks).toLong))
val ChannelErrorOccurred(_, _, _, _, LocalError(err), isFatal) = listener.expectMsgType[ChannelErrorOccurred]
@ -448,8 +516,6 @@ class OfflineStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with
test("pending non-relayed fail htlcs will timeout upstream") { f =>
import f._
val sender = TestProbe()
val register = TestProbe()
val commandBuffer = TestActorRef(new CommandBuffer(bob.underlyingActor.nodeParams, register.ref))
val (_, htlc) = addHtlc(50000000 msat, alice, bob, alice2bob, bob2alice)
crossSign(alice, bob, alice2bob, bob2alice)
@ -460,7 +526,7 @@ class OfflineStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with
// We simulate a pending failure on that HTLC.
// Even if we get close to expiring upstream we shouldn't close the channel, because we have nothing to lose.
sender.send(commandBuffer, CommandBuffer.CommandSend(htlc.channelId, CMD_FAIL_HTLC(htlc.id, Right(IncorrectOrUnknownPaymentDetails(0 msat, 0)))))
sender.send(bob, CMD_FAIL_HTLC(htlc.id, Right(IncorrectOrUnknownPaymentDetails(0 msat, 0))))
sender.send(bob, CurrentBlockCount((htlc.cltvExpiry - bob.underlyingActor.nodeParams.fulfillSafetyBeforeTimeoutBlocks).toLong))
bob2blockchain.expectNoMsg(250 millis)

View file

@ -28,7 +28,7 @@ import fr.acinq.eclair.channel._
import fr.acinq.eclair.channel.states.StateTestsHelperMethods
import fr.acinq.eclair.payment._
import fr.acinq.eclair.payment.relay.Relayer._
import fr.acinq.eclair.payment.relay.{CommandBuffer, Origin}
import fr.acinq.eclair.payment.relay.Origin
import fr.acinq.eclair.router.Router.ChannelHop
import fr.acinq.eclair.wire.Onion.FinalLegacyPayload
import fr.acinq.eclair.wire.{CommitSig, Error, FailureMessageCodecs, PermanentChannelFailure, RevokeAndAck, Shutdown, UpdateAddHtlc, UpdateFailHtlc, UpdateFailMalformedHtlc, UpdateFee, UpdateFulfillHtlc}
@ -151,7 +151,7 @@ class ShutdownStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wit
sender.send(bob, CMD_FULFILL_HTLC(42, randomBytes32)) // this will fail
sender.expectMsg(Failure(UnknownHtlcId(channelId(bob), 42)))
relayerB.expectMsg(CommandBuffer.CommandAck(initialState.channelId, 42))
awaitCond(bob.underlyingActor.nodeParams.db.pendingRelay.listPendingRelay(initialState.channelId).isEmpty)
}
test("recv UpdateFulfillHtlc") { f =>
@ -223,7 +223,7 @@ class ShutdownStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wit
val initialState = bob.stateData.asInstanceOf[DATA_SHUTDOWN]
sender.send(bob, CMD_FAIL_HTLC(42, Right(PermanentChannelFailure))) // this will fail
sender.expectMsg(Failure(UnknownHtlcId(channelId(bob), 42)))
relayerB.expectMsg(CommandBuffer.CommandAck(initialState.channelId, 42))
awaitCond(bob.underlyingActor.nodeParams.db.pendingRelay.listPendingRelay(initialState.channelId).isEmpty)
}
test("recv CMD_FAIL_MALFORMED_HTLC") { f =>
@ -262,7 +262,7 @@ class ShutdownStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wit
val initialState = bob.stateData.asInstanceOf[DATA_SHUTDOWN]
sender.send(bob, CMD_FAIL_MALFORMED_HTLC(42, randomBytes32, FailureMessageCodecs.BADONION)) // this will fail
sender.expectMsg(Failure(UnknownHtlcId(channelId(bob), 42)))
relayerB.expectMsg(CommandBuffer.CommandAck(initialState.channelId, 42))
awaitCond(bob.underlyingActor.nodeParams.db.pendingRelay.listPendingRelay(initialState.channelId).isEmpty)
}
test("recv UpdateFailHtlc") { f =>

View file

@ -30,7 +30,7 @@ import fr.acinq.eclair.channel._
import fr.acinq.eclair.channel.states.StateTestsHelperMethods
import fr.acinq.eclair.payment._
import fr.acinq.eclair.payment.relay.Relayer._
import fr.acinq.eclair.payment.relay.{CommandBuffer, Origin}
import fr.acinq.eclair.payment.relay.Origin
import fr.acinq.eclair.transactions.{Scripts, Transactions}
import fr.acinq.eclair.wire._
import fr.acinq.eclair.{CltvExpiry, LongToBtcAmount, TestConstants, TestKitBaseClass, randomBytes32}
@ -99,7 +99,7 @@ class ClosingStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with
relayerA.expectMsgType[ForwardFulfill]
crossSign(bob, alice, bob2alice, alice2bob)
// bob confirms that it has forwarded the fulfill to alice
relayerB.expectMsgType[CommandBuffer.CommandAck]
awaitCond(bob.underlyingActor.nodeParams.db.pendingRelay.listPendingRelay(htlc.channelId).isEmpty)
val bobCommitTx2 = bob.stateData.asInstanceOf[DATA_NORMAL].commitments.localCommit.publishableTxs
bobCommitTx1 :: bobCommitTx2 :: Nil
}).flatten

View file

@ -181,7 +181,7 @@ class IntegrationSpec extends TestKitBaseClass with BitcoindService with AnyFunS
instantiateEclairNode("G", ConfigFactory.parseMap(Map("eclair.node-alias" -> "G", "eclair.expiry-delta-blocks" -> 141, "eclair.server.port" -> 29741, "eclair.api.port" -> 28091, "eclair.fee-base-msat" -> 1010, "eclair.fee-proportional-millionths" -> 102, "eclair.trampoline-payments-enable" -> true).asJava).withFallback(commonConfig))
// by default C has a normal payment handler, but this can be overridden in tests
val paymentHandlerC = nodes("C").system.actorOf(PaymentHandler.props(nodes("C").nodeParams, nodes("C").commandBuffer))
val paymentHandlerC = nodes("C").system.actorOf(PaymentHandler.props(nodes("C").nodeParams, nodes("C").register))
nodes("C").paymentHandler ! paymentHandlerC
}
@ -1221,8 +1221,8 @@ class IntegrationSpec extends TestKitBaseClass with BitcoindService with AnyFunS
val forwardHandlerF = TestProbe()
nodes("F5").paymentHandler ! new ForwardHandler(forwardHandlerF.ref)
// this is the actual payment handler that we will forward requests to
val paymentHandlerC = nodes("C").system.actorOf(PaymentHandler.props(nodes("C").nodeParams, nodes("C").commandBuffer))
val paymentHandlerF = nodes("F5").system.actorOf(PaymentHandler.props(nodes("F5").nodeParams, nodes("F5").commandBuffer))
val paymentHandlerC = nodes("C").system.actorOf(PaymentHandler.props(nodes("C").nodeParams, nodes("C").register))
val paymentHandlerF = nodes("F5").system.actorOf(PaymentHandler.props(nodes("F5").nodeParams, nodes("F5").register))
// first we make sure we are in sync with current blockchain height
val currentBlockCount = getBlockCount
awaitCond(getBlockCount == currentBlockCount, max = 20 seconds, interval = 1 second)

View file

@ -22,14 +22,13 @@ import fr.acinq.bitcoin.{ByteVector32, Crypto}
import fr.acinq.eclair.FeatureSupport.Optional
import fr.acinq.eclair.Features.{BasicMultiPartPayment, ChannelRangeQueries, ChannelRangeQueriesExtended, InitialRoutingSync, OptionDataLossProtect, PaymentSecret, VariableLengthOnion}
import fr.acinq.eclair.TestConstants.Alice
import fr.acinq.eclair.channel.{CMD_FAIL_HTLC, CMD_FULFILL_HTLC}
import fr.acinq.eclair.channel.{CMD_FAIL_HTLC, CMD_FULFILL_HTLC, Register}
import fr.acinq.eclair.db.IncomingPaymentStatus
import fr.acinq.eclair.payment.PaymentReceived.PartialPayment
import fr.acinq.eclair.payment.PaymentRequest.ExtraHop
import fr.acinq.eclair.payment.receive.MultiPartHandler.{GetPendingPayments, PendingPayments, ReceivePayment}
import fr.acinq.eclair.payment.receive.MultiPartPaymentFSM.HtlcPart
import fr.acinq.eclair.payment.receive.{MultiPartPaymentFSM, PaymentHandler}
import fr.acinq.eclair.payment.relay.CommandBuffer
import fr.acinq.eclair.wire._
import fr.acinq.eclair.{ActivatedFeature, CltvExpiry, CltvExpiryDelta, Features, LongToBtcAmount, NodeParams, ShortChannelId, TestConstants, TestKitBaseClass, randomKey}
import org.scalatest.Outcome
@ -53,18 +52,18 @@ class MultiPartHandlerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
ActivatedFeature(BasicMultiPartPayment, Optional)
))
case class FixtureParam(nodeParams: NodeParams, defaultExpiry: CltvExpiry, commandBuffer: TestProbe, eventListener: TestProbe, sender: TestProbe) {
lazy val normalHandler = TestActorRef[PaymentHandler](PaymentHandler.props(nodeParams, commandBuffer.ref))
lazy val mppHandler = TestActorRef[PaymentHandler](PaymentHandler.props(nodeParams.copy(features = featuresWithMpp), commandBuffer.ref))
case class FixtureParam(nodeParams: NodeParams, defaultExpiry: CltvExpiry, register: TestProbe, eventListener: TestProbe, sender: TestProbe) {
lazy val normalHandler = TestActorRef[PaymentHandler](PaymentHandler.props(nodeParams, register.ref))
lazy val mppHandler = TestActorRef[PaymentHandler](PaymentHandler.props(nodeParams.copy(features = featuresWithMpp), register.ref))
}
override def withFixture(test: OneArgTest): Outcome = {
within(30 seconds) {
val nodeParams = Alice.nodeParams
val commandBuffer = TestProbe()
val register = TestProbe()
val eventListener = TestProbe()
system.eventStream.subscribe(eventListener.ref, classOf[PaymentEvent])
withFixture(test.toNoArgTest(FixtureParam(nodeParams, CltvExpiryDelta(12).toCltvExpiry(nodeParams.currentBlockHeight), commandBuffer, eventListener, TestProbe())))
withFixture(test.toNoArgTest(FixtureParam(nodeParams, CltvExpiryDelta(12).toCltvExpiry(nodeParams.currentBlockHeight), register, eventListener, TestProbe())))
}
}
@ -84,7 +83,7 @@ class MultiPartHandlerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
val add = UpdateAddHtlc(ByteVector32.One, 0, amountMsat, pr.paymentHash, defaultExpiry, TestConstants.emptyOnionPacket)
sender.send(normalHandler, IncomingPacket.FinalPacket(add, Onion.FinalLegacyPayload(add.amountMsat, add.cltvExpiry)))
commandBuffer.expectMsgType[CommandBuffer.CommandSend[CMD_FULFILL_HTLC]]
register.expectMsgType[Register.Forward[CMD_FULFILL_HTLC]]
val paymentReceived = eventListener.expectMsgType[PaymentReceived]
assert(paymentReceived.copy(parts = paymentReceived.parts.map(_.copy(timestamp = 0))) === PaymentReceived(add.paymentHash, PartialPayment(amountMsat, add.channelId, timestamp = 0) :: Nil))
@ -103,7 +102,7 @@ class MultiPartHandlerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
val add = UpdateAddHtlc(ByteVector32.One, 0, amountMsat, pr.paymentHash, defaultExpiry, TestConstants.emptyOnionPacket)
sender.send(mppHandler, IncomingPacket.FinalPacket(add, Onion.FinalLegacyPayload(add.amountMsat, add.cltvExpiry)))
commandBuffer.expectMsgType[CommandBuffer.CommandSend[CMD_FULFILL_HTLC]]
register.expectMsgType[Register.Forward[CMD_FULFILL_HTLC]]
val paymentReceived = eventListener.expectMsgType[PaymentReceived]
assert(paymentReceived.copy(parts = paymentReceived.parts.map(_.copy(timestamp = 0))) === PaymentReceived(add.paymentHash, PartialPayment(amountMsat, add.channelId, timestamp = 0) :: Nil))
@ -121,7 +120,7 @@ class MultiPartHandlerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
val add = UpdateAddHtlc(ByteVector32.One, 0, amountMsat, pr.paymentHash, CltvExpiryDelta(3).toCltvExpiry(nodeParams.currentBlockHeight), TestConstants.emptyOnionPacket)
sender.send(normalHandler, IncomingPacket.FinalPacket(add, Onion.FinalLegacyPayload(add.amountMsat, add.cltvExpiry)))
val cmd = commandBuffer.expectMsgType[CommandBuffer.CommandSend[CMD_FAIL_HTLC]].cmd
val cmd = register.expectMsgType[Register.Forward[CMD_FAIL_HTLC]].message
assert(cmd.reason == Right(IncorrectOrUnknownPaymentDetails(amountMsat, nodeParams.currentBlockHeight)))
assert(nodeParams.db.payments.getIncomingPayment(pr.paymentHash).get.status === IncomingPaymentStatus.Pending)
@ -231,7 +230,7 @@ class MultiPartHandlerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
val add = UpdateAddHtlc(ByteVector32.One, 0, 1000 msat, pr.paymentHash, defaultExpiry, TestConstants.emptyOnionPacket)
sender.send(normalHandler, IncomingPacket.FinalPacket(add, Onion.FinalLegacyPayload(add.amountMsat, add.cltvExpiry)))
commandBuffer.expectMsgType[CommandBuffer.CommandSend[CMD_FAIL_HTLC]]
register.expectMsgType[Register.Forward[CMD_FAIL_HTLC]]
val Some(incoming) = nodeParams.db.payments.getIncomingPayment(pr.paymentHash)
assert(incoming.paymentRequest.isExpired && incoming.status === IncomingPaymentStatus.Expired)
}
@ -246,7 +245,7 @@ class MultiPartHandlerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
val add = UpdateAddHtlc(ByteVector32.One, 0, 800 msat, pr.paymentHash, defaultExpiry, TestConstants.emptyOnionPacket)
sender.send(mppHandler, IncomingPacket.FinalPacket(add, Onion.createMultiPartPayload(add.amountMsat, 1000 msat, add.cltvExpiry, pr.paymentSecret.get)))
val cmd = commandBuffer.expectMsgType[CommandBuffer.CommandSend[CMD_FAIL_HTLC]].cmd
val cmd = register.expectMsgType[Register.Forward[CMD_FAIL_HTLC]].message
assert(cmd.reason == Right(IncorrectOrUnknownPaymentDetails(1000 msat, nodeParams.currentBlockHeight)))
val Some(incoming) = nodeParams.db.payments.getIncomingPayment(pr.paymentHash)
assert(incoming.paymentRequest.isExpired && incoming.status === IncomingPaymentStatus.Expired)
@ -261,7 +260,7 @@ class MultiPartHandlerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
val add = UpdateAddHtlc(ByteVector32.One, 0, 800 msat, pr.paymentHash, defaultExpiry, TestConstants.emptyOnionPacket)
sender.send(normalHandler, IncomingPacket.FinalPacket(add, Onion.createMultiPartPayload(add.amountMsat, 1000 msat, add.cltvExpiry, pr.paymentSecret.get)))
val cmd = commandBuffer.expectMsgType[CommandBuffer.CommandSend[CMD_FAIL_HTLC]].cmd
val cmd = register.expectMsgType[Register.Forward[CMD_FAIL_HTLC]].message
assert(cmd.reason == Right(IncorrectOrUnknownPaymentDetails(1000 msat, nodeParams.currentBlockHeight)))
assert(nodeParams.db.payments.getIncomingPayment(pr.paymentHash).get.status === IncomingPaymentStatus.Pending)
}
@ -275,7 +274,7 @@ class MultiPartHandlerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
val add = UpdateAddHtlc(ByteVector32.One, 0, 800 msat, pr.paymentHash, CltvExpiryDelta(1).toCltvExpiry(nodeParams.currentBlockHeight), TestConstants.emptyOnionPacket)
sender.send(mppHandler, IncomingPacket.FinalPacket(add, Onion.createMultiPartPayload(add.amountMsat, 1000 msat, add.cltvExpiry, pr.paymentSecret.get)))
val cmd = commandBuffer.expectMsgType[CommandBuffer.CommandSend[CMD_FAIL_HTLC]].cmd
val cmd = register.expectMsgType[Register.Forward[CMD_FAIL_HTLC]].message
assert(cmd.reason == Right(IncorrectOrUnknownPaymentDetails(1000 msat, nodeParams.currentBlockHeight)))
assert(nodeParams.db.payments.getIncomingPayment(pr.paymentHash).get.status === IncomingPaymentStatus.Pending)
}
@ -289,7 +288,7 @@ class MultiPartHandlerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
val add = UpdateAddHtlc(ByteVector32.One, 0, 800 msat, pr.paymentHash.reverse, defaultExpiry, TestConstants.emptyOnionPacket)
sender.send(mppHandler, IncomingPacket.FinalPacket(add, Onion.createMultiPartPayload(add.amountMsat, 1000 msat, add.cltvExpiry, pr.paymentSecret.get)))
val cmd = commandBuffer.expectMsgType[CommandBuffer.CommandSend[CMD_FAIL_HTLC]].cmd
val cmd = register.expectMsgType[Register.Forward[CMD_FAIL_HTLC]].message
assert(cmd.reason == Right(IncorrectOrUnknownPaymentDetails(1000 msat, nodeParams.currentBlockHeight)))
assert(nodeParams.db.payments.getIncomingPayment(pr.paymentHash).get.status === IncomingPaymentStatus.Pending)
}
@ -303,7 +302,7 @@ class MultiPartHandlerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
val add = UpdateAddHtlc(ByteVector32.One, 0, 800 msat, pr.paymentHash, defaultExpiry, TestConstants.emptyOnionPacket)
sender.send(mppHandler, IncomingPacket.FinalPacket(add, Onion.createMultiPartPayload(add.amountMsat, 999 msat, add.cltvExpiry, pr.paymentSecret.get)))
val cmd = commandBuffer.expectMsgType[CommandBuffer.CommandSend[CMD_FAIL_HTLC]].cmd
val cmd = register.expectMsgType[Register.Forward[CMD_FAIL_HTLC]].message
assert(cmd.reason == Right(IncorrectOrUnknownPaymentDetails(999 msat, nodeParams.currentBlockHeight)))
assert(nodeParams.db.payments.getIncomingPayment(pr.paymentHash).get.status === IncomingPaymentStatus.Pending)
}
@ -317,7 +316,7 @@ class MultiPartHandlerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
val add = UpdateAddHtlc(ByteVector32.One, 0, 800 msat, pr.paymentHash, defaultExpiry, TestConstants.emptyOnionPacket)
sender.send(mppHandler, IncomingPacket.FinalPacket(add, Onion.createMultiPartPayload(add.amountMsat, 2001 msat, add.cltvExpiry, pr.paymentSecret.get)))
val cmd = commandBuffer.expectMsgType[CommandBuffer.CommandSend[CMD_FAIL_HTLC]].cmd
val cmd = register.expectMsgType[Register.Forward[CMD_FAIL_HTLC]].message
assert(cmd.reason == Right(IncorrectOrUnknownPaymentDetails(2001 msat, nodeParams.currentBlockHeight)))
assert(nodeParams.db.payments.getIncomingPayment(pr.paymentHash).get.status === IncomingPaymentStatus.Pending)
}
@ -332,14 +331,14 @@ class MultiPartHandlerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
// Invalid payment secret.
val add = UpdateAddHtlc(ByteVector32.One, 0, 800 msat, pr.paymentHash, defaultExpiry, TestConstants.emptyOnionPacket)
sender.send(mppHandler, IncomingPacket.FinalPacket(add, Onion.createMultiPartPayload(add.amountMsat, 1000 msat, add.cltvExpiry, pr.paymentSecret.get.reverse)))
val cmd = commandBuffer.expectMsgType[CommandBuffer.CommandSend[CMD_FAIL_HTLC]].cmd
val cmd = register.expectMsgType[Register.Forward[CMD_FAIL_HTLC]].message
assert(cmd.reason == Right(IncorrectOrUnknownPaymentDetails(1000 msat, nodeParams.currentBlockHeight)))
assert(nodeParams.db.payments.getIncomingPayment(pr.paymentHash).get.status === IncomingPaymentStatus.Pending)
}
test("PaymentHandler should handle multi-part payment timeout") { f =>
val nodeParams = Alice.nodeParams.copy(multiPartPaymentExpiry = 200 millis, features = featuresWithMpp)
val handler = TestActorRef[PaymentHandler](PaymentHandler.props(nodeParams, f.commandBuffer.ref))
val handler = TestActorRef[PaymentHandler](PaymentHandler.props(nodeParams, f.register.ref))
// Partial payment missing additional parts.
f.sender.send(handler, ReceivePayment(Some(1000 msat), "1 slow coffee"))
@ -356,10 +355,10 @@ class MultiPartHandlerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
f.sender.send(handler, GetPendingPayments)
assert(f.sender.expectMsgType[PendingPayments].paymentHashes.nonEmpty)
val commands = f.commandBuffer.expectMsgType[CommandBuffer.CommandSend[CMD_FAIL_HTLC]] :: f.commandBuffer.expectMsgType[CommandBuffer.CommandSend[CMD_FAIL_HTLC]] :: Nil
val commands = f.register.expectMsgType[Register.Forward[CMD_FAIL_HTLC]] :: f.register.expectMsgType[Register.Forward[CMD_FAIL_HTLC]] :: Nil
assert(commands.toSet === Set(
CommandBuffer.CommandSend(ByteVector32.One, CMD_FAIL_HTLC(0, Right(PaymentTimeout), commit = true)),
CommandBuffer.CommandSend(ByteVector32.One, CMD_FAIL_HTLC(1, Right(PaymentTimeout), commit = true))
Register.Forward(ByteVector32.One, CMD_FAIL_HTLC(0, Right(PaymentTimeout), commit = true)),
Register.Forward(ByteVector32.One, CMD_FAIL_HTLC(1, Right(PaymentTimeout), commit = true))
))
awaitCond({
f.sender.send(handler, GetPendingPayments)
@ -368,7 +367,7 @@ class MultiPartHandlerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
// Extraneous HTLCs should be failed.
f.sender.send(handler, MultiPartPaymentFSM.ExtraPaymentReceived(pr1.paymentHash, HtlcPart(1000 msat, UpdateAddHtlc(ByteVector32.One, 42, 200 msat, pr1.paymentHash, add1.cltvExpiry, add1.onionRoutingPacket)), Some(PaymentTimeout)))
f.commandBuffer.expectMsg(CommandBuffer.CommandSend(ByteVector32.One, CMD_FAIL_HTLC(42, Right(PaymentTimeout), commit = true)))
f.register.expectMsg(Register.Forward(ByteVector32.One, CMD_FAIL_HTLC(42, Right(PaymentTimeout), commit = true)))
// The payment should still be pending in DB.
val Some(incomingPayment) = nodeParams.db.payments.getIncomingPayment(pr1.paymentHash)
@ -377,7 +376,7 @@ class MultiPartHandlerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
test("PaymentHandler should handle multi-part payment success") { f =>
val nodeParams = Alice.nodeParams.copy(multiPartPaymentExpiry = 500 millis, features = featuresWithMpp)
val handler = TestActorRef[PaymentHandler](PaymentHandler.props(nodeParams, f.commandBuffer.ref))
val handler = TestActorRef[PaymentHandler](PaymentHandler.props(nodeParams, f.register.ref))
f.sender.send(handler, ReceivePayment(Some(1000 msat), "1 fast coffee"))
val pr = f.sender.expectMsgType[PaymentRequest]
@ -390,15 +389,12 @@ class MultiPartHandlerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
val add3 = add2.copy(id = 43)
f.sender.send(handler, IncomingPacket.FinalPacket(add3, Onion.createMultiPartPayload(add3.amountMsat, 1000 msat, add3.cltvExpiry, pr.paymentSecret.get)))
f.commandBuffer.expectMsg(CommandBuffer.CommandSend(add2.channelId, CMD_FAIL_HTLC(add2.id, Right(IncorrectOrUnknownPaymentDetails(1000 msat, nodeParams.currentBlockHeight)), commit = true)))
val cmd1 = f.commandBuffer.expectMsgType[CommandBuffer.CommandSend[CMD_FULFILL_HTLC]]
assert(cmd1.cmd.id === add1.id)
f.register.expectMsg(Register.Forward(add2.channelId, CMD_FAIL_HTLC(add2.id, Right(IncorrectOrUnknownPaymentDetails(1000 msat, nodeParams.currentBlockHeight)), commit = true)))
val cmd1 = f.register.expectMsgType[Register.Forward[CMD_FULFILL_HTLC]]
assert(cmd1.message.id === add1.id)
assert(cmd1.channelId === add1.channelId)
assert(Crypto.sha256(cmd1.cmd.r) === pr.paymentHash)
f.commandBuffer.expectMsg(CommandBuffer.CommandSend(add3.channelId, CMD_FULFILL_HTLC(add3.id, cmd1.cmd.r, commit = true)))
f.sender.send(handler, CommandBuffer.CommandAck(add1.channelId, add1.id))
f.commandBuffer.expectMsg(CommandBuffer.CommandAck(add1.channelId, add1.id))
assert(Crypto.sha256(cmd1.message.r) === pr.paymentHash)
f.register.expectMsg(Register.Forward(add3.channelId, CMD_FULFILL_HTLC(add3.id, cmd1.message.r, commit = true)))
val paymentReceived = f.eventListener.expectMsgType[PaymentReceived]
assert(paymentReceived.copy(parts = paymentReceived.parts.map(_.copy(timestamp = 0))) === PaymentReceived(pr.paymentHash, PartialPayment(800 msat, ByteVector32.One, 0) :: PartialPayment(200 msat, ByteVector32.Zeroes, 0) :: Nil))
@ -412,7 +408,7 @@ class MultiPartHandlerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
// Extraneous HTLCs should be fulfilled.
f.sender.send(handler, MultiPartPaymentFSM.ExtraPaymentReceived(pr.paymentHash, HtlcPart(1000 msat, UpdateAddHtlc(ByteVector32.One, 44, 200 msat, pr.paymentHash, add1.cltvExpiry, add1.onionRoutingPacket)), None))
f.commandBuffer.expectMsg(CommandBuffer.CommandSend(ByteVector32.One, CMD_FULFILL_HTLC(44, cmd1.cmd.r, commit = true)))
f.register.expectMsg(Register.Forward(ByteVector32.One, CMD_FULFILL_HTLC(44, cmd1.message.r, commit = true)))
assert(f.eventListener.expectMsgType[PaymentReceived].amount === 200.msat)
val received2 = nodeParams.db.payments.getIncomingPayment(pr.paymentHash)
assert(received2.get.status.asInstanceOf[IncomingPaymentStatus.Received].amount === 1200.msat)
@ -423,7 +419,7 @@ class MultiPartHandlerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
test("PaymentHandler should handle multi-part payment timeout then success") { f =>
val nodeParams = Alice.nodeParams.copy(multiPartPaymentExpiry = 250 millis, features = featuresWithMpp)
val handler = TestActorRef[PaymentHandler](PaymentHandler.props(nodeParams, f.commandBuffer.ref))
val handler = TestActorRef[PaymentHandler](PaymentHandler.props(nodeParams, f.register.ref))
f.sender.send(handler, ReceivePayment(Some(1000 msat), "1 coffee, no sugar"))
val pr = f.sender.expectMsgType[PaymentRequest]
@ -431,7 +427,7 @@ class MultiPartHandlerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
val add1 = UpdateAddHtlc(ByteVector32.One, 0, 800 msat, pr.paymentHash, f.defaultExpiry, TestConstants.emptyOnionPacket)
f.sender.send(handler, IncomingPacket.FinalPacket(add1, Onion.createMultiPartPayload(add1.amountMsat, 1000 msat, add1.cltvExpiry, pr.paymentSecret.get)))
f.commandBuffer.expectMsg(CommandBuffer.CommandSend(ByteVector32.One, CMD_FAIL_HTLC(0, Right(PaymentTimeout), commit = true)))
f.register.expectMsg(Register.Forward(ByteVector32.One, CMD_FAIL_HTLC(0, Right(PaymentTimeout), commit = true)))
awaitCond({
f.sender.send(handler, GetPendingPayments)
f.sender.expectMsgType[PendingPayments].paymentHashes.isEmpty
@ -442,11 +438,11 @@ class MultiPartHandlerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
val add3 = UpdateAddHtlc(ByteVector32.Zeroes, 5, 700 msat, pr.paymentHash, f.defaultExpiry, TestConstants.emptyOnionPacket)
f.sender.send(handler, IncomingPacket.FinalPacket(add3, Onion.createMultiPartPayload(add3.amountMsat, 1000 msat, add3.cltvExpiry, pr.paymentSecret.get)))
val cmd1 = f.commandBuffer.expectMsgType[CommandBuffer.CommandSend[CMD_FULFILL_HTLC]]
val cmd1 = f.register.expectMsgType[Register.Forward[CMD_FULFILL_HTLC]]
assert(cmd1.channelId === add2.channelId)
assert(cmd1.cmd.id === 2)
assert(Crypto.sha256(cmd1.cmd.r) === pr.paymentHash)
f.commandBuffer.expectMsg(CommandBuffer.CommandSend(add3.channelId, CMD_FULFILL_HTLC(5, cmd1.cmd.r, commit = true)))
assert(cmd1.message.id === 2)
assert(Crypto.sha256(cmd1.message.r) === pr.paymentHash)
f.register.expectMsg(Register.Forward(add3.channelId, CMD_FULFILL_HTLC(5, cmd1.message.r, commit = true)))
val paymentReceived = f.eventListener.expectMsgType[PaymentReceived]
assert(paymentReceived.copy(parts = paymentReceived.parts.map(_.copy(timestamp = 0))) === PaymentReceived(pr.paymentHash, PartialPayment(300 msat, ByteVector32.One, 0) :: PartialPayment(700 msat, ByteVector32.Zeroes, 0) :: Nil))

View file

@ -22,11 +22,11 @@ import akka.actor.ActorRef
import akka.testkit.{TestActorRef, TestProbe}
import fr.acinq.bitcoin.{Block, Crypto}
import fr.acinq.eclair.Features._
import fr.acinq.eclair.channel.{CMD_FAIL_HTLC, CMD_FULFILL_HTLC, Upstream}
import fr.acinq.eclair.channel.{CMD_FAIL_HTLC, CMD_FULFILL_HTLC, Register, Upstream}
import fr.acinq.eclair.crypto.Sphinx
import fr.acinq.eclair.payment.PaymentRequest.{ExtraHop, PaymentRequestFeatures}
import fr.acinq.eclair.payment.receive.MultiPartPaymentFSM
import fr.acinq.eclair.payment.relay.{CommandBuffer, NodeRelayer}
import fr.acinq.eclair.payment.relay.NodeRelayer
import fr.acinq.eclair.payment.send.MultiPartPaymentLifecycle.{PreimageReceived, SendMultiPartPayment}
import fr.acinq.eclair.payment.send.PaymentInitiator.SendPaymentConfig
import fr.acinq.eclair.payment.send.PaymentLifecycle.SendPayment
@ -49,22 +49,22 @@ class NodeRelayerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike {
import NodeRelayerSpec._
case class FixtureParam(nodeParams: NodeParams, nodeRelayer: TestActorRef[NodeRelayer], relayer: TestProbe, outgoingPayFSM: TestProbe, commandBuffer: TestProbe, eventListener: TestProbe)
case class FixtureParam(nodeParams: NodeParams, nodeRelayer: TestActorRef[NodeRelayer], relayer: TestProbe, register: TestProbe, outgoingPayFSM: TestProbe, eventListener: TestProbe)
override def withFixture(test: OneArgTest): Outcome = {
within(30 seconds) {
val nodeParams = TestConstants.Bob.nodeParams
val outgoingPayFSM = TestProbe()
val (router, commandBuffer, register, eventListener) = (TestProbe(), TestProbe(), TestProbe(), TestProbe())
val (router, relayer, register, eventListener) = (TestProbe(), TestProbe(), TestProbe(), TestProbe())
system.eventStream.subscribe(eventListener.ref, classOf[PaymentEvent])
class TestNodeRelayer extends NodeRelayer(nodeParams, router.ref, commandBuffer.ref, register.ref) {
class TestNodeRelayer extends NodeRelayer(nodeParams, router.ref, register.ref) {
override def spawnOutgoingPayFSM(cfg: SendPaymentConfig, multiPart: Boolean): ActorRef = {
outgoingPayFSM.ref ! cfg
outgoingPayFSM.ref
}
}
val nodeRelayer = TestActorRef(new TestNodeRelayer().asInstanceOf[NodeRelayer])
withFixture(test.toNoArgTest(FixtureParam(nodeParams, nodeRelayer, TestProbe(), outgoingPayFSM, commandBuffer, eventListener)))
withFixture(test.toNoArgTest(FixtureParam(nodeParams, nodeRelayer, relayer, register, outgoingPayFSM, eventListener)))
}
}
@ -78,7 +78,7 @@ class NodeRelayerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike {
val parts = incomingMultiPart.dropRight(1).map(i => MultiPartPaymentFSM.HtlcPart(incomingAmount, i.add))
sender.send(nodeRelayer, MultiPartPaymentFSM.MultiPartPaymentFailed(paymentHash, PaymentTimeout, Queue(parts: _*)))
incomingMultiPart.dropRight(1).foreach(p => commandBuffer.expectMsg(CommandBuffer.CommandSend(p.add.channelId, CMD_FAIL_HTLC(p.add.id, Right(PaymentTimeout), commit = true))))
incomingMultiPart.dropRight(1).foreach(p => register.expectMsg(Register.Forward(p.add.channelId, CMD_FAIL_HTLC(p.add.id, Right(PaymentTimeout), commit = true))))
sender.expectNoMsg(100 millis)
outgoingPayFSM.expectNoMsg(100 millis)
}
@ -90,7 +90,7 @@ class NodeRelayerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike {
val partial = MultiPartPaymentFSM.HtlcPart(incomingAmount, UpdateAddHtlc(randomBytes32, 15, 100 msat, paymentHash, CltvExpiry(42000), TestConstants.emptyOnionPacket))
sender.send(nodeRelayer, MultiPartPaymentFSM.ExtraPaymentReceived(paymentHash, partial, Some(InvalidRealm)))
commandBuffer.expectMsg(CommandBuffer.CommandSend(partial.htlc.channelId, CMD_FAIL_HTLC(partial.htlc.id, Right(InvalidRealm), commit = true)))
register.expectMsg(Register.Forward(partial.htlc.channelId, CMD_FAIL_HTLC(partial.htlc.id, Right(InvalidRealm), commit = true)))
sender.expectNoMsg(100 millis)
outgoingPayFSM.expectNoMsg(100 millis)
}
@ -112,7 +112,7 @@ class NodeRelayerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike {
Onion.createNodeRelayPayload(outgoingAmount, outgoingExpiry, outgoingNodeId),
nextTrampolinePacket)
relayer.send(nodeRelayer, i1)
commandBuffer.expectMsg(CommandBuffer.CommandSend(i1.add.channelId, CMD_FAIL_HTLC(i1.add.id, Right(IncorrectOrUnknownPaymentDetails(1000 msat, nodeParams.currentBlockHeight)), commit = true)))
register.expectMsg(Register.Forward(i1.add.channelId, CMD_FAIL_HTLC(i1.add.id, Right(IncorrectOrUnknownPaymentDetails(1000 msat, nodeParams.currentBlockHeight)), commit = true)))
// Receive new HTLC with different details, but for the same payment hash.
val i2 = IncomingPacket.NodeRelayPacket(
@ -121,7 +121,7 @@ class NodeRelayerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike {
Onion.createNodeRelayPayload(1250 msat, outgoingExpiry, outgoingNodeId),
nextTrampolinePacket)
relayer.send(nodeRelayer, i2)
commandBuffer.expectMsg(CommandBuffer.CommandSend(i2.add.channelId, CMD_FAIL_HTLC(i2.add.id, Right(IncorrectOrUnknownPaymentDetails(1500 msat, nodeParams.currentBlockHeight)), commit = true)))
register.expectMsg(Register.Forward(i2.add.channelId, CMD_FAIL_HTLC(i2.add.id, Right(IncorrectOrUnknownPaymentDetails(1500 msat, nodeParams.currentBlockHeight)), commit = true)))
outgoingPayFSM.expectNoMsg(100 millis)
}
@ -135,7 +135,7 @@ class NodeRelayerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike {
relayer.send(nodeRelayer, p)
val failure = IncorrectOrUnknownPaymentDetails(2000000 msat, nodeParams.currentBlockHeight)
commandBuffer.expectMsg(CommandBuffer.CommandSend(p.add.channelId, CMD_FAIL_HTLC(p.add.id, Right(failure), commit = true)))
register.expectMsg(Register.Forward(p.add.channelId, CMD_FAIL_HTLC(p.add.id, Right(failure), commit = true)))
outgoingPayFSM.expectNoMsg(100 millis)
}
@ -150,8 +150,8 @@ class NodeRelayerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike {
relayer.send(nodeRelayer, p2)
val failure = IncorrectOrUnknownPaymentDetails(1000000 msat, nodeParams.currentBlockHeight)
commandBuffer.expectMsg(CommandBuffer.CommandSend(p2.add.channelId, CMD_FAIL_HTLC(p2.add.id, Right(failure), commit = true)))
commandBuffer.expectNoMsg(100 millis)
register.expectMsg(Register.Forward(p2.add.channelId, CMD_FAIL_HTLC(p2.add.id, Right(failure), commit = true)))
register.expectNoMsg(100 millis)
outgoingPayFSM.expectNoMsg(100 millis)
}
@ -163,8 +163,8 @@ class NodeRelayerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike {
val p = createValidIncomingPacket(2000000 msat, 2000000 msat, expiryIn, 1000000 msat, expiryOut)
relayer.send(nodeRelayer, p)
commandBuffer.expectMsg(CommandBuffer.CommandSend(p.add.channelId, CMD_FAIL_HTLC(p.add.id, Right(TrampolineExpiryTooSoon), commit = true)))
commandBuffer.expectNoMsg(100 millis)
register.expectMsg(Register.Forward(p.add.channelId, CMD_FAIL_HTLC(p.add.id, Right(TrampolineExpiryTooSoon), commit = true)))
register.expectNoMsg(100 millis)
outgoingPayFSM.expectNoMsg(100 millis)
}
@ -180,8 +180,8 @@ class NodeRelayerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike {
)
p.foreach(p => relayer.send(nodeRelayer, p))
p.foreach(p => commandBuffer.expectMsg(CommandBuffer.CommandSend(p.add.channelId, CMD_FAIL_HTLC(p.add.id, Right(TrampolineExpiryTooSoon), commit = true))))
commandBuffer.expectNoMsg(100 millis)
p.foreach(p => register.expectMsg(Register.Forward(p.add.channelId, CMD_FAIL_HTLC(p.add.id, Right(TrampolineExpiryTooSoon), commit = true))))
register.expectNoMsg(100 millis)
outgoingPayFSM.expectNoMsg(100 millis)
}
@ -191,8 +191,8 @@ class NodeRelayerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike {
val p = createValidIncomingPacket(2000000 msat, 2000000 msat, CltvExpiry(500000), 1999000 msat, CltvExpiry(490000))
relayer.send(nodeRelayer, p)
commandBuffer.expectMsg(CommandBuffer.CommandSend(p.add.channelId, CMD_FAIL_HTLC(p.add.id, Right(TrampolineFeeInsufficient), commit = true)))
commandBuffer.expectNoMsg(100 millis)
register.expectMsg(Register.Forward(p.add.channelId, CMD_FAIL_HTLC(p.add.id, Right(TrampolineFeeInsufficient), commit = true)))
register.expectNoMsg(100 millis)
outgoingPayFSM.expectNoMsg(100 millis)
}
@ -205,8 +205,8 @@ class NodeRelayerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike {
)
p.foreach(p => relayer.send(nodeRelayer, p))
p.foreach(p => commandBuffer.expectMsg(CommandBuffer.CommandSend(p.add.channelId, CMD_FAIL_HTLC(p.add.id, Right(TrampolineFeeInsufficient), commit = true))))
commandBuffer.expectNoMsg(100 millis)
p.foreach(p => register.expectMsg(Register.Forward(p.add.channelId, CMD_FAIL_HTLC(p.add.id, Right(TrampolineFeeInsufficient), commit = true))))
register.expectNoMsg(100 millis)
outgoingPayFSM.expectNoMsg(100 millis)
}
@ -219,8 +219,8 @@ class NodeRelayerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike {
outgoingPayFSM.expectMsgType[SendMultiPartPayment]
outgoingPayFSM.send(nodeRelayer, PaymentFailed(outgoingPaymentId, paymentHash, LocalFailure(Nil, BalanceTooLow) :: Nil))
incomingMultiPart.foreach(p => commandBuffer.expectMsg(CommandBuffer.CommandSend(p.add.channelId, CMD_FAIL_HTLC(p.add.id, Right(TemporaryNodeFailure), commit = true))))
commandBuffer.expectNoMsg(100 millis)
incomingMultiPart.foreach(p => register.expectMsg(Register.Forward(p.add.channelId, CMD_FAIL_HTLC(p.add.id, Right(TemporaryNodeFailure), commit = true))))
register.expectNoMsg(100 millis)
eventListener.expectNoMsg(100 millis)
}
@ -235,8 +235,8 @@ class NodeRelayerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike {
// If we're having a hard time finding routes, raising the fee/cltv will likely help.
val failures = LocalFailure(Nil, RouteNotFound) :: RemoteFailure(Nil, Sphinx.DecryptedFailurePacket(outgoingNodeId, PermanentNodeFailure)) :: LocalFailure(Nil, RouteNotFound) :: Nil
outgoingPayFSM.send(nodeRelayer, PaymentFailed(outgoingPaymentId, paymentHash, failures))
incomingMultiPart.foreach(p => commandBuffer.expectMsg(CommandBuffer.CommandSend(p.add.channelId, CMD_FAIL_HTLC(p.add.id, Right(TrampolineFeeInsufficient), commit = true))))
commandBuffer.expectNoMsg(100 millis)
incomingMultiPart.foreach(p => register.expectMsg(Register.Forward(p.add.channelId, CMD_FAIL_HTLC(p.add.id, Right(TrampolineFeeInsufficient), commit = true))))
register.expectNoMsg(100 millis)
eventListener.expectNoMsg(100 millis)
}
@ -250,8 +250,8 @@ class NodeRelayerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike {
val failures = RemoteFailure(Nil, Sphinx.DecryptedFailurePacket(outgoingNodeId, FinalIncorrectHtlcAmount(42 msat))) :: UnreadableRemoteFailure(Nil) :: Nil
outgoingPayFSM.send(nodeRelayer, PaymentFailed(outgoingPaymentId, paymentHash, failures))
incomingMultiPart.foreach(p => commandBuffer.expectMsg(CommandBuffer.CommandSend(p.add.channelId, CMD_FAIL_HTLC(p.add.id, Right(FinalIncorrectHtlcAmount(42 msat)), commit = true))))
commandBuffer.expectNoMsg(100 millis)
incomingMultiPart.foreach(p => register.expectMsg(Register.Forward(p.add.channelId, CMD_FAIL_HTLC(p.add.id, Right(FinalIncorrectHtlcAmount(42 msat)), commit = true))))
register.expectNoMsg(100 millis)
eventListener.expectNoMsg(100 millis)
}
@ -282,11 +282,11 @@ class NodeRelayerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike {
// A first downstream HTLC is fulfilled: we should immediately forward the fulfill upstream.
outgoingPayFSM.send(nodeRelayer, PreimageReceived(paymentHash, paymentPreimage))
incomingMultiPart.foreach(p => commandBuffer.expectMsg(CommandBuffer.CommandSend(p.add.channelId, CMD_FULFILL_HTLC(p.add.id, paymentPreimage, commit = true))))
incomingMultiPart.foreach(p => register.expectMsg(Register.Forward(p.add.channelId, CMD_FULFILL_HTLC(p.add.id, paymentPreimage, commit = true))))
// If the payment FSM sends us duplicate preimage events, we should not fulfill a second time upstream.
outgoingPayFSM.send(nodeRelayer, PreimageReceived(paymentHash, paymentPreimage))
commandBuffer.expectNoMsg(100 millis)
register.expectNoMsg(100 millis)
// Once all the downstream payments have settled, we should emit the relayed event.
outgoingPayFSM.send(nodeRelayer, createSuccessEvent(outgoingCfg.id))
@ -294,7 +294,7 @@ class NodeRelayerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike {
validateRelayEvent(relayEvent)
assert(relayEvent.incoming.toSet === incomingMultiPart.map(i => PaymentRelayed.Part(i.add.amountMsat, i.add.channelId)).toSet)
assert(relayEvent.outgoing.nonEmpty)
commandBuffer.expectNoMsg(100 millis)
register.expectNoMsg(100 millis)
}
test("relay incoming single-part payment") { f =>
@ -310,14 +310,14 @@ class NodeRelayerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike {
outgoingPayFSM.send(nodeRelayer, PreimageReceived(paymentHash, paymentPreimage))
val incomingAdd = incomingSinglePart.add
commandBuffer.expectMsg(CommandBuffer.CommandSend(incomingAdd.channelId, CMD_FULFILL_HTLC(incomingAdd.id, paymentPreimage, commit = true)))
register.expectMsg(Register.Forward(incomingAdd.channelId, CMD_FULFILL_HTLC(incomingAdd.id, paymentPreimage, commit = true)))
outgoingPayFSM.send(nodeRelayer, createSuccessEvent(outgoingCfg.id))
val relayEvent = eventListener.expectMsgType[TrampolinePaymentRelayed]
validateRelayEvent(relayEvent)
assert(relayEvent.incoming === Seq(PaymentRelayed.Part(incomingSinglePart.add.amountMsat, incomingSinglePart.add.channelId)))
assert(relayEvent.outgoing.nonEmpty)
commandBuffer.expectNoMsg(100 millis)
register.expectNoMsg(100 millis)
}
test("relay to non-trampoline recipient supporting multi-part") { f =>
@ -343,14 +343,14 @@ class NodeRelayerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike {
assert(outgoingPayment.assistedRoutes === hints)
outgoingPayFSM.send(nodeRelayer, PreimageReceived(paymentHash, paymentPreimage))
incomingMultiPart.foreach(p => commandBuffer.expectMsg(CommandBuffer.CommandSend(p.add.channelId, CMD_FULFILL_HTLC(p.add.id, paymentPreimage, commit = true))))
incomingMultiPart.foreach(p => register.expectMsg(Register.Forward(p.add.channelId, CMD_FULFILL_HTLC(p.add.id, paymentPreimage, commit = true))))
outgoingPayFSM.send(nodeRelayer, createSuccessEvent(outgoingCfg.id))
val relayEvent = eventListener.expectMsgType[TrampolinePaymentRelayed]
validateRelayEvent(relayEvent)
assert(relayEvent.incoming === incomingMultiPart.map(i => PaymentRelayed.Part(i.add.amountMsat, i.add.channelId)))
assert(relayEvent.outgoing.nonEmpty)
commandBuffer.expectNoMsg(100 millis)
register.expectNoMsg(100 millis)
}
test("relay to non-trampoline recipient without multi-part") { f =>
@ -373,14 +373,14 @@ class NodeRelayerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike {
assert(outgoingPayment.assistedRoutes === hints)
outgoingPayFSM.send(nodeRelayer, PreimageReceived(paymentHash, paymentPreimage))
incomingMultiPart.foreach(p => commandBuffer.expectMsg(CommandBuffer.CommandSend(p.add.channelId, CMD_FULFILL_HTLC(p.add.id, paymentPreimage, commit = true))))
incomingMultiPart.foreach(p => register.expectMsg(Register.Forward(p.add.channelId, CMD_FULFILL_HTLC(p.add.id, paymentPreimage, commit = true))))
outgoingPayFSM.send(nodeRelayer, createSuccessEvent(outgoingCfg.id))
val relayEvent = eventListener.expectMsgType[TrampolinePaymentRelayed]
validateRelayEvent(relayEvent)
assert(relayEvent.incoming === incomingMultiPart.map(i => PaymentRelayed.Part(i.add.amountMsat, i.add.channelId)))
assert(relayEvent.outgoing.length === 1)
commandBuffer.expectNoMsg(100 millis)
register.expectNoMsg(100 millis)
}
def validateOutgoingCfg(outgoingCfg: SendPaymentConfig, upstream: Upstream): Unit = {

View file

@ -28,7 +28,7 @@ import fr.acinq.eclair.channel.states.StateTestsHelperMethods
import fr.acinq.eclair.db.{OutgoingPayment, OutgoingPaymentStatus, PaymentType}
import fr.acinq.eclair.payment.OutgoingPacket.buildCommand
import fr.acinq.eclair.payment.PaymentPacketSpec._
import fr.acinq.eclair.payment.relay.{CommandBuffer, Origin, PostRestartHtlcCleaner, Relayer}
import fr.acinq.eclair.payment.relay.{Origin, PostRestartHtlcCleaner, Relayer}
import fr.acinq.eclair.router.Router.ChannelHop
import fr.acinq.eclair.transactions.{DirectedHtlc, IncomingHtlc, OutgoingHtlc}
import fr.acinq.eclair.wire.Onion.FinalLegacyPayload
@ -49,19 +49,19 @@ class PostRestartHtlcCleanerSpec extends TestKitBaseClass with FixtureAnyFunSuit
import PostRestartHtlcCleanerSpec._
case class FixtureParam(nodeParams: NodeParams, commandBuffer: TestProbe, sender: TestProbe, eventListener: TestProbe) {
case class FixtureParam(nodeParams: NodeParams, register: TestProbe, sender: TestProbe, eventListener: TestProbe) {
def createRelayer(): ActorRef = {
system.actorOf(Relayer.props(nodeParams, TestProbe().ref, TestProbe().ref, commandBuffer.ref, TestProbe().ref))
system.actorOf(Relayer.props(nodeParams, TestProbe().ref, register.ref, TestProbe().ref))
}
}
override def withFixture(test: OneArgTest): Outcome = {
within(30 seconds) {
val nodeParams = TestConstants.Bob.nodeParams
val commandBuffer = TestProbe()
val register = TestProbe()
val eventListener = TestProbe()
system.eventStream.subscribe(eventListener.ref, classOf[PaymentEvent])
withFixture(test.toNoArgTest(FixtureParam(nodeParams, commandBuffer, TestProbe(), eventListener)))
withFixture(test.toNoArgTest(FixtureParam(nodeParams, register, TestProbe(), eventListener)))
}
}
@ -110,7 +110,7 @@ class PostRestartHtlcCleanerSpec extends TestKitBaseClass with FixtureAnyFunSuit
val channel = TestProbe()
f.createRelayer()
commandBuffer.expectNoMsg(100 millis) // nothing should happen while channels are still offline.
register.expectNoMsg(100 millis) // nothing should happen while channels are still offline.
// channel 1 goes to NORMAL state:
system.eventStream.publish(ChannelStateChanged(channel.ref, system.deadLetters, a, OFFLINE, NORMAL, channels.head))
@ -172,7 +172,7 @@ class PostRestartHtlcCleanerSpec extends TestKitBaseClass with FixtureAnyFunSuit
val channel = TestProbe()
f.createRelayer()
commandBuffer.expectNoMsg(100 millis) // nothing should happen while channels are still offline.
register.expectNoMsg(100 millis) // nothing should happen while channels are still offline.
// channel 1 goes to NORMAL state:
system.eventStream.publish(ChannelStateChanged(channel.ref, system.deadLetters, a, OFFLINE, NORMAL, channels.head))
@ -219,7 +219,7 @@ class PostRestartHtlcCleanerSpec extends TestKitBaseClass with FixtureAnyFunSuit
val testCase = setupLocalPayments(nodeParams)
val relayer = createRelayer()
commandBuffer.expectNoMsg(100 millis)
register.expectNoMsg(100 millis)
sender.send(relayer, testCase.fails(1))
eventListener.expectNoMsg(100 millis)
@ -240,7 +240,7 @@ class PostRestartHtlcCleanerSpec extends TestKitBaseClass with FixtureAnyFunSuit
assert(e2.paymentHash === paymentHash1)
assert(nodeParams.db.payments.getOutgoingPayment(testCase.childIds.head).get.status.isInstanceOf[OutgoingPaymentStatus.Failed])
commandBuffer.expectNoMsg(100 millis)
register.expectNoMsg(100 millis)
}
test("handle a local payment htlc-fulfill") { f =>
@ -248,7 +248,7 @@ class PostRestartHtlcCleanerSpec extends TestKitBaseClass with FixtureAnyFunSuit
val testCase = setupLocalPayments(nodeParams)
val relayer = f.createRelayer()
commandBuffer.expectNoMsg(100 millis)
register.expectNoMsg(100 millis)
sender.send(relayer, testCase.fulfills(1))
eventListener.expectNoMsg(100 millis)
@ -276,7 +276,7 @@ class PostRestartHtlcCleanerSpec extends TestKitBaseClass with FixtureAnyFunSuit
assert(e2.recipientAmount === 561.msat)
assert(nodeParams.db.payments.getOutgoingPayment(testCase.childIds.head).get.status.isInstanceOf[OutgoingPaymentStatus.Succeeded])
commandBuffer.expectNoMsg(100 millis)
register.expectNoMsg(100 millis)
}
test("ignore htlcs in closing downstream channels that have already been settled upstream") { f =>
@ -284,9 +284,9 @@ class PostRestartHtlcCleanerSpec extends TestKitBaseClass with FixtureAnyFunSuit
val testCase = setupTrampolinePayments(nodeParams)
val initialized = Promise[Done]()
val postRestart = system.actorOf(PostRestartHtlcCleaner.props(nodeParams, commandBuffer.ref, Some(initialized)))
val postRestart = system.actorOf(PostRestartHtlcCleaner.props(nodeParams, register.ref, Some(initialized)))
awaitCond(initialized.isCompleted)
commandBuffer.expectNoMsg(100 millis)
register.expectNoMsg(100 millis)
val probe = TestProbe()
probe.send(postRestart, PostRestartHtlcCleaner.GetBrokenHtlcs)
@ -390,7 +390,7 @@ class PostRestartHtlcCleanerSpec extends TestKitBaseClass with FixtureAnyFunSuit
nodeParams.db.channels.addOrUpdateChannel(data_downstream)
val relayer = f.createRelayer()
commandBuffer.expectNoMsg(100 millis) // nothing should happen while channels are still offline.
register.expectNoMsg(100 millis) // nothing should happen while channels are still offline.
val (channel_upstream_1, channel_upstream_2, channel_upstream_3) = (TestProbe(), TestProbe(), TestProbe())
system.eventStream.publish(ChannelStateChanged(channel_upstream_1.ref, system.deadLetters, a, OFFLINE, NORMAL, data_upstream_1))
@ -406,9 +406,9 @@ class PostRestartHtlcCleanerSpec extends TestKitBaseClass with FixtureAnyFunSuit
// Payment 2 should fulfill once we receive the preimage.
val origin_2 = Origin.TrampolineRelayed(upstream_2.adds.map(add => (add.channelId, add.id)).toList, None)
sender.send(relayer, Relayer.ForwardOnChainFulfill(preimage2, origin_2, htlc_2_2))
commandBuffer.expectMsgAllOf(
CommandBuffer.CommandSend(channelId_ab_1, CMD_FULFILL_HTLC(5, preimage2, commit = true)),
CommandBuffer.CommandSend(channelId_ab_2, CMD_FULFILL_HTLC(9, preimage2, commit = true))
register.expectMsgAllOf(
Register.Forward(channelId_ab_1, CMD_FULFILL_HTLC(5, preimage2, commit = true)),
Register.Forward(channelId_ab_2, CMD_FULFILL_HTLC(9, preimage2, commit = true))
)
// Payment 3 should not be failed: we are still waiting for on-chain confirmation.
@ -420,28 +420,28 @@ class PostRestartHtlcCleanerSpec extends TestKitBaseClass with FixtureAnyFunSuit
val testCase = setupTrampolinePayments(nodeParams)
val relayer = f.createRelayer()
commandBuffer.expectNoMsg(100 millis)
register.expectNoMsg(100 millis)
// This downstream HTLC has two upstream HTLCs.
sender.send(relayer, buildForwardFail(testCase.downstream_1_1, testCase.upstream_1))
val fails = commandBuffer.expectMsgType[CommandBuffer.CommandSend[CMD_FAIL_HTLC]] :: commandBuffer.expectMsgType[CommandBuffer.CommandSend[CMD_FAIL_HTLC]] :: Nil
val fails = register.expectMsgType[Register.Forward[CMD_FAIL_HTLC]] :: register.expectMsgType[Register.Forward[CMD_FAIL_HTLC]] :: Nil
assert(fails.toSet === testCase.upstream_1.origins.map {
case (channelId, htlcId) => CommandBuffer.CommandSend(channelId, CMD_FAIL_HTLC(htlcId, Right(TemporaryNodeFailure), commit = true))
case (channelId, htlcId) => Register.Forward(channelId, CMD_FAIL_HTLC(htlcId, Right(TemporaryNodeFailure), commit = true))
}.toSet)
sender.send(relayer, buildForwardFail(testCase.downstream_1_1, testCase.upstream_1))
commandBuffer.expectNoMsg(100 millis) // a duplicate failure should be ignored
register.expectNoMsg(100 millis) // a duplicate failure should be ignored
sender.send(relayer, buildForwardOnChainFail(testCase.downstream_2_1, testCase.upstream_2))
sender.send(relayer, buildForwardFail(testCase.downstream_2_2, testCase.upstream_2))
commandBuffer.expectNoMsg(100 millis) // there is still a third downstream payment pending
register.expectNoMsg(100 millis) // there is still a third downstream payment pending
sender.send(relayer, buildForwardFail(testCase.downstream_2_3, testCase.upstream_2))
commandBuffer.expectMsg(testCase.upstream_2.origins.map {
case (channelId, htlcId) => CommandBuffer.CommandSend(channelId, CMD_FAIL_HTLC(htlcId, Right(TemporaryNodeFailure), commit = true))
register.expectMsg(testCase.upstream_2.origins.map {
case (channelId, htlcId) => Register.Forward(channelId, CMD_FAIL_HTLC(htlcId, Right(TemporaryNodeFailure), commit = true))
}.head)
commandBuffer.expectNoMsg(100 millis)
register.expectNoMsg(100 millis)
eventListener.expectNoMsg(100 millis)
}
@ -450,27 +450,27 @@ class PostRestartHtlcCleanerSpec extends TestKitBaseClass with FixtureAnyFunSuit
val testCase = setupTrampolinePayments(nodeParams)
val relayer = f.createRelayer()
commandBuffer.expectNoMsg(100 millis)
register.expectNoMsg(100 millis)
// This downstream HTLC has two upstream HTLCs.
sender.send(relayer, buildForwardFulfill(testCase.downstream_1_1, testCase.upstream_1, preimage1))
val fails = commandBuffer.expectMsgType[CommandBuffer.CommandSend[CMD_FULFILL_HTLC]] :: commandBuffer.expectMsgType[CommandBuffer.CommandSend[CMD_FULFILL_HTLC]] :: Nil
val fails = register.expectMsgType[Register.Forward[CMD_FULFILL_HTLC]] :: register.expectMsgType[Register.Forward[CMD_FULFILL_HTLC]] :: Nil
assert(fails.toSet === testCase.upstream_1.origins.map {
case (channelId, htlcId) => CommandBuffer.CommandSend(channelId, CMD_FULFILL_HTLC(htlcId, preimage1, commit = true))
case (channelId, htlcId) => Register.Forward(channelId, CMD_FULFILL_HTLC(htlcId, preimage1, commit = true))
}.toSet)
sender.send(relayer, buildForwardFulfill(testCase.downstream_1_1, testCase.upstream_1, preimage1))
commandBuffer.expectNoMsg(100 millis) // a duplicate fulfill should be ignored
register.expectNoMsg(100 millis) // a duplicate fulfill should be ignored
// This payment has 3 downstream HTLCs, but we should fulfill upstream as soon as we receive the preimage.
sender.send(relayer, buildForwardFulfill(testCase.downstream_2_1, testCase.upstream_2, preimage2))
commandBuffer.expectMsg(testCase.upstream_2.origins.map {
case (channelId, htlcId) => CommandBuffer.CommandSend(channelId, CMD_FULFILL_HTLC(htlcId, preimage2, commit = true))
register.expectMsg(testCase.upstream_2.origins.map {
case (channelId, htlcId) => Register.Forward(channelId, CMD_FULFILL_HTLC(htlcId, preimage2, commit = true))
}.head)
sender.send(relayer, buildForwardFulfill(testCase.downstream_2_2, testCase.upstream_2, preimage2))
sender.send(relayer, buildForwardFulfill(testCase.downstream_2_3, testCase.upstream_2, preimage2))
commandBuffer.expectNoMsg(100 millis) // the payment has already been fulfilled upstream
register.expectNoMsg(100 millis) // the payment has already been fulfilled upstream
eventListener.expectNoMsg(100 millis)
}
@ -479,17 +479,17 @@ class PostRestartHtlcCleanerSpec extends TestKitBaseClass with FixtureAnyFunSuit
val testCase = setupTrampolinePayments(nodeParams)
val relayer = f.createRelayer()
commandBuffer.expectNoMsg(100 millis)
register.expectNoMsg(100 millis)
sender.send(relayer, buildForwardFail(testCase.downstream_2_1, testCase.upstream_2))
sender.send(relayer, buildForwardFulfill(testCase.downstream_2_2, testCase.upstream_2, preimage2))
commandBuffer.expectMsg(testCase.upstream_2.origins.map {
case (channelId, htlcId) => CommandBuffer.CommandSend(channelId, CMD_FULFILL_HTLC(htlcId, preimage2, commit = true))
register.expectMsg(testCase.upstream_2.origins.map {
case (channelId, htlcId) => Register.Forward(channelId, CMD_FULFILL_HTLC(htlcId, preimage2, commit = true))
}.head)
sender.send(relayer, buildForwardFail(testCase.downstream_2_3, testCase.upstream_2))
commandBuffer.expectNoMsg(100 millis) // the payment has already been fulfilled upstream
register.expectNoMsg(100 millis) // the payment has already been fulfilled upstream
eventListener.expectNoMsg(100 millis)
}

View file

@ -27,7 +27,7 @@ import fr.acinq.eclair.payment.IncomingPacket.FinalPacket
import fr.acinq.eclair.payment.OutgoingPacket.{buildCommand, buildOnion, buildPacket}
import fr.acinq.eclair.payment.relay.Origin._
import fr.acinq.eclair.payment.relay.Relayer._
import fr.acinq.eclair.payment.relay.{CommandBuffer, Relayer}
import fr.acinq.eclair.payment.relay.Relayer
import fr.acinq.eclair.payment.send.MultiPartPaymentLifecycle.PreimageReceived
import fr.acinq.eclair.router.Router.{ChannelHop, Ignore, NodeHop}
import fr.acinq.eclair.router.{Announcements, _}
@ -54,10 +54,9 @@ class RelayerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike {
within(30 seconds) {
val nodeParams = TestConstants.Bob.nodeParams
val (router, register) = (TestProbe(), TestProbe())
val commandBuffer = system.actorOf(Props(new CommandBuffer(nodeParams, register.ref)))
val paymentHandler = TestProbe()
// we are node B in the route A -> B -> C -> ....
val relayer = system.actorOf(Relayer.props(nodeParams, router.ref, register.ref, commandBuffer, paymentHandler.ref))
val relayer = system.actorOf(Relayer.props(nodeParams, router.ref, register.ref, paymentHandler.ref))
withFixture(test.toNoArgTest(FixtureParam(nodeParams, relayer, router, register, paymentHandler, TestProbe())))
}
}
@ -379,8 +378,7 @@ class RelayerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike {
import f._
val nodeParams = TestConstants.Bob.nodeParams.copy(enableTrampolinePayment = false)
val commandBuffer = system.actorOf(Props(new CommandBuffer(nodeParams, register.ref)))
val relayer = system.actorOf(Relayer.props(nodeParams, router.ref, register.ref, commandBuffer, paymentHandler.ref))
val relayer = system.actorOf(Relayer.props(nodeParams, router.ref, register.ref, paymentHandler.ref))
// we use this to build a valid trampoline onion inside a normal onion
val trampolineHops = NodeHop(a, b, channelUpdate_ab.cltvExpiryDelta, 0 msat) :: NodeHop(b, c, channelUpdate_bc.cltvExpiryDelta, fee_b) :: Nil