mirror of
https://github.com/ACINQ/eclair.git
synced 2025-03-04 09:58:02 +01:00
Refactor: move channel relaying to a ChannelRelayer actor. (#1215)
The relayer is the top-level orchestrator that forwards to a payment handler, channel relayer or (coming) node relayer.
This commit is contained in:
parent
859b405587
commit
11003097cf
27 changed files with 516 additions and 428 deletions
|
@ -31,7 +31,7 @@ import fr.acinq.eclair.db.{IncomingPayment, NetworkFee, OutgoingPayment, Stats}
|
|||
import fr.acinq.eclair.io.Peer.{GetPeerInfo, PeerInfo}
|
||||
import fr.acinq.eclair.io.{NodeURI, Peer}
|
||||
import fr.acinq.eclair.payment.send.PaymentInitiator.SendPaymentRequest
|
||||
import fr.acinq.eclair.payment.Relayer.{GetOutgoingChannels, OutgoingChannels, UsableBalance}
|
||||
import fr.acinq.eclair.payment.relay.Relayer.{GetOutgoingChannels, OutgoingChannels, UsableBalance}
|
||||
import fr.acinq.eclair.payment._
|
||||
import fr.acinq.eclair.payment.receive.MultiPartHandler.ReceivePayment
|
||||
import fr.acinq.eclair.router.{ChannelDesc, RouteRequest, RouteResponse, Router}
|
||||
|
|
|
@ -45,7 +45,8 @@ import fr.acinq.eclair.db.{BackupHandler, Databases}
|
|||
import fr.acinq.eclair.io.{Authenticator, Server, Switchboard}
|
||||
import fr.acinq.eclair.payment.receive.PaymentHandler
|
||||
import fr.acinq.eclair.payment.send.{Autoprobe, PaymentInitiator}
|
||||
import fr.acinq.eclair.payment.{Auditor, Relayer}
|
||||
import fr.acinq.eclair.payment.Auditor
|
||||
import fr.acinq.eclair.payment.relay.Relayer
|
||||
import fr.acinq.eclair.router._
|
||||
import fr.acinq.eclair.tor.TorProtocolHandler.OnionServiceVersion
|
||||
import fr.acinq.eclair.tor.{Controller, TorProtocolHandler}
|
||||
|
|
|
@ -27,6 +27,7 @@ import fr.acinq.eclair.channel.Helpers.{Closing, Funding}
|
|||
import fr.acinq.eclair.crypto.{ShaChain, Sphinx}
|
||||
import fr.acinq.eclair.io.Peer
|
||||
import fr.acinq.eclair.payment._
|
||||
import fr.acinq.eclair.payment.relay.{CommandBuffer, Origin, Relayer}
|
||||
import fr.acinq.eclair.router.Announcements
|
||||
import fr.acinq.eclair.transactions._
|
||||
import fr.acinq.eclair.wire.{ChannelReestablish, _}
|
||||
|
|
|
@ -18,7 +18,7 @@ package fr.acinq.eclair.channel
|
|||
|
||||
import fr.acinq.bitcoin.Crypto.PrivateKey
|
||||
import fr.acinq.bitcoin.{ByteVector32, Satoshi, Transaction}
|
||||
import fr.acinq.eclair.payment.Origin
|
||||
import fr.acinq.eclair.payment.relay.Origin
|
||||
import fr.acinq.eclair.wire.{ChannelUpdate, UpdateAddHtlc}
|
||||
import fr.acinq.eclair.{CltvExpiry, CltvExpiryDelta, MilliSatoshi, UInt64}
|
||||
|
||||
|
|
|
@ -22,6 +22,7 @@ import fr.acinq.bitcoin.{ByteVector32, ByteVector64, Crypto}
|
|||
import fr.acinq.eclair.blockchain.fee.{FeeEstimator, FeeTargets}
|
||||
import fr.acinq.eclair.crypto.{Generators, KeyManager, ShaChain, Sphinx}
|
||||
import fr.acinq.eclair.payment._
|
||||
import fr.acinq.eclair.payment.relay.{Origin, Relayer}
|
||||
import fr.acinq.eclair.transactions.Transactions._
|
||||
import fr.acinq.eclair.transactions._
|
||||
import fr.acinq.eclair.wire._
|
||||
|
|
|
@ -27,7 +27,8 @@ import fr.acinq.eclair.blockchain.EclairWallet
|
|||
import fr.acinq.eclair.channel.Helpers.Closing
|
||||
import fr.acinq.eclair.channel._
|
||||
import fr.acinq.eclair.db.PendingRelayDb
|
||||
import fr.acinq.eclair.payment.{IncomingPacket, Origin}
|
||||
import fr.acinq.eclair.payment.relay.{CommandBuffer, Origin}
|
||||
import fr.acinq.eclair.payment.IncomingPacket
|
||||
import fr.acinq.eclair.router.Rebroadcast
|
||||
import fr.acinq.eclair.transactions.{IN, OUT}
|
||||
import fr.acinq.eclair.wire.{TemporaryNodeFailure, UpdateAddHtlc}
|
||||
|
@ -187,7 +188,7 @@ object Switchboard {
|
|||
|
||||
/**
|
||||
* We store [[CMD_FULFILL_HTLC]]/[[CMD_FAIL_HTLC]]/[[CMD_FAIL_MALFORMED_HTLC]]
|
||||
* in a database (see [[fr.acinq.eclair.payment.CommandBuffer]]) because we
|
||||
* in a database (see [[fr.acinq.eclair.payment.relay.CommandBuffer]]) because we
|
||||
* don't want to lose preimages, or to forget to fail incoming htlcs, which
|
||||
* would lead to unwanted channel closings.
|
||||
*
|
||||
|
|
|
@ -1,378 +0,0 @@
|
|||
/*
|
||||
* Copyright 2019 ACINQ SAS
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package fr.acinq.eclair.payment
|
||||
|
||||
import java.util.UUID
|
||||
|
||||
import akka.actor.{Actor, ActorLogging, ActorRef, Props, Status}
|
||||
import akka.event.LoggingAdapter
|
||||
import fr.acinq.bitcoin.ByteVector32
|
||||
import fr.acinq.bitcoin.Crypto.PublicKey
|
||||
import fr.acinq.eclair.channel._
|
||||
import fr.acinq.eclair.db.{OutgoingPayment, OutgoingPaymentStatus}
|
||||
import fr.acinq.eclair.payment.Origin.{Local, Relayed}
|
||||
import fr.acinq.eclair.router.Announcements
|
||||
import fr.acinq.eclair.wire._
|
||||
import fr.acinq.eclair.{LongToBtcAmount, MilliSatoshi, NodeParams, ShortChannelId, nodeFee}
|
||||
import grizzled.slf4j.Logging
|
||||
|
||||
import scala.collection.mutable
|
||||
|
||||
// @formatter:off
|
||||
sealed trait Origin
|
||||
object Origin {
|
||||
/** Our node is the origin of the payment. */
|
||||
case class Local(id: UUID, sender: Option[ActorRef]) extends Origin // we don't persist reference to local actors
|
||||
/** Our node forwarded a single incoming HTLC to an outgoing channel. */
|
||||
case class Relayed(originChannelId: ByteVector32, originHtlcId: Long, amountIn: MilliSatoshi, amountOut: MilliSatoshi) extends Origin
|
||||
}
|
||||
// @formatter:on
|
||||
|
||||
/**
|
||||
* Created by PM on 01/02/2017.
|
||||
*/
|
||||
class Relayer(nodeParams: NodeParams, register: ActorRef, paymentHandler: ActorRef) extends Actor with ActorLogging {
|
||||
|
||||
import Relayer._
|
||||
|
||||
// we pass these to helpers classes so that they have the logging context
|
||||
implicit def implicitLog: LoggingAdapter = log
|
||||
|
||||
context.system.eventStream.subscribe(self, classOf[LocalChannelUpdate])
|
||||
context.system.eventStream.subscribe(self, classOf[LocalChannelDown])
|
||||
context.system.eventStream.subscribe(self, classOf[AvailableBalanceChanged])
|
||||
|
||||
private val commandBuffer = context.actorOf(Props(new CommandBuffer(nodeParams, register)))
|
||||
|
||||
override def receive: Receive = main(Map.empty, new mutable.HashMap[PublicKey, mutable.Set[ShortChannelId]] with mutable.MultiMap[PublicKey, ShortChannelId])
|
||||
|
||||
def main(channelUpdates: Map[ShortChannelId, OutgoingChannel], node2channels: mutable.HashMap[PublicKey, mutable.Set[ShortChannelId]] with mutable.MultiMap[PublicKey, ShortChannelId]): Receive = {
|
||||
case GetOutgoingChannels(enabledOnly) =>
|
||||
val channels = if (enabledOnly) {
|
||||
channelUpdates.values.filter(o => Announcements.isEnabled(o.channelUpdate.channelFlags))
|
||||
} else {
|
||||
channelUpdates.values
|
||||
}
|
||||
sender ! OutgoingChannels(channels.toSeq)
|
||||
|
||||
case LocalChannelUpdate(_, channelId, shortChannelId, remoteNodeId, _, channelUpdate, commitments) =>
|
||||
log.debug(s"updating local channel info for channelId=$channelId shortChannelId=$shortChannelId remoteNodeId=$remoteNodeId channelUpdate={} commitments={}", channelUpdate, commitments)
|
||||
val channelUpdates1 = channelUpdates + (channelUpdate.shortChannelId -> OutgoingChannel(remoteNodeId, channelUpdate, commitments))
|
||||
context become main(channelUpdates1, node2channels.addBinding(remoteNodeId, channelUpdate.shortChannelId))
|
||||
|
||||
case LocalChannelDown(_, channelId, shortChannelId, remoteNodeId) =>
|
||||
log.debug(s"removed local channel info for channelId=$channelId shortChannelId=$shortChannelId")
|
||||
context become main(channelUpdates - shortChannelId, node2channels.removeBinding(remoteNodeId, shortChannelId))
|
||||
|
||||
case AvailableBalanceChanged(_, _, shortChannelId, _, commitments) =>
|
||||
val channelUpdates1 = channelUpdates.get(shortChannelId) match {
|
||||
case Some(c: OutgoingChannel) => channelUpdates + (shortChannelId -> c.copy(commitments = commitments))
|
||||
case None => channelUpdates // we only consider the balance if we have the channel_update
|
||||
}
|
||||
context become main(channelUpdates1, node2channels)
|
||||
|
||||
case ForwardAdd(add, previousFailures) =>
|
||||
log.debug(s"received forwarding request for htlc #${add.id} paymentHash=${add.paymentHash} from channelId=${add.channelId}")
|
||||
IncomingPacket.decrypt(add, nodeParams.privateKey, nodeParams.globalFeatures) match {
|
||||
case Right(p: IncomingPacket.FinalPacket) =>
|
||||
log.debug(s"forwarding htlc #${add.id} paymentHash=${add.paymentHash} to payment-handler")
|
||||
paymentHandler forward p
|
||||
case Right(r: IncomingPacket.ChannelRelayPacket) =>
|
||||
handleRelay(r, channelUpdates, node2channels, previousFailures, nodeParams.chainHash) match {
|
||||
case RelayFailure(cmdFail) =>
|
||||
log.warning(s"rejecting htlc #${add.id} paymentHash=${add.paymentHash} from channelId=${add.channelId} to shortChannelId=${r.payload.outgoingChannelId} reason=${cmdFail.reason}")
|
||||
commandBuffer ! CommandBuffer.CommandSend(add.channelId, add.id, cmdFail)
|
||||
case RelaySuccess(selectedShortChannelId, cmdAdd) =>
|
||||
log.info(s"forwarding htlc #${add.id} paymentHash=${add.paymentHash} from channelId=${add.channelId} to shortChannelId=$selectedShortChannelId")
|
||||
register ! Register.ForwardShortId(selectedShortChannelId, cmdAdd)
|
||||
}
|
||||
case Right(r: IncomingPacket.NodeRelayPacket) =>
|
||||
if (!nodeParams.enableTrampolinePayment) {
|
||||
log.warning(s"rejecting htlc #${add.id} paymentHash=${add.paymentHash} from channelId=${add.channelId} to nodeId=${r.innerPayload.outgoingNodeId} reason=trampoline disabled")
|
||||
commandBuffer ! CommandBuffer.CommandSend(add.channelId, add.id, CMD_FAIL_HTLC(add.id, Right(RequiredNodeFeatureMissing), commit = true))
|
||||
} else {
|
||||
// TODO: @t-bast: relay trampoline payload instead of rejecting.
|
||||
log.warning(s"rejecting htlc #${add.id} paymentHash=${add.paymentHash} from channelId=${add.channelId} to nodeId=${r.innerPayload.outgoingNodeId} reason=trampoline not implemented yet")
|
||||
commandBuffer ! CommandBuffer.CommandSend(add.channelId, add.id, CMD_FAIL_HTLC(add.id, Right(RequiredNodeFeatureMissing), commit = true))
|
||||
}
|
||||
case Left(badOnion: BadOnion) =>
|
||||
log.warning(s"couldn't parse onion: reason=${badOnion.message}")
|
||||
val cmdFail = CMD_FAIL_MALFORMED_HTLC(add.id, badOnion.onionHash, badOnion.code, commit = true)
|
||||
log.warning(s"rejecting htlc #${add.id} paymentHash=${add.paymentHash} from channelId=${add.channelId} reason=malformed onionHash=${cmdFail.onionHash} failureCode=${cmdFail.failureCode}")
|
||||
commandBuffer ! CommandBuffer.CommandSend(add.channelId, add.id, cmdFail)
|
||||
case Left(failure) =>
|
||||
log.warning(s"rejecting htlc #${add.id} paymentHash=${add.paymentHash} from channelId=${add.channelId} reason=$failure")
|
||||
val cmdFail = CMD_FAIL_HTLC(add.id, Right(failure), commit = true)
|
||||
commandBuffer ! CommandBuffer.CommandSend(add.channelId, add.id, cmdFail)
|
||||
}
|
||||
|
||||
case Status.Failure(Register.ForwardShortIdFailure(Register.ForwardShortId(shortChannelId, CMD_ADD_HTLC(_, _, _, _, Upstream.Relayed(add), _, _)))) =>
|
||||
log.warning(s"couldn't resolve downstream channel $shortChannelId, failing htlc #${add.id}")
|
||||
val cmdFail = CMD_FAIL_HTLC(add.id, Right(UnknownNextPeer), commit = true)
|
||||
commandBuffer ! CommandBuffer.CommandSend(add.channelId, add.id, cmdFail)
|
||||
|
||||
case Status.Failure(addFailed: AddHtlcFailed) =>
|
||||
import addFailed.paymentHash
|
||||
addFailed.origin match {
|
||||
case Local(id, None) =>
|
||||
handleLocalPaymentAfterRestart(PaymentFailed(id, paymentHash, Nil))
|
||||
case Local(_, Some(sender)) =>
|
||||
sender ! Status.Failure(addFailed)
|
||||
case Relayed(originChannelId, originHtlcId, _, _) =>
|
||||
addFailed.originalCommand match {
|
||||
case Some(CMD_ADD_HTLC(_, _, _, _, Upstream.Relayed(add), _, previousFailures)) =>
|
||||
log.info(s"retrying htlc #$originHtlcId paymentHash=$paymentHash from channelId=$originChannelId")
|
||||
self ! ForwardAdd(add, previousFailures :+ addFailed)
|
||||
case _ =>
|
||||
val failure = translateError(addFailed)
|
||||
val cmdFail = CMD_FAIL_HTLC(originHtlcId, Right(failure), commit = true)
|
||||
log.info(s"rejecting htlc #$originHtlcId paymentHash=$paymentHash from channelId=$originChannelId reason=${cmdFail.reason}")
|
||||
commandBuffer ! CommandBuffer.CommandSend(originChannelId, originHtlcId, cmdFail)
|
||||
}
|
||||
}
|
||||
|
||||
case ForwardFulfill(fulfill, to, add) =>
|
||||
to match {
|
||||
case Local(id, None) =>
|
||||
val feesPaid = 0.msat // fees are unknown since we lost the reference to the payment
|
||||
handleLocalPaymentAfterRestart(PaymentSent(id, add.paymentHash, fulfill.paymentPreimage, Seq(PaymentSent.PartialPayment(id, add.amountMsat, feesPaid, add.channelId, None))))
|
||||
case Local(_, Some(sender)) =>
|
||||
sender ! fulfill
|
||||
case Relayed(originChannelId, originHtlcId, amountIn, amountOut) =>
|
||||
val cmd = CMD_FULFILL_HTLC(originHtlcId, fulfill.paymentPreimage, commit = true)
|
||||
commandBuffer ! CommandBuffer.CommandSend(originChannelId, originHtlcId, cmd)
|
||||
context.system.eventStream.publish(PaymentRelayed(amountIn, amountOut, add.paymentHash, fromChannelId = originChannelId, toChannelId = fulfill.channelId))
|
||||
}
|
||||
|
||||
case ForwardFail(fail, to, add) =>
|
||||
to match {
|
||||
case Local(id, None) =>
|
||||
handleLocalPaymentAfterRestart(PaymentFailed(id, add.paymentHash, Nil))
|
||||
case Local(_, Some(sender)) =>
|
||||
sender ! fail
|
||||
case Relayed(originChannelId, originHtlcId, _, _) =>
|
||||
val cmd = CMD_FAIL_HTLC(originHtlcId, Left(fail.reason), commit = true)
|
||||
commandBuffer ! CommandBuffer.CommandSend(originChannelId, originHtlcId, cmd)
|
||||
}
|
||||
|
||||
case ForwardFailMalformed(fail, to, add) =>
|
||||
to match {
|
||||
case Local(id, None) =>
|
||||
handleLocalPaymentAfterRestart(PaymentFailed(id, add.paymentHash, Nil))
|
||||
case Local(_, Some(sender)) =>
|
||||
sender ! fail
|
||||
case Relayed(originChannelId, originHtlcId, _, _) =>
|
||||
val cmd = CMD_FAIL_MALFORMED_HTLC(originHtlcId, fail.onionHash, fail.failureCode, commit = true)
|
||||
commandBuffer ! CommandBuffer.CommandSend(originChannelId, originHtlcId, cmd)
|
||||
}
|
||||
|
||||
case ack: CommandBuffer.CommandAck => commandBuffer forward ack
|
||||
|
||||
case "ok" => () // ignoring responses from channels
|
||||
}
|
||||
|
||||
/**
|
||||
* It may happen that we sent a payment and then re-started before the payment completed.
|
||||
* When we receive the HTLC fulfill/fail associated to that payment, the payment FSM that generated them doesn't exist
|
||||
* anymore so we need to reconcile the database.
|
||||
*/
|
||||
def handleLocalPaymentAfterRestart(paymentResult: PaymentEvent): Unit = paymentResult match {
|
||||
case e: PaymentFailed =>
|
||||
nodeParams.db.payments.updateOutgoingPayment(e)
|
||||
// Since payments can be multi-part, we only emit the payment failed event once all child payments have failed.
|
||||
nodeParams.db.payments.getOutgoingPayment(e.id).foreach(p => {
|
||||
val payments = nodeParams.db.payments.listOutgoingPayments(p.parentId)
|
||||
if (payments.forall(_.status.isInstanceOf[OutgoingPaymentStatus.Failed])) {
|
||||
context.system.eventStream.publish(PaymentFailed(p.parentId, e.paymentHash, Nil))
|
||||
}
|
||||
})
|
||||
case e: PaymentSent =>
|
||||
nodeParams.db.payments.updateOutgoingPayment(e)
|
||||
// Since payments can be multi-part, we only emit the payment sent event once all child payments have settled.
|
||||
nodeParams.db.payments.getOutgoingPayment(e.id).foreach(p => {
|
||||
val payments = nodeParams.db.payments.listOutgoingPayments(p.parentId)
|
||||
if (!payments.exists(p => p.status == OutgoingPaymentStatus.Pending)) {
|
||||
val succeeded = payments.collect {
|
||||
case OutgoingPayment(id, _, _, _, amount, _, _, _, OutgoingPaymentStatus.Succeeded(_, feesPaid, _, completedAt)) =>
|
||||
PaymentSent.PartialPayment(id, amount, feesPaid, ByteVector32.Zeroes, None, completedAt)
|
||||
}
|
||||
context.system.eventStream.publish(PaymentSent(p.parentId, e.paymentHash, e.paymentPreimage, succeeded))
|
||||
}
|
||||
})
|
||||
case _ =>
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
object Relayer extends Logging {
|
||||
def props(nodeParams: NodeParams, register: ActorRef, paymentHandler: ActorRef) = Props(classOf[Relayer], nodeParams, register, paymentHandler)
|
||||
|
||||
// @formatter:off
|
||||
sealed trait ForwardMessage
|
||||
case class ForwardAdd(add: UpdateAddHtlc, previousFailures: Seq[AddHtlcFailed] = Seq.empty) extends ForwardMessage
|
||||
case class ForwardFulfill(fulfill: UpdateFulfillHtlc, to: Origin, htlc: UpdateAddHtlc) extends ForwardMessage
|
||||
case class ForwardFail(fail: UpdateFailHtlc, to: Origin, htlc: UpdateAddHtlc) extends ForwardMessage
|
||||
case class ForwardFailMalformed(fail: UpdateFailMalformedHtlc, to: Origin, htlc: UpdateAddHtlc) extends ForwardMessage
|
||||
|
||||
case class UsableBalance(remoteNodeId: PublicKey, shortChannelId: ShortChannelId, canSend: MilliSatoshi, canReceive: MilliSatoshi, isPublic: Boolean)
|
||||
|
||||
/**
|
||||
* Get the list of local outgoing channels.
|
||||
*
|
||||
* @param enabledOnly if true, filter out disabled channels.
|
||||
*/
|
||||
case class GetOutgoingChannels(enabledOnly: Boolean = true)
|
||||
case class OutgoingChannel(nextNodeId: PublicKey, channelUpdate: ChannelUpdate, commitments: Commitments) {
|
||||
def toUsableBalance: UsableBalance = UsableBalance(
|
||||
remoteNodeId = nextNodeId,
|
||||
shortChannelId = channelUpdate.shortChannelId,
|
||||
canSend = commitments.availableBalanceForSend,
|
||||
canReceive = commitments.availableBalanceForReceive,
|
||||
isPublic = commitments.announceChannel)
|
||||
}
|
||||
case class OutgoingChannels(channels: Seq[OutgoingChannel])
|
||||
// @formatter:on
|
||||
|
||||
// @formatter:off
|
||||
sealed trait RelayResult
|
||||
case class RelayFailure(cmdFail: CMD_FAIL_HTLC) extends RelayResult
|
||||
case class RelaySuccess(shortChannelId: ShortChannelId, cmdAdd: CMD_ADD_HTLC) extends RelayResult
|
||||
// @formatter:on
|
||||
|
||||
/**
|
||||
* Handle an incoming htlc when we are a relaying node.
|
||||
*
|
||||
* @return either:
|
||||
* - a CMD_FAIL_HTLC to be sent back upstream
|
||||
* - a CMD_ADD_HTLC to propagate downstream
|
||||
*/
|
||||
def handleRelay(relayPacket: IncomingPacket.ChannelRelayPacket, channelUpdates: Map[ShortChannelId, OutgoingChannel], node2channels: mutable.Map[PublicKey, mutable.Set[ShortChannelId]] with mutable.MultiMap[PublicKey, ShortChannelId], previousFailures: Seq[AddHtlcFailed], chainHash: ByteVector32)(implicit log: LoggingAdapter): RelayResult = {
|
||||
import relayPacket._
|
||||
log.info(s"relaying htlc #${add.id} paymentHash={} from channelId={} to requestedShortChannelId={} previousAttempts={}", add.paymentHash, add.channelId, payload.outgoingChannelId, previousFailures.size)
|
||||
val alreadyTried = previousFailures.flatMap(_.channelUpdate).map(_.shortChannelId)
|
||||
selectPreferredChannel(relayPacket, channelUpdates, node2channels, alreadyTried)
|
||||
.flatMap(selectedShortChannelId => channelUpdates.get(selectedShortChannelId).map(_.channelUpdate)) match {
|
||||
case None if previousFailures.nonEmpty =>
|
||||
// no more channels to try
|
||||
val error = previousFailures
|
||||
// we return the error for the initially requested channel if it exists
|
||||
.find(_.channelUpdate.map(_.shortChannelId).contains(payload.outgoingChannelId))
|
||||
// otherwise we return the error for the first channel tried
|
||||
.getOrElse(previousFailures.head)
|
||||
RelayFailure(CMD_FAIL_HTLC(add.id, Right(translateError(error)), commit = true))
|
||||
case channelUpdate_opt =>
|
||||
relayOrFail(relayPacket, channelUpdate_opt, previousFailures)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Select a channel to the same node to relay the payment to, that has the lowest balance and is compatible in
|
||||
* terms of fees, expiry_delta, etc.
|
||||
*
|
||||
* If no suitable channel is found we default to the originally requested channel.
|
||||
*/
|
||||
def selectPreferredChannel(relayPacket: IncomingPacket.ChannelRelayPacket, channelUpdates: Map[ShortChannelId, OutgoingChannel], node2channels: mutable.Map[PublicKey, mutable.Set[ShortChannelId]] with mutable.MultiMap[PublicKey, ShortChannelId], alreadyTried: Seq[ShortChannelId])(implicit log: LoggingAdapter): Option[ShortChannelId] = {
|
||||
import relayPacket.add
|
||||
val requestedShortChannelId = relayPacket.payload.outgoingChannelId
|
||||
log.debug(s"selecting next channel for htlc #${add.id} paymentHash={} from channelId={} to requestedShortChannelId={} previousAttempts={}", add.paymentHash, add.channelId, requestedShortChannelId, alreadyTried.size)
|
||||
// first we find out what is the next node
|
||||
val nextNodeId_opt = channelUpdates.get(requestedShortChannelId) match {
|
||||
case Some(OutgoingChannel(nextNodeId, _, _)) =>
|
||||
Some(nextNodeId)
|
||||
case None => None
|
||||
}
|
||||
nextNodeId_opt match {
|
||||
case Some(nextNodeId) =>
|
||||
log.debug(s"next hop for htlc #{} paymentHash={} is nodeId={}", add.id, add.paymentHash, nextNodeId)
|
||||
// then we retrieve all known channels to this node
|
||||
val allChannels = node2channels.getOrElse(nextNodeId, Set.empty[ShortChannelId])
|
||||
// we then filter out channels that we have already tried
|
||||
val candidateChannels = allChannels -- alreadyTried
|
||||
// and we filter keep the ones that are compatible with this payment (mainly fees, expiry delta)
|
||||
candidateChannels
|
||||
.map { shortChannelId =>
|
||||
val channelInfo_opt = channelUpdates.get(shortChannelId)
|
||||
val channelUpdate_opt = channelInfo_opt.map(_.channelUpdate)
|
||||
val relayResult = relayOrFail(relayPacket, channelUpdate_opt)
|
||||
log.debug(s"candidate channel for htlc #${add.id} paymentHash=${add.paymentHash}: shortChannelId={} balanceMsat={} channelUpdate={} relayResult={}", shortChannelId, channelInfo_opt.map(_.commitments.availableBalanceForSend).getOrElse(""), channelUpdate_opt.getOrElse(""), relayResult)
|
||||
(shortChannelId, channelInfo_opt, relayResult)
|
||||
}
|
||||
.collect { case (shortChannelId, Some(channelInfo), _: RelaySuccess) => (shortChannelId, channelInfo.commitments.availableBalanceForSend) }
|
||||
.filter(_._2 > relayPacket.payload.amountToForward) // we only keep channels that have enough balance to handle this payment
|
||||
.toList // needed for ordering
|
||||
.sortBy(_._2) // we want to use the channel with the lowest available balance that can process the payment
|
||||
.headOption match {
|
||||
case Some((preferredShortChannelId, availableBalanceMsat)) if preferredShortChannelId != requestedShortChannelId =>
|
||||
log.info("replacing requestedShortChannelId={} by preferredShortChannelId={} with availableBalanceMsat={}", requestedShortChannelId, preferredShortChannelId, availableBalanceMsat)
|
||||
Some(preferredShortChannelId)
|
||||
case Some(_) =>
|
||||
// the requested short_channel_id is already our preferred channel
|
||||
Some(requestedShortChannelId)
|
||||
case None if !alreadyTried.contains(requestedShortChannelId) =>
|
||||
// no channel seem to work for this payment, we keep the requested channel id
|
||||
Some(requestedShortChannelId)
|
||||
case None =>
|
||||
// no channel seem to work for this payment and we have already tried the requested channel id: we give up
|
||||
None
|
||||
}
|
||||
case _ => Some(requestedShortChannelId) // we don't have a channel_update for this short_channel_id
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This helper method will tell us if it is not even worth attempting to relay the payment to our local outgoing
|
||||
* channel, because some parameters don't match with our settings for that channel. In that case we directly fail the
|
||||
* htlc.
|
||||
*/
|
||||
def relayOrFail(relayPacket: IncomingPacket.ChannelRelayPacket, channelUpdate_opt: Option[ChannelUpdate], previousFailures: Seq[AddHtlcFailed] = Seq.empty): RelayResult = {
|
||||
import relayPacket._
|
||||
channelUpdate_opt match {
|
||||
case None =>
|
||||
RelayFailure(CMD_FAIL_HTLC(add.id, Right(UnknownNextPeer), commit = true))
|
||||
case Some(channelUpdate) if !Announcements.isEnabled(channelUpdate.channelFlags) =>
|
||||
RelayFailure(CMD_FAIL_HTLC(add.id, Right(ChannelDisabled(channelUpdate.messageFlags, channelUpdate.channelFlags, channelUpdate)), commit = true))
|
||||
case Some(channelUpdate) if payload.amountToForward < channelUpdate.htlcMinimumMsat =>
|
||||
RelayFailure(CMD_FAIL_HTLC(add.id, Right(AmountBelowMinimum(payload.amountToForward, channelUpdate)), commit = true))
|
||||
case Some(channelUpdate) if relayPacket.expiryDelta != channelUpdate.cltvExpiryDelta =>
|
||||
RelayFailure(CMD_FAIL_HTLC(add.id, Right(IncorrectCltvExpiry(payload.outgoingCltv, channelUpdate)), commit = true))
|
||||
case Some(channelUpdate) if relayPacket.relayFeeMsat < nodeFee(channelUpdate.feeBaseMsat, channelUpdate.feeProportionalMillionths, payload.amountToForward) =>
|
||||
RelayFailure(CMD_FAIL_HTLC(add.id, Right(FeeInsufficient(add.amountMsat, channelUpdate)), commit = true))
|
||||
case Some(channelUpdate) =>
|
||||
RelaySuccess(channelUpdate.shortChannelId, CMD_ADD_HTLC(payload.amountToForward, add.paymentHash, payload.outgoingCltv, nextPacket, Upstream.Relayed(add), commit = true, previousFailures = previousFailures))
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This helper method translates relaying errors (returned by the downstream outgoing channel) to BOLT 4 standard
|
||||
* errors that we should return upstream.
|
||||
*/
|
||||
private def translateError(failure: AddHtlcFailed): FailureMessage = {
|
||||
val error = failure.t
|
||||
val channelUpdate_opt = failure.channelUpdate
|
||||
(error, channelUpdate_opt) match {
|
||||
case (_: ExpiryTooSmall, Some(channelUpdate)) => ExpiryTooSoon(channelUpdate)
|
||||
case (_: ExpiryTooBig, _) => ExpiryTooFar
|
||||
case (_: InsufficientFunds, Some(channelUpdate)) => TemporaryChannelFailure(channelUpdate)
|
||||
case (_: TooManyAcceptedHtlcs, Some(channelUpdate)) => TemporaryChannelFailure(channelUpdate)
|
||||
case (_: ChannelUnavailable, Some(channelUpdate)) if !Announcements.isEnabled(channelUpdate.channelFlags) => ChannelDisabled(channelUpdate.messageFlags, channelUpdate.channelFlags, channelUpdate)
|
||||
case (_: ChannelUnavailable, None) => PermanentChannelFailure
|
||||
case (_: HtlcTimedout, _) => PermanentChannelFailure
|
||||
case _ => TemporaryNodeFailure
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,215 @@
|
|||
/*
|
||||
* Copyright 2019 ACINQ SAS
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package fr.acinq.eclair.payment.relay
|
||||
|
||||
import akka.actor.{Actor, ActorLogging, ActorRef, Props, Status}
|
||||
import akka.event.LoggingAdapter
|
||||
import fr.acinq.bitcoin.ByteVector32
|
||||
import fr.acinq.eclair.channel._
|
||||
import fr.acinq.eclair.payment.IncomingPacket
|
||||
import fr.acinq.eclair.payment.relay.Relayer.{ChannelUpdates, NodeChannels, OutgoingChannel}
|
||||
import fr.acinq.eclair.router.Announcements
|
||||
import fr.acinq.eclair.wire._
|
||||
import fr.acinq.eclair.{NodeParams, ShortChannelId, nodeFee}
|
||||
|
||||
/**
|
||||
* Created by t-bast on 09/10/2019.
|
||||
*/
|
||||
|
||||
/**
|
||||
* The Channel Relayer is used to relay a single upstream HTLC to a downstream channel.
|
||||
* It selects the best channel to use to relay and retries using other channels in case a local failure happens.
|
||||
*/
|
||||
class ChannelRelayer(nodeParams: NodeParams, relayer: ActorRef, register: ActorRef, commandBuffer: ActorRef) extends Actor with ActorLogging {
|
||||
|
||||
import ChannelRelayer._
|
||||
|
||||
// we pass these to helpers classes so that they have the logging context
|
||||
implicit def implicitLog: LoggingAdapter = log
|
||||
|
||||
override def receive: Receive = {
|
||||
case RelayHtlc(r, previousFailures, channelUpdates, node2channels) =>
|
||||
handleRelay(r, channelUpdates, node2channels, previousFailures, nodeParams.chainHash) match {
|
||||
case RelayFailure(cmdFail) =>
|
||||
log.info(s"rejecting htlc #${r.add.id} paymentHash=${r.add.paymentHash} from channelId=${r.add.channelId} to shortChannelId=${r.payload.outgoingChannelId} reason=${cmdFail.reason}")
|
||||
commandBuffer ! CommandBuffer.CommandSend(r.add.channelId, r.add.id, cmdFail)
|
||||
case RelaySuccess(selectedShortChannelId, cmdAdd) =>
|
||||
log.info(s"forwarding htlc #${r.add.id} paymentHash=${r.add.paymentHash} from channelId=${r.add.channelId} to shortChannelId=$selectedShortChannelId")
|
||||
register ! Register.ForwardShortId(selectedShortChannelId, cmdAdd)
|
||||
}
|
||||
|
||||
case Status.Failure(Register.ForwardShortIdFailure(Register.ForwardShortId(shortChannelId, CMD_ADD_HTLC(_, _, _, _, Upstream.Relayed(add), _, _)))) =>
|
||||
log.warning(s"couldn't resolve downstream channel $shortChannelId, failing htlc #${add.id}")
|
||||
val cmdFail = CMD_FAIL_HTLC(add.id, Right(UnknownNextPeer), commit = true)
|
||||
commandBuffer ! CommandBuffer.CommandSend(add.channelId, add.id, cmdFail)
|
||||
|
||||
case Status.Failure(addFailed: AddHtlcFailed) => addFailed.origin match {
|
||||
case Origin.Relayed(originChannelId, originHtlcId, _, _) => addFailed.originalCommand match {
|
||||
case Some(CMD_ADD_HTLC(_, _, _, _, Upstream.Relayed(add), _, previousFailures)) =>
|
||||
log.info(s"retrying htlc #$originHtlcId paymentHash=${addFailed.paymentHash} from channelId=$originChannelId")
|
||||
relayer ! Relayer.ForwardAdd(add, previousFailures :+ addFailed)
|
||||
case _ =>
|
||||
val failure = translateError(addFailed)
|
||||
val cmdFail = CMD_FAIL_HTLC(originHtlcId, Right(failure), commit = true)
|
||||
log.info(s"rejecting htlc #$originHtlcId paymentHash=${addFailed.paymentHash} from channelId=$originChannelId reason=${cmdFail.reason}")
|
||||
commandBuffer ! CommandBuffer.CommandSend(originChannelId, originHtlcId, cmdFail)
|
||||
}
|
||||
case _ => throw new IllegalArgumentException(s"channel relayer received unexpected failure: $addFailed")
|
||||
}
|
||||
|
||||
case ack: CommandBuffer.CommandAck => commandBuffer forward ack
|
||||
|
||||
case "ok" => // ignoring responses from channels
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
object ChannelRelayer {
|
||||
|
||||
def props(nodeParams: NodeParams, relayer: ActorRef, register: ActorRef, commandBuffer: ActorRef) = Props(classOf[ChannelRelayer], nodeParams, relayer, register, commandBuffer)
|
||||
|
||||
case class RelayHtlc(r: IncomingPacket.ChannelRelayPacket, previousFailures: Seq[AddHtlcFailed], channelUpdates: ChannelUpdates, node2channels: NodeChannels)
|
||||
|
||||
// @formatter:off
|
||||
sealed trait RelayResult
|
||||
case class RelayFailure(cmdFail: CMD_FAIL_HTLC) extends RelayResult
|
||||
case class RelaySuccess(shortChannelId: ShortChannelId, cmdAdd: CMD_ADD_HTLC) extends RelayResult
|
||||
// @formatter:on
|
||||
|
||||
/**
|
||||
* Handle an incoming htlc when we are a relaying node.
|
||||
*
|
||||
* @return either:
|
||||
* - a CMD_FAIL_HTLC to be sent back upstream
|
||||
* - a CMD_ADD_HTLC to propagate downstream
|
||||
*/
|
||||
def handleRelay(relayPacket: IncomingPacket.ChannelRelayPacket, channelUpdates: ChannelUpdates, node2channels: NodeChannels, previousFailures: Seq[AddHtlcFailed], chainHash: ByteVector32)(implicit log: LoggingAdapter): RelayResult = {
|
||||
import relayPacket._
|
||||
log.info(s"relaying htlc #${add.id} paymentHash={} from channelId={} to requestedShortChannelId={} previousAttempts={}", add.paymentHash, add.channelId, payload.outgoingChannelId, previousFailures.size)
|
||||
val alreadyTried = previousFailures.flatMap(_.channelUpdate).map(_.shortChannelId)
|
||||
selectPreferredChannel(relayPacket, channelUpdates, node2channels, alreadyTried)
|
||||
.flatMap(selectedShortChannelId => channelUpdates.get(selectedShortChannelId).map(_.channelUpdate)) match {
|
||||
case None if previousFailures.nonEmpty =>
|
||||
// no more channels to try
|
||||
val error = previousFailures
|
||||
// we return the error for the initially requested channel if it exists
|
||||
.find(_.channelUpdate.map(_.shortChannelId).contains(payload.outgoingChannelId))
|
||||
// otherwise we return the error for the first channel tried
|
||||
.getOrElse(previousFailures.head)
|
||||
RelayFailure(CMD_FAIL_HTLC(add.id, Right(translateError(error)), commit = true))
|
||||
case channelUpdate_opt =>
|
||||
relayOrFail(relayPacket, channelUpdate_opt, previousFailures)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Select a channel to the same node to relay the payment to, that has the lowest balance and is compatible in
|
||||
* terms of fees, expiry_delta, etc.
|
||||
*
|
||||
* If no suitable channel is found we default to the originally requested channel.
|
||||
*/
|
||||
def selectPreferredChannel(relayPacket: IncomingPacket.ChannelRelayPacket, channelUpdates: ChannelUpdates, node2channels: NodeChannels, alreadyTried: Seq[ShortChannelId])(implicit log: LoggingAdapter): Option[ShortChannelId] = {
|
||||
import relayPacket.add
|
||||
val requestedShortChannelId = relayPacket.payload.outgoingChannelId
|
||||
log.debug(s"selecting next channel for htlc #${add.id} paymentHash={} from channelId={} to requestedShortChannelId={} previousAttempts={}", add.paymentHash, add.channelId, requestedShortChannelId, alreadyTried.size)
|
||||
// first we find out what is the next node
|
||||
val nextNodeId_opt = channelUpdates.get(requestedShortChannelId) match {
|
||||
case Some(OutgoingChannel(nextNodeId, _, _)) =>
|
||||
Some(nextNodeId)
|
||||
case None => None
|
||||
}
|
||||
nextNodeId_opt match {
|
||||
case Some(nextNodeId) =>
|
||||
log.debug(s"next hop for htlc #{} paymentHash={} is nodeId={}", add.id, add.paymentHash, nextNodeId)
|
||||
// then we retrieve all known channels to this node
|
||||
val allChannels = node2channels.getOrElse(nextNodeId, Set.empty[ShortChannelId])
|
||||
// we then filter out channels that we have already tried
|
||||
val candidateChannels = allChannels -- alreadyTried
|
||||
// and we filter keep the ones that are compatible with this payment (mainly fees, expiry delta)
|
||||
candidateChannels
|
||||
.map { shortChannelId =>
|
||||
val channelInfo_opt = channelUpdates.get(shortChannelId)
|
||||
val channelUpdate_opt = channelInfo_opt.map(_.channelUpdate)
|
||||
val relayResult = relayOrFail(relayPacket, channelUpdate_opt)
|
||||
log.debug(s"candidate channel for htlc #${add.id} paymentHash=${add.paymentHash}: shortChannelId={} balanceMsat={} channelUpdate={} relayResult={}", shortChannelId, channelInfo_opt.map(_.commitments.availableBalanceForSend).getOrElse(""), channelUpdate_opt.getOrElse(""), relayResult)
|
||||
(shortChannelId, channelInfo_opt, relayResult)
|
||||
}
|
||||
.collect { case (shortChannelId, Some(channelInfo), _: RelaySuccess) => (shortChannelId, channelInfo.commitments.availableBalanceForSend) }
|
||||
.filter(_._2 > relayPacket.payload.amountToForward) // we only keep channels that have enough balance to handle this payment
|
||||
.toList // needed for ordering
|
||||
.sortBy(_._2) // we want to use the channel with the lowest available balance that can process the payment
|
||||
.headOption match {
|
||||
case Some((preferredShortChannelId, availableBalanceMsat)) if preferredShortChannelId != requestedShortChannelId =>
|
||||
log.info("replacing requestedShortChannelId={} by preferredShortChannelId={} with availableBalanceMsat={}", requestedShortChannelId, preferredShortChannelId, availableBalanceMsat)
|
||||
Some(preferredShortChannelId)
|
||||
case Some(_) =>
|
||||
// the requested short_channel_id is already our preferred channel
|
||||
Some(requestedShortChannelId)
|
||||
case None if !alreadyTried.contains(requestedShortChannelId) =>
|
||||
// no channel seem to work for this payment, we keep the requested channel id
|
||||
Some(requestedShortChannelId)
|
||||
case None =>
|
||||
// no channel seem to work for this payment and we have already tried the requested channel id: we give up
|
||||
None
|
||||
}
|
||||
case _ => Some(requestedShortChannelId) // we don't have a channel_update for this short_channel_id
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This helper method will tell us if it is not even worth attempting to relay the payment to our local outgoing
|
||||
* channel, because some parameters don't match with our settings for that channel. In that case we directly fail the
|
||||
* htlc.
|
||||
*/
|
||||
def relayOrFail(relayPacket: IncomingPacket.ChannelRelayPacket, channelUpdate_opt: Option[ChannelUpdate], previousFailures: Seq[AddHtlcFailed] = Seq.empty): RelayResult = {
|
||||
import relayPacket._
|
||||
channelUpdate_opt match {
|
||||
case None =>
|
||||
RelayFailure(CMD_FAIL_HTLC(add.id, Right(UnknownNextPeer), commit = true))
|
||||
case Some(channelUpdate) if !Announcements.isEnabled(channelUpdate.channelFlags) =>
|
||||
RelayFailure(CMD_FAIL_HTLC(add.id, Right(ChannelDisabled(channelUpdate.messageFlags, channelUpdate.channelFlags, channelUpdate)), commit = true))
|
||||
case Some(channelUpdate) if payload.amountToForward < channelUpdate.htlcMinimumMsat =>
|
||||
RelayFailure(CMD_FAIL_HTLC(add.id, Right(AmountBelowMinimum(payload.amountToForward, channelUpdate)), commit = true))
|
||||
case Some(channelUpdate) if relayPacket.expiryDelta != channelUpdate.cltvExpiryDelta =>
|
||||
RelayFailure(CMD_FAIL_HTLC(add.id, Right(IncorrectCltvExpiry(payload.outgoingCltv, channelUpdate)), commit = true))
|
||||
case Some(channelUpdate) if relayPacket.relayFeeMsat < nodeFee(channelUpdate.feeBaseMsat, channelUpdate.feeProportionalMillionths, payload.amountToForward) =>
|
||||
RelayFailure(CMD_FAIL_HTLC(add.id, Right(FeeInsufficient(add.amountMsat, channelUpdate)), commit = true))
|
||||
case Some(channelUpdate) =>
|
||||
RelaySuccess(channelUpdate.shortChannelId, CMD_ADD_HTLC(payload.amountToForward, add.paymentHash, payload.outgoingCltv, nextPacket, Upstream.Relayed(add), commit = true, previousFailures = previousFailures))
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This helper method translates relaying errors (returned by the downstream outgoing channel) to BOLT 4 standard
|
||||
* errors that we should return upstream.
|
||||
*/
|
||||
def translateError(failure: AddHtlcFailed): FailureMessage = {
|
||||
val error = failure.t
|
||||
val channelUpdate_opt = failure.channelUpdate
|
||||
(error, channelUpdate_opt) match {
|
||||
case (_: ExpiryTooSmall, Some(channelUpdate)) => ExpiryTooSoon(channelUpdate)
|
||||
case (_: ExpiryTooBig, _) => ExpiryTooFar
|
||||
case (_: InsufficientFunds, Some(channelUpdate)) => TemporaryChannelFailure(channelUpdate)
|
||||
case (_: TooManyAcceptedHtlcs, Some(channelUpdate)) => TemporaryChannelFailure(channelUpdate)
|
||||
case (_: ChannelUnavailable, Some(channelUpdate)) if !Announcements.isEnabled(channelUpdate.channelFlags) => ChannelDisabled(channelUpdate.messageFlags, channelUpdate.channelFlags, channelUpdate)
|
||||
case (_: ChannelUnavailable, None) => PermanentChannelFailure
|
||||
case (_: HtlcTimedout, _) => PermanentChannelFailure
|
||||
case _ => TemporaryNodeFailure
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -14,7 +14,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package fr.acinq.eclair.payment
|
||||
package fr.acinq.eclair.payment.relay
|
||||
|
||||
import akka.actor.{Actor, ActorLogging, ActorRef}
|
||||
import fr.acinq.bitcoin.ByteVector32
|
|
@ -0,0 +1,245 @@
|
|||
/*
|
||||
* Copyright 2019 ACINQ SAS
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package fr.acinq.eclair.payment.relay
|
||||
|
||||
import java.util.UUID
|
||||
|
||||
import akka.actor.{Actor, ActorLogging, ActorRef, Props, Status}
|
||||
import akka.event.LoggingAdapter
|
||||
import fr.acinq.bitcoin.ByteVector32
|
||||
import fr.acinq.bitcoin.Crypto.PublicKey
|
||||
import fr.acinq.eclair.channel._
|
||||
import fr.acinq.eclair.db.{OutgoingPayment, OutgoingPaymentStatus}
|
||||
import fr.acinq.eclair.payment._
|
||||
import fr.acinq.eclair.router.Announcements
|
||||
import fr.acinq.eclair.wire._
|
||||
import fr.acinq.eclair.{LongToBtcAmount, MilliSatoshi, NodeParams, ShortChannelId}
|
||||
import grizzled.slf4j.Logging
|
||||
|
||||
import scala.collection.mutable
|
||||
|
||||
// @formatter:off
|
||||
sealed trait Origin
|
||||
object Origin {
|
||||
/** Our node is the origin of the payment. */
|
||||
case class Local(id: UUID, sender: Option[ActorRef]) extends Origin // we don't persist reference to local actors
|
||||
/** Our node forwarded a single incoming HTLC to an outgoing channel. */
|
||||
case class Relayed(originChannelId: ByteVector32, originHtlcId: Long, amountIn: MilliSatoshi, amountOut: MilliSatoshi) extends Origin
|
||||
// TODO: @t-bast: add TrampolineRelayed
|
||||
}
|
||||
// @formatter:on
|
||||
|
||||
/**
|
||||
* Created by PM on 01/02/2017.
|
||||
*/
|
||||
|
||||
/**
|
||||
* The Relayer decrypts incoming HTLCs and relays accordingly:
|
||||
* - to a payment handler if we are the final recipient
|
||||
* - to a channel relayer if we are relaying from an upstream channel to a downstream channel
|
||||
* - to a node relayer if we are relaying a trampoline payment
|
||||
*
|
||||
* It also receives channel HTLC events (fulfill / failed) and relays those to the appropriate handlers.
|
||||
* It also maintains an up-to-date view of local channel balances.
|
||||
*/
|
||||
class Relayer(nodeParams: NodeParams, register: ActorRef, paymentHandler: ActorRef) extends Actor with ActorLogging {
|
||||
|
||||
import Relayer._
|
||||
|
||||
// we pass these to helpers classes so that they have the logging context
|
||||
implicit def implicitLog: LoggingAdapter = log
|
||||
|
||||
context.system.eventStream.subscribe(self, classOf[LocalChannelUpdate])
|
||||
context.system.eventStream.subscribe(self, classOf[LocalChannelDown])
|
||||
context.system.eventStream.subscribe(self, classOf[AvailableBalanceChanged])
|
||||
|
||||
private val commandBuffer = context.actorOf(Props(new CommandBuffer(nodeParams, register)))
|
||||
private val channelRelayer = context.actorOf(ChannelRelayer.props(nodeParams, self, register, commandBuffer))
|
||||
|
||||
override def receive: Receive = main(Map.empty, new mutable.HashMap[PublicKey, mutable.Set[ShortChannelId]] with mutable.MultiMap[PublicKey, ShortChannelId])
|
||||
|
||||
def main(channelUpdates: ChannelUpdates, node2channels: NodeChannels): Receive = {
|
||||
case GetOutgoingChannels(enabledOnly) =>
|
||||
val channels = if (enabledOnly) {
|
||||
channelUpdates.values.filter(o => Announcements.isEnabled(o.channelUpdate.channelFlags))
|
||||
} else {
|
||||
channelUpdates.values
|
||||
}
|
||||
sender ! OutgoingChannels(channels.toSeq)
|
||||
|
||||
case LocalChannelUpdate(_, channelId, shortChannelId, remoteNodeId, _, channelUpdate, commitments) =>
|
||||
log.debug(s"updating local channel info for channelId=$channelId shortChannelId=$shortChannelId remoteNodeId=$remoteNodeId channelUpdate={} commitments={}", channelUpdate, commitments)
|
||||
val channelUpdates1 = channelUpdates + (channelUpdate.shortChannelId -> OutgoingChannel(remoteNodeId, channelUpdate, commitments))
|
||||
context become main(channelUpdates1, node2channels.addBinding(remoteNodeId, channelUpdate.shortChannelId))
|
||||
|
||||
case LocalChannelDown(_, channelId, shortChannelId, remoteNodeId) =>
|
||||
log.debug(s"removed local channel info for channelId=$channelId shortChannelId=$shortChannelId")
|
||||
context become main(channelUpdates - shortChannelId, node2channels.removeBinding(remoteNodeId, shortChannelId))
|
||||
|
||||
case AvailableBalanceChanged(_, _, shortChannelId, _, commitments) =>
|
||||
val channelUpdates1 = channelUpdates.get(shortChannelId) match {
|
||||
case Some(c: OutgoingChannel) => channelUpdates + (shortChannelId -> c.copy(commitments = commitments))
|
||||
case None => channelUpdates // we only consider the balance if we have the channel_update
|
||||
}
|
||||
context become main(channelUpdates1, node2channels)
|
||||
|
||||
case ForwardAdd(add, previousFailures) =>
|
||||
log.debug(s"received forwarding request for htlc #${add.id} paymentHash=${add.paymentHash} from channelId=${add.channelId}")
|
||||
IncomingPacket.decrypt(add, nodeParams.privateKey, nodeParams.globalFeatures) match {
|
||||
case Right(p: IncomingPacket.FinalPacket) =>
|
||||
log.debug(s"forwarding htlc #${add.id} paymentHash=${add.paymentHash} to payment-handler")
|
||||
paymentHandler forward p
|
||||
case Right(r: IncomingPacket.ChannelRelayPacket) =>
|
||||
channelRelayer forward ChannelRelayer.RelayHtlc(r, previousFailures, channelUpdates, node2channels)
|
||||
case Right(r: IncomingPacket.NodeRelayPacket) =>
|
||||
if (!nodeParams.enableTrampolinePayment) {
|
||||
log.warning(s"rejecting htlc #${add.id} paymentHash=${add.paymentHash} from channelId=${add.channelId} to nodeId=${r.innerPayload.outgoingNodeId} reason=trampoline disabled")
|
||||
commandBuffer ! CommandBuffer.CommandSend(add.channelId, add.id, CMD_FAIL_HTLC(add.id, Right(RequiredNodeFeatureMissing), commit = true))
|
||||
} else {
|
||||
// TODO: @t-bast: relay trampoline payload instead of rejecting.
|
||||
log.warning(s"rejecting htlc #${add.id} paymentHash=${add.paymentHash} from channelId=${add.channelId} to nodeId=${r.innerPayload.outgoingNodeId} reason=trampoline not implemented yet")
|
||||
commandBuffer ! CommandBuffer.CommandSend(add.channelId, add.id, CMD_FAIL_HTLC(add.id, Right(RequiredNodeFeatureMissing), commit = true))
|
||||
}
|
||||
case Left(badOnion: BadOnion) =>
|
||||
log.warning(s"couldn't parse onion: reason=${badOnion.message}")
|
||||
val cmdFail = CMD_FAIL_MALFORMED_HTLC(add.id, badOnion.onionHash, badOnion.code, commit = true)
|
||||
log.warning(s"rejecting htlc #${add.id} paymentHash=${add.paymentHash} from channelId=${add.channelId} reason=malformed onionHash=${cmdFail.onionHash} failureCode=${cmdFail.failureCode}")
|
||||
commandBuffer ! CommandBuffer.CommandSend(add.channelId, add.id, cmdFail)
|
||||
case Left(failure) =>
|
||||
log.warning(s"rejecting htlc #${add.id} paymentHash=${add.paymentHash} from channelId=${add.channelId} reason=$failure")
|
||||
val cmdFail = CMD_FAIL_HTLC(add.id, Right(failure), commit = true)
|
||||
commandBuffer ! CommandBuffer.CommandSend(add.channelId, add.id, cmdFail)
|
||||
}
|
||||
|
||||
case Status.Failure(addFailed: AddHtlcFailed) =>
|
||||
import addFailed.paymentHash
|
||||
addFailed.origin match {
|
||||
case Origin.Local(id, None) =>
|
||||
handleLocalPaymentAfterRestart(PaymentFailed(id, paymentHash, Nil))
|
||||
case Origin.Local(_, Some(sender)) =>
|
||||
sender ! Status.Failure(addFailed)
|
||||
case _: Origin.Relayed =>
|
||||
channelRelayer forward Status.Failure(addFailed)
|
||||
}
|
||||
|
||||
case ForwardFulfill(fulfill, to, add) =>
|
||||
to match {
|
||||
case Origin.Local(id, None) =>
|
||||
val feesPaid = 0.msat // fees are unknown since we lost the reference to the payment
|
||||
handleLocalPaymentAfterRestart(PaymentSent(id, add.paymentHash, fulfill.paymentPreimage, Seq(PaymentSent.PartialPayment(id, add.amountMsat, feesPaid, add.channelId, None))))
|
||||
case Origin.Local(_, Some(sender)) =>
|
||||
sender ! fulfill
|
||||
case Origin.Relayed(originChannelId, originHtlcId, amountIn, amountOut) =>
|
||||
val cmd = CMD_FULFILL_HTLC(originHtlcId, fulfill.paymentPreimage, commit = true)
|
||||
commandBuffer ! CommandBuffer.CommandSend(originChannelId, originHtlcId, cmd)
|
||||
context.system.eventStream.publish(PaymentRelayed(amountIn, amountOut, add.paymentHash, fromChannelId = originChannelId, toChannelId = fulfill.channelId))
|
||||
}
|
||||
|
||||
case ForwardFail(fail, to, add) =>
|
||||
to match {
|
||||
case Origin.Local(id, None) =>
|
||||
handleLocalPaymentAfterRestart(PaymentFailed(id, add.paymentHash, Nil))
|
||||
case Origin.Local(_, Some(sender)) =>
|
||||
sender ! fail
|
||||
case Origin.Relayed(originChannelId, originHtlcId, _, _) =>
|
||||
val cmd = CMD_FAIL_HTLC(originHtlcId, Left(fail.reason), commit = true)
|
||||
commandBuffer ! CommandBuffer.CommandSend(originChannelId, originHtlcId, cmd)
|
||||
}
|
||||
|
||||
case ForwardFailMalformed(fail, to, add) =>
|
||||
to match {
|
||||
case Origin.Local(id, None) =>
|
||||
handleLocalPaymentAfterRestart(PaymentFailed(id, add.paymentHash, Nil))
|
||||
case Origin.Local(_, Some(sender)) =>
|
||||
sender ! fail
|
||||
case Origin.Relayed(originChannelId, originHtlcId, _, _) =>
|
||||
val cmd = CMD_FAIL_MALFORMED_HTLC(originHtlcId, fail.onionHash, fail.failureCode, commit = true)
|
||||
commandBuffer ! CommandBuffer.CommandSend(originChannelId, originHtlcId, cmd)
|
||||
}
|
||||
|
||||
case ack: CommandBuffer.CommandAck => commandBuffer forward ack
|
||||
|
||||
case "ok" => () // ignoring responses from channels
|
||||
}
|
||||
|
||||
/**
|
||||
* It may happen that we sent a payment and then re-started before the payment completed.
|
||||
* When we receive the HTLC fulfill/fail associated to that payment, the payment FSM that generated them doesn't exist
|
||||
* anymore so we need to reconcile the database.
|
||||
*/
|
||||
def handleLocalPaymentAfterRestart(paymentResult: PaymentEvent): Unit = paymentResult match {
|
||||
case e: PaymentFailed =>
|
||||
nodeParams.db.payments.updateOutgoingPayment(e)
|
||||
// Since payments can be multi-part, we only emit the payment failed event once all child payments have failed.
|
||||
nodeParams.db.payments.getOutgoingPayment(e.id).foreach(p => {
|
||||
val payments = nodeParams.db.payments.listOutgoingPayments(p.parentId)
|
||||
if (payments.forall(_.status.isInstanceOf[OutgoingPaymentStatus.Failed])) {
|
||||
context.system.eventStream.publish(PaymentFailed(p.parentId, e.paymentHash, Nil))
|
||||
}
|
||||
})
|
||||
case e: PaymentSent =>
|
||||
nodeParams.db.payments.updateOutgoingPayment(e)
|
||||
// Since payments can be multi-part, we only emit the payment sent event once all child payments have settled.
|
||||
nodeParams.db.payments.getOutgoingPayment(e.id).foreach(p => {
|
||||
val payments = nodeParams.db.payments.listOutgoingPayments(p.parentId)
|
||||
if (!payments.exists(p => p.status == OutgoingPaymentStatus.Pending)) {
|
||||
val succeeded = payments.collect {
|
||||
case OutgoingPayment(id, _, _, _, amount, _, _, _, OutgoingPaymentStatus.Succeeded(_, feesPaid, _, completedAt)) =>
|
||||
PaymentSent.PartialPayment(id, amount, feesPaid, ByteVector32.Zeroes, None, completedAt)
|
||||
}
|
||||
context.system.eventStream.publish(PaymentSent(p.parentId, e.paymentHash, e.paymentPreimage, succeeded))
|
||||
}
|
||||
})
|
||||
case _ =>
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
object Relayer extends Logging {
|
||||
|
||||
def props(nodeParams: NodeParams, register: ActorRef, paymentHandler: ActorRef) = Props(classOf[Relayer], nodeParams, register, paymentHandler)
|
||||
|
||||
type ChannelUpdates = Map[ShortChannelId, OutgoingChannel]
|
||||
type NodeChannels = mutable.HashMap[PublicKey, mutable.Set[ShortChannelId]] with mutable.MultiMap[PublicKey, ShortChannelId]
|
||||
|
||||
// @formatter:off
|
||||
sealed trait ForwardMessage
|
||||
case class ForwardAdd(add: UpdateAddHtlc, previousFailures: Seq[AddHtlcFailed] = Seq.empty) extends ForwardMessage
|
||||
case class ForwardFulfill(fulfill: UpdateFulfillHtlc, to: Origin, htlc: UpdateAddHtlc) extends ForwardMessage
|
||||
case class ForwardFail(fail: UpdateFailHtlc, to: Origin, htlc: UpdateAddHtlc) extends ForwardMessage
|
||||
case class ForwardFailMalformed(fail: UpdateFailMalformedHtlc, to: Origin, htlc: UpdateAddHtlc) extends ForwardMessage
|
||||
|
||||
case class UsableBalance(remoteNodeId: PublicKey, shortChannelId: ShortChannelId, canSend: MilliSatoshi, canReceive: MilliSatoshi, isPublic: Boolean)
|
||||
|
||||
/**
|
||||
* Get the list of local outgoing channels.
|
||||
*
|
||||
* @param enabledOnly if true, filter out disabled channels.
|
||||
*/
|
||||
case class GetOutgoingChannels(enabledOnly: Boolean = true)
|
||||
case class OutgoingChannel(nextNodeId: PublicKey, channelUpdate: ChannelUpdate, commitments: Commitments) {
|
||||
def toUsableBalance: UsableBalance = UsableBalance(
|
||||
remoteNodeId = nextNodeId,
|
||||
shortChannelId = channelUpdate.shortChannelId,
|
||||
canSend = commitments.availableBalanceForSend,
|
||||
canReceive = commitments.availableBalanceForReceive,
|
||||
isPublic = commitments.announceChannel)
|
||||
}
|
||||
case class OutgoingChannels(channels: Seq[OutgoingChannel])
|
||||
// @formatter:on
|
||||
|
||||
}
|
|
@ -26,7 +26,7 @@ import fr.acinq.eclair.channel.Commitments
|
|||
import fr.acinq.eclair.crypto.Sphinx
|
||||
import fr.acinq.eclair.payment.PaymentRequest.ExtraHop
|
||||
import fr.acinq.eclair.payment.PaymentSent.PartialPayment
|
||||
import fr.acinq.eclair.payment.Relayer.{GetOutgoingChannels, OutgoingChannel, OutgoingChannels}
|
||||
import fr.acinq.eclair.payment.relay.Relayer.{GetOutgoingChannels, OutgoingChannel, OutgoingChannels}
|
||||
import fr.acinq.eclair.payment._
|
||||
import fr.acinq.eclair.payment.send.PaymentInitiator.SendPaymentConfig
|
||||
import fr.acinq.eclair.payment.send.PaymentLifecycle.SendPayment
|
||||
|
|
|
@ -23,8 +23,7 @@ import fr.acinq.bitcoin.DeterministicWallet.{ExtendedPrivateKey, KeyPath}
|
|||
import fr.acinq.bitcoin.{ByteVector32, ByteVector64, Crypto, OutPoint, Transaction, TxOut}
|
||||
import fr.acinq.eclair.channel._
|
||||
import fr.acinq.eclair.crypto.ShaChain
|
||||
import fr.acinq.eclair.payment.Origin
|
||||
import fr.acinq.eclair.payment.Origin.{Relayed, Local}
|
||||
import fr.acinq.eclair.payment.relay.Origin
|
||||
import fr.acinq.eclair.transactions.Transactions._
|
||||
import fr.acinq.eclair.transactions._
|
||||
import fr.acinq.eclair.wire.CommonCodecs._
|
||||
|
@ -180,23 +179,23 @@ object ChannelCodecs extends Logging {
|
|||
("sentAfterLocalCommitIndex" | uint64overflow) ::
|
||||
("reSignAsap" | bool)).as[WaitingForRevocation]
|
||||
|
||||
val localCodec: Codec[Local] = (
|
||||
val localCodec: Codec[Origin.Local] = (
|
||||
("id" | uuid) ::
|
||||
("sender" | provide(Option.empty[ActorRef]))
|
||||
).as[Local]
|
||||
).as[Origin.Local]
|
||||
|
||||
val relayedCodec: Codec[Relayed] = (
|
||||
val relayedCodec: Codec[Origin.Relayed] = (
|
||||
("originChannelId" | bytes32) ::
|
||||
("originHtlcId" | int64) ::
|
||||
("amountIn" | millisatoshi) ::
|
||||
("amountOut" | millisatoshi)).as[Relayed]
|
||||
("amountOut" | millisatoshi)).as[Origin.Relayed]
|
||||
|
||||
// this is for backward compatibility to handle legacy payments that didn't have identifiers
|
||||
val UNKNOWN_UUID = UUID.fromString("00000000-0000-0000-0000-000000000000")
|
||||
|
||||
val originCodec: Codec[Origin] = discriminated[Origin].by(uint16)
|
||||
.typecase(0x03, localCodec) // backward compatible
|
||||
.typecase(0x01, provide(Local(UNKNOWN_UUID, None)))
|
||||
.typecase(0x01, provide(Origin.Local(UNKNOWN_UUID, None)))
|
||||
.typecase(0x02, relayedCodec)
|
||||
|
||||
val originsListCodec: Codec[List[(Long, Origin)]] = listOfN(uint16, int64 ~ originCodec)
|
||||
|
|
|
@ -20,7 +20,7 @@ import java.util.UUID
|
|||
|
||||
import fr.acinq.eclair.channel.Commitments._
|
||||
import fr.acinq.eclair.channel.states.StateTestsHelperMethods
|
||||
import fr.acinq.eclair.payment.Origin.Local
|
||||
import fr.acinq.eclair.payment.relay.Origin.Local
|
||||
import fr.acinq.eclair.wire.IncorrectOrUnknownPaymentDetails
|
||||
import fr.acinq.eclair.{TestkitBaseClass, _}
|
||||
import org.scalatest.Outcome
|
||||
|
|
|
@ -30,6 +30,7 @@ import fr.acinq.eclair.channel.states.StateTestsHelperMethods
|
|||
import fr.acinq.eclair.payment._
|
||||
import fr.acinq.eclair.payment.receive.MultiPartHandler.ReceivePayment
|
||||
import fr.acinq.eclair.payment.receive.PaymentHandler
|
||||
import fr.acinq.eclair.payment.relay.Relayer
|
||||
import fr.acinq.eclair.payment.send.PaymentLifecycle
|
||||
import fr.acinq.eclair.router.ChannelHop
|
||||
import fr.acinq.eclair.wire.Onion.FinalLegacyPayload
|
||||
|
|
|
@ -27,7 +27,7 @@ import fr.acinq.eclair.TestConstants.{Alice, Bob}
|
|||
import fr.acinq.eclair._
|
||||
import fr.acinq.eclair.blockchain._
|
||||
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher
|
||||
import fr.acinq.eclair.payment.Relayer
|
||||
import fr.acinq.eclair.payment.relay.Relayer
|
||||
import fr.acinq.eclair.wire.{Init, UpdateAddHtlc}
|
||||
import org.scalatest.FunSuite
|
||||
|
||||
|
|
|
@ -32,8 +32,8 @@ import fr.acinq.eclair.channel.states.StateTestsHelperMethods
|
|||
import fr.acinq.eclair.channel.{ChannelErrorOccurred, _}
|
||||
import fr.acinq.eclair.crypto.Sphinx
|
||||
import fr.acinq.eclair.io.Peer
|
||||
import fr.acinq.eclair.payment.Relayer._
|
||||
import fr.acinq.eclair.payment._
|
||||
import fr.acinq.eclair.payment.relay.Relayer._
|
||||
import fr.acinq.eclair.payment.relay.{CommandBuffer, Origin}
|
||||
import fr.acinq.eclair.router.Announcements
|
||||
import fr.acinq.eclair.transactions.Transactions.{HtlcSuccessTx, htlcSuccessWeight, htlcTimeoutWeight, weight2fee}
|
||||
import fr.acinq.eclair.transactions.{IN, OUT, Transactions}
|
||||
|
|
|
@ -28,8 +28,7 @@ import fr.acinq.eclair.blockchain.fee.FeeratesPerKw
|
|||
import fr.acinq.eclair.channel.Channel.LocalError
|
||||
import fr.acinq.eclair.channel._
|
||||
import fr.acinq.eclair.channel.states.StateTestsHelperMethods
|
||||
import fr.acinq.eclair.payment.CommandBuffer
|
||||
import fr.acinq.eclair.payment.CommandBuffer.CommandSend
|
||||
import fr.acinq.eclair.payment.relay.CommandBuffer
|
||||
import fr.acinq.eclair.router.Announcements
|
||||
import fr.acinq.eclair.transactions.Transactions.HtlcSuccessTx
|
||||
import fr.acinq.eclair.wire._
|
||||
|
@ -431,7 +430,7 @@ class OfflineStateSpec extends TestkitBaseClass with StateTestsHelperMethods {
|
|||
|
||||
// We simulate a pending fulfill on that HTLC but not relayed.
|
||||
// When it is close to expiring upstream, we should close the channel.
|
||||
sender.send(commandBuffer, CommandSend(htlc.channelId, htlc.id, CMD_FULFILL_HTLC(htlc.id, r, commit = true)))
|
||||
sender.send(commandBuffer, CommandBuffer.CommandSend(htlc.channelId, htlc.id, CMD_FULFILL_HTLC(htlc.id, r, commit = true)))
|
||||
sender.send(bob, CurrentBlockCount((htlc.cltvExpiry - bob.underlyingActor.nodeParams.fulfillSafetyBeforeTimeoutBlocks).toLong))
|
||||
|
||||
val ChannelErrorOccurred(_, _, _, _, LocalError(err), isFatal) = listener.expectMsgType[ChannelErrorOccurred]
|
||||
|
@ -465,7 +464,7 @@ class OfflineStateSpec extends TestkitBaseClass with StateTestsHelperMethods {
|
|||
|
||||
// We simulate a pending failure on that HTLC.
|
||||
// Even if we get close to expiring upstream we shouldn't close the channel, because we have nothing to lose.
|
||||
sender.send(commandBuffer, CommandSend(htlc.channelId, htlc.id, CMD_FAIL_HTLC(htlc.id, Right(IncorrectOrUnknownPaymentDetails(0 msat, 0)))))
|
||||
sender.send(commandBuffer, CommandBuffer.CommandSend(htlc.channelId, htlc.id, CMD_FAIL_HTLC(htlc.id, Right(IncorrectOrUnknownPaymentDetails(0 msat, 0)))))
|
||||
sender.send(bob, CurrentBlockCount((htlc.cltvExpiry - bob.underlyingActor.nodeParams.fulfillSafetyBeforeTimeoutBlocks).toLong))
|
||||
|
||||
bob2blockchain.expectNoMsg(250 millis)
|
||||
|
|
|
@ -26,9 +26,9 @@ import fr.acinq.eclair.blockchain._
|
|||
import fr.acinq.eclair.blockchain.fee.FeeratesPerKw
|
||||
import fr.acinq.eclair.channel._
|
||||
import fr.acinq.eclair.channel.states.StateTestsHelperMethods
|
||||
import fr.acinq.eclair.payment.Relayer._
|
||||
import fr.acinq.eclair.payment._
|
||||
import fr.acinq.eclair.payment.send.PaymentLifecycle
|
||||
import fr.acinq.eclair.payment.relay.Relayer._
|
||||
import fr.acinq.eclair.payment.relay.{CommandBuffer, Origin}
|
||||
import fr.acinq.eclair.router.ChannelHop
|
||||
import fr.acinq.eclair.wire.Onion.FinalLegacyPayload
|
||||
import fr.acinq.eclair.wire.{CommitSig, Error, FailureMessageCodecs, PermanentChannelFailure, RevokeAndAck, Shutdown, UpdateAddHtlc, UpdateFailHtlc, UpdateFailMalformedHtlc, UpdateFee, UpdateFulfillHtlc}
|
||||
|
|
|
@ -28,7 +28,7 @@ import fr.acinq.eclair.blockchain.fee.FeeratesPerKw
|
|||
import fr.acinq.eclair.channel.Helpers.Closing
|
||||
import fr.acinq.eclair.channel._
|
||||
import fr.acinq.eclair.channel.states.StateTestsHelperMethods
|
||||
import fr.acinq.eclair.payment.Origin
|
||||
import fr.acinq.eclair.payment.relay.Origin
|
||||
import fr.acinq.eclair.wire.{ClosingSigned, Error, Shutdown}
|
||||
import fr.acinq.eclair.{CltvExpiry, LongToBtcAmount, TestConstants, TestkitBaseClass}
|
||||
import org.scalatest.{Outcome, Tag}
|
||||
|
|
|
@ -29,7 +29,8 @@ import fr.acinq.eclair.channel.Helpers.Closing
|
|||
import fr.acinq.eclair.channel.states.StateTestsHelperMethods
|
||||
import fr.acinq.eclair.channel.{Data, State, _}
|
||||
import fr.acinq.eclair.payment._
|
||||
import fr.acinq.eclair.payment.Relayer._
|
||||
import fr.acinq.eclair.payment.relay.Relayer._
|
||||
import fr.acinq.eclair.payment.relay.{CommandBuffer, Origin}
|
||||
import fr.acinq.eclair.transactions.{Scripts, Transactions}
|
||||
import fr.acinq.eclair.wire._
|
||||
import fr.acinq.eclair.{CltvExpiry, LongToBtcAmount, TestConstants, TestkitBaseClass, randomBytes32}
|
||||
|
|
|
@ -35,7 +35,7 @@ import fr.acinq.eclair.crypto.Sphinx.DecryptedFailurePacket
|
|||
import fr.acinq.eclair.db.{IncomingPayment, IncomingPaymentStatus, OutgoingPaymentStatus}
|
||||
import fr.acinq.eclair.io.Peer
|
||||
import fr.acinq.eclair.io.Peer.{Disconnect, PeerRoutingMessage}
|
||||
import fr.acinq.eclair.payment.Relayer.{GetOutgoingChannels, OutgoingChannels}
|
||||
import fr.acinq.eclair.payment.relay.Relayer.{GetOutgoingChannels, OutgoingChannels}
|
||||
import fr.acinq.eclair.payment._
|
||||
import fr.acinq.eclair.payment.receive.MultiPartHandler.ReceivePayment
|
||||
import fr.acinq.eclair.payment.receive.{ForwardHandler, PaymentHandler}
|
||||
|
|
|
@ -20,7 +20,8 @@ import fr.acinq.bitcoin.Crypto.PublicKey
|
|||
import fr.acinq.bitcoin.{Block, ByteVector32}
|
||||
import fr.acinq.eclair.channel.{CMD_ADD_HTLC, CMD_FAIL_HTLC, Upstream}
|
||||
import fr.acinq.eclair.payment.PaymentPacketSpec.makeCommitments
|
||||
import fr.acinq.eclair.payment.Relayer.{OutgoingChannel, RelayFailure, RelaySuccess}
|
||||
import fr.acinq.eclair.payment.relay.ChannelRelayer.{RelayFailure, RelaySuccess, relayOrFail, selectPreferredChannel}
|
||||
import fr.acinq.eclair.payment.relay.Relayer.OutgoingChannel
|
||||
import fr.acinq.eclair.router.Announcements
|
||||
import fr.acinq.eclair.wire.Onion.RelayLegacyPayload
|
||||
import fr.acinq.eclair.wire._
|
||||
|
@ -50,24 +51,24 @@ class ChannelSelectionSpec extends FunSuite {
|
|||
val channelUpdate = dummyUpdate(ShortChannelId(12345), CltvExpiryDelta(10), 100 msat, 1000 msat, 100, 10000000 msat, true)
|
||||
|
||||
// nominal case
|
||||
assert(Relayer.relayOrFail(relayPayload, Some(channelUpdate)) === RelaySuccess(ShortChannelId(12345), CMD_ADD_HTLC(relayPayload.payload.amountToForward, relayPayload.add.paymentHash, relayPayload.payload.outgoingCltv, relayPayload.nextPacket, Upstream.Relayed(relayPayload.add), commit = true)))
|
||||
assert(relayOrFail(relayPayload, Some(channelUpdate)) === RelaySuccess(ShortChannelId(12345), CMD_ADD_HTLC(relayPayload.payload.amountToForward, relayPayload.add.paymentHash, relayPayload.payload.outgoingCltv, relayPayload.nextPacket, Upstream.Relayed(relayPayload.add), commit = true)))
|
||||
// no channel_update
|
||||
assert(Relayer.relayOrFail(relayPayload, channelUpdate_opt = None) === RelayFailure(CMD_FAIL_HTLC(relayPayload.add.id, Right(UnknownNextPeer), commit = true)))
|
||||
assert(relayOrFail(relayPayload, channelUpdate_opt = None) === RelayFailure(CMD_FAIL_HTLC(relayPayload.add.id, Right(UnknownNextPeer), commit = true)))
|
||||
// channel disabled
|
||||
val channelUpdate_disabled = channelUpdate.copy(channelFlags = Announcements.makeChannelFlags(isNode1 = true, enable = false))
|
||||
assert(Relayer.relayOrFail(relayPayload, Some(channelUpdate_disabled)) === RelayFailure(CMD_FAIL_HTLC(relayPayload.add.id, Right(ChannelDisabled(channelUpdate_disabled.messageFlags, channelUpdate_disabled.channelFlags, channelUpdate_disabled)), commit = true)))
|
||||
assert(relayOrFail(relayPayload, Some(channelUpdate_disabled)) === RelayFailure(CMD_FAIL_HTLC(relayPayload.add.id, Right(ChannelDisabled(channelUpdate_disabled.messageFlags, channelUpdate_disabled.channelFlags, channelUpdate_disabled)), commit = true)))
|
||||
// amount too low
|
||||
val relayPayload_toolow = relayPayload.copy(payload = onionPayload.copy(amountToForward = 99 msat))
|
||||
assert(Relayer.relayOrFail(relayPayload_toolow, Some(channelUpdate)) === RelayFailure(CMD_FAIL_HTLC(relayPayload.add.id, Right(AmountBelowMinimum(relayPayload_toolow.payload.amountToForward, channelUpdate)), commit = true)))
|
||||
assert(relayOrFail(relayPayload_toolow, Some(channelUpdate)) === RelayFailure(CMD_FAIL_HTLC(relayPayload.add.id, Right(AmountBelowMinimum(relayPayload_toolow.payload.amountToForward, channelUpdate)), commit = true)))
|
||||
// incorrect cltv expiry
|
||||
val relayPayload_incorrectcltv = relayPayload.copy(payload = onionPayload.copy(outgoingCltv = CltvExpiry(42)))
|
||||
assert(Relayer.relayOrFail(relayPayload_incorrectcltv, Some(channelUpdate)) === RelayFailure(CMD_FAIL_HTLC(relayPayload.add.id, Right(IncorrectCltvExpiry(relayPayload_incorrectcltv.payload.outgoingCltv, channelUpdate)), commit = true)))
|
||||
assert(relayOrFail(relayPayload_incorrectcltv, Some(channelUpdate)) === RelayFailure(CMD_FAIL_HTLC(relayPayload.add.id, Right(IncorrectCltvExpiry(relayPayload_incorrectcltv.payload.outgoingCltv, channelUpdate)), commit = true)))
|
||||
// insufficient fee
|
||||
val relayPayload_insufficientfee = relayPayload.copy(payload = onionPayload.copy(amountToForward = 998910 msat))
|
||||
assert(Relayer.relayOrFail(relayPayload_insufficientfee, Some(channelUpdate)) === RelayFailure(CMD_FAIL_HTLC(relayPayload.add.id, Right(FeeInsufficient(relayPayload_insufficientfee.add.amountMsat, channelUpdate)), commit = true)))
|
||||
assert(relayOrFail(relayPayload_insufficientfee, Some(channelUpdate)) === RelayFailure(CMD_FAIL_HTLC(relayPayload.add.id, Right(FeeInsufficient(relayPayload_insufficientfee.add.amountMsat, channelUpdate)), commit = true)))
|
||||
// note that a generous fee is ok!
|
||||
val relayPayload_highfee = relayPayload.copy(payload = onionPayload.copy(amountToForward = 900000 msat))
|
||||
assert(Relayer.relayOrFail(relayPayload_highfee, Some(channelUpdate)) === RelaySuccess(ShortChannelId(12345), CMD_ADD_HTLC(relayPayload_highfee.payload.amountToForward, relayPayload_highfee.add.paymentHash, relayPayload_highfee.payload.outgoingCltv, relayPayload_highfee.nextPacket, Upstream.Relayed(relayPayload.add), commit = true)))
|
||||
assert(relayOrFail(relayPayload_highfee, Some(channelUpdate)) === RelaySuccess(ShortChannelId(12345), CMD_ADD_HTLC(relayPayload_highfee.payload.amountToForward, relayPayload_highfee.add.paymentHash, relayPayload_highfee.payload.outgoingCltv, relayPayload_highfee.nextPacket, Upstream.Relayed(relayPayload.add), commit = true)))
|
||||
}
|
||||
|
||||
test("channel selection") {
|
||||
|
@ -94,21 +95,21 @@ class ChannelSelectionSpec extends FunSuite {
|
|||
node2channels.put(b, mutable.Set(ShortChannelId(44444)))
|
||||
|
||||
// select the channel to the same node, with the lowest balance but still high enough to handle the payment
|
||||
assert(Relayer.selectPreferredChannel(relayPayload, channelUpdates, node2channels, Seq.empty) === Some(ShortChannelId(22222)))
|
||||
assert(selectPreferredChannel(relayPayload, channelUpdates, node2channels, Seq.empty) === Some(ShortChannelId(22222)))
|
||||
// select 2nd-to-best channel
|
||||
assert(Relayer.selectPreferredChannel(relayPayload, channelUpdates, node2channels, Seq(ShortChannelId(22222))) === Some(ShortChannelId(12345)))
|
||||
assert(selectPreferredChannel(relayPayload, channelUpdates, node2channels, Seq(ShortChannelId(22222))) === Some(ShortChannelId(12345)))
|
||||
// select 3rd-to-best channel
|
||||
assert(Relayer.selectPreferredChannel(relayPayload, channelUpdates, node2channels, Seq(ShortChannelId(22222), ShortChannelId(12345))) === Some(ShortChannelId(11111)))
|
||||
assert(selectPreferredChannel(relayPayload, channelUpdates, node2channels, Seq(ShortChannelId(22222), ShortChannelId(12345))) === Some(ShortChannelId(11111)))
|
||||
// all the suitable channels have been tried
|
||||
assert(Relayer.selectPreferredChannel(relayPayload, channelUpdates, node2channels, Seq(ShortChannelId(22222), ShortChannelId(12345), ShortChannelId(11111))) === None)
|
||||
assert(selectPreferredChannel(relayPayload, channelUpdates, node2channels, Seq(ShortChannelId(22222), ShortChannelId(12345), ShortChannelId(11111))) === None)
|
||||
// higher amount payment (have to increased incoming htlc amount for fees to be sufficient)
|
||||
assert(Relayer.selectPreferredChannel(relayPayload.copy(add = relayPayload.add.copy(amountMsat = 60000000 msat), payload = onionPayload.copy(amountToForward = 50000000 msat)), channelUpdates, node2channels, Seq.empty) === Some(ShortChannelId(11111)))
|
||||
assert(selectPreferredChannel(relayPayload.copy(add = relayPayload.add.copy(amountMsat = 60000000 msat), payload = onionPayload.copy(amountToForward = 50000000 msat)), channelUpdates, node2channels, Seq.empty) === Some(ShortChannelId(11111)))
|
||||
// lower amount payment
|
||||
assert(Relayer.selectPreferredChannel(relayPayload.copy(payload = onionPayload.copy(amountToForward = 1000 msat)), channelUpdates, node2channels, Seq.empty) === Some(ShortChannelId(33333)))
|
||||
assert(selectPreferredChannel(relayPayload.copy(payload = onionPayload.copy(amountToForward = 1000 msat)), channelUpdates, node2channels, Seq.empty) === Some(ShortChannelId(33333)))
|
||||
// payment too high, no suitable channel found
|
||||
assert(Relayer.selectPreferredChannel(relayPayload.copy(payload = onionPayload.copy(amountToForward = 1000000000 msat)), channelUpdates, node2channels, Seq.empty) === Some(ShortChannelId(12345)))
|
||||
assert(selectPreferredChannel(relayPayload.copy(payload = onionPayload.copy(amountToForward = 1000000000 msat)), channelUpdates, node2channels, Seq.empty) === Some(ShortChannelId(12345)))
|
||||
// invalid cltv expiry, no suitable channel, we keep the requested one
|
||||
assert(Relayer.selectPreferredChannel(relayPayload.copy(payload = onionPayload.copy(outgoingCltv = CltvExpiry(40))), channelUpdates, node2channels, Seq.empty) === Some(ShortChannelId(12345)))
|
||||
assert(selectPreferredChannel(relayPayload.copy(payload = onionPayload.copy(outgoingCltv = CltvExpiry(40))), channelUpdates, node2channels, Seq.empty) === Some(ShortChannelId(12345)))
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -29,7 +29,7 @@ import fr.acinq.eclair.channel.Commitments
|
|||
import fr.acinq.eclair.channel.Helpers.Funding
|
||||
import fr.acinq.eclair.crypto.Sphinx
|
||||
import fr.acinq.eclair.payment.PaymentSent.PartialPayment
|
||||
import fr.acinq.eclair.payment.Relayer.{GetOutgoingChannels, OutgoingChannel, OutgoingChannels}
|
||||
import fr.acinq.eclair.payment.relay.Relayer.{GetOutgoingChannels, OutgoingChannel, OutgoingChannels}
|
||||
import fr.acinq.eclair.payment.send.MultiPartPaymentLifecycle
|
||||
import fr.acinq.eclair.payment.send.MultiPartPaymentLifecycle._
|
||||
import fr.acinq.eclair.payment.send.PaymentInitiator.SendPaymentConfig
|
||||
|
|
|
@ -30,7 +30,7 @@ import fr.acinq.eclair.channel.{AddHtlcFailed, Channel, ChannelUnavailable}
|
|||
import fr.acinq.eclair.crypto.Sphinx
|
||||
import fr.acinq.eclair.db.{OutgoingPayment, OutgoingPaymentStatus}
|
||||
import fr.acinq.eclair.io.Peer.PeerRoutingMessage
|
||||
import fr.acinq.eclair.payment.Origin.Local
|
||||
import fr.acinq.eclair.payment.relay.Origin.Local
|
||||
import fr.acinq.eclair.payment.PaymentRequest.ExtraHop
|
||||
import fr.acinq.eclair.payment.PaymentSent.PartialPayment
|
||||
import fr.acinq.eclair.payment.send.PaymentInitiator.{SendPaymentConfig, SendPaymentRequest}
|
||||
|
|
|
@ -25,9 +25,10 @@ import fr.acinq.eclair.channel._
|
|||
import fr.acinq.eclair.crypto.Sphinx
|
||||
import fr.acinq.eclair.db.{OutgoingPayment, OutgoingPaymentStatus}
|
||||
import fr.acinq.eclair.payment.IncomingPacket.FinalPacket
|
||||
import fr.acinq.eclair.payment.Origin._
|
||||
import fr.acinq.eclair.payment.relay.Origin._
|
||||
import fr.acinq.eclair.payment.OutgoingPacket.{buildCommand, buildOnion, buildPacket}
|
||||
import fr.acinq.eclair.payment.Relayer._
|
||||
import fr.acinq.eclair.payment.relay.{Origin, Relayer}
|
||||
import fr.acinq.eclair.payment.relay.Relayer._
|
||||
import fr.acinq.eclair.router.{Announcements, ChannelHop, NodeHop}
|
||||
import fr.acinq.eclair.wire.Onion.{ChannelRelayTlvPayload, FinalLegacyPayload, FinalTlvPayload, PerHopPayload}
|
||||
import fr.acinq.eclair.wire._
|
||||
|
@ -231,7 +232,7 @@ class RelayerSpec extends TestkitBaseClass {
|
|||
assert(fwd1.shortChannelId === channelUpdate_bc.shortChannelId)
|
||||
assert(fwd1.message.upstream === Upstream.Relayed(add_ab))
|
||||
|
||||
sender.send(relayer, Status.Failure(Register.ForwardShortIdFailure(fwd1)))
|
||||
register.send(register.lastSender, Status.Failure(Register.ForwardShortIdFailure(fwd1)))
|
||||
|
||||
val fwd2 = register.expectMsgType[Register.Forward[CMD_FAIL_HTLC]]
|
||||
assert(fwd2.channelId === channelId_ab)
|
||||
|
|
|
@ -27,7 +27,7 @@ import fr.acinq.bitcoin.{Block, ByteVector32, ByteVector64, Crypto, Deterministi
|
|||
import fr.acinq.eclair.channel.Helpers.Funding
|
||||
import fr.acinq.eclair.channel._
|
||||
import fr.acinq.eclair.crypto.{LocalKeyManager, ShaChain}
|
||||
import fr.acinq.eclair.payment.Origin.{Relayed, Local}
|
||||
import fr.acinq.eclair.payment.relay.Origin.{Local, Relayed}
|
||||
import fr.acinq.eclair.router.Announcements
|
||||
import fr.acinq.eclair.transactions.Transactions.{CommitTx, InputInfo, TransactionWithInputInfo}
|
||||
import fr.acinq.eclair.transactions._
|
||||
|
|
|
@ -33,7 +33,7 @@ import fr.acinq.eclair._
|
|||
import fr.acinq.eclair.db.{IncomingPayment, IncomingPaymentStatus, OutgoingPayment, OutgoingPaymentStatus}
|
||||
import fr.acinq.eclair.io.NodeURI
|
||||
import fr.acinq.eclair.io.Peer.PeerInfo
|
||||
import fr.acinq.eclair.payment.Relayer.UsableBalance
|
||||
import fr.acinq.eclair.payment.relay.Relayer.UsableBalance
|
||||
import fr.acinq.eclair.payment.{PaymentFailed, _}
|
||||
import fr.acinq.eclair.wire.NodeAddress
|
||||
import org.mockito.scalatest.IdiomaticMockito
|
||||
|
|
Loading…
Add table
Reference in a new issue