mirror of
https://github.com/ACINQ/eclair.git
synced 2024-11-20 02:27:32 +01:00
Improve relayer logs (#1529)
We can be less repetitive given that we now have a `relayId` that we can use to factor all info.
This commit is contained in:
parent
2fc118c291
commit
f264235637
@ -27,7 +27,6 @@ import fr.acinq.bitcoin.ByteVector32
|
||||
import fr.acinq.eclair.channel._
|
||||
import fr.acinq.eclair.db.PendingRelayDb
|
||||
import fr.acinq.eclair.payment.Monitoring.{Metrics, Tags}
|
||||
import fr.acinq.eclair.payment.relay.ChannelRelay.{WrappedAddResponse, WrappedForwardShortIdFailure}
|
||||
import fr.acinq.eclair.payment.relay.Relayer.OutgoingChannel
|
||||
import fr.acinq.eclair.payment.{ChannelPaymentRelayed, IncomingPacket}
|
||||
import fr.acinq.eclair.router.Announcements
|
||||
@ -56,7 +55,7 @@ object ChannelRelay {
|
||||
parentPaymentId_opt = Some(relayId), // for a channel relay, parent payment id = relay id
|
||||
paymentHash_opt = Some(r.add.paymentHash))) {
|
||||
context.self ! DoRelay
|
||||
new ChannelRelay(nodeParams, register, channels, r, context)(Seq.empty)
|
||||
new ChannelRelay(nodeParams, register, channels, r, context).relay(Seq.empty)
|
||||
}
|
||||
}
|
||||
|
||||
@ -97,24 +96,27 @@ class ChannelRelay private(nodeParams: NodeParams,
|
||||
r: IncomingPacket.ChannelRelayPacket,
|
||||
context: ActorContext[ChannelRelay.Command]) {
|
||||
|
||||
import ChannelRelay._
|
||||
|
||||
private val forwardShortIdAdapter = context.messageAdapter[Register.ForwardShortIdFailure[CMD_ADD_HTLC]](WrappedForwardShortIdFailure)
|
||||
private val addResponseAdapter = context.messageAdapter[CommandResponse[CMD_ADD_HTLC]](WrappedAddResponse)
|
||||
|
||||
import ChannelRelay._
|
||||
|
||||
private case class PreviouslyTried(shortChannelId: ShortChannelId, failure: RES_ADD_FAILED[ChannelException])
|
||||
|
||||
def apply(previousFailures: Seq[PreviouslyTried]): Behavior[Command] = {
|
||||
def relay(previousFailures: Seq[PreviouslyTried]): Behavior[Command] = {
|
||||
Behaviors.receiveMessagePartial {
|
||||
case DoRelay =>
|
||||
context.log.info(s"relaying htlc #${r.add.id} from channelId={} to requestedShortChannelId={} previousAttempts={}", r.add.channelId, r.payload.outgoingChannelId, previousFailures.size)
|
||||
if (previousFailures.isEmpty) {
|
||||
context.log.info(s"relaying htlc #${r.add.id} from channelId={} to requestedShortChannelId={} nextNode={}", r.add.channelId, r.payload.outgoingChannelId, nextNodeId_opt.getOrElse(""))
|
||||
}
|
||||
context.log.info("attempting relay previousAttempts={}", previousFailures.size)
|
||||
handleRelay(previousFailures) match {
|
||||
case RelayFailure(cmdFail) =>
|
||||
Metrics.recordPaymentRelayFailed(Tags.FailureType(cmdFail), Tags.RelayType.Channel)
|
||||
context.log.info(s"rejecting htlc #${r.add.id} from channelId=${r.add.channelId} to shortChannelId=${r.payload.outgoingChannelId} reason=${cmdFail.reason}")
|
||||
context.log.info(s"rejecting htlc reason=${cmdFail.reason}")
|
||||
safeSendAndStop(r.add.channelId, cmdFail)
|
||||
case RelaySuccess(selectedShortChannelId, cmdAdd) =>
|
||||
context.log.info(s"forwarding htlc #${r.add.id} from channelId=${r.add.channelId} to shortChannelId=$selectedShortChannelId")
|
||||
context.log.info(s"forwarding htlc to shortChannelId=$selectedShortChannelId")
|
||||
register ! Register.ForwardShortId(forwardShortIdAdapter.toClassic, selectedShortChannelId, cmdAdd)
|
||||
waitForAddResponse(selectedShortChannelId, previousFailures)
|
||||
}
|
||||
@ -130,9 +132,9 @@ class ChannelRelay private(nodeParams: NodeParams,
|
||||
safeSendAndStop(o.add.channelId, cmdFail)
|
||||
|
||||
case WrappedAddResponse(addFailed@RES_ADD_FAILED(CMD_ADD_HTLC(_, _, _, _, _, Origin.ChannelRelayedHot(_, add, _), _), _, _)) =>
|
||||
context.log.info(s"retrying htlc #${add.id} from channelId=${add.channelId}")
|
||||
context.log.info("attempt failed with reason={}", addFailed.t.getClass.getSimpleName)
|
||||
context.self ! DoRelay
|
||||
apply(previousFailures :+ PreviouslyTried(selectedShortChannelId, addFailed))
|
||||
relay(previousFailures :+ PreviouslyTried(selectedShortChannelId, addFailed))
|
||||
|
||||
case WrappedAddResponse(_: RES_SUCCESS[_]) =>
|
||||
context.log.debug("sent htlc to the downstream channel")
|
||||
@ -142,13 +144,13 @@ class ChannelRelay private(nodeParams: NodeParams,
|
||||
def waitForAddSettled(): Behavior[Command] =
|
||||
Behaviors.receiveMessagePartial {
|
||||
case WrappedAddResponse(RES_ADD_SETTLED(o: Origin.ChannelRelayedHot, htlc, fulfill: HtlcResult.Fulfill)) =>
|
||||
context.log.info("relaying fulfill upstream")
|
||||
context.log.info("relaying fulfill to upstream")
|
||||
val cmd = CMD_FULFILL_HTLC(o.originHtlcId, fulfill.paymentPreimage, commit = true)
|
||||
context.system.eventStream ! EventStream.Publish(ChannelPaymentRelayed(o.amountIn, o.amountOut, htlc.paymentHash, o.originChannelId, htlc.channelId))
|
||||
safeSendAndStop(o.originChannelId, cmd)
|
||||
|
||||
case WrappedAddResponse(RES_ADD_SETTLED(o: Origin.ChannelRelayedHot, _, fail: HtlcResult.Fail)) =>
|
||||
context.log.info("relaying fail upstream")
|
||||
context.log.info("relaying fail to upstream")
|
||||
Metrics.recordPaymentRelayFailed(Tags.FailureType.Remote, Tags.RelayType.Channel)
|
||||
val cmd = translateRelayFailure(o.originHtlcId, fail)
|
||||
safeSendAndStop(o.originChannelId, cmd)
|
||||
@ -185,6 +187,9 @@ class ChannelRelay private(nodeParams: NodeParams,
|
||||
}
|
||||
}
|
||||
|
||||
/** all the channels point to the same next node, we take the first one */
|
||||
private val nextNodeId_opt = channels.headOption.map(_._2.nextNodeId)
|
||||
|
||||
/**
|
||||
* Select a channel to the same node to relay the payment to, that has the lowest balance and is compatible in
|
||||
* terms of fees, expiry_delta, etc.
|
||||
@ -192,24 +197,17 @@ class ChannelRelay private(nodeParams: NodeParams,
|
||||
* If no suitable channel is found we default to the originally requested channel.
|
||||
*/
|
||||
def selectPreferredChannel(alreadyTried: Seq[ShortChannelId]): Option[ShortChannelId] = {
|
||||
import r.add
|
||||
val requestedShortChannelId = r.payload.outgoingChannelId
|
||||
context.log.debug(s"selecting next channel for htlc #${add.id} from channelId={} to requestedShortChannelId={} previousAttempts={}", add.channelId, requestedShortChannelId, alreadyTried.size)
|
||||
// first we find out what is the next node
|
||||
val nextNodeId_opt = channels.get(requestedShortChannelId) match {
|
||||
case Some(c) => Some(c.nextNodeId)
|
||||
case None => None
|
||||
}
|
||||
context.log.debug("selecting next channel")
|
||||
nextNodeId_opt match {
|
||||
case Some(nextNodeId) =>
|
||||
context.log.debug(s"next hop for htlc #{} is nodeId={}", add.id, nextNodeId)
|
||||
// we then filter out channels that we have already tried
|
||||
val candidateChannels: Map[ShortChannelId, OutgoingChannel] = channels -- alreadyTried
|
||||
// and we filter again to keep the ones that are compatible with this payment (mainly fees, expiry delta)
|
||||
candidateChannels
|
||||
.map { case (shortChannelId, channelInfo) =>
|
||||
val relayResult = relayOrFail(Some(channelInfo.channelUpdate))
|
||||
context.log.debug(s"candidate channel for htlc #${add.id}: shortChannelId={} balanceMsat={} channelUpdate={} relayResult={}", shortChannelId, channelInfo.commitments.availableBalanceForSend, channelInfo.channelUpdate, relayResult)
|
||||
context.log.debug(s"candidate channel: shortChannelId={} balanceMsat={} channelUpdate={} relayResult={}", shortChannelId, channelInfo.commitments.availableBalanceForSend, channelInfo.channelUpdate, relayResult)
|
||||
(shortChannelId, channelInfo, relayResult)
|
||||
}
|
||||
.collect { case (shortChannelId, channelInfo, _: RelaySuccess) => (shortChannelId, channelInfo.commitments.availableBalanceForSend) }
|
||||
|
@ -74,11 +74,15 @@ object ChannelRelayer {
|
||||
Behaviors.receiveMessage {
|
||||
case Relay(channelRelayPacket) =>
|
||||
val relayId = UUID.randomUUID()
|
||||
val channels: Map[ShortChannelId, Relayer.OutgoingChannel] = channelUpdates.get(channelRelayPacket.payload.outgoingChannelId) match {
|
||||
case Some(channel) => node2channels.get(channel.nextNodeId).map(channelUpdates).map(c => c.channelUpdate.shortChannelId -> c).toMap
|
||||
val nextNodeId_opt: Option[PublicKey] = channelUpdates.get(channelRelayPacket.payload.outgoingChannelId) match {
|
||||
case Some(channel) => Some(channel.nextNodeId)
|
||||
case None => None
|
||||
}
|
||||
val channels: Map[ShortChannelId, Relayer.OutgoingChannel] = nextNodeId_opt match {
|
||||
case Some(nextNodeId) => node2channels.get(nextNodeId).map(channelUpdates).map(c => c.channelUpdate.shortChannelId -> c).toMap
|
||||
case None => Map.empty
|
||||
}
|
||||
context.log.debug(s"spawning a new handler with relayId=$relayId channels={}", channels.keys.mkString(","))
|
||||
context.log.debug(s"spawning a new handler with relayId=$relayId to nextNodeId={} with channels={}", nextNodeId_opt.getOrElse(""), channels.keys.mkString(","))
|
||||
context.spawn(ChannelRelay.apply(nodeParams, register, channels, relayId, channelRelayPacket), name = relayId.toString)
|
||||
Behaviors.same
|
||||
|
||||
@ -94,11 +98,13 @@ object ChannelRelayer {
|
||||
case WrappedLocalChannelUpdate(LocalChannelUpdate(_, channelId, shortChannelId, remoteNodeId, _, channelUpdate, commitments)) =>
|
||||
context.log.debug(s"updating local channel info for channelId=$channelId shortChannelId=$shortChannelId remoteNodeId=$remoteNodeId channelUpdate={} commitments={}", channelUpdate, commitments)
|
||||
val channelUpdates1 = channelUpdates + (channelUpdate.shortChannelId -> Relayer.OutgoingChannel(remoteNodeId, channelUpdate, commitments))
|
||||
apply(nodeParams, register, channelUpdates1, node2channels.addOne(remoteNodeId, channelUpdate.shortChannelId))
|
||||
val node2channels1 = node2channels.addOne(remoteNodeId, channelUpdate.shortChannelId)
|
||||
apply(nodeParams, register, channelUpdates1, node2channels1)
|
||||
|
||||
case WrappedLocalChannelDown(LocalChannelDown(_, channelId, shortChannelId, remoteNodeId)) =>
|
||||
context.log.debug(s"removed local channel info for channelId=$channelId shortChannelId=$shortChannelId")
|
||||
apply(nodeParams, register, channelUpdates - shortChannelId, node2channels.subtractOne(remoteNodeId, shortChannelId))
|
||||
val node2channels1 = node2channels.subtractOne(remoteNodeId, shortChannelId)
|
||||
apply(nodeParams, register, channelUpdates - shortChannelId, node2channels1)
|
||||
|
||||
case WrappedAvailableBalanceChanged(AvailableBalanceChanged(_, channelId, shortChannelId, commitments)) =>
|
||||
val channelUpdates1 = channelUpdates.get(shortChannelId) match {
|
||||
|
@ -167,7 +167,7 @@ class NodeRelay private(nodeParams: NodeParams,
|
||||
Behaviors.stopped
|
||||
case Some(secret) =>
|
||||
import akka.actor.typed.scaladsl.adapter._
|
||||
context.log.debug("creating a payment FSM")
|
||||
context.log.info("relaying payment relayId={}", relayId)
|
||||
val mppFsm = context.actorOf(MultiPartPaymentFSM.props(nodeParams, add.paymentHash, outer.totalAmount, mppFsmAdapters))
|
||||
context.log.debug("forwarding incoming htlc to the payment FSM")
|
||||
mppFsm ! MultiPartPaymentFSM.HtlcPart(outer.totalAmount, add)
|
||||
@ -202,24 +202,29 @@ class NodeRelay private(nodeParams: NodeParams,
|
||||
receiving(htlcs :+ add, secret, nextPayload, nextPacket, handler)
|
||||
}
|
||||
case WrappedMultiPartPaymentFailed(MultiPartPaymentFSM.MultiPartPaymentFailed(_, failure, parts)) =>
|
||||
context.log.warn("could not relay payment (paidAmount={} failure={})", parts.map(_.amount).sum, failure)
|
||||
context.log.warn("could not complete incoming multi-part payment (parts={} paidAmount={} failure={})", parts.size, parts.map(_.amount).sum, failure)
|
||||
Metrics.recordPaymentRelayFailed(failure.getClass.getSimpleName, Tags.RelayType.Trampoline)
|
||||
parts.collect { case p: MultiPartPaymentFSM.HtlcPart => rejectHtlc(p.htlc.id, p.htlc.channelId, p.amount, Some(failure)) }
|
||||
Behaviors.stopped
|
||||
case WrappedMultiPartPaymentSucceeded(MultiPartPaymentFSM.MultiPartPaymentSucceeded(_, parts)) =>
|
||||
context.log.info("completed incoming multi-part payment with parts={} paidAmount={}", parts.size, parts.map(_.amount).sum)
|
||||
val upstream = Upstream.Trampoline(htlcs)
|
||||
validateRelay(nodeParams, upstream, nextPayload) match {
|
||||
case Some(failure) =>
|
||||
context.log.warn(s"rejecting trampoline payment (amountIn=${upstream.amountIn} expiryIn=${upstream.expiryIn} amountOut=${nextPayload.amountToForward} expiryOut=${nextPayload.outgoingCltv} htlcCount=${parts.length} reason=$failure)")
|
||||
context.log.warn(s"rejecting trampoline payment reason=$failure")
|
||||
rejectPayment(upstream, Some(failure))
|
||||
Behaviors.stopped
|
||||
case None =>
|
||||
context.log.info(s"relaying trampoline payment (amountIn=${upstream.amountIn} expiryIn=${upstream.expiryIn} amountOut=${nextPayload.amountToForward} expiryOut=${nextPayload.outgoingCltv} htlcCount=${upstream.adds.length})")
|
||||
relay(upstream, nextPayload, nextPacket)
|
||||
sending(upstream, nextPayload, fulfilledUpstream = false)
|
||||
doSend(upstream, nextPayload, nextPacket)
|
||||
}
|
||||
}
|
||||
|
||||
private def doSend(upstream: Upstream.Trampoline, nextPayload: Onion.NodeRelayPayload, nextPacket: OnionRoutingPacket): Behavior[Command] = {
|
||||
context.log.info(s"relaying trampoline payment (amountIn=${upstream.amountIn} expiryIn=${upstream.expiryIn} amountOut=${nextPayload.amountToForward} expiryOut=${nextPayload.outgoingCltv})")
|
||||
relay(upstream, nextPayload, nextPacket)
|
||||
sending(upstream, nextPayload, fulfilledUpstream = false)
|
||||
}
|
||||
|
||||
/**
|
||||
* Once the payment is forwarded, we're waiting for fail/fulfill responses from downstream nodes.
|
||||
*
|
||||
@ -234,7 +239,7 @@ class NodeRelay private(nodeParams: NodeParams,
|
||||
case WrappedPreimageReceived(PreimageReceived(_, paymentPreimage)) =>
|
||||
if (!fulfilledUpstream) {
|
||||
// We want to fulfill upstream as soon as we receive the preimage (even if not all HTLCs have fulfilled downstream).
|
||||
context.log.debug("trampoline payment successfully relayed")
|
||||
context.log.debug("got preimage from downstream")
|
||||
fulfillPayment(upstream, paymentPreimage)
|
||||
sending(upstream, nextPayload, fulfilledUpstream = true)
|
||||
} else {
|
||||
@ -242,11 +247,11 @@ class NodeRelay private(nodeParams: NodeParams,
|
||||
Behaviors.same
|
||||
}
|
||||
case WrappedPaymentSent(paymentSent) =>
|
||||
context.log.debug("trampoline payment fully resolved downstream (id={})", paymentSent.id)
|
||||
context.log.debug("trampoline payment fully resolved downstream")
|
||||
success(upstream, fulfilledUpstream, paymentSent)
|
||||
Behaviors.stopped
|
||||
case WrappedPaymentFailed(PaymentFailed(id, _, failures, _)) =>
|
||||
context.log.debug(s"trampoline payment failed downstream (id={})", id)
|
||||
context.log.debug(s"trampoline payment failed downstream")
|
||||
if (!fulfilledUpstream) {
|
||||
rejectPayment(upstream, translateError(nodeParams, failures, upstream, nextPayload))
|
||||
}
|
||||
@ -264,13 +269,13 @@ class NodeRelay private(nodeParams: NodeParams,
|
||||
val routingHints = payloadOut.invoiceRoutingInfo.map(_.map(_.toSeq).toSeq).getOrElse(Nil)
|
||||
payloadOut.paymentSecret match {
|
||||
case Some(paymentSecret) if Features(features).hasFeature(Features.BasicMultiPartPayment) =>
|
||||
context.log.debug("relaying trampoline payment to non-trampoline recipient using MPP")
|
||||
context.log.debug("sending the payment to non-trampoline recipient using MPP")
|
||||
val payment = SendMultiPartPayment(payFsmAdapters, paymentSecret, payloadOut.outgoingNodeId, payloadOut.amountToForward, payloadOut.outgoingCltv, nodeParams.maxPaymentAttempts, routingHints, Some(routeParams))
|
||||
val payFSM = fsmFactory.spawnOutgoingPayFSM(context, nodeParams, router, register, paymentCfg, multiPart = true)
|
||||
payFSM ! payment
|
||||
payFSM
|
||||
case _ =>
|
||||
context.log.debug("relaying trampoline payment to non-trampoline recipient without MPP")
|
||||
context.log.debug("sending the payment to non-trampoline recipient without MPP")
|
||||
val finalPayload = Onion.createSinglePartPayload(payloadOut.amountToForward, payloadOut.outgoingCltv, payloadOut.paymentSecret)
|
||||
val payment = SendPayment(payFsmAdapters, payloadOut.outgoingNodeId, finalPayload, nodeParams.maxPaymentAttempts, routingHints, Some(routeParams))
|
||||
val payFSM = fsmFactory.spawnOutgoingPayFSM(context, nodeParams, router, register, paymentCfg, multiPart = false)
|
||||
@ -278,7 +283,7 @@ class NodeRelay private(nodeParams: NodeParams,
|
||||
payFSM
|
||||
}
|
||||
case None =>
|
||||
context.log.debug("relaying trampoline payment to next trampoline node")
|
||||
context.log.debug("sending the payment to the next trampoline node")
|
||||
val payFSM = fsmFactory.spawnOutgoingPayFSM(context, nodeParams, router, register, paymentCfg, multiPart = true)
|
||||
val paymentSecret = randomBytes32 // we generate a new secret to protect against probing attacks
|
||||
val payment = SendMultiPartPayment(payFsmAdapters, paymentSecret, payloadOut.outgoingNodeId, payloadOut.amountToForward, payloadOut.outgoingCltv, nodeParams.maxPaymentAttempts, routeParams = Some(routeParams), additionalTlvs = Seq(OnionTlv.TrampolineOnion(packetOut)))
|
||||
|
Loading…
Reference in New Issue
Block a user