mirror of
https://github.com/ACINQ/eclair.git
synced 2025-02-24 06:47:46 +01:00
Use long id instead of short id in ChannelRelay
(#2221)
This is a preparation for https://github.com/lightning/bolts/pull/910. The relaying logic is now based on the `channelId` instead of `shortChannelId`. We used to consider `shortChannelId` as a unique identifier for the channel, but it's not true anymore: there will be multiple aliases per channel.
This commit is contained in:
parent
1e8791c56e
commit
6fa02a0ec1
5 changed files with 138 additions and 139 deletions
|
@ -28,7 +28,7 @@ import fr.acinq.eclair.payment.Monitoring.{Metrics, Tags}
|
|||
import fr.acinq.eclair.payment.relay.Relayer.OutgoingChannel
|
||||
import fr.acinq.eclair.payment.{ChannelPaymentRelayed, IncomingPaymentPacket}
|
||||
import fr.acinq.eclair.wire.protocol._
|
||||
import fr.acinq.eclair.{Logs, NodeParams, ShortChannelId, TimestampSecond, channel, nodeFee}
|
||||
import fr.acinq.eclair.{Logs, NodeParams, TimestampSecond, channel, nodeFee}
|
||||
|
||||
import java.util.UUID
|
||||
|
||||
|
@ -37,17 +37,17 @@ object ChannelRelay {
|
|||
// @formatter:off
|
||||
sealed trait Command
|
||||
private case object DoRelay extends Command
|
||||
private case class WrappedForwardShortIdFailure(failure: Register.ForwardShortIdFailure[CMD_ADD_HTLC]) extends Command
|
||||
private case class WrappedForwardFailure(failure: Register.ForwardFailure[CMD_ADD_HTLC]) extends Command
|
||||
private case class WrappedAddResponse(res: CommandResponse[CMD_ADD_HTLC]) extends Command
|
||||
// @formatter:on
|
||||
|
||||
// @formatter:off
|
||||
sealed trait RelayResult
|
||||
case class RelayFailure(cmdFail: CMD_FAIL_HTLC) extends RelayResult
|
||||
case class RelaySuccess(shortChannelId: ShortChannelId, cmdAdd: CMD_ADD_HTLC) extends RelayResult
|
||||
case class RelaySuccess(selectedChannelId: ByteVector32, cmdAdd: CMD_ADD_HTLC) extends RelayResult
|
||||
// @formatter:on
|
||||
|
||||
def apply(nodeParams: NodeParams, register: ActorRef, channels: Map[ShortChannelId, Relayer.OutgoingChannel], relayId: UUID, r: IncomingPaymentPacket.ChannelRelayPacket): Behavior[Command] =
|
||||
def apply(nodeParams: NodeParams, register: ActorRef, channels: Map[ByteVector32, Relayer.OutgoingChannel], relayId: UUID, r: IncomingPaymentPacket.ChannelRelayPacket): Behavior[Command] =
|
||||
Behaviors.setup { context =>
|
||||
Behaviors.withMdc(Logs.mdc(
|
||||
category_opt = Some(Logs.LogCategory.PAYMENT),
|
||||
|
@ -92,16 +92,16 @@ object ChannelRelay {
|
|||
*/
|
||||
class ChannelRelay private(nodeParams: NodeParams,
|
||||
register: ActorRef,
|
||||
channels: Map[ShortChannelId, Relayer.OutgoingChannel],
|
||||
channels: Map[ByteVector32, Relayer.OutgoingChannel],
|
||||
r: IncomingPaymentPacket.ChannelRelayPacket,
|
||||
context: ActorContext[ChannelRelay.Command]) {
|
||||
|
||||
import ChannelRelay._
|
||||
|
||||
private val forwardShortIdAdapter = context.messageAdapter[Register.ForwardShortIdFailure[CMD_ADD_HTLC]](WrappedForwardShortIdFailure)
|
||||
private val forwardFailureAdapter = context.messageAdapter[Register.ForwardFailure[CMD_ADD_HTLC]](WrappedForwardFailure)
|
||||
private val addResponseAdapter = context.messageAdapter[CommandResponse[CMD_ADD_HTLC]](WrappedAddResponse)
|
||||
|
||||
private case class PreviouslyTried(shortChannelId: ShortChannelId, failure: RES_ADD_FAILED[ChannelException])
|
||||
private case class PreviouslyTried(channelId: ByteVector32, failure: RES_ADD_FAILED[ChannelException])
|
||||
|
||||
def relay(previousFailures: Seq[PreviouslyTried]): Behavior[Command] = {
|
||||
Behaviors.receiveMessagePartial {
|
||||
|
@ -115,18 +115,18 @@ class ChannelRelay private(nodeParams: NodeParams,
|
|||
Metrics.recordPaymentRelayFailed(Tags.FailureType(cmdFail), Tags.RelayType.Channel)
|
||||
context.log.info(s"rejecting htlc reason=${cmdFail.reason}")
|
||||
safeSendAndStop(r.add.channelId, cmdFail)
|
||||
case RelaySuccess(selectedShortChannelId, cmdAdd) =>
|
||||
context.log.info(s"forwarding htlc to shortChannelId=$selectedShortChannelId")
|
||||
register ! Register.ForwardShortId(forwardShortIdAdapter.toClassic, selectedShortChannelId, cmdAdd)
|
||||
waitForAddResponse(selectedShortChannelId, previousFailures)
|
||||
case RelaySuccess(selectedChannelId, cmdAdd) =>
|
||||
context.log.info(s"forwarding htlc to channelId=$selectedChannelId")
|
||||
register ! Register.Forward(forwardFailureAdapter.toClassic, selectedChannelId, cmdAdd)
|
||||
waitForAddResponse(selectedChannelId, previousFailures)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def waitForAddResponse(selectedShortChannelId: ShortChannelId, previousFailures: Seq[PreviouslyTried]): Behavior[Command] =
|
||||
def waitForAddResponse(selectedChannelId: ByteVector32, previousFailures: Seq[PreviouslyTried]): Behavior[Command] =
|
||||
Behaviors.receiveMessagePartial {
|
||||
case WrappedForwardShortIdFailure(Register.ForwardShortIdFailure(Register.ForwardShortId(_, shortChannelId, CMD_ADD_HTLC(_, _, _, _, _, o: Origin.ChannelRelayedHot, _)))) =>
|
||||
context.log.warn(s"couldn't resolve downstream channel $shortChannelId, failing htlc #${o.add.id}")
|
||||
case WrappedForwardFailure(Register.ForwardFailure(Register.Forward(_, channelId, CMD_ADD_HTLC(_, _, _, _, _, o: Origin.ChannelRelayedHot, _)))) =>
|
||||
context.log.warn(s"couldn't resolve downstream channel $channelId, failing htlc #${o.add.id}")
|
||||
val cmdFail = CMD_FAIL_HTLC(o.add.id, Right(UnknownNextPeer), commit = true)
|
||||
Metrics.recordPaymentRelayFailed(Tags.FailureType(cmdFail), Tags.RelayType.Channel)
|
||||
safeSendAndStop(o.add.channelId, cmdFail)
|
||||
|
@ -134,7 +134,7 @@ class ChannelRelay private(nodeParams: NodeParams,
|
|||
case WrappedAddResponse(addFailed@RES_ADD_FAILED(CMD_ADD_HTLC(_, _, _, _, _, _: Origin.ChannelRelayedHot, _), _, _)) =>
|
||||
context.log.info("attempt failed with reason={}", addFailed.t.getClass.getSimpleName)
|
||||
context.self ! DoRelay
|
||||
relay(previousFailures :+ PreviouslyTried(selectedShortChannelId, addFailed))
|
||||
relay(previousFailures :+ PreviouslyTried(selectedChannelId, addFailed))
|
||||
|
||||
case WrappedAddResponse(_: RES_SUCCESS[_]) =>
|
||||
context.log.debug("sent htlc to the downstream channel")
|
||||
|
@ -170,14 +170,13 @@ class ChannelRelay private(nodeParams: NodeParams,
|
|||
* - a CMD_ADD_HTLC to propagate downstream
|
||||
*/
|
||||
def handleRelay(previousFailures: Seq[PreviouslyTried]): RelayResult = {
|
||||
val alreadyTried = previousFailures.map(_.shortChannelId)
|
||||
selectPreferredChannel(alreadyTried)
|
||||
.flatMap(selectedShortChannelId => channels.get(selectedShortChannelId)) match {
|
||||
val alreadyTried = previousFailures.map(_.channelId)
|
||||
selectPreferredChannel(alreadyTried) match {
|
||||
case None if previousFailures.nonEmpty =>
|
||||
// no more channels to try
|
||||
val error = previousFailures
|
||||
// we return the error for the initially requested channel if it exists
|
||||
.find(_.shortChannelId == r.payload.outgoingChannelId)
|
||||
.find(failure => requestedChannelId_opt.contains(failure.channelId))
|
||||
// otherwise we return the error for the first channel tried
|
||||
.getOrElse(previousFailures.head)
|
||||
.failure
|
||||
|
@ -190,58 +189,64 @@ class ChannelRelay private(nodeParams: NodeParams,
|
|||
/** all the channels point to the same next node, we take the first one */
|
||||
private val nextNodeId_opt = channels.headOption.map(_._2.nextNodeId)
|
||||
|
||||
/** channel id explicitly requested in the onion payload */
|
||||
private val requestedChannelId_opt = channels.find(_._2.channelUpdate.shortChannelId == r.payload.outgoingChannelId).map(_._1)
|
||||
|
||||
/**
|
||||
* Select a channel to the same node to relay the payment to, that has the lowest capacity and balance and is
|
||||
* compatible in terms of fees, expiry_delta, etc.
|
||||
*
|
||||
* If no suitable channel is found we default to the originally requested channel.
|
||||
*/
|
||||
def selectPreferredChannel(alreadyTried: Seq[ShortChannelId]): Option[ShortChannelId] = {
|
||||
def selectPreferredChannel(alreadyTried: Seq[ByteVector32]): Option[OutgoingChannel] = {
|
||||
val requestedShortChannelId = r.payload.outgoingChannelId
|
||||
context.log.debug("selecting next channel with requestedShortChannelId={}", requestedShortChannelId)
|
||||
nextNodeId_opt match {
|
||||
case Some(_) =>
|
||||
// we then filter out channels that we have already tried
|
||||
val candidateChannels: Map[ShortChannelId, OutgoingChannel] = channels -- alreadyTried
|
||||
// and we filter again to keep the ones that are compatible with this payment (mainly fees, expiry delta)
|
||||
candidateChannels
|
||||
.map { case (shortChannelId, channelInfo) =>
|
||||
val relayResult = relayOrFail(Some(channelInfo))
|
||||
context.log.debug(s"candidate channel: shortChannelId=$shortChannelId availableForSend={} capacity={} channelUpdate={} result={}",
|
||||
channelInfo.commitments.availableBalanceForSend,
|
||||
channelInfo.commitments.capacity,
|
||||
channelInfo.channelUpdate,
|
||||
relayResult match {
|
||||
case _: RelaySuccess => "success"
|
||||
case RelayFailure(CMD_FAIL_HTLC(_, Right(failureReason), _, _)) => failureReason
|
||||
case other => other
|
||||
})
|
||||
(shortChannelId, channelInfo, relayResult)
|
||||
}
|
||||
.collect {
|
||||
// we only keep channels that have enough balance to handle this payment
|
||||
case (shortChannelId, channelInfo, _: RelaySuccess) if channelInfo.commitments.availableBalanceForSend > r.payload.amountToForward => (shortChannelId, channelInfo.commitments)
|
||||
}
|
||||
.toList // needed for ordering
|
||||
// we want to use the channel with:
|
||||
// - the lowest available capacity to ensure we keep high-capacity channels for big payments
|
||||
// - the lowest available balance to increase our incoming liquidity
|
||||
.sortBy { case (_, commitments) => (commitments.capacity, commitments.availableBalanceForSend) }
|
||||
.headOption match {
|
||||
case Some((preferredShortChannelId, commitments)) if preferredShortChannelId != requestedShortChannelId =>
|
||||
context.log.info("replacing requestedShortChannelId={} by preferredShortChannelId={} with availableBalanceMsat={}", requestedShortChannelId, preferredShortChannelId, commitments.availableBalanceForSend)
|
||||
Some(preferredShortChannelId)
|
||||
case Some(_) =>
|
||||
context.log.debug("requested short channel id is our preferred channel")
|
||||
Some(requestedShortChannelId)
|
||||
case None if !alreadyTried.contains(requestedShortChannelId) =>
|
||||
context.log.debug("no channel seems to work for this payment, we will try to use the requested short channel id")
|
||||
Some(requestedShortChannelId)
|
||||
case None =>
|
||||
// we filter out channels that we have already tried
|
||||
val candidateChannels: Map[ByteVector32, OutgoingChannel] = channels -- alreadyTried
|
||||
// and we filter again to keep the ones that are compatible with this payment (mainly fees, expiry delta)
|
||||
candidateChannels
|
||||
.values
|
||||
.map { channel =>
|
||||
val relayResult = relayOrFail(Some(channel))
|
||||
context.log.debug(s"candidate channel: channelId=${channel.channelId} availableForSend={} capacity={} channelUpdate={} result={}",
|
||||
channel.commitments.availableBalanceForSend,
|
||||
channel.commitments.capacity,
|
||||
channel.channelUpdate,
|
||||
relayResult match {
|
||||
case _: RelaySuccess => "success"
|
||||
case RelayFailure(CMD_FAIL_HTLC(_, Right(failureReason), _, _)) => failureReason
|
||||
case other => other
|
||||
})
|
||||
(channel, relayResult)
|
||||
}
|
||||
.collect {
|
||||
// we only keep channels that have enough balance to handle this payment
|
||||
case (channel, _: RelaySuccess) if channel.commitments.availableBalanceForSend > r.payload.amountToForward => channel
|
||||
}
|
||||
.toList // needed for ordering
|
||||
// we want to use the channel with:
|
||||
// - the lowest available capacity to ensure we keep high-capacity channels for big payments
|
||||
// - the lowest available balance to increase our incoming liquidity
|
||||
.sortBy { channel => (channel.commitments.capacity, channel.commitments.availableBalanceForSend) }
|
||||
.headOption match {
|
||||
case Some(channel) =>
|
||||
if (requestedChannelId_opt.contains(channel.channelId)) {
|
||||
context.log.debug("requested short channel id is our preferred channel")
|
||||
Some(channel)
|
||||
} else {
|
||||
context.log.info("replacing requestedShortChannelId={} by preferredShortChannelId={} with availableBalanceMsat={}", requestedShortChannelId, channel.channelUpdate.shortChannelId, channel.commitments.availableBalanceForSend)
|
||||
Some(channel)
|
||||
}
|
||||
case None =>
|
||||
val requestedChannel_opt = requestedChannelId_opt.flatMap(channels.get)
|
||||
requestedChannel_opt match {
|
||||
case Some(requestedChannel) if alreadyTried.contains(requestedChannel.channelId) =>
|
||||
context.log.debug("no channel seems to work for this payment and we have already tried the requested channel id: giving up")
|
||||
None
|
||||
case _ =>
|
||||
context.log.debug("no channel seems to work for this payment, we will try to use the one requested")
|
||||
requestedChannel_opt
|
||||
}
|
||||
case _ => Some(requestedShortChannelId) // we don't have a channel_update for this short_channel_id
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -252,23 +257,23 @@ class ChannelRelay private(nodeParams: NodeParams,
|
|||
*/
|
||||
def relayOrFail(outgoingChannel_opt: Option[OutgoingChannel]): RelayResult = {
|
||||
import r._
|
||||
outgoingChannel_opt.map(_.channelUpdate) match {
|
||||
outgoingChannel_opt match {
|
||||
case None =>
|
||||
RelayFailure(CMD_FAIL_HTLC(add.id, Right(UnknownNextPeer), commit = true))
|
||||
case Some(channelUpdate) if !channelUpdate.channelFlags.isEnabled =>
|
||||
RelayFailure(CMD_FAIL_HTLC(add.id, Right(ChannelDisabled(channelUpdate.messageFlags, channelUpdate.channelFlags, channelUpdate)), commit = true))
|
||||
case Some(channelUpdate) if payload.amountToForward < channelUpdate.htlcMinimumMsat =>
|
||||
RelayFailure(CMD_FAIL_HTLC(add.id, Right(AmountBelowMinimum(payload.amountToForward, channelUpdate)), commit = true))
|
||||
case Some(channelUpdate) if r.expiryDelta < channelUpdate.cltvExpiryDelta =>
|
||||
RelayFailure(CMD_FAIL_HTLC(add.id, Right(IncorrectCltvExpiry(payload.outgoingCltv, channelUpdate)), commit = true))
|
||||
case Some(channelUpdate) if r.relayFeeMsat < nodeFee(channelUpdate.relayFees, payload.amountToForward) &&
|
||||
case Some(c) if !c.channelUpdate.channelFlags.isEnabled =>
|
||||
RelayFailure(CMD_FAIL_HTLC(add.id, Right(ChannelDisabled(c.channelUpdate.messageFlags, c.channelUpdate.channelFlags, c.channelUpdate)), commit = true))
|
||||
case Some(c) if payload.amountToForward < c.channelUpdate.htlcMinimumMsat =>
|
||||
RelayFailure(CMD_FAIL_HTLC(add.id, Right(AmountBelowMinimum(payload.amountToForward, c.channelUpdate)), commit = true))
|
||||
case Some(c) if r.expiryDelta < c.channelUpdate.cltvExpiryDelta =>
|
||||
RelayFailure(CMD_FAIL_HTLC(add.id, Right(IncorrectCltvExpiry(payload.outgoingCltv, c.channelUpdate)), commit = true))
|
||||
case Some(c) if r.relayFeeMsat < nodeFee(c.channelUpdate.relayFees, payload.amountToForward) &&
|
||||
// fees also do not satisfy the previous channel update for `enforcementDelay` seconds after current update
|
||||
(TimestampSecond.now() - channelUpdate.timestamp > nodeParams.relayParams.enforcementDelay ||
|
||||
(TimestampSecond.now() - c.channelUpdate.timestamp > nodeParams.relayParams.enforcementDelay ||
|
||||
outgoingChannel_opt.flatMap(_.prevChannelUpdate).forall(c => r.relayFeeMsat < nodeFee(c.relayFees, payload.amountToForward))) =>
|
||||
RelayFailure(CMD_FAIL_HTLC(add.id, Right(FeeInsufficient(add.amountMsat, channelUpdate)), commit = true))
|
||||
case Some(channelUpdate) =>
|
||||
RelayFailure(CMD_FAIL_HTLC(add.id, Right(FeeInsufficient(add.amountMsat, c.channelUpdate)), commit = true))
|
||||
case Some(c) =>
|
||||
val origin = Origin.ChannelRelayedHot(addResponseAdapter.toClassic, add, payload.amountToForward)
|
||||
RelaySuccess(channelUpdate.shortChannelId, CMD_ADD_HTLC(addResponseAdapter.toClassic, payload.amountToForward, add.paymentHash, payload.outgoingCltv, nextPacket, origin, commit = true))
|
||||
RelaySuccess(c.channelId, CMD_ADD_HTLC(addResponseAdapter.toClassic, payload.amountToForward, add.paymentHash, payload.outgoingCltv, nextPacket, origin, commit = true))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@ import akka.actor.ActorRef
|
|||
import akka.actor.typed.Behavior
|
||||
import akka.actor.typed.eventstream.EventStream
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import fr.acinq.bitcoin.scalacompat.ByteVector32
|
||||
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
|
||||
import fr.acinq.eclair.channel._
|
||||
import fr.acinq.eclair.payment.IncomingPaymentPacket
|
||||
|
@ -53,13 +54,11 @@ object ChannelRelayer {
|
|||
case _ => Map.empty
|
||||
}
|
||||
|
||||
private type ChannelUpdates = Map[ShortChannelId, Relayer.OutgoingChannel]
|
||||
private type NodeChannels = mutable.MultiDict[PublicKey, ShortChannelId]
|
||||
|
||||
def apply(nodeParams: NodeParams,
|
||||
register: ActorRef,
|
||||
channelUpdates: ChannelUpdates = Map.empty,
|
||||
node2channels: NodeChannels = mutable.MultiDict.empty[PublicKey, ShortChannelId]): Behavior[Command] =
|
||||
channels: Map[ByteVector32, Relayer.OutgoingChannel] = Map.empty,
|
||||
scid2channels: Map[ShortChannelId, ByteVector32] = Map.empty,
|
||||
node2channels: mutable.MultiDict[PublicKey, ByteVector32] = mutable.MultiDict.empty): Behavior[Command] =
|
||||
Behaviors.setup { context =>
|
||||
context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[LocalChannelUpdate](WrappedLocalChannelUpdate))
|
||||
context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[LocalChannelDown](WrappedLocalChannelDown))
|
||||
|
@ -70,61 +69,56 @@ object ChannelRelayer {
|
|||
Behaviors.receiveMessage {
|
||||
case Relay(channelRelayPacket) =>
|
||||
val relayId = UUID.randomUUID()
|
||||
val nextNodeId_opt: Option[PublicKey] = channelUpdates.get(channelRelayPacket.payload.outgoingChannelId) match {
|
||||
case Some(channel) => Some(channel.nextNodeId)
|
||||
val nextNodeId_opt: Option[PublicKey] = scid2channels.get(channelRelayPacket.payload.outgoingChannelId) match {
|
||||
case Some(channelId) => channels.get(channelId).map(_.nextNodeId)
|
||||
case None => None
|
||||
}
|
||||
val channels: Map[ShortChannelId, Relayer.OutgoingChannel] = nextNodeId_opt match {
|
||||
case Some(nextNodeId) => node2channels.get(nextNodeId).map(channelUpdates).map(c => c.channelUpdate.shortChannelId -> c).toMap
|
||||
val nextChannels: Map[ByteVector32, Relayer.OutgoingChannel] = nextNodeId_opt match {
|
||||
case Some(nextNodeId) => node2channels.get(nextNodeId).flatMap(channels.get).map(c => c.channelId -> c).toMap
|
||||
case None => Map.empty
|
||||
}
|
||||
context.log.debug(s"spawning a new handler with relayId=$relayId to nextNodeId={} with channels={}", nextNodeId_opt.getOrElse(""), channels.keys.mkString(","))
|
||||
context.spawn(ChannelRelay.apply(nodeParams, register, channels, relayId, channelRelayPacket), name = relayId.toString)
|
||||
context.log.debug(s"spawning a new handler with relayId=$relayId to nextNodeId={} with channels={}", nextNodeId_opt.getOrElse(""), nextChannels.keys.mkString(","))
|
||||
context.spawn(ChannelRelay.apply(nodeParams, register, nextChannels, relayId, channelRelayPacket), name = relayId.toString)
|
||||
Behaviors.same
|
||||
|
||||
case GetOutgoingChannels(replyTo, Relayer.GetOutgoingChannels(enabledOnly)) =>
|
||||
val channels = if (enabledOnly) {
|
||||
channelUpdates.values.filter(o => o.channelUpdate.channelFlags.isEnabled)
|
||||
val selected = if (enabledOnly) {
|
||||
channels.values.filter(o => o.channelUpdate.channelFlags.isEnabled)
|
||||
} else {
|
||||
channelUpdates.values
|
||||
channels.values
|
||||
}
|
||||
replyTo ! Relayer.OutgoingChannels(channels.toSeq)
|
||||
replyTo ! Relayer.OutgoingChannels(selected.toSeq)
|
||||
Behaviors.same
|
||||
|
||||
case WrappedLocalChannelUpdate(LocalChannelUpdate(_, channelId, shortChannelId, remoteNodeId, _, channelUpdate, commitments)) =>
|
||||
context.log.debug(s"updating local channel info for channelId=$channelId shortChannelId=$shortChannelId remoteNodeId=$remoteNodeId channelUpdate={} commitments={}", channelUpdate, commitments)
|
||||
val prevChannelUpdate = channelUpdates.get(shortChannelId).map(_.channelUpdate)
|
||||
val channelUpdates1 = channelUpdates + (channelUpdate.shortChannelId -> Relayer.OutgoingChannel(remoteNodeId, channelUpdate, prevChannelUpdate, commitments))
|
||||
val node2channels1 = node2channels.addOne(remoteNodeId, channelUpdate.shortChannelId)
|
||||
apply(nodeParams, register, channelUpdates1, node2channels1)
|
||||
val prevChannelUpdate = channels.get(channelId).map(_.channelUpdate)
|
||||
val channel = Relayer.OutgoingChannel(remoteNodeId, channelUpdate, prevChannelUpdate, commitments)
|
||||
val channels1 = channels + (channelId -> channel)
|
||||
val scid2channels1 = scid2channels + (channelUpdate.shortChannelId -> channelId)
|
||||
val node2channels1 = node2channels.addOne(remoteNodeId, channelId)
|
||||
apply(nodeParams, register, channels1, scid2channels1, node2channels1)
|
||||
|
||||
case WrappedLocalChannelDown(LocalChannelDown(_, channelId, shortChannelId, remoteNodeId)) =>
|
||||
context.log.debug(s"removed local channel info for channelId=$channelId shortChannelId=$shortChannelId")
|
||||
val node2channels1 = node2channels.subtractOne(remoteNodeId, shortChannelId)
|
||||
apply(nodeParams, register, channelUpdates - shortChannelId, node2channels1)
|
||||
val channels1 = channels - channelId
|
||||
val scid2Channels1 = scid2channels - shortChannelId
|
||||
val node2channels1 = node2channels.subtractOne(remoteNodeId, channelId)
|
||||
apply(nodeParams, register, channels1, scid2Channels1, node2channels1)
|
||||
|
||||
case WrappedAvailableBalanceChanged(AvailableBalanceChanged(_, channelId, shortChannelId, commitments)) =>
|
||||
val channelUpdates1 = channelUpdates.get(shortChannelId) match {
|
||||
val channels1 = channels.get(channelId) match {
|
||||
case Some(c: Relayer.OutgoingChannel) =>
|
||||
context.log.debug(s"available balance changed for channelId=$channelId shortChannelId=$shortChannelId availableForSend={} availableForReceive={}", commitments.availableBalanceForSend, commitments.availableBalanceForReceive)
|
||||
channelUpdates + (shortChannelId -> c.copy(commitments = commitments))
|
||||
case None => channelUpdates // we only consider the balance if we have the channel_update
|
||||
channels + (channelId -> c.copy(commitments = commitments))
|
||||
case None => channels // we only consider the balance if we have the channel_update
|
||||
}
|
||||
apply(nodeParams, register, channelUpdates1, node2channels)
|
||||
apply(nodeParams, register, channels1, scid2channels, node2channels)
|
||||
|
||||
case WrappedShortChannelIdAssigned(ShortChannelIdAssigned(_, channelId, shortChannelId, previousShortChannelId_opt)) =>
|
||||
val (channelUpdates1, node2channels1) = previousShortChannelId_opt match {
|
||||
case Some(previousShortChannelId) if previousShortChannelId != shortChannelId =>
|
||||
context.log.debug(s"shortChannelId changed for channelId=$channelId ($previousShortChannelId->$shortChannelId, probably due to chain re-org)")
|
||||
// We simply remove the old entry: we should receive a LocalChannelUpdate with the new shortChannelId shortly.
|
||||
val node2channels1 = channelUpdates.get(previousShortChannelId).map(_.nextNodeId) match {
|
||||
case Some(remoteNodeId) => node2channels.subtractOne(remoteNodeId, previousShortChannelId)
|
||||
case None => node2channels
|
||||
}
|
||||
(channelUpdates - previousShortChannelId, node2channels1)
|
||||
case _ => (channelUpdates, node2channels)
|
||||
}
|
||||
apply(nodeParams, register, channelUpdates1, node2channels1)
|
||||
context.log.debug(s"added new mapping shortChannelId=$shortChannelId for channelId=$channelId")
|
||||
val scid2channels1 = scid2channels + (shortChannelId -> channelId)
|
||||
apply(nodeParams, register, channels, scid2channels1, node2channels)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import akka.actor.typed.scaladsl.adapter.ClassicActorContextOps
|
|||
import akka.actor.{Actor, ActorRef, DiagnosticActorLogging, Props, typed}
|
||||
import akka.event.Logging.MDC
|
||||
import akka.event.LoggingAdapter
|
||||
import fr.acinq.bitcoin.scalacompat.ByteVector32
|
||||
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
|
||||
import fr.acinq.eclair.channel._
|
||||
import fr.acinq.eclair.db.PendingCommandsDb
|
||||
|
@ -141,6 +142,7 @@ object Relayer extends Logging {
|
|||
*/
|
||||
case class GetOutgoingChannels(enabledOnly: Boolean = true)
|
||||
case class OutgoingChannel(nextNodeId: PublicKey, channelUpdate: ChannelUpdate, prevChannelUpdate: Option[ChannelUpdate], commitments: AbstractCommitments) {
|
||||
val channelId: ByteVector32 = commitments.channelId
|
||||
def toChannelBalance: ChannelBalance = ChannelBalance(
|
||||
remoteNodeId = nextNodeId,
|
||||
shortChannelId = channelUpdate.shortChannelId,
|
||||
|
|
|
@ -62,9 +62,9 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
|
|||
fwd
|
||||
}
|
||||
|
||||
def expectFwdAdd(register: TestProbe[Any], shortChannelId: ShortChannelId, outAmount: MilliSatoshi, outExpiry: CltvExpiry): Register.ForwardShortId[CMD_ADD_HTLC] = {
|
||||
val fwd = register.expectMessageType[Register.ForwardShortId[CMD_ADD_HTLC]]
|
||||
assert(fwd.shortChannelId === shortChannelId)
|
||||
def expectFwdAdd(register: TestProbe[Any], channelId: ByteVector32, outAmount: MilliSatoshi, outExpiry: CltvExpiry): Register.Forward[CMD_ADD_HTLC] = {
|
||||
val fwd = register.expectMessageType[Register.Forward[CMD_ADD_HTLC]]
|
||||
assert(fwd.channelId === channelId)
|
||||
assert(fwd.message.amount === outAmount)
|
||||
assert(fwd.message.cltvExpiry === outExpiry)
|
||||
assert(fwd.message.origin.isInstanceOf[Origin.ChannelRelayedHot])
|
||||
|
@ -83,7 +83,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
|
|||
channelRelayer ! WrappedLocalChannelUpdate(u)
|
||||
channelRelayer ! Relay(r)
|
||||
|
||||
expectFwdAdd(register, shortId1, outgoingAmount, outgoingExpiry)
|
||||
expectFwdAdd(register, channelIds(shortId1), outgoingAmount, outgoingExpiry)
|
||||
}
|
||||
|
||||
test("relay an htlc-add with onion tlv payload") { f =>
|
||||
|
@ -97,7 +97,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
|
|||
channelRelayer ! WrappedLocalChannelUpdate(u)
|
||||
channelRelayer ! Relay(r)
|
||||
|
||||
expectFwdAdd(register, shortId1, outgoingAmount, outgoingExpiry)
|
||||
expectFwdAdd(register, channelIds(shortId1), outgoingAmount, outgoingExpiry)
|
||||
}
|
||||
|
||||
test("relay an htlc-add with retries") { f =>
|
||||
|
@ -117,12 +117,12 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
|
|||
channelRelayer ! Relay(r)
|
||||
|
||||
// first try
|
||||
val fwd1 = expectFwdAdd(register, shortId2, outgoingAmount, outgoingExpiry)
|
||||
val fwd1 = expectFwdAdd(register, channelIds(shortId2), outgoingAmount, outgoingExpiry)
|
||||
// channel returns an error
|
||||
fwd1.message.replyTo ! RES_ADD_FAILED(fwd1.message, HtlcValueTooHighInFlight(channelIds(shortId2), UInt64(1000000000L), 1516977616L msat), Some(u2.channelUpdate))
|
||||
|
||||
// second try
|
||||
val fwd2 = expectFwdAdd(register, shortId1, outgoingAmount, outgoingExpiry)
|
||||
val fwd2 = expectFwdAdd(register, channelIds(shortId1), outgoingAmount, outgoingExpiry)
|
||||
// failure again
|
||||
fwd1.message.replyTo ! RES_ADD_FAILED(fwd2.message, HtlcValueTooHighInFlight(channelIds(shortId1), UInt64(1000000000L), 1516977616L msat), Some(u1.channelUpdate))
|
||||
|
||||
|
@ -151,8 +151,8 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
|
|||
channelRelayer ! WrappedLocalChannelUpdate(u)
|
||||
channelRelayer ! Relay(r)
|
||||
|
||||
val fwd = expectFwdAdd(register, shortId1, outgoingAmount, outgoingExpiry)
|
||||
fwd.replyTo ! Register.ForwardShortIdFailure(fwd)
|
||||
val fwd = expectFwdAdd(register, channelIds(shortId1), outgoingAmount, outgoingExpiry)
|
||||
fwd.replyTo ! Register.ForwardFailure(fwd)
|
||||
|
||||
expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(UnknownNextPeer), commit = true))
|
||||
}
|
||||
|
@ -208,7 +208,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
|
|||
channelRelayer ! WrappedLocalChannelUpdate(u)
|
||||
channelRelayer ! Relay(r)
|
||||
|
||||
expectFwdAdd(register, shortId1, payload.amountToForward, payload.outgoingCltv).message
|
||||
expectFwdAdd(register, channelIds(shortId1), payload.amountToForward, payload.outgoingCltv).message
|
||||
}
|
||||
|
||||
test("fail to relay an htlc-add (expiry too small)") { f =>
|
||||
|
@ -248,7 +248,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
|
|||
channelRelayer ! Relay(r)
|
||||
|
||||
// relay succeeds with current channel update (u1) with lower fees
|
||||
expectFwdAdd(register, shortId1, outgoingAmount, outgoingExpiry)
|
||||
expectFwdAdd(register, channelIds(shortId1), outgoingAmount, outgoingExpiry)
|
||||
|
||||
val u2 = createLocalUpdate(shortId1, timestamp = TimestampSecond.now() - 530)
|
||||
|
||||
|
@ -256,7 +256,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
|
|||
channelRelayer ! Relay(r)
|
||||
|
||||
// relay succeeds because the current update (u2) with higher fees occurred less than 10 minutes ago
|
||||
expectFwdAdd(register, shortId1, outgoingAmount, outgoingExpiry)
|
||||
expectFwdAdd(register, channelIds(shortId1), outgoingAmount, outgoingExpiry)
|
||||
|
||||
val u3 = createLocalUpdate(shortId1, timestamp = TimestampSecond.now() - 601)
|
||||
|
||||
|
@ -290,7 +290,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
|
|||
testCases.foreach { testCase =>
|
||||
channelRelayer ! WrappedLocalChannelUpdate(u)
|
||||
channelRelayer ! Relay(r)
|
||||
val fwd = expectFwdAdd(register, shortId1, outgoingAmount, outgoingExpiry)
|
||||
val fwd = expectFwdAdd(register, channelIds(shortId1), outgoingAmount, outgoingExpiry)
|
||||
fwd.message.replyTo ! RES_ADD_FAILED(fwd.message, testCase.exc, Some(testCase.update))
|
||||
expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(testCase.failure), commit = true))
|
||||
}
|
||||
|
@ -303,7 +303,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
|
|||
def dummyLocalUpdate(shortChannelId: ShortChannelId, remoteNodeId: PublicKey, availableBalanceForSend: MilliSatoshi, capacity: Satoshi) = {
|
||||
val channelId = randomBytes32()
|
||||
val update = Announcements.makeChannelUpdate(Block.RegtestGenesisBlock.hash, randomKey(), remoteNodeId, shortChannelId, CltvExpiryDelta(10), 100 msat, 1000 msat, 100, capacity.toMilliSatoshi)
|
||||
val commitments = PaymentPacketSpec.makeCommitments(ByteVector32.Zeroes, availableBalanceForSend, testCapacity = capacity)
|
||||
val commitments = PaymentPacketSpec.makeCommitments(channelId, availableBalanceForSend, testCapacity = capacity)
|
||||
LocalChannelUpdate(null, channelId, shortChannelId, remoteNodeId, None, update, commitments)
|
||||
}
|
||||
|
||||
|
@ -325,16 +325,16 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
|
|||
val r = createValidIncomingPacket(1000000 msat, CltvExpiry(70), payload)
|
||||
channelRelayer ! Relay(r)
|
||||
// select the channel to the same node, with the lowest capacity and balance but still high enough to handle the payment
|
||||
val cmd1 = expectFwdAdd(register, ShortChannelId(22223), payload.amountToForward, payload.outgoingCltv).message
|
||||
val cmd1 = expectFwdAdd(register, channelUpdates(ShortChannelId(22223)).channelId, payload.amountToForward, payload.outgoingCltv).message
|
||||
cmd1.replyTo ! RES_ADD_FAILED(cmd1, ChannelUnavailable(randomBytes32()), None)
|
||||
// select 2nd-to-best channel: higher capacity and balance
|
||||
val cmd2 = expectFwdAdd(register, ShortChannelId(22222), payload.amountToForward, payload.outgoingCltv).message
|
||||
val cmd2 = expectFwdAdd(register, channelUpdates(ShortChannelId(22222)).channelId, payload.amountToForward, payload.outgoingCltv).message
|
||||
cmd2.replyTo ! RES_ADD_FAILED(cmd2, TooManyAcceptedHtlcs(randomBytes32(), 42), Some(channelUpdates(ShortChannelId(22222)).channelUpdate))
|
||||
// select 3rd-to-best channel: same balance but higher capacity
|
||||
val cmd3 = expectFwdAdd(register, ShortChannelId(12345), payload.amountToForward, payload.outgoingCltv).message
|
||||
val cmd3 = expectFwdAdd(register, channelUpdates(ShortChannelId(12345)).channelId, payload.amountToForward, payload.outgoingCltv).message
|
||||
cmd3.replyTo ! RES_ADD_FAILED(cmd3, TooManyAcceptedHtlcs(randomBytes32(), 42), Some(channelUpdates(ShortChannelId(12345)).channelUpdate))
|
||||
// select 4th-to-best channel: same capacity but higher balance
|
||||
val cmd4 = expectFwdAdd(register, ShortChannelId(11111), payload.amountToForward, payload.outgoingCltv).message
|
||||
val cmd4 = expectFwdAdd(register, channelUpdates(ShortChannelId(11111)).channelId, payload.amountToForward, payload.outgoingCltv).message
|
||||
cmd4.replyTo ! RES_ADD_FAILED(cmd4, HtlcValueTooHighInFlight(randomBytes32(), UInt64(100000000), 100000000 msat), Some(channelUpdates(ShortChannelId(11111)).channelUpdate))
|
||||
// all the suitable channels have been tried
|
||||
expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(TemporaryChannelFailure(channelUpdates(ShortChannelId(12345)).channelUpdate)), commit = true))
|
||||
|
@ -344,28 +344,28 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
|
|||
val payload = RelayLegacyPayload(ShortChannelId(12345), 50000000 msat, CltvExpiry(60))
|
||||
val r = createValidIncomingPacket(60000000 msat, CltvExpiry(70), payload)
|
||||
channelRelayer ! Relay(r)
|
||||
expectFwdAdd(register, ShortChannelId(11111), payload.amountToForward, payload.outgoingCltv).message
|
||||
expectFwdAdd(register, channelUpdates(ShortChannelId(11111)).channelId, payload.amountToForward, payload.outgoingCltv).message
|
||||
}
|
||||
{
|
||||
// lower amount payment
|
||||
val payload = RelayLegacyPayload(ShortChannelId(12345), 1000 msat, CltvExpiry(60))
|
||||
val r = createValidIncomingPacket(60000000 msat, CltvExpiry(70), payload)
|
||||
channelRelayer ! Relay(r)
|
||||
expectFwdAdd(register, ShortChannelId(33333), payload.amountToForward, payload.outgoingCltv).message
|
||||
expectFwdAdd(register, channelUpdates(ShortChannelId(33333)).channelId, payload.amountToForward, payload.outgoingCltv).message
|
||||
}
|
||||
{
|
||||
// payment too high, no suitable channel found, we keep the requested one
|
||||
val payload = RelayLegacyPayload(ShortChannelId(12345), 1000000000 msat, CltvExpiry(60))
|
||||
val r = createValidIncomingPacket(1010000000 msat, CltvExpiry(70), payload)
|
||||
channelRelayer ! Relay(r)
|
||||
expectFwdAdd(register, ShortChannelId(12345), payload.amountToForward, payload.outgoingCltv).message
|
||||
expectFwdAdd(register, channelUpdates(ShortChannelId(12345)).channelId, payload.amountToForward, payload.outgoingCltv).message
|
||||
}
|
||||
{
|
||||
// cltv expiry larger than our requirements
|
||||
val payload = RelayLegacyPayload(ShortChannelId(12345), 998900 msat, CltvExpiry(50))
|
||||
val r = createValidIncomingPacket(1000000 msat, CltvExpiry(70), payload)
|
||||
channelRelayer ! Relay(r)
|
||||
expectFwdAdd(register, ShortChannelId(22223), payload.amountToForward, payload.outgoingCltv).message
|
||||
expectFwdAdd(register, channelUpdates(ShortChannelId(22223)).channelId, payload.amountToForward, payload.outgoingCltv).message
|
||||
}
|
||||
{
|
||||
// cltv expiry too small, no suitable channel found
|
||||
|
@ -399,7 +399,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
|
|||
testCases.foreach { testCase =>
|
||||
channelRelayer ! WrappedLocalChannelUpdate(u)
|
||||
channelRelayer ! Relay(r)
|
||||
val fwd = expectFwdAdd(register, shortId1, outgoingAmount, outgoingExpiry)
|
||||
val fwd = expectFwdAdd(register, channelIds(shortId1), outgoingAmount, outgoingExpiry)
|
||||
fwd.message.replyTo ! RES_SUCCESS(fwd.message, channelId1)
|
||||
fwd.message.origin.replyTo ! RES_ADD_SETTLED(fwd.message.origin, downstream_htlc, testCase.result)
|
||||
expectFwdFail(register, r.add.channelId, testCase.cmd)
|
||||
|
@ -428,7 +428,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
|
|||
channelRelayer ! WrappedLocalChannelUpdate(u)
|
||||
channelRelayer ! Relay(r)
|
||||
|
||||
val fwd1 = expectFwdAdd(register, shortId1, outgoingAmount, outgoingExpiry)
|
||||
val fwd1 = expectFwdAdd(register, channelIds(shortId1), outgoingAmount, outgoingExpiry)
|
||||
fwd1.message.replyTo ! RES_SUCCESS(fwd1.message, channelId1)
|
||||
fwd1.message.origin.replyTo ! RES_ADD_SETTLED(fwd1.message.origin, downstream_htlc, testCase.result)
|
||||
|
||||
|
@ -488,14 +488,12 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
|
|||
|
||||
// Simulate a chain re-org that changes the shortChannelId:
|
||||
channelRelayer ! WrappedShortChannelIdAssigned(ShortChannelIdAssigned(null, channelId_ab, ShortChannelId(42), Some(channelUpdate_ab.shortChannelId)))
|
||||
val channels7 = getOutgoingChannels(true)
|
||||
assert(channels7.isEmpty)
|
||||
|
||||
// We should receive the updated channel update containing the new shortChannelId:
|
||||
channelRelayer ! WrappedLocalChannelUpdate(LocalChannelUpdate(null, channelId_ab, ShortChannelId(42), a, None, channelUpdate_ab.copy(shortChannelId = ShortChannelId(42)), makeCommitments(channelId_ab, 100000 msat, 200000 msat)))
|
||||
val channels8 = getOutgoingChannels(true)
|
||||
assert(channels8.size === 1)
|
||||
assert(channels8.head.channelUpdate.shortChannelId === ShortChannelId(42))
|
||||
val channels7 = getOutgoingChannels(true)
|
||||
assert(channels7.size === 1)
|
||||
assert(channels7.head.channelUpdate.shortChannelId === ShortChannelId(42))
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -90,7 +90,7 @@ class RelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("applicat
|
|||
// and then manually build an htlc
|
||||
val add_ab = UpdateAddHtlc(channelId = randomBytes32(), id = 123456, cmd.amount, cmd.paymentHash, cmd.cltvExpiry, cmd.onion)
|
||||
relayer ! RelayForward(add_ab)
|
||||
register.expectMessageType[Register.ForwardShortId[CMD_ADD_HTLC]]
|
||||
register.expectMessageType[Register.Forward[CMD_ADD_HTLC]]
|
||||
}
|
||||
|
||||
test("relay an htlc-add at the final node to the payment handler") { f =>
|
||||
|
|
Loading…
Add table
Reference in a new issue