1
0
Fork 0
mirror of https://github.com/ACINQ/eclair.git synced 2025-02-23 22:46:44 +01:00

Smarter relay logic (#1011)

When relaying a payment, we look in the onion to find the next
`shortChannelId`. But we may choose a different channel (to the same
node), because the requested channel may not have enough balance, of for
some other reasons, as permitted by the spec.

Currently we limit ourselves to only two attempts: one with a
"preferred" channel, and one with the originally requested channel if is
different from the preferred one. This has drawbacks, because if we have
multiple channels to the same node, we may not be able to relay a
payment if the "preferred" channel is currently unavailable (e.g.
because of an htlc in-flight value that is too high).

We now retry as many times as there are available channels, in our order
of preference, and if all fail, then we return a failure message for the
originally requested channel.
This commit is contained in:
Pierre-Marie Padiou 2019-06-03 19:10:14 +02:00 committed by GitHub
parent cd78c9ecdd
commit b33c9ecaac
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 233 additions and 149 deletions

View file

@ -18,7 +18,7 @@ package fr.acinq.eclair.channel
import fr.acinq.bitcoin.Crypto.Scalar
import fr.acinq.bitcoin.{ByteVector32, Transaction}
import fr.acinq.eclair.UInt64
import fr.acinq.eclair.{ShortChannelId, UInt64}
import fr.acinq.eclair.payment.Origin
import fr.acinq.eclair.wire.{ChannelUpdate, UpdateAddHtlc}

View file

@ -108,7 +108,7 @@ case class BITCOIN_PARENT_TX_CONFIRMED(childTx: Transaction) extends BitcoinEven
*/
sealed trait Command
final case class CMD_ADD_HTLC(amountMsat: Long, paymentHash: ByteVector32, cltvExpiry: Long, onion: ByteVector = Sphinx.LAST_PACKET.serialize, upstream: Either[UUID, UpdateAddHtlc], commit: Boolean = false, redirected: Boolean = false) extends Command
final case class CMD_ADD_HTLC(amountMsat: Long, paymentHash: ByteVector32, cltvExpiry: Long, onion: ByteVector = Sphinx.LAST_PACKET.serialize, upstream: Either[UUID, UpdateAddHtlc], commit: Boolean = false, previousFailures: Seq[AddHtlcFailed] = Seq.empty) extends Command
final case class CMD_FULFILL_HTLC(id: Long, r: ByteVector32, commit: Boolean = false) extends Command
final case class CMD_FAIL_HTLC(id: Long, reason: Either[ByteVector, FailureMessage], commit: Boolean = false) extends Command
final case class CMD_FAIL_MALFORMED_HTLC(id: Long, onionHash: ByteVector32, failureCode: Int, commit: Boolean = false) extends Command

View file

@ -42,7 +42,7 @@ case class Local(id: UUID, sender: Option[ActorRef]) extends Origin // we don't
case class Relayed(originChannelId: ByteVector32, originHtlcId: Long, amountMsatIn: Long, amountMsatOut: Long) extends Origin
sealed trait ForwardMessage
case class ForwardAdd(add: UpdateAddHtlc, canRedirect: Boolean = true) extends ForwardMessage
case class ForwardAdd(add: UpdateAddHtlc, previousFailures: Seq[AddHtlcFailed] = Seq.empty) extends ForwardMessage
case class ForwardFulfill(fulfill: UpdateFulfillHtlc, to: Origin, htlc: UpdateAddHtlc) extends ForwardMessage
case class ForwardFail(fail: UpdateFailHtlc, to: Origin, htlc: UpdateAddHtlc) extends ForwardMessage
case class ForwardFailMalformed(fail: UpdateFailMalformedHtlc, to: Origin, htlc: UpdateAddHtlc) extends ForwardMessage
@ -58,13 +58,13 @@ class Relayer(nodeParams: NodeParams, register: ActorRef, paymentHandler: ActorR
import Relayer._
// we pass these to helpers classes so that they have the logging context
implicit def implicitLog = log
implicit def implicitLog: LoggingAdapter = log
context.system.eventStream.subscribe(self, classOf[LocalChannelUpdate])
context.system.eventStream.subscribe(self, classOf[LocalChannelDown])
context.system.eventStream.subscribe(self, classOf[AvailableBalanceChanged])
val commandBuffer = context.actorOf(Props(new CommandBuffer(nodeParams, register)))
private val commandBuffer = context.actorOf(Props(new CommandBuffer(nodeParams, register)))
override def receive: Receive = main(Map.empty, new mutable.HashMap[PublicKey, mutable.Set[ShortChannelId]] with mutable.MultiMap[PublicKey, ShortChannelId])
@ -85,7 +85,7 @@ class Relayer(nodeParams: NodeParams, register: ActorRef, paymentHandler: ActorR
}
context become main(channelUpdates1, node2channels)
case ForwardAdd(add, canRedirect) =>
case ForwardAdd(add, previousFailures) =>
log.debug(s"received forwarding request for htlc #${add.id} paymentHash=${add.paymentHash} from channelId=${add.channelId}")
tryParsePacket(add, nodeParams.privateKey) match {
case Success(p: FinalPayload) =>
@ -98,12 +98,11 @@ class Relayer(nodeParams: NodeParams, register: ActorRef, paymentHandler: ActorR
paymentHandler forward addHtlc
}
case Success(r: RelayPayload) =>
val selectedShortChannelId = if (canRedirect) selectPreferredChannel(r, channelUpdates, node2channels) else r.payload.shortChannelId
handleRelay(r, channelUpdates.get(selectedShortChannelId).map(_.channelUpdate)) match {
handleRelay(r, channelUpdates, node2channels, previousFailures) match {
case Left(cmdFail) =>
log.info(s"rejecting htlc #${add.id} paymentHash=${add.paymentHash} from channelId=${add.channelId} to shortChannelId=${r.payload.shortChannelId} reason=${cmdFail.reason}")
commandBuffer ! CommandBuffer.CommandSend(add.channelId, add.id, cmdFail)
case Right(cmdAdd) =>
case Right((selectedShortChannelId, cmdAdd)) =>
log.info(s"forwarding htlc #${add.id} paymentHash=${add.paymentHash} from channelId=${add.channelId} to shortChannelId=$selectedShortChannelId")
register ! Register.ForwardShortId(selectedShortChannelId, cmdAdd)
}
@ -119,79 +118,74 @@ class Relayer(nodeParams: NodeParams, register: ActorRef, paymentHandler: ActorR
val cmdFail = CMD_FAIL_HTLC(add.id, Right(UnknownNextPeer), commit = true)
commandBuffer ! CommandBuffer.CommandSend(add.channelId, add.id, cmdFail)
case Status.Failure(AddHtlcFailed(_, paymentHash, _, Local(id, None), _, _)) =>
// we sent the payment, but we probably restarted and the reference to the original sender was lost,
// we publish the failure on the event stream and update the status in paymentDb
nodeParams.db.payments.updateOutgoingPayment(id, OutgoingPaymentStatus.FAILED)
context.system.eventStream.publish(PaymentFailed(id, paymentHash, Nil))
case Status.Failure(AddHtlcFailed(_, _, error, Local(_, Some(sender)), _, _)) =>
sender ! Status.Failure(error)
case Status.Failure(AddHtlcFailed(_, paymentHash, error, Relayed(originChannelId, originHtlcId, _, _), channelUpdate_opt, originalCommand_opt)) =>
originalCommand_opt match {
case Some(cmd) if cmd.redirected && cmd.upstream.isRight => // cmd.upstream_opt.isDefined always true since origin = relayed
// if it was redirected, we give it one more try with the original requested channel (meaning that the error returned will always be for the requested channel)
log.info(s"retrying htlc #$originHtlcId paymentHash=$paymentHash from channelId=$originChannelId")
self ! ForwardAdd(cmd.upstream.right.get, canRedirect = false)
case _ =>
// otherwise we just return a failure
val failure = (error, channelUpdate_opt) match {
case (_: ExpiryTooSmall, Some(channelUpdate)) => ExpiryTooSoon(channelUpdate)
case (_: ExpiryTooBig, _) => ExpiryTooFar
case (_: InsufficientFunds, Some(channelUpdate)) => TemporaryChannelFailure(channelUpdate)
case (_: TooManyAcceptedHtlcs, Some(channelUpdate)) => TemporaryChannelFailure(channelUpdate)
case (_: ChannelUnavailable, Some(channelUpdate)) if !Announcements.isEnabled(channelUpdate.channelFlags) => ChannelDisabled(channelUpdate.messageFlags, channelUpdate.channelFlags, channelUpdate)
case (_: ChannelUnavailable, None) => PermanentChannelFailure
case (_: HtlcTimedout, _) => PermanentChannelFailure
case _ => TemporaryNodeFailure
case Status.Failure(addFailed: AddHtlcFailed) =>
import addFailed.paymentHash
addFailed.origin match {
case Local(id, None) =>
// we sent the payment, but we probably restarted and the reference to the original sender was lost,
// we publish the failure on the event stream and update the status in paymentDb
nodeParams.db.payments.updateOutgoingPayment(id, OutgoingPaymentStatus.FAILED)
context.system.eventStream.publish(PaymentFailed(id, paymentHash, Nil))
case Local(_, Some(sender)) =>
sender ! Status.Failure(addFailed)
case Relayed(originChannelId, originHtlcId, _, _) =>
addFailed.originalCommand match {
case Some(cmd) =>
log.info(s"retrying htlc #$originHtlcId paymentHash=$paymentHash from channelId=$originChannelId")
// NB: cmd.upstream.right is defined since this is a relayed payment
self ! ForwardAdd(cmd.upstream.right.get, cmd.previousFailures :+ addFailed)
case None =>
val failure = translateError(addFailed)
val cmdFail = CMD_FAIL_HTLC(originHtlcId, Right(failure), commit = true)
log.info(s"rejecting htlc #$originHtlcId paymentHash=$paymentHash from channelId=$originChannelId reason=${cmdFail.reason}")
commandBuffer ! CommandBuffer.CommandSend(originChannelId, originHtlcId, cmdFail)
}
val cmdFail = CMD_FAIL_HTLC(originHtlcId, Right(failure), commit = true)
log.info(s"rejecting htlc #$originHtlcId paymentHash=$paymentHash from channelId=$originChannelId reason=${cmdFail.reason}")
commandBuffer ! CommandBuffer.CommandSend(originChannelId, originHtlcId, cmdFail)
}
case ForwardFulfill(fulfill, Local(id, None), add) =>
val feesPaid = MilliSatoshi(0)
context.system.eventStream.publish(PaymentSent(id, MilliSatoshi(add.amountMsat), feesPaid, add.paymentHash, fulfill.paymentPreimage, fulfill.channelId))
// we sent the payment, but we probably restarted and the reference to the original sender was lost,
// we publish the failure on the event stream and update the status in paymentDb
nodeParams.db.payments.updateOutgoingPayment(id, OutgoingPaymentStatus.SUCCEEDED, Some(fulfill.paymentPreimage))
context.system.eventStream.publish(PaymentSucceeded(id, add.amountMsat, add.paymentHash, fulfill.paymentPreimage, Nil)) //
case ForwardFulfill(fulfill, to, add) =>
to match {
case Local(id, None) =>
val feesPaid = MilliSatoshi(0)
context.system.eventStream.publish(PaymentSent(id, MilliSatoshi(add.amountMsat), feesPaid, add.paymentHash, fulfill.paymentPreimage, fulfill.channelId))
// we sent the payment, but we probably restarted and the reference to the original sender was lost,
// we publish the failure on the event stream and update the status in paymentDb
nodeParams.db.payments.updateOutgoingPayment(id, OutgoingPaymentStatus.SUCCEEDED, Some(fulfill.paymentPreimage))
context.system.eventStream.publish(PaymentSucceeded(id, add.amountMsat, add.paymentHash, fulfill.paymentPreimage, Nil)) //
case Local(_, Some(sender)) =>
sender ! fulfill
case Relayed(originChannelId, originHtlcId, amountMsatIn, amountMsatOut) =>
val cmd = CMD_FULFILL_HTLC(originHtlcId, fulfill.paymentPreimage, commit = true)
commandBuffer ! CommandBuffer.CommandSend(originChannelId, originHtlcId, cmd)
context.system.eventStream.publish(PaymentRelayed(MilliSatoshi(amountMsatIn), MilliSatoshi(amountMsatOut), add.paymentHash, fromChannelId = originChannelId, toChannelId = fulfill.channelId))
}
case ForwardFulfill(fulfill, Local(_, Some(sender)), _) =>
sender ! fulfill
case ForwardFail(fail, to, add) =>
to match {
case Local(id, None) =>
// we sent the payment, but we probably restarted and the reference to the original sender was lost
// we publish the failure on the event stream and update the status in paymentDb
nodeParams.db.payments.updateOutgoingPayment(id, OutgoingPaymentStatus.FAILED)
context.system.eventStream.publish(PaymentFailed(id, add.paymentHash, Nil))
case Local(_, Some(sender)) =>
sender ! fail
case Relayed(originChannelId, originHtlcId, _, _) =>
val cmd = CMD_FAIL_HTLC(originHtlcId, Left(fail.reason), commit = true)
commandBuffer ! CommandBuffer.CommandSend(originChannelId, originHtlcId, cmd)
}
case ForwardFulfill(fulfill, Relayed(originChannelId, originHtlcId, amountMsatIn, amountMsatOut), add) =>
val cmd = CMD_FULFILL_HTLC(originHtlcId, fulfill.paymentPreimage, commit = true)
commandBuffer ! CommandBuffer.CommandSend(originChannelId, originHtlcId, cmd)
context.system.eventStream.publish(PaymentRelayed(MilliSatoshi(amountMsatIn), MilliSatoshi(amountMsatOut), add.paymentHash, fromChannelId = originChannelId, toChannelId = fulfill.channelId))
case ForwardFail(_, Local(id, None), add) =>
// we sent the payment, but we probably restarted and the reference to the original sender was lost
// we publish the failure on the event stream and update the status in paymentDb
nodeParams.db.payments.updateOutgoingPayment(id, OutgoingPaymentStatus.FAILED)
context.system.eventStream.publish(PaymentFailed(id, add.paymentHash, Nil))
case ForwardFail(fail, Local(_, Some(sender)), _) =>
sender ! fail
case ForwardFail(fail, Relayed(originChannelId, originHtlcId, _, _), _) =>
val cmd = CMD_FAIL_HTLC(originHtlcId, Left(fail.reason), commit = true)
commandBuffer ! CommandBuffer.CommandSend(originChannelId, originHtlcId, cmd)
case ForwardFailMalformed(_, Local(id, None), add) =>
// we sent the payment, but we probably restarted and the reference to the original sender was lost
// we publish the failure on the event stream and update the status in paymentDb
nodeParams.db.payments.updateOutgoingPayment(id, OutgoingPaymentStatus.FAILED)
context.system.eventStream.publish(PaymentFailed(id, add.paymentHash, Nil))
case ForwardFailMalformed(fail, Local(_, Some(sender)), _) =>
sender ! fail
case ForwardFailMalformed(fail, Relayed(originChannelId, originHtlcId, _, _), _) =>
val cmd = CMD_FAIL_MALFORMED_HTLC(originHtlcId, fail.onionHash, fail.failureCode, commit = true)
commandBuffer ! CommandBuffer.CommandSend(originChannelId, originHtlcId, cmd)
case ForwardFailMalformed(fail, to, add) =>
to match {
case Local(id, None) =>
// we sent the payment, but we probably restarted and the reference to the original sender was lost
// we publish the failure on the event stream and update the status in paymentDb
nodeParams.db.payments.updateOutgoingPayment(id, OutgoingPaymentStatus.FAILED)
context.system.eventStream.publish(PaymentFailed(id, add.paymentHash, Nil))
case Local(_, Some(sender)) =>
sender ! fail
case Relayed(originChannelId, originHtlcId, _, _) =>
val cmd = CMD_FAIL_MALFORMED_HTLC(originHtlcId, fail.onionHash, fail.failureCode, commit = true)
commandBuffer ! CommandBuffer.CommandSend(originChannelId, originHtlcId, cmd)
}
case ack: CommandBuffer.CommandAck => commandBuffer forward ack
@ -209,8 +203,8 @@ object Relayer {
sealed trait NextPayload
case class FinalPayload(add: UpdateAddHtlc, payload: PerHopPayload) extends NextPayload
case class RelayPayload(add: UpdateAddHtlc, payload: PerHopPayload, nextPacket: Sphinx.Packet) extends NextPayload {
val relayFeeMsat = add.amountMsat - payload.amtToForward
val expiryDelta = add.cltvExpiry - payload.outgoingCltvValue
val relayFeeMsat: Long = add.amountMsat - payload.amtToForward
val expiryDelta: Long = add.cltvExpiry - payload.outgoingCltvValue
}
// @formatter:on
@ -265,7 +259,80 @@ object Relayer {
* - a CMD_FAIL_HTLC to be sent back upstream
* - a CMD_ADD_HTLC to propagate downstream
*/
def handleRelay(relayPayload: RelayPayload, channelUpdate_opt: Option[ChannelUpdate])(implicit log: LoggingAdapter): Either[CMD_FAIL_HTLC, CMD_ADD_HTLC] = {
def handleRelay(relayPayload: RelayPayload, channelUpdates: Map[ShortChannelId, OutgoingChannel], node2channels: mutable.Map[PublicKey, mutable.Set[ShortChannelId]] with mutable.MultiMap[PublicKey, ShortChannelId], previousFailures: Seq[AddHtlcFailed])(implicit log: LoggingAdapter): Either[CMD_FAIL_HTLC, (ShortChannelId, CMD_ADD_HTLC)] = {
import relayPayload._
log.info(s"relaying htlc #${add.id} paymentHash={} from channelId={} to requestedShortChannelId={} previousAttempts={}", add.paymentHash, add.channelId, relayPayload.payload.shortChannelId, previousFailures.size)
val alreadyTried = previousFailures.flatMap(_.channelUpdate).map(_.shortChannelId)
selectPreferredChannel(relayPayload, channelUpdates, node2channels, alreadyTried)
.flatMap(selectedShortChannelId => channelUpdates.get(selectedShortChannelId).map(_.channelUpdate)) match {
case None if previousFailures.nonEmpty =>
// no more channels to try
val error = previousFailures
// we return the error for the initially requested channel if it exists
.find(_.channelUpdate.map(_.shortChannelId).contains(relayPayload.payload.shortChannelId))
// otherwise we return the error for the first channel tried
.getOrElse(previousFailures.head)
Left(CMD_FAIL_HTLC(add.id, Right(translateError(error)), commit = true))
case channelUpdate_opt =>
relayOrFail(relayPayload, channelUpdate_opt, previousFailures)
}
}
/**
* Select a channel to the same node to relay the payment to, that has the lowest balance and is compatible in
* terms of fees, expiry_delta, etc.
*
* If no suitable channel is found we default to the originally requested channel.
*/
def selectPreferredChannel(relayPayload: RelayPayload, channelUpdates: Map[ShortChannelId, OutgoingChannel], node2channels: mutable.Map[PublicKey, mutable.Set[ShortChannelId]] with mutable.MultiMap[PublicKey, ShortChannelId], alreadyTried: Seq[ShortChannelId])(implicit log: LoggingAdapter): Option[ShortChannelId] = {
import relayPayload.add
val requestedShortChannelId = relayPayload.payload.shortChannelId
log.debug(s"selecting next channel for htlc #${add.id} paymentHash={} from channelId={} to requestedShortChannelId={} previousAttempts={}", add.paymentHash, add.channelId, requestedShortChannelId, alreadyTried.size)
// first we find out what is the next node
channelUpdates.get(requestedShortChannelId) match {
case Some(OutgoingChannel(nextNodeId, _, _)) =>
log.debug(s"next hop for htlc #{} paymentHash={} is nodeId={}", add.id, add.paymentHash, nextNodeId)
// then we retrieve all known channels to this node
val allChannels = node2channels.getOrElse(nextNodeId, Set.empty[ShortChannelId])
// we then filter out channels that we have already tried
val candidateChannels = allChannels -- alreadyTried
// and we filter keep the ones that are compatible with this payment (mainly fees, expiry delta)
candidateChannels
.map { shortChannelId =>
val channelInfo_opt = channelUpdates.get(shortChannelId)
val channelUpdate_opt = channelInfo_opt.map(_.channelUpdate)
val relayResult = relayOrFail(relayPayload, channelUpdate_opt)
log.debug(s"candidate channel for htlc #${add.id} paymentHash=${add.paymentHash}: shortChannelId={} balanceMsat={} channelUpdate={} relayResult={}", shortChannelId, channelInfo_opt.map(_.availableBalanceMsat).getOrElse(""), channelUpdate_opt.getOrElse(""), relayResult)
(shortChannelId, channelInfo_opt, relayResult)
}
.collect { case (shortChannelId, Some(channelInfo), Right(_)) => (shortChannelId, channelInfo.availableBalanceMsat) }
.filter(_._2 > relayPayload.payload.amtToForward) // we only keep channels that have enough balance to handle this payment
.toList // needed for ordering
.sortBy(_._2) // we want to use the channel with the lowest available balance that can process the payment
.headOption match {
case Some((preferredShortChannelId, availableBalanceMsat)) if preferredShortChannelId != requestedShortChannelId =>
log.info("replacing requestedShortChannelId={} by preferredShortChannelId={} with availableBalanceMsat={}", requestedShortChannelId, preferredShortChannelId, availableBalanceMsat)
Some(preferredShortChannelId)
case Some(_) =>
// the requested short_channel_id is already our preferred channel
Some(requestedShortChannelId)
case None if !alreadyTried.contains(requestedShortChannelId) =>
// no channel seem to work for this payment, we keep the requested channel id
Some(requestedShortChannelId)
case None =>
// no channel seem to work for this payment and we have already tried the requested channel id: we give up
None
}
case _ => Some(requestedShortChannelId) // we don't have a channel_update for this short_channel_id
}
}
/**
* This helper method will tell us if it is not even worth attempting to relay the payment to our local outgoing
* channel, because some parameters don't match with our settings for that channel. In that case we directly fail the
* htlc.
*/
def relayOrFail(relayPayload: RelayPayload, channelUpdate_opt: Option[ChannelUpdate], previousFailures: Seq[AddHtlcFailed] = Seq.empty)(implicit log: LoggingAdapter): Either[CMD_FAIL_HTLC, (ShortChannelId, CMD_ADD_HTLC)] = {
import relayPayload._
channelUpdate_opt match {
case None =>
@ -279,58 +346,27 @@ object Relayer {
case Some(channelUpdate) if relayPayload.relayFeeMsat < nodeFee(channelUpdate.feeBaseMsat, channelUpdate.feeProportionalMillionths, payload.amtToForward) =>
Left(CMD_FAIL_HTLC(add.id, Right(FeeInsufficient(add.amountMsat, channelUpdate)), commit = true))
case Some(channelUpdate) =>
val isRedirected = (channelUpdate.shortChannelId != payload.shortChannelId) // we may decide to use another channel (to the same node) from the one requested
Right(CMD_ADD_HTLC(payload.amtToForward, add.paymentHash, payload.outgoingCltvValue, nextPacket.serialize, upstream = Right(add), commit = true, redirected = isRedirected))
Right((channelUpdate.shortChannelId, CMD_ADD_HTLC(payload.amtToForward, add.paymentHash, payload.outgoingCltvValue, nextPacket.serialize, upstream = Right(add), commit = true, previousFailures = previousFailures)))
}
}
/**
* Select a channel to the same node to the relay the payment to, that has the lowest balance and is compatible in
* terms of fees, expiry_delta, etc.
*
* If no suitable channel is found we default to the originally requested channel.
*
* @param relayPayload
* @param channelUpdates
* @param node2channels
* @param log
* @return
* This helper method translates relaying errors (returned by the downstream outgoing channel) to BOLT 4 standard
* errors that we should return upstream.
*/
def selectPreferredChannel(relayPayload: RelayPayload, channelUpdates: Map[ShortChannelId, OutgoingChannel], node2channels: mutable.Map[PublicKey, mutable.Set[ShortChannelId]] with mutable.MultiMap[PublicKey, ShortChannelId])(implicit log: LoggingAdapter): ShortChannelId = {
import relayPayload.add
val requestedShortChannelId = relayPayload.payload.shortChannelId
log.debug(s"selecting next channel for htlc #{} paymentHash={} from channelId={} to requestedShortChannelId={}", add.id, add.paymentHash, add.channelId, requestedShortChannelId)
// first we find out what is the next node
channelUpdates.get(requestedShortChannelId) match {
case Some(OutgoingChannel(nextNodeId, _, _)) =>
log.debug(s"next hop for htlc #{} paymentHash={} is nodeId={}", add.id, add.paymentHash, nextNodeId)
// then we retrieve all known channels to this node
val candidateChannels = node2channels.get(nextNodeId).getOrElse(Set.empty[ShortChannelId])
// and we filter keep the ones that are compatible with this payment (mainly fees, expiry delta)
candidateChannels
.map { shortChannelId =>
val channelInfo_opt = channelUpdates.get(shortChannelId)
val channelUpdate_opt = channelInfo_opt.map(_.channelUpdate)
val relayResult = handleRelay(relayPayload, channelUpdate_opt)
log.debug(s"candidate channel for htlc #${add.id} paymentHash=${add.paymentHash}: shortChannelId={} balanceMsat={} channelUpdate={} relayResult={}", shortChannelId, channelInfo_opt.map(_.availableBalanceMsat).getOrElse(""), channelUpdate_opt.getOrElse(""), relayResult)
(shortChannelId, channelInfo_opt, relayResult)
}
.collect { case (shortChannelId, Some(channelInfo), Right(_)) => (shortChannelId, channelInfo.availableBalanceMsat) }
.filter(_._2 > relayPayload.payload.amtToForward) // we only keep channels that have enough balance to handle this payment
.toList // needed for ordering
.sortBy(_._2) // we want to use the channel with the lowest available balance that can process the payment
.headOption match {
case Some((preferredShortChannelId, availableBalanceMsat)) if preferredShortChannelId != requestedShortChannelId =>
log.info("replacing requestedShortChannelId={} by preferredShortChannelId={} with availableBalanceMsat={}", requestedShortChannelId, preferredShortChannelId, availableBalanceMsat)
preferredShortChannelId
case Some(_) =>
// the requested short_channel_id is already our preferred channel
requestedShortChannelId
case None =>
// no channel seem to work for this payment, we keep the requested channel id
requestedShortChannelId
}
case _ => requestedShortChannelId // we don't have a channel_update for this short_channel_id
private def translateError(failure: AddHtlcFailed): FailureMessage = {
val error = failure.t
val channelUpdate_opt = failure.channelUpdate
(error, channelUpdate_opt) match {
case (_: ExpiryTooSmall, Some(channelUpdate)) => ExpiryTooSoon(channelUpdate)
case (_: ExpiryTooBig, _) => ExpiryTooFar
case (_: InsufficientFunds, Some(channelUpdate)) => TemporaryChannelFailure(channelUpdate)
case (_: TooManyAcceptedHtlcs, Some(channelUpdate)) => TemporaryChannelFailure(channelUpdate)
case (_: ChannelUnavailable, Some(channelUpdate)) if !Announcements.isEnabled(channelUpdate.channelFlags) => ChannelDisabled(channelUpdate.messageFlags, channelUpdate.channelFlags, channelUpdate)
case (_: ChannelUnavailable, None) => PermanentChannelFailure
case (_: HtlcTimedout, _) => PermanentChannelFailure
case _ => TemporaryNodeFailure
}
}
}

View file

@ -18,7 +18,7 @@ package fr.acinq.eclair.payment
import fr.acinq.bitcoin.Block
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.eclair.channel.{CMD_ADD_HTLC, CMD_FAIL_HTLC}
import fr.acinq.eclair.channel.{AddHtlcFailed, CMD_ADD_HTLC, CMD_FAIL_HTLC}
import fr.acinq.eclair.crypto.Sphinx
import fr.acinq.eclair.payment.Relayer.{OutgoingChannel, RelayPayload}
import fr.acinq.eclair.router.Announcements
@ -37,7 +37,7 @@ class ChannelSelectionSpec extends FunSuite {
def dummyUpdate(shortChannelId: ShortChannelId, cltvExpiryDelta: Int, htlcMinimumMsat: Long, feeBaseMsat: Long, feeProportionalMillionths: Long, htlcMaximumMsat: Long, enable: Boolean = true) =
Announcements.makeChannelUpdate(Block.RegtestGenesisBlock.hash, randomKey, randomKey.publicKey, shortChannelId, cltvExpiryDelta, htlcMinimumMsat, feeBaseMsat, feeProportionalMillionths, htlcMaximumMsat, enable)
test("handle relay") {
test("convert to CMD_FAIL_HTLC/CMD_ADD_HTLC") {
val relayPayload = RelayPayload(
add = UpdateAddHtlc(randomBytes32, 42, 1000000, randomBytes32, 70, ByteVector.empty),
payload = PerHopPayload(ShortChannelId(12345), amtToForward = 998900, outgoingCltvValue = 60),
@ -49,29 +49,27 @@ class ChannelSelectionSpec extends FunSuite {
implicit val log = akka.event.NoLogging
// nominal case
assert(Relayer.handleRelay(relayPayload, Some(channelUpdate)) === Right(CMD_ADD_HTLC(relayPayload.payload.amtToForward, relayPayload.add.paymentHash, relayPayload.payload.outgoingCltvValue, relayPayload.nextPacket.serialize, upstream = Right(relayPayload.add), commit = true, redirected = false)))
// redirected to preferred channel
assert(Relayer.handleRelay(relayPayload, Some(channelUpdate.copy(shortChannelId = ShortChannelId(1111)))) === Right(CMD_ADD_HTLC(relayPayload.payload.amtToForward, relayPayload.add.paymentHash, relayPayload.payload.outgoingCltvValue, relayPayload.nextPacket.serialize, upstream = Right(relayPayload.add), commit = true, redirected = true)))
assert(Relayer.relayOrFail(relayPayload, Some(channelUpdate)) === Right((ShortChannelId(12345), CMD_ADD_HTLC(relayPayload.payload.amtToForward, relayPayload.add.paymentHash, relayPayload.payload.outgoingCltvValue, relayPayload.nextPacket.serialize, upstream = Right(relayPayload.add), commit = true))))
// no channel_update
assert(Relayer.handleRelay(relayPayload, channelUpdate_opt = None) === Left(CMD_FAIL_HTLC(relayPayload.add.id, Right(UnknownNextPeer), commit = true)))
assert(Relayer.relayOrFail(relayPayload, channelUpdate_opt = None) === Left(CMD_FAIL_HTLC(relayPayload.add.id, Right(UnknownNextPeer), commit = true)))
// channel disabled
val channelUpdate_disabled = channelUpdate.copy(channelFlags = Announcements.makeChannelFlags(true, enable = false))
assert(Relayer.handleRelay(relayPayload, Some(channelUpdate_disabled)) === Left(CMD_FAIL_HTLC(relayPayload.add.id, Right(ChannelDisabled(channelUpdate_disabled.messageFlags, channelUpdate_disabled.channelFlags, channelUpdate_disabled)), commit = true)))
assert(Relayer.relayOrFail(relayPayload, Some(channelUpdate_disabled)) === Left(CMD_FAIL_HTLC(relayPayload.add.id, Right(ChannelDisabled(channelUpdate_disabled.messageFlags, channelUpdate_disabled.channelFlags, channelUpdate_disabled)), commit = true)))
// amount too low
val relayPayload_toolow = relayPayload.copy(payload = relayPayload.payload.copy(amtToForward = 99))
assert(Relayer.handleRelay(relayPayload_toolow, Some(channelUpdate)) === Left(CMD_FAIL_HTLC(relayPayload.add.id, Right(AmountBelowMinimum(relayPayload_toolow.payload.amtToForward, channelUpdate)), commit = true)))
assert(Relayer.relayOrFail(relayPayload_toolow, Some(channelUpdate)) === Left(CMD_FAIL_HTLC(relayPayload.add.id, Right(AmountBelowMinimum(relayPayload_toolow.payload.amtToForward, channelUpdate)), commit = true)))
// incorrect cltv expiry
val relayPayload_incorrectcltv = relayPayload.copy(payload = relayPayload.payload.copy(outgoingCltvValue = 42))
assert(Relayer.handleRelay(relayPayload_incorrectcltv, Some(channelUpdate)) === Left(CMD_FAIL_HTLC(relayPayload.add.id, Right(IncorrectCltvExpiry(relayPayload_incorrectcltv.payload.outgoingCltvValue, channelUpdate)), commit = true)))
assert(Relayer.relayOrFail(relayPayload_incorrectcltv, Some(channelUpdate)) === Left(CMD_FAIL_HTLC(relayPayload.add.id, Right(IncorrectCltvExpiry(relayPayload_incorrectcltv.payload.outgoingCltvValue, channelUpdate)), commit = true)))
// insufficient fee
val relayPayload_insufficientfee = relayPayload.copy(payload = relayPayload.payload.copy(amtToForward = 998910))
assert(Relayer.handleRelay(relayPayload_insufficientfee, Some(channelUpdate)) === Left(CMD_FAIL_HTLC(relayPayload.add.id, Right(FeeInsufficient(relayPayload_insufficientfee.add.amountMsat, channelUpdate)), commit = true)))
assert(Relayer.relayOrFail(relayPayload_insufficientfee, Some(channelUpdate)) === Left(CMD_FAIL_HTLC(relayPayload.add.id, Right(FeeInsufficient(relayPayload_insufficientfee.add.amountMsat, channelUpdate)), commit = true)))
// note that a generous fee is ok!
val relayPayload_highfee = relayPayload.copy(payload = relayPayload.payload.copy(amtToForward = 900000))
assert(Relayer.handleRelay(relayPayload_highfee, Some(channelUpdate)) === Right(CMD_ADD_HTLC(relayPayload_highfee.payload.amtToForward, relayPayload_highfee.add.paymentHash, relayPayload_highfee.payload.outgoingCltvValue, relayPayload_highfee.nextPacket.serialize, upstream = Right(relayPayload.add), commit = true, redirected = false)))
assert(Relayer.relayOrFail(relayPayload_highfee, Some(channelUpdate)) === Right((ShortChannelId(12345), CMD_ADD_HTLC(relayPayload_highfee.payload.amtToForward, relayPayload_highfee.add.paymentHash, relayPayload_highfee.payload.outgoingCltvValue, relayPayload_highfee.nextPacket.serialize, upstream = Right(relayPayload.add), commit = true))))
}
test("relay channel selection") {
test("channel selection") {
val relayPayload = RelayPayload(
add = UpdateAddHtlc(randomBytes32, 42, 1000000, randomBytes32, 70, ByteVector.empty),
@ -99,15 +97,21 @@ class ChannelSelectionSpec extends FunSuite {
import com.softwaremill.quicklens._
// select the channel to the same node, with the lowest balance but still high enough to handle the payment
assert(Relayer.selectPreferredChannel(relayPayload, channelUpdates, node2channels) === ShortChannelId(22222))
assert(Relayer.selectPreferredChannel(relayPayload, channelUpdates, node2channels, Seq.empty) === Some(ShortChannelId(22222)))
// select 2nd-to-best channel
assert(Relayer.selectPreferredChannel(relayPayload, channelUpdates, node2channels, Seq(ShortChannelId(22222))) === Some(ShortChannelId(12345)))
// select 3rd-to-best channel
assert(Relayer.selectPreferredChannel(relayPayload, channelUpdates, node2channels, Seq(ShortChannelId(22222), ShortChannelId(12345))) === Some(ShortChannelId(11111)))
// all the suitable channels have been tried
assert(Relayer.selectPreferredChannel(relayPayload, channelUpdates, node2channels, Seq(ShortChannelId(22222), ShortChannelId(12345), ShortChannelId(11111))) === None)
// higher amount payment (have to increased incoming htlc amount for fees to be sufficient)
assert(Relayer.selectPreferredChannel(relayPayload.modify(_.add.amountMsat).setTo(60000000).modify(_.payload.amtToForward).setTo(50000000), channelUpdates, node2channels) === ShortChannelId(11111))
assert(Relayer.selectPreferredChannel(relayPayload.modify(_.add.amountMsat).setTo(60000000).modify(_.payload.amtToForward).setTo(50000000), channelUpdates, node2channels, Seq.empty) === Some(ShortChannelId(11111)))
// lower amount payment
assert(Relayer.selectPreferredChannel(relayPayload.modify(_.payload.amtToForward).setTo(1000), channelUpdates, node2channels) === ShortChannelId(33333))
// payment too high, no suitable channel, we keep the requested one
assert(Relayer.selectPreferredChannel(relayPayload.modify(_.payload.amtToForward).setTo(1000000000), channelUpdates, node2channels) === ShortChannelId(12345))
assert(Relayer.selectPreferredChannel(relayPayload.modify(_.payload.amtToForward).setTo(1000), channelUpdates, node2channels, Seq.empty) === Some(ShortChannelId(33333)))
// payment too high, no suitable channel found
assert(Relayer.selectPreferredChannel(relayPayload.modify(_.payload.amtToForward).setTo(1000000000), channelUpdates, node2channels, Seq.empty) === Some(ShortChannelId(12345)))
// invalid cltv expiry, no suitable channel, we keep the requested one
assert(Relayer.selectPreferredChannel(relayPayload.modify(_.payload.outgoingCltvValue).setTo(40), channelUpdates, node2channels) === ShortChannelId(12345))
assert(Relayer.selectPreferredChannel(relayPayload.modify(_.payload.outgoingCltvValue).setTo(40), channelUpdates, node2channels, Seq.empty) === Some(ShortChannelId(12345)))
}

View file

@ -27,7 +27,7 @@ import fr.acinq.eclair.payment.PaymentLifecycle.buildCommand
import fr.acinq.eclair.router.Announcements
import fr.acinq.eclair.transactions.CommitmentSpec
import fr.acinq.eclair.wire._
import fr.acinq.eclair.{TestConstants, TestkitBaseClass, randomBytes32, randomKey}
import fr.acinq.eclair.{ShortChannelId, TestConstants, TestkitBaseClass, UInt64, randomBytes32, randomKey}
import org.scalatest.Outcome
import scodec.bits.ByteVector
@ -58,10 +58,9 @@ class RelayerSpec extends TestkitBaseClass {
val channelId_ab = randomBytes32
val channelId_bc = randomBytes32
def makeCommitments(channelId: ByteVector32) = new Commitments(null, null, 0.toByte, null,
RemoteCommit(42, CommitmentSpec(Set.empty, 20000, 5000000, 100000000), ByteVector32.Zeroes, randomKey.toPoint),
def makeCommitments(channelId: ByteVector32, availableBalanceMsat: Long = 50000000L) = new Commitments(null, null, 0.toByte, null, null,
null, null, 0, 0, Map.empty, null, null, null, channelId) {
override def availableBalanceForSendMsat: Long = remoteCommit.spec.toRemoteMsat // approximation
override def availableBalanceForSendMsat: Long = availableBalanceMsat
}
test("relay an htlc-add") { f =>
@ -84,6 +83,51 @@ class RelayerSpec extends TestkitBaseClass {
paymentHandler.expectNoMsg(100 millis)
}
test("relay an htlc-add with retries") { f =>
import f._
val sender = TestProbe()
// we use this to build a valid onion
val (cmd, _) = buildCommand(UUID.randomUUID(), finalAmountMsat, finalExpiry, paymentHash, hops)
// and then manually build an htlc
val add_ab = UpdateAddHtlc(channelId = channelId_ab, id = 123456, cmd.amountMsat, cmd.paymentHash, cmd.cltvExpiry, cmd.onion)
// we tell the relayer about channel B-C
relayer ! LocalChannelUpdate(null, channelId_bc, channelUpdate_bc.shortChannelId, c, None, channelUpdate_bc, makeCommitments(channelId_bc))
// this is another channel B-C, with less balance (it will be preferred)
val (channelId_bc_1, channelUpdate_bc_1) = (randomBytes32, channelUpdate_bc.copy(shortChannelId = ShortChannelId("500000x1x1")))
relayer ! LocalChannelUpdate(null, channelId_bc_1, channelUpdate_bc_1.shortChannelId, c, None, channelUpdate_bc_1, makeCommitments(channelId_bc_1, availableBalanceMsat = 49000000L))
sender.send(relayer, ForwardAdd(add_ab))
// first try
val fwd1 = register.expectMsgType[Register.ForwardShortId[CMD_ADD_HTLC]]
assert(fwd1.shortChannelId === channelUpdate_bc_1.shortChannelId)
assert(fwd1.message.upstream === Right(add_ab))
// channel returns an error
val origin = Relayed(channelId_ab, originHtlcId = 42, amountMsatIn = 1100000, amountMsatOut = 1000000)
sender.send(relayer, Status.Failure(AddHtlcFailed(channelId_bc_1, paymentHash, HtlcValueTooHighInFlight(channelId_bc_1, UInt64(1000000000L), UInt64(1516977616L)), origin, Some(channelUpdate_bc_1), originalCommand = Some(fwd1.message))))
// second try
val fwd2 = register.expectMsgType[Register.ForwardShortId[CMD_ADD_HTLC]]
assert(fwd2.shortChannelId === channelUpdate_bc.shortChannelId)
assert(fwd2.message.upstream === Right(add_ab))
// failure again
sender.send(relayer, Status.Failure(AddHtlcFailed(channelId_bc, paymentHash, HtlcValueTooHighInFlight(channelId_bc, UInt64(1000000000L), UInt64(1516977616L)), origin, Some(channelUpdate_bc), originalCommand = Some(fwd2.message))))
// the relayer should give up
val fwdFail = register.expectMsgType[Register.Forward[CMD_FAIL_HTLC]]
assert(fwdFail.channelId === add_ab.channelId)
assert(fwdFail.message.id === add_ab.id)
assert(fwdFail.message.reason === Right(TemporaryNodeFailure))
sender.expectNoMsg(100 millis)
paymentHandler.expectNoMsg(100 millis)
}
test("fail to relay an htlc-add when we have no channel_update for the next channel") { f =>
import f._
val sender = TestProbe()