mirror of
https://github.com/ACINQ/eclair.git
synced 2025-01-19 05:33:59 +01:00
Improved tracing of single/multi-part payments (#1218)
This includes a bit of refactoring in `MultiPartPaymentLifecycle`. Note that we can't use the `onTermination` handler to finish the spans, because it is asynchronous and may not be called after a long time. That's why we use a dedicated `myStop` function. In Kamon 2.0, by default spans are automatically generated for tracked actors, which we don't want because we define our own spans. That's why there is an additional configuration in `application.conf`.
This commit is contained in:
parent
ff3aefa45e
commit
0937af3a0b
@ -16,8 +16,10 @@
|
||||
|
||||
package fr.acinq.eclair
|
||||
|
||||
import fr.acinq.eclair.payment.{LocalFailure, PaymentFailure, RemoteFailure, UnreadableRemoteFailure}
|
||||
import kamon.Kamon
|
||||
import kamon.tag.TagSet
|
||||
import kamon.trace.Span
|
||||
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
|
||||
@ -39,4 +41,15 @@ object KamonExt {
|
||||
res
|
||||
}
|
||||
|
||||
/**
|
||||
* A helper function that fails a span with proper messages when dealing with payments
|
||||
*/
|
||||
def failSpan(span: Span, failure: PaymentFailure) = {
|
||||
failure match {
|
||||
case LocalFailure(t) => span.fail("local failure", t)
|
||||
case RemoteFailure(_, e) => span.fail(s"remote failure: origin=${e.originNode} error=${e.failureMessage}")
|
||||
case UnreadableRemoteFailure(_) => span.fail("unreadable remote failure")
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -33,6 +33,8 @@ import fr.acinq.eclair.payment.send.PaymentLifecycle.SendPayment
|
||||
import fr.acinq.eclair.router._
|
||||
import fr.acinq.eclair.wire._
|
||||
import fr.acinq.eclair.{CltvExpiry, FSMDiagnosticActorLogging, Logs, LongToBtcAmount, MilliSatoshi, NodeParams, ShortChannelId, ToMilliSatoshiConversion}
|
||||
import kamon.Kamon
|
||||
import kamon.context.Context
|
||||
import scodec.bits.ByteVector
|
||||
|
||||
import scala.annotation.tailrec
|
||||
@ -54,6 +56,12 @@ class MultiPartPaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig,
|
||||
|
||||
val id = cfg.id
|
||||
|
||||
private val span = Kamon.spanBuilder("multi-part-payment")
|
||||
.tag("parentPaymentId", cfg.parentId.toString)
|
||||
.tag("paymentHash", cfg.paymentHash.toHex)
|
||||
.tag("targetNodeId", cfg.targetNodeId.toString())
|
||||
.start()
|
||||
|
||||
startWith(WAIT_FOR_PAYMENT_REQUEST, WaitingForRequest)
|
||||
|
||||
when(WAIT_FOR_PAYMENT_REQUEST) {
|
||||
@ -84,7 +92,11 @@ class MultiPartPaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig,
|
||||
goto(PAYMENT_ABORTED) using PaymentAborted(d.sender, d.request, LocalFailure(BalanceTooLow) :: Nil, Set.empty)
|
||||
} else {
|
||||
val pending = setFees(d.request.routeParams, payments, payments.size)
|
||||
pending.foreach { case (childId, payment) => spawnChildPaymentFsm(childId) ! payment }
|
||||
Kamon.runWithContextEntry(parentPaymentIdKey, cfg.parentId) {
|
||||
Kamon.runWithSpan(span, finishSpan = true) {
|
||||
pending.foreach { case (childId, payment) => spawnChildPaymentFsm(childId) ! payment }
|
||||
}
|
||||
}
|
||||
goto(PAYMENT_IN_PROGRESS) using PaymentProgress(d.sender, d.request, d.networkStats, channels.length, 0 msat, d.request.maxAttempts - 1, pending, Set.empty, Nil)
|
||||
}
|
||||
}
|
||||
@ -162,9 +174,7 @@ class MultiPartPaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig,
|
||||
val failures = d.failures ++ pf.failures
|
||||
val pending = d.pending - pf.id
|
||||
if (pending.isEmpty) {
|
||||
log.warning("multi-part payment failed")
|
||||
reply(d.sender, PaymentFailed(id, d.request.paymentHash, failures))
|
||||
stop(FSM.Normal)
|
||||
myStop(d.sender, Left(PaymentFailed(id, d.request.paymentHash, failures)))
|
||||
} else {
|
||||
stay using d.copy(failures = failures, pending = pending)
|
||||
}
|
||||
@ -183,9 +193,7 @@ class MultiPartPaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig,
|
||||
val parts = d.parts ++ ps.parts
|
||||
val pending = d.pending - ps.id
|
||||
if (pending.isEmpty) {
|
||||
log.info("multi-part payment succeeded")
|
||||
reply(d.sender, PaymentSent(id, d.request.paymentHash, d.preimage, parts))
|
||||
stop(FSM.Normal)
|
||||
myStop(d.sender, Right(PaymentSent(id, d.request.paymentHash, d.preimage, parts)))
|
||||
} else {
|
||||
stay using d.copy(parts = parts, pending = pending)
|
||||
}
|
||||
@ -196,9 +204,7 @@ class MultiPartPaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig,
|
||||
log.warning(s"payment succeeded but partial payment failed (id=${pf.id})")
|
||||
val pending = d.pending - pf.id
|
||||
if (pending.isEmpty) {
|
||||
log.info("multi-part payment succeeded")
|
||||
reply(d.sender, PaymentSent(id, d.request.paymentHash, d.preimage, d.parts))
|
||||
stop(FSM.Normal)
|
||||
myStop(d.sender, Right(PaymentSent(id, d.request.paymentHash, d.preimage, d.parts)))
|
||||
} else {
|
||||
stay using d.copy(pending = pending)
|
||||
}
|
||||
@ -207,17 +213,13 @@ class MultiPartPaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig,
|
||||
onTransition {
|
||||
case _ -> PAYMENT_ABORTED => nextStateData match {
|
||||
case d: PaymentAborted if d.pending.isEmpty =>
|
||||
log.warning("multi-part payment failed")
|
||||
reply(d.sender, PaymentFailed(id, d.request.paymentHash, d.failures))
|
||||
stop(FSM.Normal)
|
||||
myStop(d.sender, Left(PaymentFailed(id, d.request.paymentHash, d.failures)))
|
||||
case _ =>
|
||||
}
|
||||
|
||||
case _ -> PAYMENT_SUCCEEDED => nextStateData match {
|
||||
case d: PaymentSucceeded if d.pending.isEmpty =>
|
||||
log.info("multi-part payment succeeded")
|
||||
reply(d.sender, PaymentSent(id, d.request.paymentHash, d.preimage, d.parts))
|
||||
stop(FSM.Normal)
|
||||
myStop(d.sender, Right(PaymentSent(id, d.request.paymentHash, d.preimage, d.parts)))
|
||||
case _ =>
|
||||
}
|
||||
}
|
||||
@ -227,6 +229,20 @@ class MultiPartPaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig,
|
||||
context.actorOf(PaymentLifecycle.props(nodeParams, childCfg, router, register))
|
||||
}
|
||||
|
||||
def myStop(origin: ActorRef, event: Either[PaymentFailed, PaymentSent]): State = {
|
||||
event match {
|
||||
case Left(paymentFailed) =>
|
||||
log.warning("multi-part payment failed")
|
||||
reply(origin, paymentFailed)
|
||||
span.fail("payment failed")
|
||||
case Right(paymentSent) =>
|
||||
log.info("multi-part payment succeeded")
|
||||
reply(origin, paymentSent)
|
||||
}
|
||||
span.finish()
|
||||
stop(FSM.Normal)
|
||||
}
|
||||
|
||||
def reply(to: ActorRef, e: PaymentEvent): Unit = {
|
||||
to ! e
|
||||
if (cfg.publishEvent) context.system.eventStream.publish(e)
|
||||
@ -257,6 +273,8 @@ class MultiPartPaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig,
|
||||
|
||||
object MultiPartPaymentLifecycle {
|
||||
|
||||
val parentPaymentIdKey = Context.key[UUID]("parentPaymentId", UUID.fromString("00000000-0000-0000-0000-000000000000"))
|
||||
|
||||
def props(nodeParams: NodeParams, cfg: SendPaymentConfig, relayer: ActorRef, router: ActorRef, register: ActorRef) = Props(new MultiPartPaymentLifecycle(nodeParams, cfg, relayer, router, register))
|
||||
|
||||
case class SendMultiPartPayment(paymentHash: ByteVector32,
|
||||
|
@ -32,6 +32,8 @@ import fr.acinq.eclair.payment.send.PaymentLifecycle._
|
||||
import fr.acinq.eclair.router._
|
||||
import fr.acinq.eclair.wire.Onion._
|
||||
import fr.acinq.eclair.wire._
|
||||
import kamon.Kamon
|
||||
import kamon.trace.Span
|
||||
|
||||
import scala.compat.Platform
|
||||
import scala.util.{Failure, Success}
|
||||
@ -45,10 +47,26 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A
|
||||
val id = cfg.id
|
||||
val paymentsDb = nodeParams.db.payments
|
||||
|
||||
private val span = Kamon.runWithContextEntry(MultiPartPaymentLifecycle.parentPaymentIdKey, cfg.parentId) {
|
||||
val spanBuilder = if (Kamon.currentSpan().isEmpty) {
|
||||
Kamon.spanBuilder("single-payment")
|
||||
} else {
|
||||
Kamon.spanBuilder("payment-part").asChildOf(Kamon.currentSpan())
|
||||
}
|
||||
spanBuilder
|
||||
.tag("paymentId", cfg.id.toString)
|
||||
.tag("paymentHash", cfg.paymentHash.toHex)
|
||||
.tag("targetNodeId", cfg.targetNodeId.toString())
|
||||
.start()
|
||||
}
|
||||
|
||||
startWith(WAITING_FOR_REQUEST, WaitingForRequest)
|
||||
|
||||
when(WAITING_FOR_REQUEST) {
|
||||
case Event(c: SendPaymentToRoute, WaitingForRequest) =>
|
||||
span.tag("amount", c.finalPayload.amount.toLong)
|
||||
span.tag("totalAmount", c.finalPayload.totalAmount.toLong)
|
||||
span.tag("expiry", c.finalPayload.expiry.toLong)
|
||||
log.debug("sending {} to route {}", c.finalPayload.amount, c.hops.mkString("->"))
|
||||
val send = SendPayment(c.paymentHash, c.hops.last, c.finalPayload, maxAttempts = 1)
|
||||
router ! FinalizeRoute(c.hops)
|
||||
@ -58,6 +76,9 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A
|
||||
goto(WAITING_FOR_ROUTE) using WaitingForRoute(sender, send, failures = Nil)
|
||||
|
||||
case Event(c: SendPayment, WaitingForRequest) =>
|
||||
span.tag("amount", c.finalPayload.amount.toLong)
|
||||
span.tag("totalAmount", c.finalPayload.totalAmount.toLong)
|
||||
span.tag("expiry", c.finalPayload.expiry.toLong)
|
||||
log.debug("sending {} to {}{}", c.finalPayload.amount, c.targetNodeId, c.routePrefix.mkString(" with route prefix ", "->", ""))
|
||||
// We don't want the router to try cycling back to nodes that are at the beginning of the route.
|
||||
val ignoredNodes = c.routePrefix.map(_.nodeId).toSet
|
||||
@ -84,7 +105,7 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A
|
||||
|
||||
case Event(Status.Failure(t), WaitingForRoute(s, c, failures)) =>
|
||||
onFailure(s, PaymentFailed(id, c.paymentHash, failures :+ LocalFailure(t)))
|
||||
stop(FSM.Normal)
|
||||
myStop()
|
||||
}
|
||||
|
||||
when(WAITING_FOR_PAYMENT_COMPLETE) {
|
||||
@ -93,7 +114,7 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A
|
||||
case Event(fulfill: UpdateFulfillHtlc, WaitingForComplete(s, c, cmd, _, _, _, _, route)) =>
|
||||
val p = PartialPayment(id, c.finalPayload.amount, cmd.amount - c.finalPayload.amount, fulfill.channelId, Some(route))
|
||||
onSuccess(s, PaymentSent(id, c.paymentHash, fulfill.paymentPreimage, p :: Nil))
|
||||
stop(FSM.Normal)
|
||||
myStop()
|
||||
|
||||
case Event(fail: UpdateFailHtlc, WaitingForComplete(s, c, _, failures, sharedSecrets, ignoreNodes, ignoreChannels, hops)) =>
|
||||
Sphinx.FailurePacket.decrypt(fail.reason, sharedSecrets) match {
|
||||
@ -101,7 +122,7 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A
|
||||
// if destination node returns an error, we fail the payment immediately
|
||||
log.warning(s"received an error message from target nodeId=$nodeId, failing the payment (failure=$failureMessage)")
|
||||
onFailure(s, PaymentFailed(id, c.paymentHash, failures :+ RemoteFailure(hops, e)))
|
||||
stop(FSM.Normal)
|
||||
myStop()
|
||||
case res if failures.size + 1 >= c.maxAttempts =>
|
||||
// otherwise we never try more than maxAttempts, no matter the kind of error returned
|
||||
val failure = res match {
|
||||
@ -114,7 +135,7 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A
|
||||
}
|
||||
log.warning(s"too many failed attempts, failing the payment")
|
||||
onFailure(s, PaymentFailed(id, c.paymentHash, failures :+ failure))
|
||||
stop(FSM.Normal)
|
||||
myStop()
|
||||
case Failure(t) =>
|
||||
log.warning(s"cannot parse returned error: ${t.getMessage}")
|
||||
// in that case we don't know which node is sending garbage, let's try to blacklist all nodes except the one we are directly connected to and the destination node
|
||||
@ -190,20 +211,43 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A
|
||||
// will end up retrying over that same faulty channel).
|
||||
if (failures.size + 1 >= c.maxAttempts || c.routePrefix.nonEmpty) {
|
||||
onFailure(s, PaymentFailed(id, c.paymentHash, failures :+ LocalFailure(t)))
|
||||
stop(FSM.Normal)
|
||||
myStop()
|
||||
} else {
|
||||
log.info(s"received an error message from local, trying to use a different channel (failure=${t.getMessage})")
|
||||
val faultyChannel = ChannelDesc(hops.head.lastUpdate.shortChannelId, hops.head.nodeId, hops.head.nextNodeId)
|
||||
router ! RouteRequest(c.getRouteRequestStart(nodeParams), c.targetNodeId, c.finalPayload.amount, c.assistedRoutes, ignoreNodes, ignoreChannels + faultyChannel, c.routeParams)
|
||||
goto(WAITING_FOR_ROUTE) using WaitingForRoute(s, c, failures :+ LocalFailure(t))
|
||||
}
|
||||
}
|
||||
|
||||
private var stateSpan: Option[Span] = None
|
||||
|
||||
onTransition {
|
||||
case _ -> state2 =>
|
||||
// whenever there is a transition we stop the current span and start a new one, this way we can track each state
|
||||
val stateSpanBuilder = Kamon.spanBuilder(state2.toString).asChildOf(span)
|
||||
nextStateData match {
|
||||
case d: WaitingForRoute =>
|
||||
// this means that previous state was WAITING_FOR_COMPLETE
|
||||
d.failures.lastOption.foreach(failure => stateSpan.foreach(span => KamonExt.failSpan(span, failure)))
|
||||
case d: WaitingForComplete =>
|
||||
stateSpanBuilder.tag("route", s"${d.hops.map(_.nextNodeId).mkString("->")}")
|
||||
case _ => ()
|
||||
}
|
||||
stateSpan.foreach(_.finish())
|
||||
stateSpan = Some(stateSpanBuilder.start())
|
||||
}
|
||||
|
||||
whenUnhandled {
|
||||
case Event(_: TransportHandler.ReadAck, _) => stay // ignored, router replies with this when we forward a channel_update
|
||||
}
|
||||
|
||||
def myStop(): State = {
|
||||
stateSpan.foreach(_.finish())
|
||||
span.finish()
|
||||
stop(FSM.Normal)
|
||||
}
|
||||
|
||||
def onSuccess(sender: ActorRef, result: PaymentSent): Unit = {
|
||||
if (cfg.storeInDb) paymentsDb.updateOutgoingPayment(result)
|
||||
sender ! result
|
||||
@ -211,6 +255,7 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A
|
||||
}
|
||||
|
||||
def onFailure(sender: ActorRef, result: PaymentFailed): Unit = {
|
||||
span.fail("payment failed")
|
||||
if (cfg.storeInDb) paymentsDb.updateOutgoingPayment(result)
|
||||
sender ! result
|
||||
if (cfg.publishEvent) context.system.eventStream.publish(result)
|
||||
|
@ -35,10 +35,10 @@ import fr.acinq.eclair.crypto.Sphinx.DecryptedFailurePacket
|
||||
import fr.acinq.eclair.db.{IncomingPayment, IncomingPaymentStatus, OutgoingPaymentStatus}
|
||||
import fr.acinq.eclair.io.Peer
|
||||
import fr.acinq.eclair.io.Peer.{Disconnect, PeerRoutingMessage}
|
||||
import fr.acinq.eclair.payment.relay.Relayer.{GetOutgoingChannels, OutgoingChannels}
|
||||
import fr.acinq.eclair.payment._
|
||||
import fr.acinq.eclair.payment.receive.MultiPartHandler.ReceivePayment
|
||||
import fr.acinq.eclair.payment.receive.{ForwardHandler, PaymentHandler}
|
||||
import fr.acinq.eclair.payment.relay.Relayer.{GetOutgoingChannels, OutgoingChannels}
|
||||
import fr.acinq.eclair.payment.send.PaymentInitiator.SendPaymentRequest
|
||||
import fr.acinq.eclair.payment.send.PaymentLifecycle.{State => _}
|
||||
import fr.acinq.eclair.router.Graph.WeightRatios
|
||||
|
@ -2,6 +2,20 @@ eclair {
|
||||
enable-kamon = false
|
||||
}
|
||||
|
||||
kamon.instrumentation.akka {
|
||||
filters {
|
||||
actors {
|
||||
# Decides which actors generate Spans for the messages they process, given that there is already an ongoing trace
|
||||
# in the Context of the processed message (i.e. there is a Sampled Span in the Context).
|
||||
#
|
||||
trace {
|
||||
includes = [ ]
|
||||
excludes = [ "**" ] # we don't want automatically generated spans because they conflict with the ones we define
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
akka {
|
||||
|
||||
loggers = ["akka.event.slf4j.Slf4jLogger"]
|
||||
|
Loading…
Reference in New Issue
Block a user