1
0
Fork 0
mirror of https://github.com/ACINQ/eclair.git synced 2025-03-13 19:37:35 +01:00

Add local reputation

This commit is contained in:
Thomas HUET 2023-06-12 16:25:15 +02:00
parent 7aacd4b460
commit 83a56271f5
23 changed files with 664 additions and 84 deletions

View file

@ -24,6 +24,25 @@ Existing `static_remote_key` channels will continue to work. You can override th
Eclair will not allow remote peers to open new obsolete channels that do not support `option_static_remotekey`.
### Local reputation and HTLC endorsement
To protect against jamming attacks, eclair gives a reputation to its neighbors and uses to decide if a HTLC should be relayed given how congested is the outgoing channel.
The reputation is basically how much this node paid us in fees divided by how much they should have paid us for the liquidity and slots that they blocked.
The reputation is per incoming node and endorsement level.
The confidence that the HTLC will be fulfilled is transmitted to the next node using the endorsement TLV of the `update_add_htlc` message.
To configure, edit `eclair.conf`:
```eclair.conf
eclair.local-reputation {
# Reputation decays with the following half life to emphasize recent behavior.
half-life = 7 days
# HTLCs that stay pending for longer than this get penalized
good-htlc-duration = 12 seconds
# How much to penalize pending HLTCs. A pending HTLC is considered equivalent to this many fast-failing HTLCs.
pending-multiplier = 1000
}
```
### API changes
<insert changes>

View file

@ -547,6 +547,15 @@ eclair {
enabled = true // enable automatic purges of expired invoices from the database
interval = 24 hours // interval between expired invoice purges
}
local-reputation {
# Reputation decays with the following half life to emphasize recent behavior.
half-life = 7 days
# HTLCs that stay pending for longer than this get penalized
good-htlc-duration = 12 seconds # 95% of successful payments settle in less than 12 seconds, only the slowest 5% will be penalized.
# How much to penalize pending HLTCs. A pending HTLC is considered equivalent to this many fast-failing HTLCs.
pending-multiplier = 1000
}
}
akka {

View file

@ -31,6 +31,7 @@ import fr.acinq.eclair.io.MessageRelay.{RelayAll, RelayChannelsOnly, RelayPolicy
import fr.acinq.eclair.io.PeerConnection
import fr.acinq.eclair.message.OnionMessages.OnionMessageConfig
import fr.acinq.eclair.payment.relay.Relayer.{AsyncPaymentsParams, RelayFees, RelayParams}
import fr.acinq.eclair.reputation.Reputation.ReputationConfig
import fr.acinq.eclair.router.Announcements.AddressException
import fr.acinq.eclair.router.Graph.{HeuristicsConstants, WeightRatios}
import fr.acinq.eclair.router.Router._
@ -87,7 +88,8 @@ case class NodeParams(nodeKeyManager: NodeKeyManager,
blockchainWatchdogSources: Seq[String],
onionMessageConfig: OnionMessageConfig,
purgeInvoicesInterval: Option[FiniteDuration],
revokedHtlcInfoCleanerConfig: RevokedHtlcInfoCleaner.Config) {
revokedHtlcInfoCleanerConfig: RevokedHtlcInfoCleaner.Config,
localReputationConfig: ReputationConfig) {
val privateKey: Crypto.PrivateKey = nodeKeyManager.nodeKey.privateKey
val nodeId: PublicKey = nodeKeyManager.nodeId
@ -611,7 +613,12 @@ object NodeParams extends Logging {
revokedHtlcInfoCleanerConfig = RevokedHtlcInfoCleaner.Config(
batchSize = config.getInt("db.revoked-htlc-info-cleaner.batch-size"),
interval = FiniteDuration(config.getDuration("db.revoked-htlc-info-cleaner.interval").getSeconds, TimeUnit.SECONDS)
)
),
localReputationConfig = ReputationConfig(
FiniteDuration(config.getDuration("local-reputation.half-life").getSeconds, TimeUnit.SECONDS),
FiniteDuration(config.getDuration("local-reputation.good-htlc-duration").getSeconds, TimeUnit.SECONDS),
config.getDouble("local-reputation.pending-multiplier"),
),
)
}
}

View file

@ -44,6 +44,7 @@ import fr.acinq.eclair.payment.offer.OfferManager
import fr.acinq.eclair.payment.receive.PaymentHandler
import fr.acinq.eclair.payment.relay.{AsyncPaymentTriggerer, PostRestartHtlcCleaner, Relayer}
import fr.acinq.eclair.payment.send.{Autoprobe, PaymentInitiator}
import fr.acinq.eclair.reputation.ReputationRecorder
import fr.acinq.eclair.router._
import fr.acinq.eclair.tor.{Controller, TorProtocolHandler}
import fr.acinq.eclair.wire.protocol.NodeAddress
@ -360,7 +361,8 @@ class Setup(val datadir: File,
offerManager = system.spawn(Behaviors.supervise(OfferManager(nodeParams, router, paymentTimeout = 1 minute)).onFailure(typed.SupervisorStrategy.resume), name = "offer-manager")
paymentHandler = system.actorOf(SimpleSupervisor.props(PaymentHandler.props(nodeParams, register, offerManager), "payment-handler", SupervisorStrategy.Resume))
triggerer = system.spawn(Behaviors.supervise(AsyncPaymentTriggerer()).onFailure(typed.SupervisorStrategy.resume), name = "async-payment-triggerer")
relayer = system.actorOf(SimpleSupervisor.props(Relayer.props(nodeParams, router, register, paymentHandler, triggerer, Some(postRestartCleanUpInitialized)), "relayer", SupervisorStrategy.Resume))
reputationRecorder = system.spawn(Behaviors.supervise(ReputationRecorder(nodeParams.localReputationConfig, Map.empty)).onFailure(typed.SupervisorStrategy.resume), name = "reputation-recorder")
relayer = system.actorOf(SimpleSupervisor.props(Relayer.props(nodeParams, router, register, paymentHandler, triggerer, reputationRecorder, Some(postRestartCleanUpInitialized)), "relayer", SupervisorStrategy.Resume))
_ = relayer ! PostRestartHtlcCleaner.Init(channels)
// Before initializing the switchboard (which re-connects us to the network) and the user-facing parts of the system,
// we want to make sure the handler for post-restart broken HTLCs has finished initializing.

View file

@ -118,6 +118,8 @@ case class ExpiryTooBig (override val channelId: Byte
case class HtlcValueTooSmall (override val channelId: ByteVector32, minimum: MilliSatoshi, actual: MilliSatoshi) extends ChannelException(channelId, s"htlc value too small: minimum=$minimum actual=$actual")
case class HtlcValueTooHighInFlight (override val channelId: ByteVector32, maximum: MilliSatoshi, actual: MilliSatoshi) extends ChannelException(channelId, s"in-flight htlcs hold too much value: maximum=$maximum actual=$actual")
case class TooManyAcceptedHtlcs (override val channelId: ByteVector32, maximum: Long) extends ChannelException(channelId, s"too many accepted htlcs: maximum=$maximum")
case class TooManySmallHtlcs (override val channelId: ByteVector32, number: Long, below: MilliSatoshi) extends ChannelException(channelId, s"too many small htlcs: $number HTLCs below $below")
case class ConfidenceTooLow (override val channelId: ByteVector32, confidence: Double, occupancy: Double) extends ChannelException(channelId, s"confidence too low: confidence=$confidence occupancy=$occupancy")
case class LocalDustHtlcExposureTooHigh (override val channelId: ByteVector32, maximum: Satoshi, actual: MilliSatoshi) extends ChannelException(channelId, s"dust htlcs hold too much value: maximum=$maximum actual=$actual")
case class RemoteDustHtlcExposureTooHigh (override val channelId: ByteVector32, maximum: Satoshi, actual: MilliSatoshi) extends ChannelException(channelId, s"dust htlcs hold too much value: maximum=$maximum actual=$actual")
case class InsufficientFunds (override val channelId: ByteVector32, amount: MilliSatoshi, missing: Satoshi, reserve: Satoshi, fees: Satoshi) extends ChannelException(channelId, s"insufficient funds: missing=$missing reserve=$reserve fees=$fees")

View file

@ -429,7 +429,7 @@ case class Commitment(fundingTxIndex: Long,
localCommit.spec.htlcs.collect(DirectedHtlc.incoming).filter(nearlyExpired)
}
def canSendAdd(amount: MilliSatoshi, params: ChannelParams, changes: CommitmentChanges, feerates: FeeratesPerKw, feeConf: OnChainFeeConf): Either[ChannelException, Unit] = {
def canSendAdd(amount: MilliSatoshi, params: ChannelParams, changes: CommitmentChanges, feerates: FeeratesPerKw, feeConf: OnChainFeeConf, confidence: Double): Either[ChannelException, Unit] = {
// we allowed mismatches between our feerates and our remote's as long as commitments didn't contain any HTLC at risk
// we need to verify that we're not disagreeing on feerates anymore before offering new HTLCs
// NB: there may be a pending update_fee that hasn't been applied yet that needs to be taken into account
@ -488,7 +488,8 @@ case class Commitment(fundingTxIndex: Long,
if (allowedHtlcValueInFlight < htlcValueInFlight) {
return Left(HtlcValueTooHighInFlight(params.channelId, maximum = allowedHtlcValueInFlight, actual = htlcValueInFlight))
}
if (Seq(params.localParams.maxAcceptedHtlcs, params.remoteParams.maxAcceptedHtlcs).min < outgoingHtlcs.size) {
val maxAcceptedHtlcs = params.localParams.maxAcceptedHtlcs.min(params.remoteParams.maxAcceptedHtlcs)
if (maxAcceptedHtlcs < outgoingHtlcs.size) {
return Left(TooManyAcceptedHtlcs(params.channelId, maximum = Seq(params.localParams.maxAcceptedHtlcs, params.remoteParams.maxAcceptedHtlcs).min))
}
@ -505,6 +506,18 @@ case class Commitment(fundingTxIndex: Long,
return Left(RemoteDustHtlcExposureTooHigh(params.channelId, maxDustExposure, remoteDustExposureAfterAdd))
}
// Jamming protection
// Must be the last checks so that they can be ignored for shadow deployment.
for ((amountMsat, i) <- outgoingHtlcs.toSeq.map(_.amountMsat).sorted.zipWithIndex) {
if ((amountMsat.toLong < 1) || (math.log(amountMsat.toLong.toDouble) * maxAcceptedHtlcs / math.log(params.localParams.maxHtlcValueInFlightMsat.toLong.toDouble / maxAcceptedHtlcs) < i)) {
return Left(TooManySmallHtlcs(params.channelId, number = i + 1, below = amountMsat))
}
}
val occupancy = (outgoingHtlcs.size.toDouble / maxAcceptedHtlcs).max(htlcValueInFlight.toLong.toDouble / allowedHtlcValueInFlight.toLong.toDouble)
if (confidence + 0.05 < occupancy) {
return Left(ConfidenceTooLow(params.channelId, confidence, occupancy))
}
Right(())
}
@ -552,6 +565,14 @@ case class Commitment(fundingTxIndex: Long,
return Left(TooManyAcceptedHtlcs(params.channelId, maximum = params.localParams.maxAcceptedHtlcs))
}
// Jamming protection
// Must be the last checks so that they can be ignored for shadow deployment.
for ((amountMsat, i) <- incomingHtlcs.toSeq.map(_.amountMsat).sorted.zipWithIndex) {
if ((amountMsat.toLong < 1) || (math.log(amountMsat.toLong.toDouble) * params.localParams.maxAcceptedHtlcs / math.log(params.localParams.maxHtlcValueInFlightMsat.toLong.toDouble / params.localParams.maxAcceptedHtlcs) < i)) {
return Left(TooManySmallHtlcs(params.channelId, number = i + 1, below = amountMsat))
}
}
Right(())
}
@ -835,7 +856,7 @@ case class Commitments(params: ChannelParams,
* @param cmd add HTLC command
* @return either Left(failure, error message) where failure is a failure message (see BOLT #4 and the Failure Message class) or Right(new commitments, updateAddHtlc)
*/
def sendAdd(cmd: CMD_ADD_HTLC, currentHeight: BlockHeight, channelConf: ChannelConf, feerates: FeeratesPerKw, feeConf: OnChainFeeConf): Either[ChannelException, (Commitments, UpdateAddHtlc)] = {
def sendAdd(cmd: CMD_ADD_HTLC, currentHeight: BlockHeight, channelConf: ChannelConf, feerates: FeeratesPerKw, feeConf: OnChainFeeConf)(implicit log: LoggingAdapter): Either[ChannelException, (Commitments, UpdateAddHtlc)] = {
// we must ensure we're not relaying htlcs that are already expired, otherwise the downstream channel will instantly close
// NB: we add a 3 blocks safety to reduce the probability of running into this when our bitcoin node is slightly outdated
val minExpiry = CltvExpiry(currentHeight + 3)
@ -859,12 +880,28 @@ case class Commitments(params: ChannelParams,
val changes1 = changes.addLocalProposal(add).copy(localNextHtlcId = changes.localNextHtlcId + 1)
val originChannels1 = originChannels + (add.id -> cmd.origin)
// we verify that this htlc is allowed in every active commitment
active.map(_.canSendAdd(add.amountMsat, params, changes1, feerates, feeConf))
.collectFirst { case Left(f) => Left(f) }
val canSendAdds = active.map(_.canSendAdd(add.amountMsat, params, changes1, feerates, feeConf, cmd.confidence))
// Log only for jamming protection.
canSendAdds.collectFirst {
case Left(f: TooManySmallHtlcs) =>
log.info("TooManySmallHtlcs: {} outgoing HTLCs are below {}}", f.number, f.below)
Metrics.dropHtlc(f, Tags.Directions.Outgoing)
case Left(f: ConfidenceTooLow) =>
log.info("ConfidenceTooLow: confidence is {}% while channel is {}% full", (100 * f.confidence).toInt, (100 * f.occupancy).toInt)
Metrics.dropHtlc(f, Tags.Directions.Outgoing)
}
canSendAdds.flatMap { // TODO: We ignore jamming protection, delete this flatMap to activate jamming protection.
case Left(_: TooManySmallHtlcs) | Left(_: ConfidenceTooLow) => None
case x => Some(x)
}
.collectFirst { case Left(f) =>
Metrics.dropHtlc(f, Tags.Directions.Outgoing)
Left(f)
}
.getOrElse(Right(copy(changes = changes1, originChannels = originChannels1), add))
}
def receiveAdd(add: UpdateAddHtlc, feerates: FeeratesPerKw, feeConf: OnChainFeeConf): Either[ChannelException, Commitments] = {
def receiveAdd(add: UpdateAddHtlc, feerates: FeeratesPerKw, feeConf: OnChainFeeConf)(implicit log: LoggingAdapter): Either[ChannelException, Commitments] = {
if (add.id != changes.remoteNextHtlcId) {
return Left(UnexpectedHtlcId(channelId, expected = changes.remoteNextHtlcId, actual = add.id))
}
@ -877,8 +914,21 @@ case class Commitments(params: ChannelParams,
val changes1 = changes.addRemoteProposal(add).copy(remoteNextHtlcId = changes.remoteNextHtlcId + 1)
// we verify that this htlc is allowed in every active commitment
active.map(_.canReceiveAdd(add.amountMsat, params, changes1, feerates, feeConf))
.collectFirst { case Left(f) => Left(f) }
val canReceiveAdds = active.map(_.canReceiveAdd(add.amountMsat, params, changes1, feerates, feeConf))
// Log only for jamming protection.
canReceiveAdds.collectFirst {
case Left(f: TooManySmallHtlcs) =>
log.info("TooManySmallHtlcs: {} incoming HTLCs are below {}}", f.number, f.below)
Metrics.dropHtlc(f, Tags.Directions.Incoming)
}
canReceiveAdds.flatMap { // TODO: We ignore jamming protection, delete this flatMap to activate jamming protection.
case Left(_: TooManySmallHtlcs) | Left(_: ConfidenceTooLow) => None
case x => Some(x)
}
.collectFirst { case Left(f) =>
Metrics.dropHtlc(f, Tags.Directions.Incoming)
Left(f)
}
.getOrElse(Right(copy(changes = changes1)))
}

View file

@ -35,6 +35,7 @@ object Monitoring {
val RemoteFeeratePerByte = Kamon.histogram("channels.remote-feerate-per-byte")
val Splices = Kamon.histogram("channels.splices", "Splices")
val ProcessMessage = Kamon.timer("channels.messages-processed")
val HtlcDropped = Kamon.counter("channels.htlc-dropped")
def recordHtlcsInFlight(remoteSpec: CommitmentSpec, previousRemoteSpec: CommitmentSpec): Unit = {
for (direction <- Tags.Directions.Incoming :: Tags.Directions.Outgoing :: Nil) {
@ -75,6 +76,10 @@ object Monitoring {
Metrics.Splices.withTag(Tags.Origin, Tags.Origins.Remote).withTag(Tags.SpliceType, Tags.SpliceTypes.SpliceCpfp).record(Math.abs(fundingParams.remoteContribution.toLong))
}
}
def dropHtlc(reason: ChannelException, direction: String): Unit = {
HtlcDropped.withTag(Tags.Reason, reason.getClass.getSimpleName).withTag(Tags.Direction, direction).increment()
}
}
object Tags {
@ -85,6 +90,7 @@ object Monitoring {
val State = "state"
val CommitmentFormat = "commitment-format"
val SpliceType = "splice-type"
val Reason = "reason"
object Events {
val Created = "created"

View file

@ -29,6 +29,8 @@ import fr.acinq.eclair.db.PendingCommandsDb
import fr.acinq.eclair.payment.Monitoring.{Metrics, Tags}
import fr.acinq.eclair.payment.relay.Relayer.{OutgoingChannel, OutgoingChannelParams}
import fr.acinq.eclair.payment.{ChannelPaymentRelayed, IncomingPaymentPacket}
import fr.acinq.eclair.reputation.ReputationRecorder
import fr.acinq.eclair.reputation.ReputationRecorder.{CancelRelay, GetConfidence, RecordResult}
import fr.acinq.eclair.wire.protocol.FailureMessageCodecs.createBadOnionFailure
import fr.acinq.eclair.wire.protocol.PaymentOnion.IntermediatePayload
import fr.acinq.eclair.wire.protocol._
@ -43,7 +45,7 @@ object ChannelRelay {
// @formatter:off
sealed trait Command
private case object DoRelay extends Command
private case class WrappedConfidence(confidence: Double) extends Command
private case class WrappedForwardFailure(failure: Register.ForwardFailure[CMD_ADD_HTLC]) extends Command
private case class WrappedAddResponse(res: CommandResponse[CMD_ADD_HTLC]) extends Command
// @formatter:on
@ -56,6 +58,7 @@ object ChannelRelay {
def apply(nodeParams: NodeParams,
register: ActorRef,
reputationRecorder: typed.ActorRef[ReputationRecorder.StandardCommand],
channels: Map[ByteVector32, Relayer.OutgoingChannel],
originNode:PublicKey,
relayId: UUID,
@ -67,9 +70,11 @@ object ChannelRelay {
paymentHash_opt = Some(r.add.paymentHash),
nodeAlias_opt = Some(nodeParams.alias))) {
val upstream = Upstream.Hot.Channel(r.add.removeUnknownTlvs(), TimestampMilli.now(), originNode)
context.self ! DoRelay
val confidence = (r.add.endorsement + 0.5) / 8
new ChannelRelay(nodeParams, register, channels, r, upstream, confidence, context).relay(Seq.empty)
reputationRecorder ! GetConfidence(context.messageAdapter[ReputationRecorder.Confidence](confidence => WrappedConfidence(confidence.value)), originNode, r.add.endorsement, relayId, r.relayFeeMsat)
Behaviors.receiveMessagePartial {
case WrappedConfidence(confidence) =>
new ChannelRelay(nodeParams, register, reputationRecorder, channels, r, upstream, confidence, context, relayId).relay(Seq.empty)
}
}
}
@ -110,11 +115,13 @@ object ChannelRelay {
*/
class ChannelRelay private(nodeParams: NodeParams,
register: ActorRef,
reputationRecorder: typed.ActorRef[ReputationRecorder.StandardCommand],
channels: Map[ByteVector32, Relayer.OutgoingChannel],
r: IncomingPaymentPacket.ChannelRelayPacket,
upstream: Upstream.Hot.Channel,
confidence: Double,
context: ActorContext[ChannelRelay.Command]) {
context: ActorContext[ChannelRelay.Command],
relayId: UUID) {
import ChannelRelay._
@ -124,8 +131,6 @@ class ChannelRelay private(nodeParams: NodeParams,
private case class PreviouslyTried(channelId: ByteVector32, failure: RES_ADD_FAILED[ChannelException])
def relay(previousFailures: Seq[PreviouslyTried]): Behavior[Command] = {
Behaviors.receiveMessagePartial {
case DoRelay =>
if (previousFailures.isEmpty) {
context.log.info("relaying htlc #{} from channelId={} to requestedShortChannelId={} nextNode={}", r.add.id, r.add.channelId, r.payload.outgoingChannelId, nextNodeId_opt.getOrElse(""))
}
@ -134,13 +139,13 @@ class ChannelRelay private(nodeParams: NodeParams,
case RelayFailure(cmdFail) =>
Metrics.recordPaymentRelayFailed(Tags.FailureType(cmdFail), Tags.RelayType.Channel)
context.log.info("rejecting htlc reason={}", cmdFail.reason)
reputationRecorder ! CancelRelay(upstream.receivedFrom, r.add.endorsement, relayId)
safeSendAndStop(r.add.channelId, cmdFail)
case RelaySuccess(selectedChannelId, cmdAdd) =>
context.log.info("forwarding htlc #{} from channelId={} to channelId={}", r.add.id, r.add.channelId, selectedChannelId)
register ! Register.Forward(forwardFailureAdapter, selectedChannelId, cmdAdd)
waitForAddResponse(selectedChannelId, previousFailures)
}
}
}
def waitForAddResponse(selectedChannelId: ByteVector32, previousFailures: Seq[PreviouslyTried]): Behavior[Command] =
@ -149,11 +154,11 @@ class ChannelRelay private(nodeParams: NodeParams,
context.log.warn(s"couldn't resolve downstream channel $channelId, failing htlc #${upstream.add.id}")
val cmdFail = CMD_FAIL_HTLC(upstream.add.id, Right(UnknownNextPeer()), commit = true)
Metrics.recordPaymentRelayFailed(Tags.FailureType(cmdFail), Tags.RelayType.Channel)
reputationRecorder ! CancelRelay(upstream.receivedFrom, r.add.endorsement, relayId)
safeSendAndStop(upstream.add.channelId, cmdFail)
case WrappedAddResponse(addFailed: RES_ADD_FAILED[_]) =>
context.log.info("attempt failed with reason={}", addFailed.t.getClass.getSimpleName)
context.self ! DoRelay
relay(previousFailures :+ PreviouslyTried(selectedChannelId, addFailed))
case WrappedAddResponse(r: RES_SUCCESS[_]) =>
@ -325,9 +330,11 @@ class ChannelRelay private(nodeParams: NodeParams,
}
}
private def recordRelayDuration(isSuccess: Boolean): Unit =
private def recordRelayDuration(isSuccess: Boolean): Unit = {
reputationRecorder ! RecordResult(upstream.receivedFrom, r.add.endorsement, relayId, isSuccess)
Metrics.RelayedPaymentDuration
.withTag(Tags.Relay, Tags.RelayType.Channel)
.withTag(Tags.Success, isSuccess)
.record((TimestampMilli.now() - upstream.receivedAt).toMillis, TimeUnit.MILLISECONDS)
}
}

View file

@ -16,15 +16,16 @@
package fr.acinq.eclair.payment.relay
import akka.actor.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.eventstream.EventStream
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.{ActorRef, typed}
import fr.acinq.bitcoin.scalacompat.ByteVector32
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.eclair.channel._
import fr.acinq.eclair.payment.IncomingPaymentPacket
import fr.acinq.eclair.{SubscriptionsComplete, Logs, NodeParams, ShortChannelId}
import fr.acinq.eclair.reputation.ReputationRecorder
import fr.acinq.eclair.{Logs, NodeParams, ShortChannelId, SubscriptionsComplete}
import java.util.UUID
import scala.collection.mutable
@ -58,6 +59,7 @@ object ChannelRelayer {
def apply(nodeParams: NodeParams,
register: ActorRef,
reputationRecorder: typed.ActorRef[ReputationRecorder.StandardCommand],
channels: Map[ByteVector32, Relayer.OutgoingChannel] = Map.empty,
scid2channels: Map[ShortChannelId, ByteVector32] = Map.empty,
node2channels: mutable.MultiDict[PublicKey, ByteVector32] = mutable.MultiDict.empty): Behavior[Command] =
@ -79,7 +81,7 @@ object ChannelRelayer {
case None => Map.empty
}
context.log.debug(s"spawning a new handler with relayId=$relayId to nextNodeId={} with channels={}", nextNodeId_opt.getOrElse(""), nextChannels.keys.mkString(","))
context.spawn(ChannelRelay.apply(nodeParams, register, nextChannels, originNode, relayId, channelRelayPacket), name = relayId.toString)
context.spawn(ChannelRelay.apply(nodeParams, register, reputationRecorder, nextChannels, originNode, relayId, channelRelayPacket), name = relayId.toString)
Behaviors.same
case GetOutgoingChannels(replyTo, Relayer.GetOutgoingChannels(enabledOnly)) =>
@ -100,14 +102,14 @@ object ChannelRelayer {
context.log.debug("adding mappings={} to channelId={}", mappings.keys.mkString(","), channelId)
val scid2channels1 = scid2channels ++ mappings
val node2channels1 = node2channels.addOne(remoteNodeId, channelId)
apply(nodeParams, register, channels1, scid2channels1, node2channels1)
apply(nodeParams, register, reputationRecorder, channels1, scid2channels1, node2channels1)
case WrappedLocalChannelDown(LocalChannelDown(_, channelId, shortIds, remoteNodeId)) =>
context.log.debug(s"removed local channel info for channelId=$channelId localAlias=${shortIds.localAlias}")
val channels1 = channels - channelId
val scid2Channels1 = scid2channels - shortIds.localAlias -- shortIds.real.toOption
val node2channels1 = node2channels.subtractOne(remoteNodeId, channelId)
apply(nodeParams, register, channels1, scid2Channels1, node2channels1)
apply(nodeParams, register, reputationRecorder, channels1, scid2Channels1, node2channels1)
case WrappedAvailableBalanceChanged(AvailableBalanceChanged(_, channelId, shortIds, commitments)) =>
val channels1 = channels.get(channelId) match {
@ -116,7 +118,7 @@ object ChannelRelayer {
channels + (channelId -> c.copy(commitments = commitments))
case None => channels // we only consider the balance if we have the channel_update
}
apply(nodeParams, register, channels1, scid2channels, node2channels)
apply(nodeParams, register, reputationRecorder, channels1, scid2channels, node2channels)
}
}

View file

@ -36,6 +36,8 @@ import fr.acinq.eclair.payment.send.MultiPartPaymentLifecycle.{PreimageReceived,
import fr.acinq.eclair.payment.send.PaymentInitiator.SendPaymentConfig
import fr.acinq.eclair.payment.send.PaymentLifecycle.SendPaymentToNode
import fr.acinq.eclair.payment.send._
import fr.acinq.eclair.reputation.ReputationRecorder
import fr.acinq.eclair.reputation.ReputationRecorder.{GetTrampolineConfidence, RecordTrampolineFailure, RecordTrampolineSuccess}
import fr.acinq.eclair.router.Router.RouteParams
import fr.acinq.eclair.router.{BalanceTooLow, RouteNotFound}
import fr.acinq.eclair.wire.protocol.PaymentOnion.IntermediatePayload
@ -64,6 +66,7 @@ object NodeRelay {
private case class WrappedPaymentFailed(paymentFailed: PaymentFailed) extends Command
private[relay] case class WrappedPeerReadyResult(result: AsyncPaymentTriggerer.Result) extends Command
private case class WrappedResolvedPaths(resolved: Seq[ResolvedPath]) extends Command
private case class WrappedConfidence(confidence: Double) extends Command
// @formatter:on
trait OutgoingPaymentFactory {
@ -85,6 +88,7 @@ object NodeRelay {
def apply(nodeParams: NodeParams,
parent: typed.ActorRef[NodeRelayer.Command],
register: ActorRef,
reputationRecorder: typed.ActorRef[ReputationRecorder.TrampolineCommand],
relayId: UUID,
nodeRelayPacket: NodeRelayPacket,
outgoingPaymentFactory: OutgoingPaymentFactory,
@ -108,7 +112,7 @@ object NodeRelay {
case IncomingPaymentPacket.RelayToTrampolinePacket(_, _, _, nextPacket) => Some(nextPacket)
case _: IncomingPaymentPacket.RelayToBlindedPathsPacket => None
}
new NodeRelay(nodeParams, parent, register, relayId, paymentHash, nodeRelayPacket.outerPayload.paymentSecret, context, outgoingPaymentFactory, triggerer, router)
new NodeRelay(nodeParams, parent, register, reputationRecorder, relayId, paymentHash, nodeRelayPacket.outerPayload.paymentSecret, context, outgoingPaymentFactory, triggerer, router)
.receiving(Queue.empty, nodeRelayPacket.innerPayload, nextPacket_opt, incomingPaymentHandler)
}
}
@ -183,6 +187,7 @@ object NodeRelay {
class NodeRelay private(nodeParams: NodeParams,
parent: akka.actor.typed.ActorRef[NodeRelayer.Command],
register: ActorRef,
reputationRecorder: typed.ActorRef[ReputationRecorder.TrampolineCommand],
relayId: UUID,
paymentHash: ByteVector32,
paymentSecret: ByteVector32,
@ -259,8 +264,17 @@ class NodeRelay private(nodeParams: NodeParams,
private def doSend(upstream: Upstream.Hot.Trampoline, nextPayload: IntermediatePayload.NodeRelay, nextPacket_opt: Option[OnionRoutingPacket]): Behavior[Command] = {
context.log.debug(s"relaying trampoline payment (amountIn=${upstream.amountIn} expiryIn=${upstream.expiryIn} amountOut=${nextPayload.amountToForward} expiryOut=${nextPayload.outgoingCltv})")
val confidence = (upstream.received.map(_.add.endorsement).min + 0.5) / 8
relay(upstream, nextPayload, nextPacket_opt, confidence)
val totalFee = upstream.amountIn - nextPayload.amountToForward
val fees = upstream.received.foldLeft(Map.empty[(PublicKey, Int), MilliSatoshi])((fees, r) =>
fees.updatedWith((r.receivedFrom, r.add.endorsement))(fee =>
Some(fee.getOrElse(MilliSatoshi(0)) + r.add.amountMsat * totalFee.toLong / upstream.amountIn.toLong)))
reputationRecorder ! GetTrampolineConfidence(context.messageAdapter[ReputationRecorder.Confidence](confidence => WrappedConfidence(confidence.value)), fees, relayId)
Behaviors.receiveMessagePartial {
rejectExtraHtlcPartialFunction orElse {
case WrappedConfidence(confidence) =>
relay(upstream, nextPayload, nextPacket_opt, confidence)
}
}
}
/**
@ -287,6 +301,11 @@ class NodeRelay private(nodeParams: NodeParams,
case WrappedPaymentSent(paymentSent) =>
context.log.debug("trampoline payment fully resolved downstream")
success(upstream, fulfilledUpstream, paymentSent)
val totalFee = upstream.amountIn - paymentSent.amountWithFees
val fees = upstream.received.foldLeft(Map.empty[(PublicKey, Int), MilliSatoshi])((fees, r) =>
fees.updatedWith((r.receivedFrom, r.add.endorsement))(fee =>
Some(fee.getOrElse(MilliSatoshi(0)) + r.add.amountMsat * totalFee.toLong / upstream.amountIn.toLong)))
reputationRecorder ! RecordTrampolineSuccess(fees, relayId)
recordRelayDuration(startedAt, isSuccess = true)
stopping()
case WrappedPaymentFailed(PaymentFailed(_, _, failures, _)) =>
@ -294,6 +313,7 @@ class NodeRelay private(nodeParams: NodeParams,
if (!fulfilledUpstream) {
rejectPayment(upstream, translateError(nodeParams, failures, upstream, nextPayload))
}
reputationRecorder ! RecordTrampolineFailure(upstream.received.map(r => (r.receivedFrom, r.add.endorsement)).toSet, relayId)
recordRelayDuration(startedAt, isSuccess = fulfilledUpstream)
stopping()
}

View file

@ -22,6 +22,7 @@ import akka.actor.typed.{ActorRef, Behavior}
import fr.acinq.bitcoin.scalacompat.ByteVector32
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.eclair.payment._
import fr.acinq.eclair.reputation.ReputationRecorder
import fr.acinq.eclair.{Logs, NodeParams}
import java.util.UUID
@ -58,7 +59,13 @@ object NodeRelayer {
* NB: the payment secret used here is different from the invoice's payment secret and ensures we can
* group together HTLCs that the previous trampoline node sent in the same MPP.
*/
def apply(nodeParams: NodeParams, register: akka.actor.ActorRef, outgoingPaymentFactory: NodeRelay.OutgoingPaymentFactory, triggerer: typed.ActorRef[AsyncPaymentTriggerer.Command], router: akka.actor.ActorRef, children: Map[PaymentKey, ActorRef[NodeRelay.Command]] = Map.empty): Behavior[Command] =
def apply(nodeParams: NodeParams,
register: akka.actor.ActorRef,
reputationRecorder: typed.ActorRef[ReputationRecorder.TrampolineCommand],
outgoingPaymentFactory: NodeRelay.OutgoingPaymentFactory,
triggerer: typed.ActorRef[AsyncPaymentTriggerer.Command],
router: akka.actor.ActorRef,
children: Map[PaymentKey, ActorRef[NodeRelay.Command]] = Map.empty): Behavior[Command] =
Behaviors.setup { context =>
Behaviors.withMdc(Logs.mdc(category_opt = Some(Logs.LogCategory.PAYMENT)), mdc) {
Behaviors.receiveMessage {
@ -73,15 +80,15 @@ object NodeRelayer {
case None =>
val relayId = UUID.randomUUID()
context.log.debug(s"spawning a new handler with relayId=$relayId")
val handler = context.spawn(NodeRelay.apply(nodeParams, context.self, register, relayId, nodeRelayPacket, outgoingPaymentFactory, triggerer, router), relayId.toString)
val handler = context.spawn(NodeRelay.apply(nodeParams, context.self, register, reputationRecorder, relayId, nodeRelayPacket, outgoingPaymentFactory, triggerer, router), relayId.toString)
context.log.debug("forwarding incoming htlc #{} from channel {} to new handler", htlcIn.id, htlcIn.channelId)
handler ! NodeRelay.Relay(nodeRelayPacket, originNode)
apply(nodeParams, register, outgoingPaymentFactory, triggerer, router, children + (childKey -> handler))
apply(nodeParams, register, reputationRecorder, outgoingPaymentFactory, triggerer, router, children + (childKey -> handler))
}
case RelayComplete(childHandler, paymentHash, paymentSecret) =>
// we do a back-and-forth between parent and child before stopping the child to prevent a race condition
childHandler ! NodeRelay.Stop
apply(nodeParams, register, outgoingPaymentFactory, triggerer, router, children - PaymentKey(paymentHash, paymentSecret))
apply(nodeParams, register, reputationRecorder, outgoingPaymentFactory, triggerer, router, children - PaymentKey(paymentHash, paymentSecret))
case GetPendingPayments(replyTo) =>
replyTo ! children
Behaviors.same

View file

@ -28,6 +28,7 @@ import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.eclair.channel._
import fr.acinq.eclair.db.PendingCommandsDb
import fr.acinq.eclair.payment._
import fr.acinq.eclair.reputation.ReputationRecorder
import fr.acinq.eclair.wire.protocol._
import fr.acinq.eclair.{CltvExpiryDelta, Logs, MilliSatoshi, NodeParams}
import grizzled.slf4j.Logging
@ -49,7 +50,7 @@ import scala.util.Random
* It also receives channel HTLC events (fulfill / failed) and relays those to the appropriate handlers.
* It also maintains an up-to-date view of local channel balances.
*/
class Relayer(nodeParams: NodeParams, router: ActorRef, register: ActorRef, paymentHandler: ActorRef, triggerer: typed.ActorRef[AsyncPaymentTriggerer.Command], initialized: Option[Promise[Done]] = None) extends Actor with DiagnosticActorLogging {
class Relayer(nodeParams: NodeParams, router: ActorRef, register: ActorRef, paymentHandler: ActorRef, triggerer: typed.ActorRef[AsyncPaymentTriggerer.Command], reputationRecorder: typed.ActorRef[ReputationRecorder.Command], initialized: Option[Promise[Done]] = None) extends Actor with DiagnosticActorLogging {
import Relayer._
@ -57,8 +58,8 @@ class Relayer(nodeParams: NodeParams, router: ActorRef, register: ActorRef, paym
implicit def implicitLog: LoggingAdapter = log
private val postRestartCleaner = context.actorOf(PostRestartHtlcCleaner.props(nodeParams, register, initialized), "post-restart-htlc-cleaner")
private val channelRelayer = context.spawn(Behaviors.supervise(ChannelRelayer(nodeParams, register)).onFailure(SupervisorStrategy.resume), "channel-relayer")
private val nodeRelayer = context.spawn(Behaviors.supervise(NodeRelayer(nodeParams, register, NodeRelay.SimpleOutgoingPaymentFactory(nodeParams, router, register), triggerer, router)).onFailure(SupervisorStrategy.resume), name = "node-relayer")
private val channelRelayer = context.spawn(Behaviors.supervise(ChannelRelayer(nodeParams, register, reputationRecorder)).onFailure(SupervisorStrategy.resume), "channel-relayer")
private val nodeRelayer = context.spawn(Behaviors.supervise(NodeRelayer(nodeParams, register, reputationRecorder, NodeRelay.SimpleOutgoingPaymentFactory(nodeParams, router, register), triggerer, router)).onFailure(SupervisorStrategy.resume), name = "node-relayer")
def receive: Receive = {
case init: PostRestartHtlcCleaner.Init => postRestartCleaner forward init
@ -120,8 +121,8 @@ class Relayer(nodeParams: NodeParams, router: ActorRef, register: ActorRef, paym
object Relayer extends Logging {
def props(nodeParams: NodeParams, router: ActorRef, register: ActorRef, paymentHandler: ActorRef, triggerer: typed.ActorRef[AsyncPaymentTriggerer.Command], initialized: Option[Promise[Done]] = None): Props =
Props(new Relayer(nodeParams, router, register, paymentHandler, triggerer, initialized))
def props(nodeParams: NodeParams, router: ActorRef, register: ActorRef, paymentHandler: ActorRef, triggerer: typed.ActorRef[AsyncPaymentTriggerer.Command], reputationRecorder: typed.ActorRef[ReputationRecorder.Command], initialized: Option[Promise[Done]] = None): Props =
Props(new Relayer(nodeParams, router, register, paymentHandler, triggerer, reputationRecorder, initialized))
// @formatter:off
case class RelayFees(feeBase: MilliSatoshi, feeProportionalMillionths: Long) {

View file

@ -0,0 +1,82 @@
/*
* Copyright 2023 ACINQ SAS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package fr.acinq.eclair.reputation
import fr.acinq.eclair.reputation.Reputation.Pending
import fr.acinq.eclair.{MilliSatoshi, TimestampMilli}
import java.util.UUID
import scala.concurrent.duration.FiniteDuration
/** Local reputation per incoming node and endorsement level
*
* @param pastWeight How much fees we would have collected in the past if all HTLCs had succeeded (exponential moving average).
* @param pastScore How much fees we have collected in the past (exponential moving average).
* @param lastSettlementAt Timestamp of the last recorded HTLC settlement.
* @param pending Set of pending HTLCs.
* @param halfLife Half life for the exponential moving average.
* @param goodDuration Duration after which HTLCs are penalized for staying pending too long.
* @param pendingMultiplier How much to penalize pending HTLCs.
*/
case class Reputation(pastWeight: Double, pastScore: Double, lastSettlementAt: TimestampMilli, pending: Map[UUID, Pending], halfLife: FiniteDuration, goodDuration: FiniteDuration, pendingMultiplier: Double) {
private def decay(now: TimestampMilli): Double = scala.math.pow(0.5, (now - lastSettlementAt) / halfLife)
private def pendingWeight(now: TimestampMilli): Double = pending.values.map(_.weight(now, goodDuration, pendingMultiplier)).sum
/** Register a HTLC to relay and estimate the confidence that it will succeed.
* @return (updated reputation, confidence)
*/
def attempt(relayId: UUID, fee: MilliSatoshi, now: TimestampMilli = TimestampMilli.now()): (Reputation, Double) = {
val d = decay(now)
val newReputation = copy(pending = pending + (relayId -> Pending(fee, now)))
val confidence = d * pastScore / (d * pastWeight + newReputation.pendingWeight(now))
(newReputation, confidence)
}
/** Mark a previously registered HTLC as failed without trying to relay it (usually because its confidence was too low).
* @return updated reputation
*/
def cancel(relayId: UUID): Reputation = copy(pending = pending - relayId)
/** When a HTLC is settled, we record whether it succeeded and how long it took.
*
* @param feeOverride When relaying trampoline payments, the actual fee is only known when the payment succeeds. This
* is used instead of the fee upper bound that was known when first attempting the relay.
* @return updated reputation
*/
def record(relayId: UUID, isSuccess: Boolean, feeOverride: Option[MilliSatoshi] = None, now: TimestampMilli = TimestampMilli.now()): Reputation = {
val d = decay(now)
var p = pending.getOrElse(relayId, Pending(MilliSatoshi(0), now))
feeOverride.foreach(fee => p = p.copy(fee = fee))
val newWeight = d * pastWeight + p.weight(now, goodDuration, 1.0)
val newScore = d * pastScore + (if (isSuccess) p.fee.toLong.toDouble else 0)
Reputation(newWeight, newScore, now, pending - relayId, halfLife, goodDuration, pendingMultiplier)
}
}
object Reputation {
case class Pending(fee: MilliSatoshi, startedAt: TimestampMilli) {
def weight(now: TimestampMilli, minDuration: FiniteDuration, multiplier: Double): Double = {
val duration = now - startedAt
fee.toLong.toDouble * (duration / minDuration).max(multiplier)
}
}
case class ReputationConfig(halfLife: FiniteDuration, goodDuration: FiniteDuration, pendingMultiplier: Double)
def init(config: ReputationConfig): Reputation = Reputation(0.0, 0.0, TimestampMilli.min, Map.empty, config.halfLife, config.goodDuration, config.pendingMultiplier)
}

View file

@ -0,0 +1,75 @@
/*
* Copyright 2023 ACINQ SAS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package fr.acinq.eclair.reputation
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, Behavior}
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.eclair.reputation.Reputation.ReputationConfig
import fr.acinq.eclair.MilliSatoshi
import java.util.UUID
object ReputationRecorder {
sealed trait Command
sealed trait StandardCommand extends Command
case class GetConfidence(replyTo: ActorRef[Confidence], originNode: PublicKey, endorsement: Int, relayId: UUID, fee: MilliSatoshi) extends StandardCommand
case class CancelRelay(originNode: PublicKey, endorsement: Int, relayId: UUID) extends StandardCommand
case class RecordResult(originNode: PublicKey, endorsement: Int, relayId: UUID, isSuccess: Boolean) extends StandardCommand
sealed trait TrampolineCommand extends Command
case class GetTrampolineConfidence(replyTo: ActorRef[Confidence], fees: Map[(PublicKey, Int), MilliSatoshi], relayId: UUID) extends TrampolineCommand
case class RecordTrampolineFailure(keys: Set[(PublicKey, Int)], relayId: UUID) extends TrampolineCommand
case class RecordTrampolineSuccess(fees: Map[(PublicKey, Int), MilliSatoshi], relayId: UUID) extends TrampolineCommand
case class Confidence(value: Double)
def apply(reputationConfig: ReputationConfig, reputations: Map[(PublicKey, Int), Reputation]): Behavior[Command] = {
Behaviors.receiveMessage {
case GetConfidence(replyTo, originNode, endorsement, relayId, fee) =>
val (updatedReputation, confidence) = reputations.getOrElse((originNode, endorsement), Reputation.init(reputationConfig)).attempt(relayId, fee)
replyTo ! Confidence(confidence)
ReputationRecorder(reputationConfig, reputations.updated((originNode, endorsement), updatedReputation))
case CancelRelay(originNode, endorsement, relayId) =>
val updatedReputation = reputations.getOrElse((originNode, endorsement), Reputation.init(reputationConfig)).cancel(relayId)
ReputationRecorder(reputationConfig, reputations.updated((originNode, endorsement), updatedReputation))
case RecordResult(originNode, endorsement, relayId, isSuccess) =>
val updatedReputation = reputations.getOrElse((originNode, endorsement), Reputation.init(reputationConfig)).record(relayId, isSuccess)
ReputationRecorder(reputationConfig, reputations.updated((originNode, endorsement), updatedReputation))
case GetTrampolineConfidence(replyTo, fees, relayId) =>
val (confidence, updatedReputations) = fees.foldLeft((1.0, reputations)){case ((c, r), ((originNode, endorsement), fee)) =>
val (updatedReputation, confidence) = reputations.getOrElse((originNode, endorsement), Reputation.init(reputationConfig)).attempt(relayId, fee)
(c.min(confidence), r.updated((originNode, endorsement), updatedReputation))
}
replyTo ! Confidence(confidence)
ReputationRecorder(reputationConfig, updatedReputations)
case RecordTrampolineFailure(keys, relayId) =>
val updatedReputations = keys.foldLeft(reputations) { case (r, (originNode, endorsement)) =>
val updatedReputation = reputations.getOrElse((originNode, endorsement), Reputation.init(reputationConfig)).record(relayId, isSuccess = false)
r.updated((originNode, endorsement), updatedReputation)
}
ReputationRecorder(reputationConfig, updatedReputations)
case RecordTrampolineSuccess(fees, relayId) =>
val updatedReputations = fees.foldLeft(reputations) { case (r, ((originNode, endorsement), fee)) =>
val updatedReputation = reputations.getOrElse((originNode, endorsement), Reputation.init(reputationConfig)).record(relayId, isSuccess = true, Some(fee))
r.updated((originNode, endorsement), updatedReputation)
}
ReputationRecorder(reputationConfig, updatedReputations)
}
}
}

View file

@ -29,6 +29,7 @@ import fr.acinq.eclair.io.MessageRelay.RelayAll
import fr.acinq.eclair.io.{OpenChannelInterceptor, PeerConnection}
import fr.acinq.eclair.message.OnionMessages.OnionMessageConfig
import fr.acinq.eclair.payment.relay.Relayer.{AsyncPaymentsParams, RelayFees, RelayParams}
import fr.acinq.eclair.reputation.Reputation.ReputationConfig
import fr.acinq.eclair.router.Graph.{MessagePath, WeightRatios}
import fr.acinq.eclair.router.PathFindingExperimentConf
import fr.acinq.eclair.router.Router._
@ -231,7 +232,8 @@ object TestConstants {
maxAttempts = 2,
),
purgeInvoicesInterval = None,
revokedHtlcInfoCleanerConfig = RevokedHtlcInfoCleaner.Config(10, 100 millis)
revokedHtlcInfoCleanerConfig = RevokedHtlcInfoCleaner.Config(10, 100 millis),
localReputationConfig = ReputationConfig(1 day, 10 seconds, 100),
)
def channelParams: LocalParams = OpenChannelInterceptor.makeChannelParams(
@ -401,7 +403,8 @@ object TestConstants {
maxAttempts = 2,
),
purgeInvoicesInterval = None,
revokedHtlcInfoCleanerConfig = RevokedHtlcInfoCleaner.Config(10, 100 millis)
revokedHtlcInfoCleanerConfig = RevokedHtlcInfoCleaner.Config(10, 100 millis),
localReputationConfig = ReputationConfig(2 days, 20 seconds, 200),
)
def channelParams: LocalParams = OpenChannelInterceptor.makeChannelParams(

View file

@ -16,7 +16,7 @@
package fr.acinq.eclair.channel
import akka.actor.typed.scaladsl.adapter.actorRefAdapter
import akka.actor.typed.scaladsl.adapter.{ClassicActorSystemOps, actorRefAdapter}
import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import akka.testkit.{TestFSMRef, TestProbe}
import fr.acinq.bitcoin.scalacompat.ByteVector32
@ -33,6 +33,7 @@ import fr.acinq.eclair.payment.receive.MultiPartHandler.ReceiveStandardPayment
import fr.acinq.eclair.payment.receive.PaymentHandler
import fr.acinq.eclair.payment.relay.Relayer
import fr.acinq.eclair.payment.send.ClearRecipient
import fr.acinq.eclair.reputation.ReputationRecorder
import fr.acinq.eclair.wire.protocol._
import grizzled.slf4j.Logging
import org.scalatest.funsuite.FixtureAnyFunSuiteLike
@ -66,8 +67,10 @@ class FuzzySpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with Channe
val bobRegister = system.actorOf(Props(new TestRegister()))
val alicePaymentHandler = system.actorOf(Props(new PaymentHandler(aliceParams, aliceRegister, TestProbe().ref)))
val bobPaymentHandler = system.actorOf(Props(new PaymentHandler(bobParams, bobRegister, TestProbe().ref)))
val aliceRelayer = system.actorOf(Relayer.props(aliceParams, TestProbe().ref, aliceRegister, alicePaymentHandler, TestProbe().ref))
val bobRelayer = system.actorOf(Relayer.props(bobParams, TestProbe().ref, bobRegister, bobPaymentHandler, TestProbe().ref))
val aliceReputationRecorder = system.spawnAnonymous(ReputationRecorder(aliceParams.localReputationConfig, Map.empty))
val bobReputationRecorder = system.spawnAnonymous(ReputationRecorder(bobParams.localReputationConfig, Map.empty))
val aliceRelayer = system.actorOf(Relayer.props(aliceParams, TestProbe().ref, aliceRegister, alicePaymentHandler, TestProbe().ref, aliceReputationRecorder))
val bobRelayer = system.actorOf(Relayer.props(bobParams, TestProbe().ref, bobRegister, bobPaymentHandler, TestProbe().ref, bobReputationRecorder))
val wallet = new DummyOnChainWallet()
val alice: TestFSMRef[ChannelState, ChannelData, Channel] = TestFSMRef(new Channel(aliceParams, wallet, bobParams.nodeId, alice2blockchain.ref, aliceRelayer, FakeTxPublisherFactory(alice2blockchain)), alicePeer.ref)
val bob: TestFSMRef[ChannelState, ChannelData, Channel] = TestFSMRef(new Channel(bobParams, wallet, aliceParams.nodeId, bob2blockchain.ref, bobRelayer, FakeTxPublisherFactory(bob2blockchain)), bobPeer.ref)

View file

@ -28,6 +28,7 @@ import fr.acinq.eclair.payment.offer.OfferManager
import fr.acinq.eclair.payment.receive.{MultiPartHandler, PaymentHandler}
import fr.acinq.eclair.payment.relay.{ChannelRelayer, PostRestartHtlcCleaner, Relayer}
import fr.acinq.eclair.payment.send.PaymentInitiator
import fr.acinq.eclair.reputation.ReputationRecorder
import fr.acinq.eclair.router.Router
import fr.acinq.eclair.wire.protocol.IPAddress
import fr.acinq.eclair.{BlockHeight, MilliSatoshi, NodeParams, SubscriptionsComplete, TestBitcoinCoreClient, TestDatabases}
@ -96,7 +97,8 @@ object MinimalNodeFixture extends Assertions with Eventually with IntegrationPat
val router = system.actorOf(Router.props(nodeParams, watcherTyped), "router")
val offerManager = system.spawn(OfferManager(nodeParams, router, 1 minute), "offer-manager")
val paymentHandler = system.actorOf(PaymentHandler.props(nodeParams, register, offerManager), "payment-handler")
val relayer = system.actorOf(Relayer.props(nodeParams, router, register, paymentHandler, triggerer.ref.toTyped), "relayer")
val reputationRecorder = system.spawn(ReputationRecorder(nodeParams.localReputationConfig, Map.empty), "reputation-recorder")
val relayer = system.actorOf(Relayer.props(nodeParams, router, register, paymentHandler, triggerer.ref.toTyped, reputationRecorder), "relayer")
val txPublisherFactory = Channel.SimpleTxPublisherFactory(nodeParams, watcherTyped, bitcoinClient)
val channelFactory = Peer.SimpleChannelFactory(nodeParams, watcherTyped, relayer, wallet, txPublisherFactory)
val pendingChannelsRateLimiter = system.spawnAnonymous(Behaviors.supervise(PendingChannelsRateLimiter(nodeParams, router.toTyped, Seq())).onFailure(typed.SupervisorStrategy.resume))

View file

@ -57,7 +57,7 @@ class PostRestartHtlcCleanerSpec extends TestKitBaseClass with FixtureAnyFunSuit
case class FixtureParam(nodeParams: NodeParams, register: TestProbe, sender: TestProbe, eventListener: TestProbe) {
def createRelayer(nodeParams1: NodeParams): (ActorRef, ActorRef) = {
val relayer = system.actorOf(Relayer.props(nodeParams1, TestProbe().ref, register.ref, TestProbe().ref, TestProbe().ref.toTyped))
val relayer = system.actorOf(Relayer.props(nodeParams1, TestProbe().ref, register.ref, TestProbe().ref, TestProbe().ref.toTyped, TestProbe().ref.toTyped))
// we need ensure the post-htlc-restart child actor is initialized
sender.send(relayer, Relayer.GetChildActors(sender.ref))
(relayer, sender.expectMsgType[Relayer.ChildActors].postRestartCleaner)

View file

@ -32,6 +32,7 @@ import fr.acinq.eclair.crypto.Sphinx
import fr.acinq.eclair.payment.IncomingPaymentPacket.ChannelRelayPacket
import fr.acinq.eclair.payment.relay.ChannelRelayer._
import fr.acinq.eclair.payment.{ChannelPaymentRelayed, IncomingPaymentPacket, PaymentPacketSpec}
import fr.acinq.eclair.reputation.ReputationRecorder
import fr.acinq.eclair.router.Announcements
import fr.acinq.eclair.wire.protocol.BlindedRouteData.PaymentRelayData
import fr.acinq.eclair.wire.protocol.PaymentOnion.IntermediatePayload
@ -47,24 +48,26 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
import ChannelRelayerSpec._
case class FixtureParam(nodeParams: NodeParams, channelRelayer: typed.ActorRef[ChannelRelayer.Command], register: TestProbe[Any])
case class FixtureParam(nodeParams: NodeParams, channelRelayer: typed.ActorRef[ChannelRelayer.Command], register: TestProbe[Any], reputationRecorder: TestProbe[ReputationRecorder.Command])
override def withFixture(test: OneArgTest): Outcome = {
// we are node B in the route A -> B -> C -> ....
val nodeParams = TestConstants.Bob.nodeParams
val register = TestProbe[Any]("register")
val channelRelayer = testKit.spawn(ChannelRelayer.apply(nodeParams, register.ref.toClassic))
val reputationRecorder = TestProbe[ReputationRecorder.Command]("reputation-recorder")
val channelRelayer = testKit.spawn(ChannelRelayer.apply(nodeParams, register.ref.toClassic, reputationRecorder.ref))
try {
withFixture(test.toNoArgTest(FixtureParam(nodeParams, channelRelayer, register)))
withFixture(test.toNoArgTest(FixtureParam(nodeParams, channelRelayer, register, reputationRecorder)))
} finally {
testKit.stop(channelRelayer)
}
}
def expectFwdFail(register: TestProbe[Any], channelId: ByteVector32, cmd: channel.Command): Register.Forward[channel.Command] = {
def expectFwdFail(register: TestProbe[Any], channelId: ByteVector32, cmd: channel.Command, reputationRecorder: TestProbe[ReputationRecorder.Command]): Register.Forward[channel.Command] = {
val fwd = register.expectMessageType[Register.Forward[channel.Command]]
assert(fwd.message == cmd)
assert(fwd.channelId == channelId)
reputationRecorder.expectMessageType[ReputationRecorder.CancelRelay]
fwd
}
@ -79,6 +82,14 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
fwd
}
def setConfidence(f: FixtureParam)(value: Double): Unit = {
import f._
val getConfidence = reputationRecorder.expectMessageType[ReputationRecorder.GetConfidence]
assert(getConfidence.originNode == TestConstants.Alice.nodeParams.nodeId)
getConfidence.replyTo ! ReputationRecorder.Confidence(value)
}
def basicRelayTest(f: FixtureParam)(relayPayloadScid: ShortChannelId, lcu: LocalChannelUpdate, success: Boolean): Unit = {
import f._
@ -87,11 +98,12 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
channelRelayer ! WrappedLocalChannelUpdate(lcu)
channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId)
setConfidence(f)(1.0)
if (success) {
expectFwdAdd(register, lcu.channelId, outgoingAmount, outgoingExpiry, 7)
} else {
expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(UnknownNextPeer()), commit = true))
expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(UnknownNextPeer()), commit = true), reputationRecorder)
}
}
@ -136,6 +148,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
val r1 = createValidIncomingPacket(payload1)
channelRelayer ! WrappedLocalChannelUpdate(lcu1)
channelRelayer ! Relay(r1, TestConstants.Alice.nodeParams.nodeId)
setConfidence(f)(1.0)
expectFwdAdd(register, lcu1.channelId, outgoingAmount, outgoingExpiry, 7)
// reorg happens
@ -147,9 +160,11 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
// both old and new real scids work
channelRelayer ! Relay(r1, TestConstants.Alice.nodeParams.nodeId)
setConfidence(f)(1.0)
expectFwdAdd(register, lcu1.channelId, outgoingAmount, outgoingExpiry, 7)
// new real scid works
channelRelayer ! Relay(r2, TestConstants.Alice.nodeParams.nodeId)
setConfidence(f)(1.0)
expectFwdAdd(register, lcu2.channelId, outgoingAmount, outgoingExpiry, 7)
}
@ -162,6 +177,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
channelRelayer ! WrappedLocalChannelUpdate(u)
channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId)
setConfidence(f)(1.0)
expectFwdAdd(register, channelIds(realScid1), outgoingAmount, outgoingExpiry, 7)
}
@ -181,6 +197,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
channelRelayer ! WrappedLocalChannelUpdate(u2)
channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId)
setConfidence(f)(1.0)
// first try
val fwd1 = expectFwdAdd(register, channelIds(realScid2), outgoingAmount, outgoingExpiry, 7)
@ -193,7 +210,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
fwd1.message.replyTo ! RES_ADD_FAILED(fwd2.message, HtlcValueTooHighInFlight(channelIds(realScid1), 1000000000 msat, 1516977616 msat), Some(u1.channelUpdate))
// the relayer should give up
expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(TemporaryChannelFailure(Some(u1.channelUpdate))), commit = true))
expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(TemporaryChannelFailure(Some(u1.channelUpdate))), commit = true), reputationRecorder)
}
test("fail to relay when we have no channel_update for the next channel") { f =>
@ -203,8 +220,9 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
val r = createValidIncomingPacket(payload)
channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId)
setConfidence(f)(1.0)
expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(UnknownNextPeer()), commit = true))
expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(UnknownNextPeer()), commit = true), reputationRecorder)
}
test("fail to relay when register returns an error") { f =>
@ -216,11 +234,12 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
channelRelayer ! WrappedLocalChannelUpdate(u)
channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId)
setConfidence(f)(1.0)
val fwd = expectFwdAdd(register, channelIds(realScid1), outgoingAmount, outgoingExpiry, 7)
fwd.replyTo ! Register.ForwardFailure(fwd)
expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(UnknownNextPeer()), commit = true))
expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(UnknownNextPeer()), commit = true), reputationRecorder)
}
test("fail to relay when the channel is advertised as unusable (down)") { f =>
@ -234,8 +253,9 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
channelRelayer ! WrappedLocalChannelUpdate(u)
channelRelayer ! WrappedLocalChannelDown(d)
channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId)
setConfidence(f)(1.0)
expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(UnknownNextPeer()), commit = true))
expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(UnknownNextPeer()), commit = true), reputationRecorder)
}
test("fail to relay when channel is disabled") { f =>
@ -247,8 +267,9 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
channelRelayer ! WrappedLocalChannelUpdate(u)
channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId)
setConfidence(f)(1.0)
expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(ChannelDisabled(u.channelUpdate.messageFlags, u.channelUpdate.channelFlags, Some(u.channelUpdate))), commit = true))
expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(ChannelDisabled(u.channelUpdate.messageFlags, u.channelUpdate.channelFlags, Some(u.channelUpdate))), commit = true), reputationRecorder)
}
test("fail to relay when amount is below minimum") { f =>
@ -260,8 +281,9 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
channelRelayer ! WrappedLocalChannelUpdate(u)
channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId)
setConfidence(f)(1.0)
expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(AmountBelowMinimum(outgoingAmount, Some(u.channelUpdate))), commit = true))
expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(AmountBelowMinimum(outgoingAmount, Some(u.channelUpdate))), commit = true), reputationRecorder)
}
test("fail to relay blinded payment") { f =>
@ -274,6 +296,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
channelRelayer ! WrappedLocalChannelUpdate(u)
channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId)
setConfidence(f)(1.0)
val cmd = register.expectMessageType[Register.Forward[channel.Command]]
assert(cmd.channelId == r.add.channelId)
@ -290,6 +313,8 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
assert(fail.onionHash == Sphinx.hash(r.add.onionRoutingPacket))
assert(fail.failureCode == InvalidOnionBlinding(Sphinx.hash(r.add.onionRoutingPacket)).code)
}
reputationRecorder.expectMessageType[ReputationRecorder.CancelRelay]
}
}
@ -302,6 +327,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
channelRelayer ! WrappedLocalChannelUpdate(u)
channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId)
setConfidence(f)(1.0)
expectFwdAdd(register, channelIds(realScid1), r.amountToForward, r.outgoingCltv, 7).message
}
@ -315,8 +341,9 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
channelRelayer ! WrappedLocalChannelUpdate(u)
channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId)
setConfidence(f)(1.0)
expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(IncorrectCltvExpiry(r.outgoingCltv, Some(u.channelUpdate))), commit = true))
expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(IncorrectCltvExpiry(r.outgoingCltv, Some(u.channelUpdate))), commit = true), reputationRecorder)
}
test("fail to relay when fee is insufficient") { f =>
@ -328,8 +355,9 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
channelRelayer ! WrappedLocalChannelUpdate(u)
channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId)
setConfidence(f)(1.0)
expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(FeeInsufficient(r.add.amountMsat, Some(u.channelUpdate))), commit = true))
expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(FeeInsufficient(r.add.amountMsat, Some(u.channelUpdate))), commit = true), reputationRecorder)
}
test("relay that would fail (fee insufficient) with a recent channel update but succeed with the previous update") { f =>
@ -341,6 +369,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
channelRelayer ! WrappedLocalChannelUpdate(u1)
channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId)
setConfidence(f)(1.0)
// relay succeeds with current channel update (u1) with lower fees
expectFwdAdd(register, channelIds(realScid1), outgoingAmount, outgoingExpiry, 7)
@ -349,6 +378,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
channelRelayer ! WrappedLocalChannelUpdate(u2)
channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId)
setConfidence(f)(1.0)
// relay succeeds because the current update (u2) with higher fees occurred less than 10 minutes ago
expectFwdAdd(register, channelIds(realScid1), outgoingAmount, outgoingExpiry, 7)
@ -358,9 +388,10 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
channelRelayer ! WrappedLocalChannelUpdate(u1)
channelRelayer ! WrappedLocalChannelUpdate(u3)
channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId)
setConfidence(f)(1.0)
// relay fails because the current update (u3) with higher fees occurred more than 10 minutes ago
expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(FeeInsufficient(r.add.amountMsat, Some(u3.channelUpdate))), commit = true))
expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(FeeInsufficient(r.add.amountMsat, Some(u3.channelUpdate))), commit = true), reputationRecorder)
}
test("fail to relay when there is a local error") { f =>
@ -387,9 +418,10 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
testCases.foreach { testCase =>
channelRelayer ! WrappedLocalChannelUpdate(u)
channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId)
setConfidence(f)(1.0)
val fwd = expectFwdAdd(register, channelIds(realScid1), outgoingAmount, outgoingExpiry, 7)
fwd.message.replyTo ! RES_ADD_FAILED(fwd.message, testCase.exc, Some(testCase.update))
expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(testCase.failure), commit = true))
expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(testCase.failure), commit = true), reputationRecorder)
}
}
@ -422,6 +454,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
val payload = ChannelRelay.Standard(ShortChannelId(12345), 998900 msat, CltvExpiry(60))
val r = createValidIncomingPacket(payload, 1000000 msat, CltvExpiry(70), endorsementIn = 5)
channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId)
setConfidence(f)(5.5 / 8)
// select the channel to the same node, with the lowest capacity and balance but still high enough to handle the payment
val cmd1 = expectFwdAdd(register, channelUpdates(ShortChannelId(22223)).channelId, r.amountToForward, r.outgoingCltv, 5).message
cmd1.replyTo ! RES_ADD_FAILED(cmd1, ChannelUnavailable(randomBytes32()), None)
@ -435,13 +468,14 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
val cmd4 = expectFwdAdd(register, channelUpdates(ShortChannelId(11111)).channelId, r.amountToForward, r.outgoingCltv, 5).message
cmd4.replyTo ! RES_ADD_FAILED(cmd4, HtlcValueTooHighInFlight(randomBytes32(), 100000000 msat, 100000000 msat), Some(channelUpdates(ShortChannelId(11111)).channelUpdate))
// all the suitable channels have been tried
expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(TemporaryChannelFailure(Some(channelUpdates(ShortChannelId(12345)).channelUpdate))), commit = true))
expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(TemporaryChannelFailure(Some(channelUpdates(ShortChannelId(12345)).channelUpdate))), commit = true), reputationRecorder)
}
{
// higher amount payment (have to increased incoming htlc amount for fees to be sufficient)
val payload = ChannelRelay.Standard(ShortChannelId(12345), 50000000 msat, CltvExpiry(60))
val r = createValidIncomingPacket(payload, 60000000 msat, CltvExpiry(70), endorsementIn = 0)
channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId)
setConfidence(f)(0)
expectFwdAdd(register, channelUpdates(ShortChannelId(11111)).channelId, r.amountToForward, r.outgoingCltv, 0).message
}
{
@ -449,6 +483,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
val payload = ChannelRelay.Standard(ShortChannelId(12345), 1000 msat, CltvExpiry(60))
val r = createValidIncomingPacket(payload, 60000000 msat, CltvExpiry(70), endorsementIn = 6)
channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId)
setConfidence(f)(6.5 / 8)
expectFwdAdd(register, channelUpdates(ShortChannelId(33333)).channelId, r.amountToForward, r.outgoingCltv, 6).message
}
{
@ -456,6 +491,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
val payload = ChannelRelay.Standard(ShortChannelId(12345), 1000000000 msat, CltvExpiry(60))
val r = createValidIncomingPacket(payload, 1010000000 msat, CltvExpiry(70))
channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId)
setConfidence(f)(7.5 / 8)
expectFwdAdd(register, channelUpdates(ShortChannelId(12345)).channelId, r.amountToForward, r.outgoingCltv, 7).message
}
{
@ -463,6 +499,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
val payload = ChannelRelay.Standard(ShortChannelId(12345), 998900 msat, CltvExpiry(50))
val r = createValidIncomingPacket(payload, 1000000 msat, CltvExpiry(70))
channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId)
setConfidence(f)(7.5 / 8)
expectFwdAdd(register, channelUpdates(ShortChannelId(22223)).channelId, r.amountToForward, r.outgoingCltv, 7).message
}
{
@ -470,7 +507,8 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
val payload = ChannelRelay.Standard(ShortChannelId(12345), 998900 msat, CltvExpiry(61))
val r = createValidIncomingPacket(payload, 1000000 msat, CltvExpiry(70))
channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId)
expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(IncorrectCltvExpiry(CltvExpiry(61), Some(channelUpdates(ShortChannelId(12345)).channelUpdate))), commit = true))
setConfidence(f)(0.2)
expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(IncorrectCltvExpiry(CltvExpiry(61), Some(channelUpdates(ShortChannelId(12345)).channelUpdate))), commit = true), reputationRecorder)
}
}
@ -496,10 +534,14 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
testCases.foreach { testCase =>
channelRelayer ! WrappedLocalChannelUpdate(u)
channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId)
setConfidence(f)(1.0)
val fwd = expectFwdAdd(register, channelIds(realScid1), outgoingAmount, outgoingExpiry, 7)
fwd.message.replyTo ! RES_SUCCESS(fwd.message, channelId1)
fwd.message.origin.replyTo ! RES_ADD_SETTLED(fwd.message.origin, downstream_htlc, testCase.result)
expectFwdFail(register, r.add.channelId, testCase.cmd)
val fwdFail = register.expectMessageType[Register.Forward[channel.Command]]
assert(fwdFail.message == testCase.cmd)
assert(fwdFail.channelId == r.add.channelId)
assert(!reputationRecorder.expectMessageType[ReputationRecorder.RecordResult].isSuccess)
}
}
@ -522,6 +564,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
val r = createValidIncomingPacket(createBlindedPayload(u.channelUpdate, isIntroduction), outgoingAmount + u.channelUpdate.feeBaseMsat, outgoingExpiry + u.channelUpdate.cltvExpiryDelta, endorsementIn = 0)
channelRelayer ! WrappedLocalChannelUpdate(u)
channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId)
setConfidence(f)(0)
val fwd = expectFwdAdd(register, channelId1, outgoingAmount, outgoingExpiry, 0)
fwd.message.replyTo ! RES_SUCCESS(fwd.message, channelId1)
fwd.message.origin.replyTo ! RES_ADD_SETTLED(fwd.message.origin, downstream, htlcResult)
@ -540,6 +583,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
assert(fail.onionHash == Sphinx.hash(r.add.onionRoutingPacket))
assert(fail.failureCode == InvalidOnionBlinding(Sphinx.hash(r.add.onionRoutingPacket)).code)
}
assert(!reputationRecorder.expectMessageType[ReputationRecorder.RecordResult].isSuccess)
}
}
}
@ -565,6 +609,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
testCases.foreach { testCase =>
channelRelayer ! WrappedLocalChannelUpdate(u)
channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId)
setConfidence(f)(3.5 / 8)
val fwd1 = expectFwdAdd(register, channelIds(realScid1), outgoingAmount, outgoingExpiry, 3)
fwd1.message.replyTo ! RES_SUCCESS(fwd1.message, channelId1)
@ -577,6 +622,8 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
val paymentRelayed = eventListener.expectMessageType[ChannelPaymentRelayed]
assert(paymentRelayed.copy(startedAt = 0 unixms, settledAt = 0 unixms) == ChannelPaymentRelayed(r.add.amountMsat, r.amountToForward, r.add.paymentHash, r.add.channelId, channelId1, startedAt = 0 unixms, settledAt = 0 unixms))
assert(reputationRecorder.expectMessageType[ReputationRecorder.RecordResult].isSuccess)
}
}

View file

@ -41,6 +41,8 @@ import fr.acinq.eclair.payment.send.MultiPartPaymentLifecycle.{PreimageReceived,
import fr.acinq.eclair.payment.send.PaymentInitiator.SendPaymentConfig
import fr.acinq.eclair.payment.send.PaymentLifecycle.SendPaymentToNode
import fr.acinq.eclair.payment.send.{BlindedRecipient, ClearRecipient}
import fr.acinq.eclair.reputation.ReputationRecorder
import fr.acinq.eclair.reputation.ReputationRecorder.{Confidence, GetTrampolineConfidence, RecordTrampolineFailure, RecordTrampolineSuccess}
import fr.acinq.eclair.router.Router.RouteRequest
import fr.acinq.eclair.router.{BalanceTooLow, RouteNotFound, Router}
import fr.acinq.eclair.wire.protocol.OfferTypes._
@ -65,11 +67,11 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl
import NodeRelayerSpec._
case class FixtureParam(nodeParams: NodeParams, router: TestProbe[Any], register: TestProbe[Any], mockPayFSM: TestProbe[Any], eventListener: TestProbe[PaymentEvent], triggerer: TestProbe[AsyncPaymentTriggerer.Command]) {
case class FixtureParam(nodeParams: NodeParams, router: TestProbe[Any], register: TestProbe[Any], reputationRecorder: TestProbe[ReputationRecorder.Command], mockPayFSM: TestProbe[Any], eventListener: TestProbe[PaymentEvent], triggerer: TestProbe[AsyncPaymentTriggerer.Command]) {
def createNodeRelay(packetIn: IncomingPaymentPacket.NodeRelayPacket, useRealPaymentFactory: Boolean = false): (ActorRef[NodeRelay.Command], TestProbe[NodeRelayer.Command]) = {
val parent = TestProbe[NodeRelayer.Command]("parent-relayer")
val outgoingPaymentFactory = if (useRealPaymentFactory) RealOutgoingPaymentFactory(this) else FakeOutgoingPaymentFactory(this)
val nodeRelay = testKit.spawn(NodeRelay(nodeParams, parent.ref, register.ref.toClassic, relayId, packetIn, outgoingPaymentFactory, triggerer.ref, router.ref.toClassic))
val nodeRelay = testKit.spawn(NodeRelay(nodeParams, parent.ref, register.ref.toClassic, reputationRecorder.ref, relayId, packetIn, outgoingPaymentFactory, triggerer.ref, router.ref.toClassic))
(nodeRelay, parent)
}
}
@ -96,17 +98,18 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl
.modify(_.relayParams.asyncPaymentsParams.holdTimeoutBlocks).setToIf(test.tags.contains("long_hold_timeout"))(200000) // timeout after payment expires
val router = TestProbe[Any]("router")
val register = TestProbe[Any]("register")
val reputationRecorder = TestProbe[ReputationRecorder.Command]("reputation-recorder")
val eventListener = TestProbe[PaymentEvent]("event-listener")
system.eventStream ! EventStream.Subscribe(eventListener.ref)
val mockPayFSM = TestProbe[Any]("pay-fsm")
val triggerer = TestProbe[AsyncPaymentTriggerer.Command]("payment-triggerer")
withFixture(test.toNoArgTest(FixtureParam(nodeParams, router, register, mockPayFSM, eventListener, triggerer)))
withFixture(test.toNoArgTest(FixtureParam(nodeParams, router, register, reputationRecorder, mockPayFSM, eventListener, triggerer)))
}
test("create child handlers for new payments") { f =>
import f._
val probe = TestProbe[Any]()
val parentRelayer = testKit.spawn(NodeRelayer(nodeParams, register.ref.toClassic, FakeOutgoingPaymentFactory(f), triggerer.ref, router.ref.toClassic))
val parentRelayer = testKit.spawn(NodeRelayer(nodeParams, register.ref.toClassic, reputationRecorder.ref, FakeOutgoingPaymentFactory(f), triggerer.ref, router.ref.toClassic))
parentRelayer ! NodeRelayer.GetPendingPayments(probe.ref.toClassic)
probe.expectMessage(Map.empty)
@ -145,7 +148,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl
val outgoingPaymentFactory = FakeOutgoingPaymentFactory(f)
{
val parentRelayer = testKit.spawn(NodeRelayer(nodeParams, register.ref.toClassic, outgoingPaymentFactory, triggerer.ref, router.ref.toClassic))
val parentRelayer = testKit.spawn(NodeRelayer(nodeParams, register.ref.toClassic, reputationRecorder.ref, outgoingPaymentFactory, triggerer.ref, router.ref.toClassic))
parentRelayer ! NodeRelayer.GetPendingPayments(probe.ref.toClassic)
probe.expectMessage(Map.empty)
}
@ -153,7 +156,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl
val (paymentHash1, paymentSecret1, child1) = (randomBytes32(), randomBytes32(), TestProbe[NodeRelay.Command]())
val (paymentHash2, paymentSecret2, child2) = (randomBytes32(), randomBytes32(), TestProbe[NodeRelay.Command]())
val children = Map(PaymentKey(paymentHash1, paymentSecret1) -> child1.ref, PaymentKey(paymentHash2, paymentSecret2) -> child2.ref)
val parentRelayer = testKit.spawn(NodeRelayer(nodeParams, register.ref.toClassic, outgoingPaymentFactory, triggerer.ref, router.ref.toClassic, children))
val parentRelayer = testKit.spawn(NodeRelayer(nodeParams, register.ref.toClassic, reputationRecorder.ref, outgoingPaymentFactory, triggerer.ref, router.ref.toClassic, children))
parentRelayer ! NodeRelayer.GetPendingPayments(probe.ref.toClassic)
probe.expectMessage(children)
@ -169,7 +172,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl
val (paymentSecret1, child1) = (randomBytes32(), TestProbe[NodeRelay.Command]())
val (paymentSecret2, child2) = (randomBytes32(), TestProbe[NodeRelay.Command]())
val children = Map(PaymentKey(paymentHash, paymentSecret1) -> child1.ref, PaymentKey(paymentHash, paymentSecret2) -> child2.ref)
val parentRelayer = testKit.spawn(NodeRelayer(nodeParams, register.ref.toClassic, outgoingPaymentFactory, triggerer.ref, router.ref.toClassic, children))
val parentRelayer = testKit.spawn(NodeRelayer(nodeParams, register.ref.toClassic, reputationRecorder.ref, outgoingPaymentFactory, triggerer.ref, router.ref.toClassic, children))
parentRelayer ! NodeRelayer.GetPendingPayments(probe.ref.toClassic)
probe.expectMessage(children)
@ -179,7 +182,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl
probe.expectMessage(Map(PaymentKey(paymentHash, paymentSecret2) -> child2.ref))
}
{
val parentRelayer = testKit.spawn(NodeRelayer(nodeParams, register.ref.toClassic, outgoingPaymentFactory, triggerer.ref, router.ref.toClassic))
val parentRelayer = testKit.spawn(NodeRelayer(nodeParams, register.ref.toClassic, reputationRecorder.ref, outgoingPaymentFactory, triggerer.ref, router.ref.toClassic))
parentRelayer ! NodeRelayer.Relay(incomingMultiPart.head, randomKey().publicKey)
parentRelayer ! NodeRelayer.GetPendingPayments(probe.ref.toClassic)
val pending1 = probe.expectMessageType[Map[PaymentKey, ActorRef[NodeRelay.Command]]]
@ -231,6 +234,8 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl
nextTrampolinePacket)
nodeRelayer ! NodeRelay.Relay(extra, randomKey().publicKey)
reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(1.0)
// the extra payment will be rejected
val fwd = register.expectMessageType[Register.Forward[CMD_FAIL_HTLC]]
assert(fwd.channelId == extra.add.channelId)
@ -247,8 +252,10 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl
// Receive a complete upstream multi-part payment, which we relay out.
incomingMultiPart.foreach(incoming => nodeRelayer ! NodeRelay.Relay(incoming, randomKey().publicKey))
reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(3.5 / 8)
val outgoingCfg = mockPayFSM.expectMessageType[SendPaymentConfig]
validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingMultiPart.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey))), 5)
validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingMultiPart.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey))), 3)
val outgoingPayment = mockPayFSM.expectMessageType[SendMultiPartPayment]
validateOutgoingPayment(outgoingPayment)
@ -375,9 +382,11 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl
assert(peerWatch.timeout == asyncSafetyHeight(incomingAsyncPayment, nodeParams))
peerWatch.replyTo ! AsyncPaymentTriggered
reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(4.5 / 8)
// upstream payment relayed
val outgoingCfg = mockPayFSM.expectMessageType[SendPaymentConfig]
validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingAsyncPayment.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey))), 5)
validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingAsyncPayment.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey))), 4)
val outgoingPayment = mockPayFSM.expectMessageType[SendMultiPartPayment]
validateOutgoingPayment(outgoingPayment)
// those are adapters for pay-fsm messages
@ -398,6 +407,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl
assert(relayEvent.incoming.map(p => (p.amount, p.channelId)).toSet == incomingAsyncPayment.map(i => (i.add.amountMsat, i.add.channelId)).toSet)
assert(relayEvent.outgoing.nonEmpty)
parent.expectMessageType[NodeRelayer.RelayComplete]
reputationRecorder.expectMessageType[RecordTrampolineSuccess]
register.expectNoMessage(100 millis)
}
@ -452,9 +462,11 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl
val (nodeRelayer, parent) = createNodeRelay(incomingAsyncPayment.head)
incomingAsyncPayment.foreach(p => nodeRelayer ! NodeRelay.Relay(p, randomKey().publicKey))
reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(6.5 / 8)
// upstream payment relayed
val outgoingCfg = mockPayFSM.expectMessageType[SendPaymentConfig]
validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingAsyncPayment.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey))), 5)
validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingAsyncPayment.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey))), 6)
val outgoingPayment = mockPayFSM.expectMessageType[SendMultiPartPayment]
validateOutgoingPayment(outgoingPayment)
// those are adapters for pay-fsm messages
@ -475,6 +487,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl
assert(relayEvent.incoming.map(p => (p.amount, p.channelId)).toSet == incomingAsyncPayment.map(i => (i.add.amountMsat, i.add.channelId)).toSet)
assert(relayEvent.outgoing.nonEmpty)
parent.expectMessageType[NodeRelayer.RelayComplete]
reputationRecorder.expectMessageType[RecordTrampolineSuccess]
register.expectNoMessage(100 millis)
}
@ -551,6 +564,8 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl
val (nodeRelayer, _) = f.createNodeRelay(incomingMultiPart.head)
incomingMultiPart.foreach(p => nodeRelayer ! NodeRelay.Relay(p, randomKey().publicKey))
reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(1.0)
mockPayFSM.expectMessageType[SendPaymentConfig]
// those are adapters for pay-fsm messages
val nodeRelayerAdapters = mockPayFSM.expectMessageType[SendMultiPartPayment].replyTo
@ -563,6 +578,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl
assert(fwd.message == CMD_FAIL_HTLC(p.add.id, Right(TrampolineFeeInsufficient()), commit = true))
}
reputationRecorder.expectMessageType[RecordTrampolineFailure]
register.expectNoMessage(100 millis)
eventListener.expectNoMessage(100 millis)
}
@ -578,6 +594,8 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl
val (nodeRelayer, _) = f.createNodeRelay(incoming.head, useRealPaymentFactory = true)
incoming.foreach(p => nodeRelayer ! NodeRelay.Relay(p, randomKey().publicKey))
reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(1.0)
val payFSM = mockPayFSM.expectMessageType[akka.actor.ActorRef]
router.expectMessageType[RouteRequest]
payFSM ! Status.Failure(BalanceTooLow)
@ -588,6 +606,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl
assert(fwd.message == CMD_FAIL_HTLC(p.add.id, Right(TemporaryNodeFailure()), commit = true))
}
reputationRecorder.expectMessageType[RecordTrampolineFailure]
register.expectNoMessage(100 millis)
}
@ -598,6 +617,8 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl
val (nodeRelayer, _) = f.createNodeRelay(incomingMultiPart.head, useRealPaymentFactory = true)
incomingMultiPart.foreach(p => nodeRelayer ! NodeRelay.Relay(p, randomKey().publicKey))
reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(1.0)
val payFSM = mockPayFSM.expectMessageType[akka.actor.ActorRef]
router.expectMessageType[RouteRequest]
@ -611,6 +632,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl
assert(fwd.message == CMD_FAIL_HTLC(p.add.id, Right(TrampolineFeeInsufficient()), commit = true))
}
reputationRecorder.expectMessageType[RecordTrampolineFailure]
register.expectNoMessage(100 millis)
}
@ -621,6 +643,8 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl
val (nodeRelayer, _) = f.createNodeRelay(incomingMultiPart.head, useRealPaymentFactory = true)
incomingMultiPart.foreach(p => nodeRelayer ! NodeRelay.Relay(p, randomKey().publicKey))
reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(1.0)
val payFSM = mockPayFSM.expectMessageType[akka.actor.ActorRef]
router.expectMessageType[RouteRequest]
@ -633,6 +657,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl
assert(fwd.message == CMD_FAIL_HTLC(p.add.id, Right(FinalIncorrectHtlcAmount(42 msat)), commit = true))
}
reputationRecorder.expectMessageType[RecordTrampolineFailure]
register.expectNoMessage(100 millis)
}
@ -643,6 +668,8 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl
val (nodeRelayer, _) = f.createNodeRelay(incomingSinglePart, useRealPaymentFactory = true)
nodeRelayer ! NodeRelay.Relay(incomingSinglePart, randomKey().publicKey)
reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(1.0)
val routeRequest = router.expectMessageType[RouteRequest]
val routeParams = routeRequest.routeParams
assert(routeParams.boundaries.maxFeeProportional == 0) // should be disabled
@ -661,8 +688,10 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl
nodeRelayer ! NodeRelay.Relay(incomingMultiPart.last, randomKey().publicKey)
reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(3.5 / 8)
val outgoingCfg = mockPayFSM.expectMessageType[SendPaymentConfig]
validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingMultiPart.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey))), 5)
validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingMultiPart.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey))), 3)
val outgoingPayment = mockPayFSM.expectMessageType[SendMultiPartPayment]
validateOutgoingPayment(outgoingPayment)
// those are adapters for pay-fsm messages
@ -686,6 +715,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl
validateRelayEvent(relayEvent)
assert(relayEvent.incoming.map(p => (p.amount, p.channelId)).toSet == incomingMultiPart.map(i => (i.add.amountMsat, i.add.channelId)).toSet)
assert(relayEvent.outgoing.nonEmpty)
reputationRecorder.expectMessageType[RecordTrampolineSuccess]
parent.expectMessageType[NodeRelayer.RelayComplete]
register.expectNoMessage(100 millis)
}
@ -697,6 +727,8 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl
val (nodeRelayer, parent) = f.createNodeRelay(incomingSinglePart)
nodeRelayer ! NodeRelay.Relay(incomingSinglePart, randomKey().publicKey)
reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(1.0)
val outgoingCfg = mockPayFSM.expectMessageType[SendPaymentConfig]
validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(Upstream.Hot.Channel(incomingSinglePart.add, TimestampMilli.now(), randomKey().publicKey) :: Nil), 7)
val outgoingPayment = mockPayFSM.expectMessageType[SendMultiPartPayment]
@ -715,6 +747,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl
validateRelayEvent(relayEvent)
assert(relayEvent.incoming.map(p => (p.amount, p.channelId)) == Seq((incomingSinglePart.add.amountMsat, incomingSinglePart.add.channelId)))
assert(relayEvent.outgoing.nonEmpty)
reputationRecorder.expectMessageType[RecordTrampolineSuccess]
parent.expectMessageType[NodeRelayer.RelayComplete]
register.expectNoMessage(100 millis)
}
@ -732,8 +765,10 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl
val (nodeRelayer, parent) = f.createNodeRelay(incomingPayments.head)
incomingPayments.foreach(incoming => nodeRelayer ! NodeRelay.Relay(incoming, randomKey().publicKey))
reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(2.5 / 8)
val outgoingCfg = mockPayFSM.expectMessageType[SendPaymentConfig]
validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingMultiPart.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey))), 5)
validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingMultiPart.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey))), 2)
val outgoingPayment = mockPayFSM.expectMessageType[SendMultiPartPayment]
assert(outgoingPayment.recipient.nodeId == outgoingNodeId)
assert(outgoingPayment.recipient.totalAmount == outgoingAmount)
@ -759,6 +794,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl
validateRelayEvent(relayEvent)
assert(relayEvent.incoming.map(p => (p.amount, p.channelId)) == incomingMultiPart.map(i => (i.add.amountMsat, i.add.channelId)))
assert(relayEvent.outgoing.nonEmpty)
reputationRecorder.expectMessageType[RecordTrampolineSuccess]
parent.expectMessageType[NodeRelayer.RelayComplete]
register.expectNoMessage(100 millis)
}
@ -776,8 +812,10 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl
val (nodeRelayer, parent) = f.createNodeRelay(incomingPayments.head)
incomingPayments.foreach(incoming => nodeRelayer ! NodeRelay.Relay(incoming, randomKey().publicKey))
reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(0)
val outgoingCfg = mockPayFSM.expectMessageType[SendPaymentConfig]
validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingMultiPart.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey))), 5)
validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingMultiPart.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey))), 0)
val outgoingPayment = mockPayFSM.expectMessageType[SendPaymentToNode]
assert(outgoingPayment.recipient.nodeId == outgoingNodeId)
assert(outgoingPayment.amount == outgoingAmount)
@ -803,6 +841,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl
validateRelayEvent(relayEvent)
assert(relayEvent.incoming.map(p => (p.amount, p.channelId)) == incomingMultiPart.map(i => (i.add.amountMsat, i.add.channelId)))
assert(relayEvent.outgoing.length == 1)
reputationRecorder.expectMessageType[RecordTrampolineSuccess]
parent.expectMessageType[NodeRelayer.RelayComplete]
register.expectNoMessage(100 millis)
}
@ -845,8 +884,10 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl
val (nodeRelayer, parent) = f.createNodeRelay(incomingPayments.head)
incomingPayments.foreach(incoming => nodeRelayer ! NodeRelay.Relay(incoming, randomKey().publicKey))
reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(1.0)
val outgoingCfg = mockPayFSM.expectMessageType[SendPaymentConfig]
validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingMultiPart.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey))), 5, ignoreNodeId = true)
validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingMultiPart.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey))), 7, ignoreNodeId = true)
val outgoingPayment = mockPayFSM.expectMessageType[SendPaymentToNode]
assert(outgoingPayment.amount == outgoingAmount)
assert(outgoingPayment.recipient.expiry == outgoingExpiry)
@ -884,8 +925,10 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl
val (nodeRelayer, parent) = f.createNodeRelay(incomingPayments.head)
incomingPayments.foreach(incoming => nodeRelayer ! NodeRelay.Relay(incoming, randomKey().publicKey))
reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(1.5 / 8)
val outgoingCfg = mockPayFSM.expectMessageType[SendPaymentConfig]
validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingMultiPart.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey))), 5, ignoreNodeId = true)
validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingMultiPart.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey))), 1, ignoreNodeId = true)
val outgoingPayment = mockPayFSM.expectMessageType[SendMultiPartPayment]
assert(outgoingPayment.recipient.totalAmount == outgoingAmount)
assert(outgoingPayment.recipient.expiry == outgoingExpiry)
@ -926,13 +969,15 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl
val (nodeRelayer, parent) = f.createNodeRelay(incomingPayments.head)
incomingPayments.foreach(incoming => nodeRelayer ! NodeRelay.Relay(incoming, randomKey().publicKey))
reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(4.5 / 8)
val getNodeId = router.expectMessageType[Router.GetNodeId]
assert(getNodeId.isNode1 == scidDir.isNode1)
assert(getNodeId.shortChannelId == scidDir.scid)
getNodeId.replyTo ! Some(outgoingNodeId)
val outgoingCfg = mockPayFSM.expectMessageType[SendPaymentConfig]
validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingMultiPart.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey))), 5, ignoreNodeId = true)
validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingMultiPart.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey))), 4, ignoreNodeId = true)
val outgoingPayment = mockPayFSM.expectMessageType[SendPaymentToNode]
assert(outgoingPayment.amount == outgoingAmount)
assert(outgoingPayment.recipient.expiry == outgoingExpiry)
@ -973,6 +1018,8 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl
val (nodeRelayer, _) = f.createNodeRelay(incomingPayments.head)
incomingPayments.foreach(incoming => nodeRelayer ! NodeRelay.Relay(incoming, randomKey().publicKey))
reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(1.0)
val getNodeId = router.expectMessageType[Router.GetNodeId]
assert(getNodeId.isNode1 == scidDir.isNode1)
assert(getNodeId.shortChannelId == scidDir.scid)

View file

@ -33,6 +33,7 @@ import fr.acinq.eclair.payment.OutgoingPaymentPacket.{NodePayload, buildOnion, b
import fr.acinq.eclair.payment.PaymentPacketSpec._
import fr.acinq.eclair.payment.relay.Relayer._
import fr.acinq.eclair.payment.send.{ClearRecipient, TrampolineRecipient}
import fr.acinq.eclair.reputation.ReputationRecorder
import fr.acinq.eclair.router.BaseRouterSpec.{blindedRouteFromHops, channelHopFromUpdate}
import fr.acinq.eclair.router.Router.{NodeHop, Route}
import fr.acinq.eclair.wire.protocol.PaymentOnion.FinalPayload
@ -46,7 +47,7 @@ import scala.concurrent.duration.DurationInt
class RelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("application")) with FixtureAnyFunSuiteLike {
case class FixtureParam(nodeParams: NodeParams, relayer: akka.actor.ActorRef, router: TestProbe[Any], register: TestProbe[Any], childActors: ChildActors, paymentHandler: TestProbe[Any])
case class FixtureParam(nodeParams: NodeParams, relayer: akka.actor.ActorRef, router: TestProbe[Any], register: TestProbe[Any], childActors: ChildActors, paymentHandler: TestProbe[Any], reputationRecorder: TestProbe[ReputationRecorder.Command])
override def withFixture(test: OneArgTest): Outcome = {
// we are node B in the route A -> B -> C -> ....
@ -56,17 +57,26 @@ class RelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("applicat
val register = TestProbe[Any]("register")
val paymentHandler = TestProbe[Any]("payment-handler")
val triggerer = TestProbe[AsyncPaymentTriggerer.Command]("payment-triggerer")
val reputationRecorder = TestProbe[ReputationRecorder.Command]("reputation-recorder")
val probe = TestProbe[Any]()
// we can't spawn top-level actors with akka typed
testKit.spawn(Behaviors.setup[Any] { context =>
val relayer = context.toClassic.actorOf(Relayer.props(nodeParams, router.ref.toClassic, register.ref.toClassic, paymentHandler.ref.toClassic, triggerer.ref))
val relayer = context.toClassic.actorOf(Relayer.props(nodeParams, router.ref.toClassic, register.ref.toClassic, paymentHandler.ref.toClassic, triggerer.ref, reputationRecorder.ref))
probe.ref ! relayer
Behaviors.empty[Any]
})
val relayer = probe.expectMessageType[akka.actor.ActorRef]
relayer ! GetChildActors(probe.ref.toClassic)
val childActors = probe.expectMessageType[ChildActors]
withFixture(test.toNoArgTest(FixtureParam(nodeParams, relayer, router, register, childActors, paymentHandler)))
withFixture(test.toNoArgTest(FixtureParam(nodeParams, relayer, router, register, childActors, paymentHandler, reputationRecorder)))
}
def setConfidence(f: FixtureParam)(value: Double): Unit = {
import f._
val getConfidence = reputationRecorder.expectMessageType[ReputationRecorder.GetConfidence]
assert(getConfidence.originNode == TestConstants.Alice.nodeParams.nodeId)
getConfidence.replyTo ! ReputationRecorder.Confidence(value)
}
val channelId_ab = randomBytes32()
@ -94,6 +104,7 @@ class RelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("applicat
// and then manually build an htlc
val add_ab = UpdateAddHtlc(randomBytes32(), 123456, payment.cmd.amount, payment.cmd.paymentHash, payment.cmd.cltvExpiry, payment.cmd.onion, None, 1.0)
relayer ! RelayForward(add_ab, priv_a.publicKey)
setConfidence(f)(1.0)
register.expectMessageType[Register.Forward[CMD_ADD_HTLC]]
}

View file

@ -0,0 +1,97 @@
/*
* Copyright 2023 ACINQ SAS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package fr.acinq.eclair.reputation
import akka.actor.testkit.typed.scaladsl.{ScalaTestWithActorTestKit, TestProbe}
import akka.actor.typed.ActorRef
import com.typesafe.config.ConfigFactory
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.eclair.reputation.Reputation.ReputationConfig
import fr.acinq.eclair.reputation.ReputationRecorder._
import fr.acinq.eclair.{MilliSatoshiLong, randomKey}
import org.scalatest.Outcome
import org.scalatest.funsuite.FixtureAnyFunSuiteLike
import java.util.UUID
import scala.concurrent.duration.DurationInt
class ReputationRecorderSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("application")) with FixtureAnyFunSuiteLike {
val (uuid1, uuid2, uuid3, uuid4, uuid5, uuid6, uuid7, uuid8) = (UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID())
val originNode: PublicKey = randomKey().publicKey
case class FixtureParam(config: ReputationConfig, reputationRecorder: ActorRef[Command], replyTo: TestProbe[Confidence])
override def withFixture(test: OneArgTest): Outcome = {
val config = ReputationConfig(1 day, 10 seconds, 2)
val replyTo = TestProbe[Confidence]("confidence")
val reputationRecorder = testKit.spawn(ReputationRecorder(config, Map.empty))
withFixture(test.toNoArgTest(FixtureParam(config, reputationRecorder.ref, replyTo)))
}
test("standard") { f =>
import f._
reputationRecorder ! GetConfidence(replyTo.ref, originNode, 7, uuid1, 2000 msat)
assert(replyTo.expectMessageType[Confidence].value == 0)
reputationRecorder ! RecordResult(originNode, 7, uuid1, isSuccess = true)
reputationRecorder ! GetConfidence(replyTo.ref, originNode, 7, uuid2, 1000 msat)
assert(replyTo.expectMessageType[Confidence].value === (2.0 / 4) +- 0.001)
reputationRecorder ! GetConfidence(replyTo.ref, originNode, 7, uuid3, 3000 msat)
assert(replyTo.expectMessageType[Confidence].value === (2.0 / 10) +- 0.001)
reputationRecorder ! CancelRelay(originNode, 7, uuid3)
reputationRecorder ! GetConfidence(replyTo.ref, originNode, 7, uuid4, 1000 msat)
assert(replyTo.expectMessageType[Confidence].value === (2.0 / 6) +- 0.001)
reputationRecorder ! RecordResult(originNode, 7, uuid4, isSuccess = true)
reputationRecorder ! RecordResult(originNode, 7, uuid2, isSuccess = false)
// Not endorsed
reputationRecorder ! GetConfidence(replyTo.ref, originNode, 0, uuid5, 1000 msat)
assert(replyTo.expectMessageType[Confidence].value == 0)
// Different origin node
reputationRecorder ! GetConfidence(replyTo.ref, randomKey().publicKey, 7, uuid6, 1000 msat)
assert(replyTo.expectMessageType[Confidence].value == 0)
// Very large HTLC
reputationRecorder ! GetConfidence(replyTo.ref, originNode, 7, uuid5, 100000000 msat)
assert(replyTo.expectMessageType[Confidence].value === 0.0 +- 0.001)
}
test("trampoline") { f =>
import f._
val (a, b, c) = (randomKey().publicKey, randomKey().publicKey, randomKey().publicKey)
reputationRecorder ! GetTrampolineConfidence(replyTo.ref, Map((a, 7) -> 2000.msat, (b, 7) -> 4000.msat, (c, 0) -> 6000.msat), uuid1)
assert(replyTo.expectMessageType[Confidence].value == 0)
reputationRecorder ! RecordTrampolineSuccess(Map((a, 7) -> 1000.msat, (b, 7) -> 2000.msat, (c, 0) -> 3000.msat), uuid1)
reputationRecorder ! GetTrampolineConfidence(replyTo.ref, Map((a, 7) -> 1000.msat, (c, 0) -> 1000.msat), uuid2)
assert(replyTo.expectMessageType[Confidence].value === (1.0 / 3) +- 0.001)
reputationRecorder ! GetTrampolineConfidence(replyTo.ref, Map((a, 0) -> 1000.msat, (b, 7) -> 2000.msat), uuid3)
assert(replyTo.expectMessageType[Confidence].value == 0)
reputationRecorder ! RecordTrampolineFailure(Set((a, 7), (c, 0)), uuid2)
reputationRecorder ! RecordTrampolineSuccess(Map((a, 0) -> 1000.msat, (b, 7) -> 2000.msat), uuid3)
reputationRecorder ! GetConfidence(replyTo.ref, a, 7, uuid4, 1000 msat)
assert(replyTo.expectMessageType[Confidence].value === (1.0 / 4) +- 0.001)
reputationRecorder ! GetConfidence(replyTo.ref, a, 0, uuid5, 1000 msat)
assert(replyTo.expectMessageType[Confidence].value === (1.0 / 3) +- 0.001)
reputationRecorder ! GetConfidence(replyTo.ref, b, 7, uuid6, 1000 msat)
assert(replyTo.expectMessageType[Confidence].value === (4.0 / 6) +- 0.001)
reputationRecorder ! GetConfidence(replyTo.ref, b, 0, uuid7, 1000 msat)
assert(replyTo.expectMessageType[Confidence].value == 0.0)
reputationRecorder ! GetConfidence(replyTo.ref, c, 0, uuid8, 1000 msat)
assert(replyTo.expectMessageType[Confidence].value === (3.0 / 6) +- 0.001)
}
}

View file

@ -0,0 +1,81 @@
/*
* Copyright 2023 ACINQ SAS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package fr.acinq.eclair.reputation
import fr.acinq.eclair.{MilliSatoshiLong, TimestampMilli}
import fr.acinq.eclair.reputation.Reputation.ReputationConfig
import org.scalatest.funsuite.AnyFunSuite
import org.scalactic.Tolerance.convertNumericToPlusOrMinusWrapper
import java.util.UUID
import scala.concurrent.duration.DurationInt
class ReputationSpec extends AnyFunSuite {
val (uuid1, uuid2, uuid3, uuid4, uuid5, uuid6, uuid7) = (UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID())
test("basic") {
val r0 = Reputation.init(ReputationConfig(1 day, 1 second, 2))
val (r1, c1) = r0.attempt(uuid1, 10000 msat)
assert(c1 == 0)
val r2 = r1.record(uuid1, isSuccess = true)
val (r3, c3) = r2.attempt(uuid2, 10000 msat)
assert(c3 === (1.0 / 3) +- 0.001)
val (r4, c4) = r3.attempt(uuid3, 10000 msat)
assert(c4 === (1.0 / 5) +- 0.001)
val r5 = r4.record(uuid2, isSuccess = true)
val r6 = r5.record(uuid3, isSuccess = true)
val (r7, c7) = r6.attempt(uuid4, 1 msat)
assert(c7 === 1.0 +- 0.001)
val (r8, c8) = r7.attempt(uuid5, 40000 msat)
assert(c8 === (3.0 / 11) +- 0.001)
val (r9, c9) = r8.attempt(uuid6, 10000 msat)
assert(c9 === (3.0 / 13) +- 0.001)
val r10 = r9.cancel(uuid5)
val r11 = r10.record(uuid6, isSuccess = false)
val (_, c12) = r11.attempt(uuid7, 10000 msat)
assert(c12 === (3.0 / 6) +- 0.001)
}
test("long HTLC") {
val r0 = Reputation.init(ReputationConfig(1000 day, 1 second, 10))
val (r1, c1) = r0.attempt(uuid1, 100000 msat, TimestampMilli(0))
assert(c1 == 0)
val r2 = r1.record(uuid1, isSuccess = true, now = TimestampMilli(0))
val (r3, c3) = r2.attempt(uuid2, 1000 msat, TimestampMilli(0))
assert(c3 === (10.0 / 11) +- 0.001)
val r4 = r3.record(uuid2, isSuccess = false, now = TimestampMilli(0) + 100.seconds)
val (_, c5) = r4.attempt(uuid3, 0 msat, now = TimestampMilli(0) + 100.seconds)
assert(c5 === 0.5 +- 0.001)
}
test("exponential decay") {
val r0 = Reputation.init(ReputationConfig(100 seconds, 1 second, 1))
val (r1, _) = r0.attempt(uuid1, 1000 msat, TimestampMilli(0))
val r2 = r1.record(uuid1, isSuccess = true, now = TimestampMilli(0))
val (r3, c3) = r2.attempt(uuid2, 1000 msat, TimestampMilli(0))
assert(c3 == 1.0 / 2)
val r4 = r3.record(uuid2, isSuccess = true, now = TimestampMilli(0))
val (r5, c5) = r4.attempt(uuid3, 1000 msat, TimestampMilli(0))
assert(c5 == 2.0 / 3)
val r6 = r5.record(uuid3, isSuccess = true, now = TimestampMilli(0))
val (r7, c7) = r6.attempt(uuid4, 1000 msat, TimestampMilli(0) + 100.seconds)
assert(c7 == 1.5 / 2.5)
val r8 = r7.cancel(uuid4)
val (_, c9) = r8.attempt(uuid5, 1000 msat, TimestampMilli(0) + 1.hour)
assert(c9 < 0.000001)
}
}