1
0
mirror of https://github.com/ACINQ/eclair.git synced 2024-11-19 01:43:22 +01:00

Record begin and end timestamps for relays (#2701)

For fighting jamming attempts, or even just to detect one, we need to know how fast relayed HTLCs are fulfilled. We now measure this and store it in the audit database. Previously the "IN" and "OUT" directions for the same HTLC were storing the same timestamp (corresponding to when the HTLC is fulfilled), we now use the timestamp at which we received the UpdateAddHtlc for the "IN" direction.
This commit is contained in:
Thomas HUET 2023-06-23 13:00:23 +02:00 committed by GitHub
parent 1519dd07a4
commit 9db0063079
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 141 additions and 109 deletions

View File

@ -176,7 +176,7 @@ object Origin {
object Hot {
def apply(replyTo: ActorRef, upstream: Upstream): Hot = upstream match {
case u: Upstream.Local => Origin.LocalHot(replyTo, u.id)
case u: Upstream.Trampoline => Origin.TrampolineRelayedHot(replyTo, u.adds)
case u: Upstream.Trampoline => Origin.TrampolineRelayedHot(replyTo, u.adds.map(_.add))
}
}
}

View File

@ -76,12 +76,12 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with DiagnosticActorL
.withTag(PaymentTags.Relay, PaymentTags.RelayType(e))
.record((e.amountIn - e.amountOut).truncateToSatoshi.toLong)
e match {
case TrampolinePaymentRelayed(_, incoming, outgoing, _, _, _) =>
case TrampolinePaymentRelayed(_, incoming, outgoing, _, _) =>
PaymentMetrics.PaymentParts.withTag(PaymentTags.Direction, PaymentTags.Directions.Received).record(incoming.length)
PaymentMetrics.PaymentParts.withTag(PaymentTags.Direction, PaymentTags.Directions.Sent).record(outgoing.length)
incoming.foreach(p => channelsDb.updateChannelMeta(p.channelId, ChannelEvent.EventType.PaymentReceived))
outgoing.foreach(p => channelsDb.updateChannelMeta(p.channelId, ChannelEvent.EventType.PaymentSent))
case ChannelPaymentRelayed(_, _, _, fromChannelId, toChannelId, _) =>
case ChannelPaymentRelayed(_, _, _, fromChannelId, toChannelId, _, _) =>
channelsDb.updateChannelMeta(fromChannelId, ChannelEvent.EventType.PaymentReceived)
channelsDb.updateChannelMeta(toChannelId, ChannelEvent.EventType.PaymentSent)
}

View File

@ -165,7 +165,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
migration910(statement)
}
if (v < 11) {
migration1011(statement)
migration1011(statement)
}
case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do
case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
@ -228,10 +228,10 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
override def add(e: PaymentRelayed): Unit = withMetrics("audit/add-payment-relayed", DbBackends.Postgres) {
inTransaction { pg =>
val payments = e match {
case ChannelPaymentRelayed(amountIn, amountOut, _, fromChannelId, toChannelId, ts) =>
case ChannelPaymentRelayed(amountIn, amountOut, _, fromChannelId, toChannelId, startedAt, settledAt) =>
// non-trampoline relayed payments have one input and one output
Seq(RelayedPart(fromChannelId, amountIn, "IN", "channel", ts), RelayedPart(toChannelId, amountOut, "OUT", "channel", ts))
case TrampolinePaymentRelayed(_, incoming, outgoing, nextTrampolineNodeId, nextTrampolineAmount, ts) =>
Seq(RelayedPart(fromChannelId, amountIn, "IN", "channel", startedAt), RelayedPart(toChannelId, amountOut, "OUT", "channel", settledAt))
case TrampolinePaymentRelayed(_, incoming, outgoing, nextTrampolineNodeId, nextTrampolineAmount) =>
using(pg.prepareStatement("INSERT INTO audit.relayed_trampoline VALUES (?, ?, ?, ?)")) { statement =>
statement.setString(1, e.paymentHash.toHex)
statement.setLong(2, nextTrampolineAmount.toLong)
@ -240,7 +240,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
statement.executeUpdate()
}
// trampoline relayed payments do MPP aggregation and may have M inputs and N outputs
incoming.map(i => RelayedPart(i.channelId, i.amount, "IN", "trampoline", ts)) ++ outgoing.map(o => RelayedPart(o.channelId, o.amount, "OUT", "trampoline", ts))
incoming.map(i => RelayedPart(i.channelId, i.amount, "IN", "trampoline", i.receivedAt)) ++ outgoing.map(o => RelayedPart(o.channelId, o.amount, "OUT", "trampoline", o.settledAt))
}
for (p <- payments) {
using(pg.prepareStatement("INSERT INTO audit.relayed VALUES (?, ?, ?, ?, ?, ?)")) { statement =>
@ -249,7 +249,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
statement.setString(3, p.channelId.toHex)
statement.setString(4, p.direction)
statement.setString(5, p.relayType)
statement.setTimestamp(6, e.timestamp.toSqlTimestamp)
statement.setTimestamp(6, p.timestamp.toSqlTimestamp)
statement.executeUpdate()
}
}
@ -425,15 +425,15 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
case (paymentHash, parts) =>
// We may have been routing multiple payments for the same payment_hash (MPP) in both cases (trampoline and channel).
// NB: we may link the wrong in-out parts, but the overall sum will be correct: we sort by amounts to minimize the risk of mismatch.
val incoming = parts.filter(_.direction == "IN").map(p => PaymentRelayed.Part(p.amount, p.channelId)).sortBy(_.amount)
val outgoing = parts.filter(_.direction == "OUT").map(p => PaymentRelayed.Part(p.amount, p.channelId)).sortBy(_.amount)
val incoming = parts.filter(_.direction == "IN").map(p => PaymentRelayed.IncomingPart(p.amount, p.channelId, p.timestamp)).sortBy(_.amount)
val outgoing = parts.filter(_.direction == "OUT").map(p => PaymentRelayed.OutgoingPart(p.amount, p.channelId, p.timestamp)).sortBy(_.amount)
parts.headOption match {
case Some(RelayedPart(_, _, _, "channel", timestamp)) => incoming.zip(outgoing).map {
case (in, out) => ChannelPaymentRelayed(in.amount, out.amount, paymentHash, in.channelId, out.channelId, timestamp)
case Some(RelayedPart(_, _, _, "channel", _)) => incoming.zip(outgoing).map {
case (in, out) => ChannelPaymentRelayed(in.amount, out.amount, paymentHash, in.channelId, out.channelId, in.receivedAt, out.settledAt)
}
case Some(RelayedPart(_, _, _, "trampoline", timestamp)) =>
case Some(RelayedPart(_, _, _, "trampoline", _)) =>
val (nextTrampolineAmount, nextTrampolineNodeId) = trampolineByHash.getOrElse(paymentHash, (0 msat, PlaceHolderPubKey))
TrampolinePaymentRelayed(paymentHash, incoming, outgoing, nextTrampolineNodeId, nextTrampolineAmount, timestamp) :: Nil
TrampolinePaymentRelayed(paymentHash, incoming, outgoing, nextTrampolineNodeId, nextTrampolineAmount) :: Nil
case _ => Nil
}
}.toSeq.sortBy(_.timestamp)

View File

@ -216,10 +216,10 @@ class SqliteAuditDb(val sqlite: Connection) extends AuditDb with Logging {
override def add(e: PaymentRelayed): Unit = withMetrics("audit/add-payment-relayed", DbBackends.Sqlite) {
val payments = e match {
case ChannelPaymentRelayed(amountIn, amountOut, _, fromChannelId, toChannelId, ts) =>
case ChannelPaymentRelayed(amountIn, amountOut, _, fromChannelId, toChannelId, startedAt, settledAt) =>
// non-trampoline relayed payments have one input and one output
Seq(RelayedPart(fromChannelId, amountIn, "IN", "channel", ts), RelayedPart(toChannelId, amountOut, "OUT", "channel", ts))
case TrampolinePaymentRelayed(_, incoming, outgoing, nextTrampolineNodeId, nextTrampolineAmount, ts) =>
Seq(RelayedPart(fromChannelId, amountIn, "IN", "channel", startedAt), RelayedPart(toChannelId, amountOut, "OUT", "channel", settledAt))
case TrampolinePaymentRelayed(_, incoming, outgoing, nextTrampolineNodeId, nextTrampolineAmount) =>
using(sqlite.prepareStatement("INSERT INTO relayed_trampoline VALUES (?, ?, ?, ?)")) { statement =>
statement.setBytes(1, e.paymentHash.toArray)
statement.setLong(2, nextTrampolineAmount.toLong)
@ -228,8 +228,8 @@ class SqliteAuditDb(val sqlite: Connection) extends AuditDb with Logging {
statement.executeUpdate()
}
// trampoline relayed payments do MPP aggregation and may have M inputs and N outputs
incoming.map(i => RelayedPart(i.channelId, i.amount, "IN", "trampoline", ts)) ++
outgoing.map(o => RelayedPart(o.channelId, o.amount, "OUT", "trampoline", ts))
incoming.map(i => RelayedPart(i.channelId, i.amount, "IN", "trampoline", i.receivedAt)) ++
outgoing.map(o => RelayedPart(o.channelId, o.amount, "OUT", "trampoline", o.settledAt))
}
for (p <- payments) {
using(sqlite.prepareStatement("INSERT INTO relayed VALUES (?, ?, ?, ?, ?, ?)")) { statement =>
@ -238,7 +238,7 @@ class SqliteAuditDb(val sqlite: Connection) extends AuditDb with Logging {
statement.setBytes(3, p.channelId.toArray)
statement.setString(4, p.direction)
statement.setString(5, p.relayType)
statement.setLong(6, e.timestamp.toLong)
statement.setLong(6, p.timestamp.toLong)
statement.executeUpdate()
}
}
@ -397,15 +397,15 @@ class SqliteAuditDb(val sqlite: Connection) extends AuditDb with Logging {
case (paymentHash, parts) =>
// We may have been routing multiple payments for the same payment_hash (MPP) in both cases (trampoline and channel).
// NB: we may link the wrong in-out parts, but the overall sum will be correct: we sort by amounts to minimize the risk of mismatch.
val incoming = parts.filter(_.direction == "IN").map(p => PaymentRelayed.Part(p.amount, p.channelId)).sortBy(_.amount)
val outgoing = parts.filter(_.direction == "OUT").map(p => PaymentRelayed.Part(p.amount, p.channelId)).sortBy(_.amount)
val incoming = parts.filter(_.direction == "IN").map(p => PaymentRelayed.IncomingPart(p.amount, p.channelId, p.timestamp)).sortBy(_.amount)
val outgoing = parts.filter(_.direction == "OUT").map(p => PaymentRelayed.OutgoingPart(p.amount, p.channelId, p.timestamp)).sortBy(_.amount)
parts.headOption match {
case Some(RelayedPart(_, _, _, "channel", timestamp)) => incoming.zip(outgoing).map {
case (in, out) => ChannelPaymentRelayed(in.amount, out.amount, paymentHash, in.channelId, out.channelId, timestamp)
case Some(RelayedPart(_, _, _, "channel", _)) => incoming.zip(outgoing).map {
case (in, out) => ChannelPaymentRelayed(in.amount, out.amount, paymentHash, in.channelId, out.channelId, in.receivedAt, out.settledAt)
}
case Some(RelayedPart(_, _, _, "trampoline", timestamp)) =>
case Some(RelayedPart(_, _, _, "trampoline", _)) =>
val (nextTrampolineAmount, nextTrampolineNodeId) = trampolineByHash.getOrElse(paymentHash, (0 msat, PlaceHolderPubKey))
TrampolinePaymentRelayed(paymentHash, incoming, outgoing, nextTrampolineNodeId, nextTrampolineAmount, timestamp) :: Nil
TrampolinePaymentRelayed(paymentHash, incoming, outgoing, nextTrampolineNodeId, nextTrampolineAmount) :: Nil
case _ => Nil
}
}.toSeq.sortBy(_.timestamp)

View File

@ -33,6 +33,7 @@ object Monitoring {
val PaymentAttempt = Kamon.histogram("payment.attempt", "Number of attempts before a payment succeeds")
val SentPaymentDuration = Kamon.timer("payment.duration.sent", "Outgoing payment duration")
val ReceivedPaymentDuration = Kamon.timer("payment.duration.received", "Incoming payment duration")
val RelayedPaymentDuration = Kamon.timer("payment.duration.relayed", "Duration of pending downstream HTLCs during a relay")
// The goal of this metric is to measure whether retrying MPP payments on failing channels yields useful results.
// Once enough data has been collected, we will update the MultiPartPaymentLifecycle logic accordingly.

View File

@ -86,22 +86,29 @@ case class PaymentFailed(id: UUID, paymentHash: ByteVector32, failures: Seq[Paym
sealed trait PaymentRelayed extends PaymentEvent {
val amountIn: MilliSatoshi
val amountOut: MilliSatoshi
val timestamp: TimestampMilli
val startedAt: TimestampMilli
val settledAt: TimestampMilli
}
case class ChannelPaymentRelayed(amountIn: MilliSatoshi, amountOut: MilliSatoshi, paymentHash: ByteVector32, fromChannelId: ByteVector32, toChannelId: ByteVector32, timestamp: TimestampMilli = TimestampMilli.now()) extends PaymentRelayed
case class ChannelPaymentRelayed(amountIn: MilliSatoshi, amountOut: MilliSatoshi, paymentHash: ByteVector32, fromChannelId: ByteVector32, toChannelId: ByteVector32, startedAt: TimestampMilli, settledAt: TimestampMilli) extends PaymentRelayed {
override val timestamp: TimestampMilli = settledAt
}
case class TrampolinePaymentRelayed(paymentHash: ByteVector32, incoming: PaymentRelayed.Incoming, outgoing: PaymentRelayed.Outgoing, nextTrampolineNodeId: PublicKey, nextTrampolineAmount: MilliSatoshi, timestamp: TimestampMilli = TimestampMilli.now()) extends PaymentRelayed {
case class TrampolinePaymentRelayed(paymentHash: ByteVector32, incoming: PaymentRelayed.Incoming, outgoing: PaymentRelayed.Outgoing, nextTrampolineNodeId: PublicKey, nextTrampolineAmount: MilliSatoshi) extends PaymentRelayed {
override val amountIn: MilliSatoshi = incoming.map(_.amount).sum
override val amountOut: MilliSatoshi = outgoing.map(_.amount).sum
override val startedAt: TimestampMilli = incoming.map(_.receivedAt).minOption.getOrElse(TimestampMilli.now())
override val settledAt: TimestampMilli = outgoing.map(_.settledAt).maxOption.getOrElse(TimestampMilli.now())
override val timestamp: TimestampMilli = settledAt
}
object PaymentRelayed {
case class Part(amount: MilliSatoshi, channelId: ByteVector32)
case class IncomingPart(amount: MilliSatoshi, channelId: ByteVector32, receivedAt: TimestampMilli)
case class OutgoingPart(amount: MilliSatoshi, channelId: ByteVector32, settledAt: TimestampMilli)
type Incoming = Seq[Part]
type Outgoing = Seq[Part]
type Incoming = Seq[IncomingPart]
type Outgoing = Seq[OutgoingPart]
}

View File

@ -25,7 +25,7 @@ import fr.acinq.eclair.payment.send.Recipient
import fr.acinq.eclair.router.Router.{BlindedHop, ChannelHop, Route}
import fr.acinq.eclair.wire.protocol.PaymentOnion.{FinalPayload, IntermediatePayload, PerHopPayload}
import fr.acinq.eclair.wire.protocol._
import fr.acinq.eclair.{CltvExpiry, CltvExpiryDelta, Feature, Features, MilliSatoshi, ShortChannelId, UInt64, randomKey}
import fr.acinq.eclair.{CltvExpiry, CltvExpiryDelta, Feature, Features, MilliSatoshi, ShortChannelId, TimestampMilli, UInt64, randomKey}
import scodec.bits.ByteVector
import scodec.{Attempt, DecodeResult}
@ -241,10 +241,12 @@ object OutgoingPaymentPacket {
sealed trait Upstream
object Upstream {
case class Local(id: UUID) extends Upstream
case class Trampoline(adds: Seq[UpdateAddHtlc]) extends Upstream {
val amountIn: MilliSatoshi = adds.map(_.amountMsat).sum
val expiryIn: CltvExpiry = adds.map(_.cltvExpiry).min
case class Trampoline(adds: Seq[ReceivedHtlc]) extends Upstream {
val amountIn: MilliSatoshi = adds.map(_.add.amountMsat).sum
val expiryIn: CltvExpiry = adds.map(_.add.cltvExpiry).min
}
case class ReceivedHtlc(add: UpdateAddHtlc, receivedAt: TimestampMilli)
}
// @formatter:on

View File

@ -31,9 +31,10 @@ import fr.acinq.eclair.payment.{ChannelPaymentRelayed, IncomingPaymentPacket}
import fr.acinq.eclair.wire.protocol.FailureMessageCodecs.createBadOnionFailure
import fr.acinq.eclair.wire.protocol.PaymentOnion.IntermediatePayload
import fr.acinq.eclair.wire.protocol._
import fr.acinq.eclair.{Logs, NodeParams, TimestampSecond, channel, nodeFee}
import fr.acinq.eclair.{Logs, NodeParams, TimestampMilli, TimestampSecond, channel, nodeFee}
import java.util.UUID
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.DurationLong
import scala.util.Random
@ -103,7 +104,8 @@ class ChannelRelay private(nodeParams: NodeParams,
register: ActorRef,
channels: Map[ByteVector32, Relayer.OutgoingChannel],
r: IncomingPaymentPacket.ChannelRelayPacket,
context: ActorContext[ChannelRelay.Command]) {
context: ActorContext[ChannelRelay.Command],
startedAt: TimestampMilli = TimestampMilli.now()) {
import ChannelRelay._
@ -155,13 +157,15 @@ class ChannelRelay private(nodeParams: NodeParams,
case WrappedAddResponse(RES_ADD_SETTLED(o: Origin.ChannelRelayedHot, htlc, fulfill: HtlcResult.Fulfill)) =>
context.log.debug("relaying fulfill to upstream")
val cmd = CMD_FULFILL_HTLC(o.originHtlcId, fulfill.paymentPreimage, commit = true)
context.system.eventStream ! EventStream.Publish(ChannelPaymentRelayed(o.amountIn, o.amountOut, htlc.paymentHash, o.originChannelId, htlc.channelId))
context.system.eventStream ! EventStream.Publish(ChannelPaymentRelayed(o.amountIn, o.amountOut, htlc.paymentHash, o.originChannelId, htlc.channelId, startedAt, TimestampMilli.now()))
recordRelayDuration(isSuccess = true)
safeSendAndStop(o.originChannelId, cmd)
case WrappedAddResponse(RES_ADD_SETTLED(o: Origin.ChannelRelayedHot, _, fail: HtlcResult.Fail)) =>
context.log.debug("relaying fail to upstream")
Metrics.recordPaymentRelayFailed(Tags.FailureType.Remote, Tags.RelayType.Channel)
val cmd = translateRelayFailure(o.originHtlcId, fail)
recordRelayDuration(isSuccess = false)
safeSendAndStop(o.originChannelId, cmd)
}
@ -310,4 +314,9 @@ class ChannelRelay private(nodeParams: NodeParams,
}
}
private def recordRelayDuration(isSuccess: Boolean): Unit =
Metrics.RelayedPaymentDuration
.withTag(Tags.Relay, Tags.RelayType.Channel)
.withTag(Tags.Success, isSuccess)
.record((TimestampMilli.now() - startedAt).toMillis, TimeUnit.MILLISECONDS)
}

View File

@ -39,9 +39,10 @@ import fr.acinq.eclair.router.Router.RouteParams
import fr.acinq.eclair.router.{BalanceTooLow, RouteNotFound}
import fr.acinq.eclair.wire.protocol.PaymentOnion.IntermediatePayload
import fr.acinq.eclair.wire.protocol._
import fr.acinq.eclair.{CltvExpiry, Features, Logs, MilliSatoshi, NodeParams, UInt64, nodeFee, randomBytes32}
import fr.acinq.eclair.{CltvExpiry, Features, Logs, MilliSatoshi, NodeParams, TimestampMilli, UInt64, nodeFee, randomBytes32}
import java.util.UUID
import java.util.concurrent.TimeUnit
import scala.collection.immutable.Queue
/**
@ -182,13 +183,13 @@ class NodeRelay private(nodeParams: NodeParams,
* @param nextPacket trampoline onion to relay to the next trampoline node.
* @param handler actor handling the aggregation of the incoming HTLC set.
*/
private def receiving(htlcs: Queue[UpdateAddHtlc], nextPayload: IntermediatePayload.NodeRelay.Standard, nextPacket: OnionRoutingPacket, handler: ActorRef): Behavior[Command] =
private def receiving(htlcs: Queue[Upstream.ReceivedHtlc], nextPayload: IntermediatePayload.NodeRelay.Standard, nextPacket: OnionRoutingPacket, handler: ActorRef): Behavior[Command] =
Behaviors.receiveMessagePartial {
case Relay(IncomingPaymentPacket.NodeRelayPacket(add, outer, _, _)) =>
require(outer.paymentSecret == paymentSecret, "payment secret mismatch")
context.log.debug("forwarding incoming htlc #{} from channel {} to the payment FSM", add.id, add.channelId)
handler ! MultiPartPaymentFSM.HtlcPart(outer.totalAmount, add)
receiving(htlcs :+ add, nextPayload, nextPacket, handler)
receiving(htlcs :+ Upstream.ReceivedHtlc(add, TimestampMilli.now()), nextPayload, nextPacket, handler)
case WrappedMultiPartPaymentFailed(MultiPartPaymentFSM.MultiPartPaymentFailed(_, failure, parts)) =>
context.log.warn("could not complete incoming multi-part payment (parts={} paidAmount={} failure={})", parts.size, parts.map(_.amount).sum, failure)
Metrics.recordPaymentRelayFailed(failure.getClass.getSimpleName, Tags.RelayType.Trampoline)
@ -238,7 +239,7 @@ class NodeRelay private(nodeParams: NodeParams,
private def doSend(upstream: Upstream.Trampoline, nextPayload: IntermediatePayload.NodeRelay.Standard, nextPacket: OnionRoutingPacket): Behavior[Command] = {
context.log.debug(s"relaying trampoline payment (amountIn=${upstream.amountIn} expiryIn=${upstream.expiryIn} amountOut=${nextPayload.amountToForward} expiryOut=${nextPayload.outgoingCltv})")
relay(upstream, nextPayload, nextPacket)
sending(upstream, nextPayload, fulfilledUpstream = false)
sending(upstream, nextPayload, TimestampMilli.now(), fulfilledUpstream = false)
}
/**
@ -248,7 +249,7 @@ class NodeRelay private(nodeParams: NodeParams,
* @param nextPayload relay instructions.
* @param fulfilledUpstream true if we already fulfilled the payment upstream.
*/
private def sending(upstream: Upstream.Trampoline, nextPayload: IntermediatePayload.NodeRelay.Standard, fulfilledUpstream: Boolean): Behavior[Command] =
private def sending(upstream: Upstream.Trampoline, nextPayload: IntermediatePayload.NodeRelay.Standard, startedAt: TimestampMilli, fulfilledUpstream: Boolean): Behavior[Command] =
Behaviors.receiveMessagePartial {
rejectExtraHtlcPartialFunction orElse {
// this is the fulfill that arrives from downstream channels
@ -257,7 +258,7 @@ class NodeRelay private(nodeParams: NodeParams,
// We want to fulfill upstream as soon as we receive the preimage (even if not all HTLCs have fulfilled downstream).
context.log.debug("got preimage from downstream")
fulfillPayment(upstream, paymentPreimage)
sending(upstream, nextPayload, fulfilledUpstream = true)
sending(upstream, nextPayload, startedAt, fulfilledUpstream = true)
} else {
// we don't want to fulfill multiple times
Behaviors.same
@ -265,12 +266,14 @@ class NodeRelay private(nodeParams: NodeParams,
case WrappedPaymentSent(paymentSent) =>
context.log.debug("trampoline payment fully resolved downstream")
success(upstream, fulfilledUpstream, paymentSent)
recordRelayDuration(startedAt, isSuccess = true)
stopping()
case WrappedPaymentFailed(PaymentFailed(_, _, failures, _)) =>
context.log.debug(s"trampoline payment failed downstream")
if (!fulfilledUpstream) {
rejectPayment(upstream, translateError(nodeParams, failures, upstream, nextPayload))
}
recordRelayDuration(startedAt, isSuccess = fulfilledUpstream)
stopping()
}
}
@ -353,12 +356,12 @@ class NodeRelay private(nodeParams: NodeParams,
private def rejectPayment(upstream: Upstream.Trampoline, failure: Option[FailureMessage]): Unit = {
Metrics.recordPaymentRelayFailed(failure.map(_.getClass.getSimpleName).getOrElse("Unknown"), Tags.RelayType.Trampoline)
upstream.adds.foreach(add => rejectHtlc(add.id, add.channelId, upstream.amountIn, failure))
upstream.adds.foreach(r => rejectHtlc(r.add.id, r.add.channelId, upstream.amountIn, failure))
}
private def fulfillPayment(upstream: Upstream.Trampoline, paymentPreimage: ByteVector32): Unit = upstream.adds.foreach(add => {
val cmd = CMD_FULFILL_HTLC(add.id, paymentPreimage, commit = true)
PendingCommandsDb.safeSend(register, nodeParams.db.pendingCommands, add.channelId, cmd)
private def fulfillPayment(upstream: Upstream.Trampoline, paymentPreimage: ByteVector32): Unit = upstream.adds.foreach(r => {
val cmd = CMD_FULFILL_HTLC(r.add.id, paymentPreimage, commit = true)
PendingCommandsDb.safeSend(register, nodeParams.db.pendingCommands, r.add.channelId, cmd)
})
private def success(upstream: Upstream.Trampoline, fulfilledUpstream: Boolean, paymentSent: PaymentSent): Unit = {
@ -366,9 +369,14 @@ class NodeRelay private(nodeParams: NodeParams,
if (!fulfilledUpstream) {
fulfillPayment(upstream, paymentSent.paymentPreimage)
}
val incoming = upstream.adds.map(add => PaymentRelayed.Part(add.amountMsat, add.channelId))
val outgoing = paymentSent.parts.map(part => PaymentRelayed.Part(part.amountWithFees, part.toChannelId))
val incoming = upstream.adds.map(r => PaymentRelayed.IncomingPart(r.add.amountMsat, r.add.channelId, r.receivedAt))
val outgoing = paymentSent.parts.map(part => PaymentRelayed.OutgoingPart(part.amountWithFees, part.toChannelId, part.timestamp))
context.system.eventStream ! EventStream.Publish(TrampolinePaymentRelayed(paymentHash, incoming, outgoing, paymentSent.recipientNodeId, paymentSent.recipientAmount))
}
private def recordRelayDuration(startedAt: TimestampMilli, isSuccess: Boolean): Unit =
Metrics.RelayedPaymentDuration
.withTag(Tags.Relay, Tags.RelayType.Trampoline)
.withTag(Tags.Success, isSuccess)
.record((TimestampMilli.now() - startedAt).toMillis, TimeUnit.MILLISECONDS)
}

View File

@ -190,7 +190,8 @@ class PostRestartHtlcCleaner(nodeParams: NodeParams, register: ActorRef, initial
log.error(s"unexpected channel relay downstream HTLCs: expected (${fulfilledHtlc.channelId},${fulfilledHtlc.id}), found $relayedOut")
}
PendingCommandsDb.safeSend(register, nodeParams.db.pendingCommands, originChannelId, CMD_FULFILL_HTLC(originHtlcId, paymentPreimage, commit = true))
context.system.eventStream.publish(ChannelPaymentRelayed(amountIn, amountOut, fulfilledHtlc.paymentHash, originChannelId, fulfilledHtlc.channelId))
// We don't know when we received this HTLC so we just pretend that we received it just now.
context.system.eventStream.publish(ChannelPaymentRelayed(amountIn, amountOut, fulfilledHtlc.paymentHash, originChannelId, fulfilledHtlc.channelId, TimestampMilli.now(), TimestampMilli.now()))
Metrics.PendingRelayedOut.decrement()
context become main(brokenHtlcs.copy(relayedOut = brokenHtlcs.relayedOut - origin))

View File

@ -15,7 +15,7 @@ import fr.acinq.eclair.db.pg.PgUtils.using
import fr.acinq.eclair.payment.OutgoingPaymentPacket.Upstream
import fr.acinq.eclair.wire.internal.channel.ChannelCodecs.channelDataCodec
import fr.acinq.eclair.wire.protocol.{CommitSig, Error, RevokeAndAck, TlvStream, UpdateAddHtlc, UpdateAddHtlcTlv}
import fr.acinq.eclair.{BlockHeight, CltvExpiry, CltvExpiryDelta, MilliSatoshiLong, TestConstants, TestKitBaseClass, ToMilliSatoshiConversion, randomBytes32}
import fr.acinq.eclair.{BlockHeight, CltvExpiry, CltvExpiryDelta, MilliSatoshiLong, TestConstants, TestKitBaseClass, TimestampMilli, ToMilliSatoshiConversion, randomBytes32}
import org.scalatest.Outcome
import org.scalatest.funsuite.FixtureAnyFunSuiteLike
import org.sqlite.SQLiteConfig
@ -170,7 +170,7 @@ class CheckBalanceSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with
val (ra2, htlca2) = addHtlc(100000000 msat, alice, bob, alice2bob, bob2alice)
val (_, htlca3) = addHtlc(10000 msat, alice, bob, alice2bob, bob2alice)
// for this one we set a non-local upstream to simulate a relayed payment
val (_, htlca4) = addHtlc(30000000 msat, CltvExpiryDelta(144), alice, bob, alice2bob, bob2alice, upstream = Upstream.Trampoline(UpdateAddHtlc(randomBytes32(), 42, 30003000 msat, randomBytes32(), CltvExpiry(144), TestConstants.emptyOnionPacket, TlvStream.empty[UpdateAddHtlcTlv]) :: Nil), replyTo = TestProbe().ref)
val (_, htlca4) = addHtlc(30000000 msat, CltvExpiryDelta(144), alice, bob, alice2bob, bob2alice, upstream = Upstream.Trampoline(Upstream.ReceivedHtlc(UpdateAddHtlc(randomBytes32(), 42, 30003000 msat, randomBytes32(), CltvExpiry(144), TestConstants.emptyOnionPacket, TlvStream.empty[UpdateAddHtlcTlv]), TimestampMilli(1687345927000L)) :: Nil), replyTo = TestProbe().ref)
val (rb1, htlcb1) = addHtlc(50000000 msat, bob, alice, bob2alice, alice2bob)
val (_, _) = addHtlc(55000000 msat, bob, alice, bob2alice, alice2bob)
crossSign(alice, bob, alice2bob, bob2alice)

View File

@ -68,7 +68,7 @@ class AuditDbSpec extends AnyFunSuite {
val pp2a = PaymentReceived.PartialPayment(42000 msat, randomBytes32())
val pp2b = PaymentReceived.PartialPayment(42100 msat, randomBytes32())
val e2 = PaymentReceived(randomBytes32(), pp2a :: pp2b :: Nil)
val e3 = ChannelPaymentRelayed(42000 msat, 1000 msat, randomBytes32(), randomBytes32(), randomBytes32())
val e3 = ChannelPaymentRelayed(42000 msat, 1000 msat, randomBytes32(), randomBytes32(), randomBytes32(), TimestampMilli.now() - 3.seconds, TimestampMilli.now())
val e4a = TransactionPublished(randomBytes32(), randomKey().publicKey, Transaction(0, Seq.empty, Seq.empty, 0), 42 sat, "mutual")
val e4b = TransactionConfirmed(e4a.channelId, e4a.remoteNodeId, e4a.tx)
val e4c = TransactionConfirmed(randomBytes32(), randomKey().publicKey, Transaction(2, Nil, TxOut(500 sat, hex"1234") :: Nil, 0))
@ -80,11 +80,11 @@ class AuditDbSpec extends AnyFunSuite {
val e7 = ChannelEvent(randomBytes32(), randomKey().publicKey, 456123000 sat, isInitiator = true, isPrivate = false, ChannelEvent.EventType.Closed(MutualClose(null)))
val e8 = ChannelErrorOccurred(null, randomBytes32(), randomKey().publicKey, LocalError(new RuntimeException("oops")), isFatal = true)
val e9 = ChannelErrorOccurred(null, randomBytes32(), randomKey().publicKey, RemoteError(Error(randomBytes32(), "remote oops")), isFatal = true)
val e10 = TrampolinePaymentRelayed(randomBytes32(), Seq(PaymentRelayed.Part(20000 msat, randomBytes32()), PaymentRelayed.Part(22000 msat, randomBytes32())), Seq(PaymentRelayed.Part(10000 msat, randomBytes32()), PaymentRelayed.Part(12000 msat, randomBytes32()), PaymentRelayed.Part(15000 msat, randomBytes32())), randomKey().publicKey, 30000 msat)
val e10 = TrampolinePaymentRelayed(randomBytes32(), Seq(PaymentRelayed.IncomingPart(20000 msat, randomBytes32(), TimestampMilli.now() - 7.seconds), PaymentRelayed.IncomingPart(22000 msat, randomBytes32(), TimestampMilli.now() - 5.seconds)), Seq(PaymentRelayed.OutgoingPart(10000 msat, randomBytes32(), TimestampMilli.now()), PaymentRelayed.OutgoingPart(12000 msat, randomBytes32(), TimestampMilli.now()), PaymentRelayed.OutgoingPart(15000 msat, randomBytes32(), TimestampMilli.now())), randomKey().publicKey, 30000 msat)
val multiPartPaymentHash = randomBytes32()
val now = TimestampMilli.now()
val e11 = ChannelPaymentRelayed(13000 msat, 11000 msat, multiPartPaymentHash, randomBytes32(), randomBytes32(), now)
val e12 = ChannelPaymentRelayed(15000 msat, 12500 msat, multiPartPaymentHash, randomBytes32(), randomBytes32(), now)
val e11 = ChannelPaymentRelayed(13000 msat, 11000 msat, multiPartPaymentHash, randomBytes32(), randomBytes32(), now - 5.seconds, now)
val e12 = ChannelPaymentRelayed(15000 msat, 12500 msat, multiPartPaymentHash, randomBytes32(), randomBytes32(), now - 4.seconds, now)
db.add(e1)
db.add(e2)
@ -137,13 +137,13 @@ class AuditDbSpec extends AnyFunSuite {
val c5 = randomBytes32()
val c6 = randomBytes32()
db.add(ChannelPaymentRelayed(46000 msat, 44000 msat, randomBytes32(), c6, c1))
db.add(ChannelPaymentRelayed(41000 msat, 40000 msat, randomBytes32(), c6, c1))
db.add(ChannelPaymentRelayed(43000 msat, 42000 msat, randomBytes32(), c5, c1))
db.add(ChannelPaymentRelayed(42000 msat, 40000 msat, randomBytes32(), c5, c2))
db.add(ChannelPaymentRelayed(45000 msat, 40000 msat, randomBytes32(), c5, c6))
db.add(TrampolinePaymentRelayed(randomBytes32(), Seq(PaymentRelayed.Part(25000 msat, c6)), Seq(PaymentRelayed.Part(20000 msat, c4)), randomKey().publicKey, 15000 msat))
db.add(TrampolinePaymentRelayed(randomBytes32(), Seq(PaymentRelayed.Part(46000 msat, c6)), Seq(PaymentRelayed.Part(16000 msat, c2), PaymentRelayed.Part(10000 msat, c4), PaymentRelayed.Part(14000 msat, c4)), randomKey().publicKey, 37000 msat))
db.add(ChannelPaymentRelayed(46000 msat, 44000 msat, randomBytes32(), c6, c1, 1000 unixms, 1001 unixms))
db.add(ChannelPaymentRelayed(41000 msat, 40000 msat, randomBytes32(), c6, c1, 1002 unixms, 1003 unixms))
db.add(ChannelPaymentRelayed(43000 msat, 42000 msat, randomBytes32(), c5, c1, 1004 unixms, 1005 unixms))
db.add(ChannelPaymentRelayed(42000 msat, 40000 msat, randomBytes32(), c5, c2, 1006 unixms, 1007 unixms))
db.add(ChannelPaymentRelayed(45000 msat, 40000 msat, randomBytes32(), c5, c6, 1008 unixms, 1009 unixms))
db.add(TrampolinePaymentRelayed(randomBytes32(), Seq(PaymentRelayed.IncomingPart(25000 msat, c6, 1010 unixms)), Seq(PaymentRelayed.OutgoingPart(20000 msat, c4, 1011 unixms)), randomKey().publicKey, 15000 msat))
db.add(TrampolinePaymentRelayed(randomBytes32(), Seq(PaymentRelayed.IncomingPart(46000 msat, c6, 1012 unixms)), Seq(PaymentRelayed.OutgoingPart(16000 msat, c2, 1013 unixms), PaymentRelayed.OutgoingPart(10000 msat, c4, 1014 unixms), PaymentRelayed.OutgoingPart(14000 msat, c4, 1015 unixms)), randomKey().publicKey, 37000 msat))
// The following confirmed txs will be taken into account.
db.add(TransactionPublished(c2, n2, Transaction(0, Seq.empty, Seq(TxOut(5000 sat, hex"12345")), 0), 200 sat, "funding"))
@ -197,12 +197,12 @@ class AuditDbSpec extends AnyFunSuite {
// 25% trampoline relays.
if (Random.nextInt(4) == 0) {
val outgoingCount = 1 + Random.nextInt(4)
val incoming = Seq(PaymentRelayed.Part(10000 msat, randomBytes32()))
val outgoing = (1 to outgoingCount).map(_ => PaymentRelayed.Part(Random.nextInt(2000).msat, channelIds(Random.nextInt(channelCount))))
val incoming = Seq(PaymentRelayed.IncomingPart(10000 msat, randomBytes32(), TimestampMilli.now() - 3.seconds))
val outgoing = (1 to outgoingCount).map(_ => PaymentRelayed.OutgoingPart(Random.nextInt(2000).msat, channelIds(Random.nextInt(channelCount)), TimestampMilli.now()))
db.add(TrampolinePaymentRelayed(randomBytes32(), incoming, outgoing, randomKey().publicKey, 5000 msat))
} else {
val toChannelId = channelIds(Random.nextInt(channelCount))
db.add(ChannelPaymentRelayed(10000 msat, Random.nextInt(10000).msat, randomBytes32(), randomBytes32(), toChannelId))
db.add(ChannelPaymentRelayed(10000 msat, Random.nextInt(10000).msat, randomBytes32(), randomBytes32(), toChannelId, TimestampMilli.now() - 2.seconds, TimestampMilli.now()))
}
})
// Test starts here.
@ -334,8 +334,8 @@ class AuditDbSpec extends AnyFunSuite {
val pp2 = PaymentSent.PartialPayment(UUID.randomUUID(), 600 msat, 5 msat, randomBytes32(), None, 110 unixms)
val ps1 = PaymentSent(UUID.randomUUID(), randomBytes32(), randomBytes32(), 1100 msat, PrivateKey(ByteVector32.One).publicKey, pp1 :: pp2 :: Nil)
val relayed1 = ChannelPaymentRelayed(600 msat, 500 msat, randomBytes32(), randomBytes32(), randomBytes32(), 105 unixms)
val relayed2 = ChannelPaymentRelayed(650 msat, 500 msat, randomBytes32(), randomBytes32(), randomBytes32(), 115 unixms)
val relayed1 = ChannelPaymentRelayed(600 msat, 500 msat, randomBytes32(), randomBytes32(), randomBytes32(), 105 unixms, 105 unixms)
val relayed2 = ChannelPaymentRelayed(650 msat, 500 msat, randomBytes32(), randomBytes32(), randomBytes32(), 115 unixms, 115 unixms)
migrationCheck(
dbs = dbs,
@ -407,7 +407,7 @@ class AuditDbSpec extends AnyFunSuite {
PaymentSent.PartialPayment(UUID.randomUUID(), 500 msat, 10 msat, randomBytes32(), None, 160 unixms),
PaymentSent.PartialPayment(UUID.randomUUID(), 600 msat, 5 msat, randomBytes32(), None, 165 unixms)
))
val relayed3 = TrampolinePaymentRelayed(randomBytes32(), Seq(PaymentRelayed.Part(450 msat, randomBytes32()), PaymentRelayed.Part(500 msat, randomBytes32())), Seq(PaymentRelayed.Part(800 msat, randomBytes32())), randomKey().publicKey, 700 msat, 150 unixms)
val relayed3 = TrampolinePaymentRelayed(randomBytes32(), Seq(PaymentRelayed.IncomingPart(450 msat, randomBytes32(), 150 unixms), PaymentRelayed.IncomingPart(500 msat, randomBytes32(), 150 unixms)), Seq(PaymentRelayed.OutgoingPart(800 msat, randomBytes32(), 150 unixms)), randomKey().publicKey, 700 msat)
postMigrationDb.add(ps2)
assert(postMigrationDb.listSent(155 unixms, 200 unixms) == Seq(ps2))
postMigrationDb.add(relayed3)
@ -418,8 +418,8 @@ class AuditDbSpec extends AnyFunSuite {
test("migrate audit database v4 -> current") {
val relayed1 = ChannelPaymentRelayed(600 msat, 500 msat, randomBytes32(), randomBytes32(), randomBytes32(), 105 unixms)
val relayed2 = TrampolinePaymentRelayed(randomBytes32(), Seq(PaymentRelayed.Part(300 msat, randomBytes32()), PaymentRelayed.Part(350 msat, randomBytes32())), Seq(PaymentRelayed.Part(600 msat, randomBytes32())), PlaceHolderPubKey, 0 msat, 110 unixms)
val relayed1 = ChannelPaymentRelayed(600 msat, 500 msat, randomBytes32(), randomBytes32(), randomBytes32(), 105 unixms, 105 unixms)
val relayed2 = TrampolinePaymentRelayed(randomBytes32(), Seq(PaymentRelayed.IncomingPart(300 msat, randomBytes32(), 110 unixms), PaymentRelayed.IncomingPart(350 msat, randomBytes32(), 110 unixms)), Seq(PaymentRelayed.OutgoingPart(600 msat, randomBytes32(), 110 unixms)), PlaceHolderPubKey, 0 msat)
forAllDbs {
case dbs: TestPgDatabases =>
@ -498,7 +498,7 @@ class AuditDbSpec extends AnyFunSuite {
using(connection.createStatement()) { statement =>
assert(getVersion(statement, "audit").contains(PgAuditDb.CURRENT_VERSION))
}
val relayed3 = TrampolinePaymentRelayed(randomBytes32(), Seq(PaymentRelayed.Part(450 msat, randomBytes32()), PaymentRelayed.Part(500 msat, randomBytes32())), Seq(PaymentRelayed.Part(800 msat, randomBytes32())), randomKey().publicKey, 700 msat, 150 unixms)
val relayed3 = TrampolinePaymentRelayed(randomBytes32(), Seq(PaymentRelayed.IncomingPart(450 msat, randomBytes32(), 150 unixms), PaymentRelayed.IncomingPart(500 msat, randomBytes32(), 150 unixms)), Seq(PaymentRelayed.OutgoingPart(800 msat, randomBytes32(), 150 unixms)), randomKey().publicKey, 700 msat)
postMigrationDb.add(relayed3)
assert(postMigrationDb.listRelayed(100 unixms, 160 unixms) == Seq(relayed1, relayed2, relayed3))
}
@ -581,7 +581,7 @@ class AuditDbSpec extends AnyFunSuite {
using(connection.createStatement()) { statement =>
assert(getVersion(statement, "audit").contains(SqliteAuditDb.CURRENT_VERSION))
}
val relayed3 = TrampolinePaymentRelayed(randomBytes32(), Seq(PaymentRelayed.Part(450 msat, randomBytes32()), PaymentRelayed.Part(500 msat, randomBytes32())), Seq(PaymentRelayed.Part(800 msat, randomBytes32())), randomKey().publicKey, 700 msat, 150 unixms)
val relayed3 = TrampolinePaymentRelayed(randomBytes32(), Seq(PaymentRelayed.IncomingPart(450 msat, randomBytes32(), 150 unixms), PaymentRelayed.IncomingPart(500 msat, randomBytes32(), 150 unixms)), Seq(PaymentRelayed.OutgoingPart(800 msat, randomBytes32(), 150 unixms)), randomKey().publicKey, 700 msat)
postMigrationDb.add(relayed3)
assert(postMigrationDb.listRelayed(100 unixms, 160 unixms) == Seq(relayed1, relayed2, relayed3))
}

View File

@ -173,7 +173,7 @@ class PgUtilsSpec extends TestKitBaseClass with AnyFunSuiteLike with Eventually
db.network.addNode(Announcements.makeNodeAnnouncement(randomKey(), "node-A", Color(50, 99, -80), Nil, Features.empty, TimestampSecond.now() - 45.days))
db.network.addNode(Announcements.makeNodeAnnouncement(randomKey(), "node-B", Color(50, 99, -80), Nil, Features.empty, TimestampSecond.now() - 3.days))
db.network.addNode(Announcements.makeNodeAnnouncement(randomKey(), "node-C", Color(50, 99, -80), Nil, Features.empty, TimestampSecond.now() - 7.minutes))
db.audit.add(ChannelPaymentRelayed(421 msat, 400 msat, randomBytes32(), randomBytes32(), randomBytes32(), TimestampMilli.now() - 3.seconds))
db.audit.add(ChannelPaymentRelayed(421 msat, 400 msat, randomBytes32(), randomBytes32(), randomBytes32(), TimestampMilli.now() - 5.seconds, TimestampMilli.now() - 3.seconds))
db.dataSource.close()
}

View File

@ -36,7 +36,7 @@ import fr.acinq.eclair.wire.protocol.OfferTypes.{InvoiceRequest, Offer, PaymentI
import fr.acinq.eclair.wire.protocol.OnionPaymentPayloadTlv.{AmountToForward, OutgoingCltv, PaymentData}
import fr.acinq.eclair.wire.protocol.PaymentOnion.{FinalPayload, IntermediatePayload}
import fr.acinq.eclair.wire.protocol._
import fr.acinq.eclair.{BlockHeight, Bolt11Feature, Bolt12Feature, CltvExpiry, CltvExpiryDelta, Features, MilliSatoshi, MilliSatoshiLong, ShortChannelId, TestConstants, TimestampSecondLong, UInt64, nodeFee, randomBytes32, randomKey}
import fr.acinq.eclair.{BlockHeight, Bolt11Feature, Bolt12Feature, CltvExpiry, CltvExpiryDelta, Features, MilliSatoshi, MilliSatoshiLong, ShortChannelId, TestConstants, TimestampMilli, TimestampSecondLong, UInt64, nodeFee, randomBytes32, randomKey}
import org.scalatest.BeforeAndAfterAll
import org.scalatest.funsuite.AnyFunSuite
import scodec.bits.{ByteVector, HexStringSyntax}
@ -314,7 +314,7 @@ class PaymentPacketSpec extends AnyFunSuite with BeforeAndAfterAll {
// c forwards the trampoline payment to e through d.
val recipient_e = ClearRecipient(e, Features.empty, inner_c.amountToForward, inner_c.outgoingCltv, randomBytes32(), nextTrampolineOnion_opt = Some(trampolinePacket_e))
val Right(payment_e) = buildOutgoingPayment(ActorRef.noSender, priv_c.privateKey, Upstream.Trampoline(Seq(add_c)), paymentHash, Route(inner_c.amountToForward, afterTrampolineChannelHops, None), recipient_e)
val Right(payment_e) = buildOutgoingPayment(ActorRef.noSender, priv_c.privateKey, Upstream.Trampoline(Seq(Upstream.ReceivedHtlc(add_c, TimestampMilli(1687345927000L)))), paymentHash, Route(inner_c.amountToForward, afterTrampolineChannelHops, None), recipient_e)
assert(payment_e.outgoingChannel == channelUpdate_cd.shortChannelId)
assert(payment_e.cmd.amount == amount_cd)
assert(payment_e.cmd.cltvExpiry == expiry_cd)
@ -365,7 +365,7 @@ class PaymentPacketSpec extends AnyFunSuite with BeforeAndAfterAll {
// c forwards the trampoline payment to e through d.
val recipient_e = ClearRecipient(e, Features.empty, inner_c.amountToForward, inner_c.outgoingCltv, inner_c.paymentSecret.get, invoice.extraEdges, inner_c.paymentMetadata)
val Right(payment_e) = buildOutgoingPayment(ActorRef.noSender, priv_c.privateKey, Upstream.Trampoline(Seq(add_c)), paymentHash, Route(inner_c.amountToForward, afterTrampolineChannelHops, None), recipient_e)
val Right(payment_e) = buildOutgoingPayment(ActorRef.noSender, priv_c.privateKey, Upstream.Trampoline(Seq(Upstream.ReceivedHtlc(add_c, TimestampMilli(1687345927000L)))), paymentHash, Route(inner_c.amountToForward, afterTrampolineChannelHops, None), recipient_e)
assert(payment_e.outgoingChannel == channelUpdate_cd.shortChannelId)
assert(payment_e.cmd.amount == amount_cd)
assert(payment_e.cmd.cltvExpiry == expiry_cd)
@ -443,7 +443,7 @@ class PaymentPacketSpec extends AnyFunSuite with BeforeAndAfterAll {
// c forwards an invalid trampoline onion to e through d.
val recipient_e = ClearRecipient(e, Features.empty, inner_c.amountToForward, inner_c.outgoingCltv, randomBytes32(), nextTrampolineOnion_opt = Some(trampolinePacket_e.copy(payload = trampolinePacket_e.payload.reverse)))
val Right(payment_e) = buildOutgoingPayment(ActorRef.noSender, priv_c.privateKey, Upstream.Trampoline(Seq(add_c)), paymentHash, Route(inner_c.amountToForward, afterTrampolineChannelHops, None), recipient_e)
val Right(payment_e) = buildOutgoingPayment(ActorRef.noSender, priv_c.privateKey, Upstream.Trampoline(Seq(Upstream.ReceivedHtlc(add_c, TimestampMilli(1687345927000L)))), paymentHash, Route(inner_c.amountToForward, afterTrampolineChannelHops, None), recipient_e)
assert(payment_e.outgoingChannel == channelUpdate_cd.shortChannelId)
val add_d = UpdateAddHtlc(randomBytes32(), 3, payment_e.cmd.amount, paymentHash, payment_e.cmd.cltvExpiry, payment_e.cmd.onion, None)
val Right(ChannelRelayPacket(_, _, packet_e)) = decrypt(add_d, priv_d.privateKey, Features.empty)
@ -589,7 +589,7 @@ class PaymentPacketSpec extends AnyFunSuite with BeforeAndAfterAll {
// c forwards an invalid amount to e through (the outer total amount doesn't match the inner amount).
val invalidTotalAmount = inner_c.amountToForward - 1.msat
val recipient_e = ClearRecipient(e, Features.empty, invalidTotalAmount, inner_c.outgoingCltv, randomBytes32(), nextTrampolineOnion_opt = Some(trampolinePacket_e))
val Right(payment_e) = buildOutgoingPayment(ActorRef.noSender, priv_c.privateKey, Upstream.Trampoline(Seq(add_c)), paymentHash, Route(invalidTotalAmount, afterTrampolineChannelHops, None), recipient_e)
val Right(payment_e) = buildOutgoingPayment(ActorRef.noSender, priv_c.privateKey, Upstream.Trampoline(Seq(Upstream.ReceivedHtlc(add_c, TimestampMilli(1687345927000L)))), paymentHash, Route(invalidTotalAmount, afterTrampolineChannelHops, None), recipient_e)
val add_d = UpdateAddHtlc(randomBytes32(), 3, payment_e.cmd.amount, paymentHash, payment_e.cmd.cltvExpiry, payment_e.cmd.onion, None)
val Right(ChannelRelayPacket(_, payload_d, packet_e)) = decrypt(add_d, priv_d.privateKey, Features.empty)
@ -605,7 +605,7 @@ class PaymentPacketSpec extends AnyFunSuite with BeforeAndAfterAll {
// c forwards an invalid amount to e through (the outer expiry doesn't match the inner expiry).
val invalidExpiry = inner_c.outgoingCltv - CltvExpiryDelta(12)
val recipient_e = ClearRecipient(e, Features.empty, inner_c.amountToForward, invalidExpiry, randomBytes32(), nextTrampolineOnion_opt = Some(trampolinePacket_e))
val Right(payment_e) = buildOutgoingPayment(ActorRef.noSender, priv_c.privateKey, Upstream.Trampoline(Seq(add_c)), paymentHash, Route(inner_c.amountToForward, afterTrampolineChannelHops, None), recipient_e)
val Right(payment_e) = buildOutgoingPayment(ActorRef.noSender, priv_c.privateKey, Upstream.Trampoline(Seq(Upstream.ReceivedHtlc(add_c, TimestampMilli(1687345927000L)))), paymentHash, Route(inner_c.amountToForward, afterTrampolineChannelHops, None), recipient_e)
val add_d = UpdateAddHtlc(randomBytes32(), 3, payment_e.cmd.amount, paymentHash, payment_e.cmd.cltvExpiry, payment_e.cmd.onion, None)
val Right(ChannelRelayPacket(_, payload_d, packet_e)) = decrypt(add_d, priv_d.privateKey, Features.empty)

View File

@ -38,7 +38,7 @@ import fr.acinq.eclair.transactions.Transactions.{ClaimRemoteDelayedOutputTx, In
import fr.acinq.eclair.transactions.{DirectedHtlc, IncomingHtlc, OutgoingHtlc}
import fr.acinq.eclair.wire.internal.channel.{ChannelCodecs, ChannelCodecsSpec}
import fr.acinq.eclair.wire.protocol._
import fr.acinq.eclair.{BlockHeight, CltvExpiry, CltvExpiryDelta, CustomCommitmentsPlugin, MilliSatoshiLong, NodeParams, TestConstants, TestKitBaseClass, TimestampMilliLong, randomBytes32, randomKey}
import fr.acinq.eclair.{BlockHeight, CltvExpiry, CltvExpiryDelta, CustomCommitmentsPlugin, MilliSatoshiLong, NodeParams, TestConstants, TestKitBaseClass, TimestampMilli, TimestampMilliLong, randomBytes32, randomKey}
import org.scalatest.funsuite.FixtureAnyFunSuiteLike
import org.scalatest.{Outcome, ParallelTestExecution}
import scodec.bits.ByteVector
@ -331,9 +331,9 @@ class PostRestartHtlcCleanerSpec extends TestKitBaseClass with FixtureAnyFunSuit
val htlc_upstream_1 = Seq(buildHtlcIn(0, channelId_ab_1, paymentHash1), buildHtlcIn(5, channelId_ab_1, paymentHash2))
val htlc_upstream_2 = Seq(buildHtlcIn(7, channelId_ab_2, paymentHash1), buildHtlcIn(9, channelId_ab_2, paymentHash2))
val htlc_upstream_3 = Seq(buildHtlcIn(11, randomBytes32(), paymentHash3))
val upstream_1 = Upstream.Trampoline(htlc_upstream_1.head.add :: htlc_upstream_2.head.add :: Nil)
val upstream_2 = Upstream.Trampoline(htlc_upstream_1(1).add :: htlc_upstream_2(1).add :: Nil)
val upstream_3 = Upstream.Trampoline(htlc_upstream_3.head.add :: Nil)
val upstream_1 = Upstream.Trampoline(Upstream.ReceivedHtlc(htlc_upstream_1.head.add, TimestampMilli(1687345927000L)) :: Upstream.ReceivedHtlc(htlc_upstream_2.head.add, TimestampMilli(1687345967000L)) :: Nil)
val upstream_2 = Upstream.Trampoline(Upstream.ReceivedHtlc(htlc_upstream_1(1).add, TimestampMilli(1687345902000L)) :: Upstream.ReceivedHtlc(htlc_upstream_2(1).add, TimestampMilli(1687345999000L)) :: Nil)
val upstream_3 = Upstream.Trampoline(Upstream.ReceivedHtlc(htlc_upstream_3.head.add, TimestampMilli(1687345980000L)) :: Nil)
val data_upstream_1 = ChannelCodecsSpec.makeChannelDataNormal(htlc_upstream_1, Map.empty)
val data_upstream_2 = ChannelCodecsSpec.makeChannelDataNormal(htlc_upstream_2, Map.empty)
val data_upstream_3 = ChannelCodecsSpec.makeChannelDataNormal(htlc_upstream_3, Map.empty)
@ -410,7 +410,7 @@ class PostRestartHtlcCleanerSpec extends TestKitBaseClass with FixtureAnyFunSuit
channel_upstream_2.expectNoMessage(100 millis)
// Payment 2 should fulfill once we receive the preimage.
val origin_2 = Origin.TrampolineRelayedCold(upstream_2.adds.map(u => (u.channelId, u.id)).toList)
val origin_2 = Origin.TrampolineRelayedCold(upstream_2.adds.map(r => (r.add.channelId, r.add.id)).toList)
sender.send(relayer, RES_ADD_SETTLED(origin_2, htlc_2_2, HtlcResult.OnChainFulfill(preimage2)))
register.expectMsgAllOf(
Register.Forward(replyTo = null, channelId_ab_1, CMD_FULFILL_HTLC(5, preimage2, commit = true)),

View File

@ -577,7 +577,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
assert(fwd2.message.r == paymentPreimage)
val paymentRelayed = eventListener.expectMessageType[ChannelPaymentRelayed]
assert(paymentRelayed.copy(timestamp = 0 unixms) == ChannelPaymentRelayed(r.add.amountMsat, r.amountToForward, r.add.paymentHash, r.add.channelId, channelId1, timestamp = 0 unixms))
assert(paymentRelayed.copy(startedAt = 0 unixms, settledAt = 0 unixms) == ChannelPaymentRelayed(r.add.amountMsat, r.amountToForward, r.add.paymentHash, r.add.channelId, channelId1, startedAt = 0 unixms, settledAt = 0 unixms))
}
}

View File

@ -29,12 +29,12 @@ import fr.acinq.eclair.FeatureSupport.{Mandatory, Optional}
import fr.acinq.eclair.Features.{AsyncPaymentPrototype, BasicMultiPartPayment, PaymentSecret, VariableLengthOnion}
import fr.acinq.eclair.channel.{CMD_FAIL_HTLC, CMD_FULFILL_HTLC, Register}
import fr.acinq.eclair.crypto.Sphinx
import AsyncPaymentTriggerer.{AsyncPaymentCanceled, AsyncPaymentTimeout, AsyncPaymentTriggered, Watch}
import fr.acinq.eclair.payment.Bolt11Invoice.ExtraHop
import fr.acinq.eclair.payment.IncomingPaymentPacket.NodeRelayPacket
import fr.acinq.eclair.payment.Invoice.ExtraEdge
import fr.acinq.eclair.payment.OutgoingPaymentPacket.Upstream
import fr.acinq.eclair.payment._
import fr.acinq.eclair.payment.relay.AsyncPaymentTriggerer.{AsyncPaymentCanceled, AsyncPaymentTimeout, AsyncPaymentTriggered, Watch}
import fr.acinq.eclair.payment.relay.NodeRelayer.PaymentKey
import fr.acinq.eclair.payment.send.ClearRecipient
import fr.acinq.eclair.payment.send.MultiPartPaymentLifecycle.{PreimageReceived, SendMultiPartPayment}
@ -44,7 +44,7 @@ import fr.acinq.eclair.router.Router.RouteRequest
import fr.acinq.eclair.router.{BalanceTooLow, RouteNotFound}
import fr.acinq.eclair.wire.protocol.PaymentOnion.{FinalPayload, IntermediatePayload}
import fr.acinq.eclair.wire.protocol._
import fr.acinq.eclair.{BlockHeight, Bolt11Feature, CltvExpiry, CltvExpiryDelta, Features, InvoiceFeature, MilliSatoshi, MilliSatoshiLong, NodeParams, ShortChannelId, TestConstants, UInt64, randomBytes, randomBytes32, randomKey}
import fr.acinq.eclair.{BlockHeight, Bolt11Feature, CltvExpiry, CltvExpiryDelta, Features, MilliSatoshi, MilliSatoshiLong, NodeParams, ShortChannelId, TestConstants, TimestampMilli, UInt64, randomBytes, randomBytes32, randomKey}
import org.scalatest.funsuite.FixtureAnyFunSuiteLike
import org.scalatest.{Outcome, Tag}
import scodec.bits.HexStringSyntax
@ -243,7 +243,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl
incomingMultiPart.foreach(incoming => nodeRelayer ! NodeRelay.Relay(incoming))
val outgoingCfg = mockPayFSM.expectMessageType[SendPaymentConfig]
validateOutgoingCfg(outgoingCfg, Upstream.Trampoline(incomingMultiPart.map(_.add)))
validateOutgoingCfg(outgoingCfg, Upstream.Trampoline(incomingMultiPart.map(p => Upstream.ReceivedHtlc(p.add, TimestampMilli.now()))))
val outgoingPayment = mockPayFSM.expectMessageType[SendMultiPartPayment]
validateOutgoingPayment(outgoingPayment)
@ -372,7 +372,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl
// upstream payment relayed
val outgoingCfg = mockPayFSM.expectMessageType[SendPaymentConfig]
validateOutgoingCfg(outgoingCfg, Upstream.Trampoline(incomingAsyncPayment.map(_.add)))
validateOutgoingCfg(outgoingCfg, Upstream.Trampoline(incomingAsyncPayment.map(p => Upstream.ReceivedHtlc(p.add, TimestampMilli.now()))))
val outgoingPayment = mockPayFSM.expectMessageType[SendMultiPartPayment]
validateOutgoingPayment(outgoingPayment)
// those are adapters for pay-fsm messages
@ -390,7 +390,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl
nodeRelayerAdapters ! createSuccessEvent()
val relayEvent = eventListener.expectMessageType[TrampolinePaymentRelayed]
validateRelayEvent(relayEvent)
assert(relayEvent.incoming.toSet == incomingAsyncPayment.map(i => PaymentRelayed.Part(i.add.amountMsat, i.add.channelId)).toSet)
assert(relayEvent.incoming.map(p => (p.amount, p.channelId)).toSet == incomingAsyncPayment.map(i => (i.add.amountMsat, i.add.channelId)).toSet)
assert(relayEvent.outgoing.nonEmpty)
parent.expectMessageType[NodeRelayer.RelayComplete]
register.expectNoMessage(100 millis)
@ -449,7 +449,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl
// upstream payment relayed
val outgoingCfg = mockPayFSM.expectMessageType[SendPaymentConfig]
validateOutgoingCfg(outgoingCfg, Upstream.Trampoline(incomingAsyncPayment.map(_.add)))
validateOutgoingCfg(outgoingCfg, Upstream.Trampoline(incomingAsyncPayment.map(p => Upstream.ReceivedHtlc(p.add, TimestampMilli.now()))))
val outgoingPayment = mockPayFSM.expectMessageType[SendMultiPartPayment]
validateOutgoingPayment(outgoingPayment)
// those are adapters for pay-fsm messages
@ -467,7 +467,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl
nodeRelayerAdapters ! createSuccessEvent()
val relayEvent = eventListener.expectMessageType[TrampolinePaymentRelayed]
validateRelayEvent(relayEvent)
assert(relayEvent.incoming.toSet == incomingAsyncPayment.map(i => PaymentRelayed.Part(i.add.amountMsat, i.add.channelId)).toSet)
assert(relayEvent.incoming.map(p => (p.amount, p.channelId)).toSet == incomingAsyncPayment.map(i => (i.add.amountMsat, i.add.channelId)).toSet)
assert(relayEvent.outgoing.nonEmpty)
parent.expectMessageType[NodeRelayer.RelayComplete]
register.expectNoMessage(100 millis)
@ -657,7 +657,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl
nodeRelayer ! NodeRelay.Relay(incomingMultiPart.last)
val outgoingCfg = mockPayFSM.expectMessageType[SendPaymentConfig]
validateOutgoingCfg(outgoingCfg, Upstream.Trampoline(incomingMultiPart.map(_.add)))
validateOutgoingCfg(outgoingCfg, Upstream.Trampoline(incomingMultiPart.map(p => Upstream.ReceivedHtlc(p.add, TimestampMilli.now()))))
val outgoingPayment = mockPayFSM.expectMessageType[SendMultiPartPayment]
validateOutgoingPayment(outgoingPayment)
// those are adapters for pay-fsm messages
@ -679,7 +679,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl
nodeRelayerAdapters ! createSuccessEvent()
val relayEvent = eventListener.expectMessageType[TrampolinePaymentRelayed]
validateRelayEvent(relayEvent)
assert(relayEvent.incoming.toSet == incomingMultiPart.map(i => PaymentRelayed.Part(i.add.amountMsat, i.add.channelId)).toSet)
assert(relayEvent.incoming.map(p => (p.amount, p.channelId)).toSet == incomingMultiPart.map(i => (i.add.amountMsat, i.add.channelId)).toSet)
assert(relayEvent.outgoing.nonEmpty)
parent.expectMessageType[NodeRelayer.RelayComplete]
register.expectNoMessage(100 millis)
@ -693,7 +693,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl
nodeRelayer ! NodeRelay.Relay(incomingSinglePart)
val outgoingCfg = mockPayFSM.expectMessageType[SendPaymentConfig]
validateOutgoingCfg(outgoingCfg, Upstream.Trampoline(incomingSinglePart.add :: Nil))
validateOutgoingCfg(outgoingCfg, Upstream.Trampoline(Upstream.ReceivedHtlc(incomingSinglePart.add, TimestampMilli.now()) :: Nil))
val outgoingPayment = mockPayFSM.expectMessageType[SendMultiPartPayment]
validateOutgoingPayment(outgoingPayment)
// those are adapters for pay-fsm messages
@ -708,7 +708,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl
nodeRelayerAdapters ! createSuccessEvent()
val relayEvent = eventListener.expectMessageType[TrampolinePaymentRelayed]
validateRelayEvent(relayEvent)
assert(relayEvent.incoming == Seq(PaymentRelayed.Part(incomingSinglePart.add.amountMsat, incomingSinglePart.add.channelId)))
assert(relayEvent.incoming.map(p => (p.amount, p.channelId)) == Seq((incomingSinglePart.add.amountMsat, incomingSinglePart.add.channelId)))
assert(relayEvent.outgoing.nonEmpty)
parent.expectMessageType[NodeRelayer.RelayComplete]
register.expectNoMessage(100 millis)
@ -728,7 +728,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl
incomingPayments.foreach(incoming => nodeRelayer ! NodeRelay.Relay(incoming))
val outgoingCfg = mockPayFSM.expectMessageType[SendPaymentConfig]
validateOutgoingCfg(outgoingCfg, Upstream.Trampoline(incomingMultiPart.map(_.add)))
validateOutgoingCfg(outgoingCfg, Upstream.Trampoline(incomingMultiPart.map(p => Upstream.ReceivedHtlc(p.add, TimestampMilli.now()))))
val outgoingPayment = mockPayFSM.expectMessageType[SendMultiPartPayment]
assert(outgoingPayment.recipient.nodeId == outgoingNodeId)
assert(outgoingPayment.recipient.totalAmount == outgoingAmount)
@ -752,7 +752,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl
nodeRelayerAdapters ! createSuccessEvent()
val relayEvent = eventListener.expectMessageType[TrampolinePaymentRelayed]
validateRelayEvent(relayEvent)
assert(relayEvent.incoming == incomingMultiPart.map(i => PaymentRelayed.Part(i.add.amountMsat, i.add.channelId)))
assert(relayEvent.incoming.map(p => (p.amount, p.channelId)) == incomingMultiPart.map(i => (i.add.amountMsat, i.add.channelId)))
assert(relayEvent.outgoing.nonEmpty)
parent.expectMessageType[NodeRelayer.RelayComplete]
register.expectNoMessage(100 millis)
@ -772,7 +772,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl
incomingPayments.foreach(incoming => nodeRelayer ! NodeRelay.Relay(incoming))
val outgoingCfg = mockPayFSM.expectMessageType[SendPaymentConfig]
validateOutgoingCfg(outgoingCfg, Upstream.Trampoline(incomingMultiPart.map(_.add)))
validateOutgoingCfg(outgoingCfg, Upstream.Trampoline(incomingMultiPart.map(p => Upstream.ReceivedHtlc(p.add, TimestampMilli.now()))))
val outgoingPayment = mockPayFSM.expectMessageType[SendPaymentToNode]
assert(outgoingPayment.recipient.nodeId == outgoingNodeId)
assert(outgoingPayment.amount == outgoingAmount)
@ -796,7 +796,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl
nodeRelayerAdapters ! createSuccessEvent()
val relayEvent = eventListener.expectMessageType[TrampolinePaymentRelayed]
validateRelayEvent(relayEvent)
assert(relayEvent.incoming == incomingMultiPart.map(i => PaymentRelayed.Part(i.add.amountMsat, i.add.channelId)))
assert(relayEvent.incoming.map(p => (p.amount, p.channelId)) == incomingMultiPart.map(i => (i.add.amountMsat, i.add.channelId)))
assert(relayEvent.outgoing.length == 1)
parent.expectMessageType[NodeRelayer.RelayComplete]
register.expectNoMessage(100 millis)
@ -828,7 +828,11 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl
assert(outgoingCfg.paymentHash == paymentHash)
assert(outgoingCfg.invoice.isEmpty)
assert(outgoingCfg.recipientNodeId == outgoingNodeId)
assert(outgoingCfg.upstream == upstream)
(outgoingCfg.upstream, upstream) match {
case (Upstream.Trampoline(adds1), Upstream.Trampoline(adds2)) =>
assert(adds1.map(_.add) == adds2.map(_.add))
case _ => assert(outgoingCfg.upstream == upstream)
}
}
def validateOutgoingPayment(outgoingPayment: SendMultiPartPayment): Unit = {

View File

@ -1184,14 +1184,14 @@ class ApiServiceSpec extends AnyFunSuite with ScalatestRouteTest with IdiomaticM
system.eventStream.publish(ps)
wsClient.expectMessage(expectedSerializedPs)
val prel = ChannelPaymentRelayed(21 msat, 20 msat, ByteVector32.Zeroes, ByteVector32.Zeroes, ByteVector32.One, TimestampMilli(1553784963659L))
val expectedSerializedPrel = """{"type":"payment-relayed","amountIn":21,"amountOut":20,"paymentHash":"0000000000000000000000000000000000000000000000000000000000000000","fromChannelId":"0000000000000000000000000000000000000000000000000000000000000000","toChannelId":"0100000000000000000000000000000000000000000000000000000000000000","timestamp":{"iso":"2019-03-28T14:56:03.659Z","unix":1553784963}}"""
val prel = ChannelPaymentRelayed(21 msat, 20 msat, ByteVector32.Zeroes, ByteVector32.Zeroes, ByteVector32.One, TimestampMilli(1553784961048L), TimestampMilli(1553784963659L))
val expectedSerializedPrel = """{"type":"payment-relayed","amountIn":21,"amountOut":20,"paymentHash":"0000000000000000000000000000000000000000000000000000000000000000","fromChannelId":"0000000000000000000000000000000000000000000000000000000000000000","toChannelId":"0100000000000000000000000000000000000000000000000000000000000000","startedAt":{"iso":"2019-03-28T14:56:01.048Z","unix":1553784961},"settledAt":{"iso":"2019-03-28T14:56:03.659Z","unix":1553784963}}"""
assert(serialization.write(prel) == expectedSerializedPrel)
system.eventStream.publish(prel)
wsClient.expectMessage(expectedSerializedPrel)
val ptrel = TrampolinePaymentRelayed(ByteVector32.Zeroes, Seq(PaymentRelayed.Part(21 msat, ByteVector32.Zeroes)), Seq(PaymentRelayed.Part(8 msat, ByteVector32.Zeroes), PaymentRelayed.Part(10 msat, ByteVector32.One)), bobNodeId, 17 msat, TimestampMilli(1553784963659L))
val expectedSerializedPtrel = """{"type":"trampoline-payment-relayed","paymentHash":"0000000000000000000000000000000000000000000000000000000000000000","incoming":[{"amount":21,"channelId":"0000000000000000000000000000000000000000000000000000000000000000"}],"outgoing":[{"amount":8,"channelId":"0000000000000000000000000000000000000000000000000000000000000000"},{"amount":10,"channelId":"0100000000000000000000000000000000000000000000000000000000000000"}],"nextTrampolineNodeId":"039dc0e0b1d25905e44fdf6f8e89755a5e219685840d0bc1d28d3308f9628a3585","nextTrampolineAmount":17,"timestamp":{"iso":"2019-03-28T14:56:03.659Z","unix":1553784963}}"""
val ptrel = TrampolinePaymentRelayed(ByteVector32.Zeroes, Seq(PaymentRelayed.IncomingPart(21 msat, ByteVector32.Zeroes, TimestampMilli(1553784963659L))), Seq(PaymentRelayed.OutgoingPart(8 msat, ByteVector32.Zeroes, TimestampMilli(1553784963659L)), PaymentRelayed.OutgoingPart(10 msat, ByteVector32.One, TimestampMilli(1553784963659L))), bobNodeId, 17 msat)
val expectedSerializedPtrel = """{"type":"trampoline-payment-relayed","paymentHash":"0000000000000000000000000000000000000000000000000000000000000000","incoming":[{"amount":21,"channelId":"0000000000000000000000000000000000000000000000000000000000000000","receivedAt":{"iso":"2019-03-28T14:56:03.659Z","unix":1553784963}}],"outgoing":[{"amount":8,"channelId":"0000000000000000000000000000000000000000000000000000000000000000","settledAt":{"iso":"2019-03-28T14:56:03.659Z","unix":1553784963}},{"amount":10,"channelId":"0100000000000000000000000000000000000000000000000000000000000000","settledAt":{"iso":"2019-03-28T14:56:03.659Z","unix":1553784963}}],"nextTrampolineNodeId":"039dc0e0b1d25905e44fdf6f8e89755a5e219685840d0bc1d28d3308f9628a3585","nextTrampolineAmount":17}"""
assert(serialization.write(ptrel) == expectedSerializedPtrel)
system.eventStream.publish(ptrel)
wsClient.expectMessage(expectedSerializedPtrel)