diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/PimpKamon.scala b/eclair-core/src/main/scala/fr/acinq/eclair/PimpKamon.scala index 6ae751c62..23297d789 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/PimpKamon.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/PimpKamon.scala @@ -16,9 +16,7 @@ package fr.acinq.eclair -import fr.acinq.eclair.payment.{LocalFailure, PaymentFailure, RemoteFailure, UnreadableRemoteFailure} import kamon.metric.Timer -import kamon.trace.Span import scala.concurrent.{ExecutionContext, Future} @@ -40,15 +38,4 @@ object KamonExt { res } - /** - * A helper function that fails a span with proper messages when dealing with payments - */ - def failSpan(span: Span, failure: PaymentFailure) = { - failure match { - case LocalFailure(_, t) => span.fail("local failure", t) - case RemoteFailure(_, e) => span.fail(s"remote failure: origin=${e.originNode} error=${e.failureMessage}") - case UnreadableRemoteFailure(_) => span.fail("unreadable remote failure") - } - } - } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/bitcoind/rpc/ExtendedBitcoinClient.scala b/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/bitcoind/rpc/ExtendedBitcoinClient.scala index 216b33d19..88086998d 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/bitcoind/rpc/ExtendedBitcoinClient.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/bitcoind/rpc/ExtendedBitcoinClient.scala @@ -21,7 +21,6 @@ import fr.acinq.eclair.ShortChannelId.coordinates import fr.acinq.eclair.TxCoordinates import fr.acinq.eclair.blockchain.{GetTxWithMetaResponse, UtxoStatus, ValidateResult} import fr.acinq.eclair.wire.ChannelAnnouncement -import kamon.Kamon import org.json4s.Formats import org.json4s.JsonAST._ @@ -170,31 +169,20 @@ class ExtendedBitcoinClient(val rpcClient: BitcoinJsonRPCClient) { def validate(c: ChannelAnnouncement)(implicit ec: ExecutionContext): Future[ValidateResult] = { val TxCoordinates(blockHeight, txIndex, outputIndex) = coordinates(c.shortChannelId) - val span = Kamon.spanBuilder("validate-bitcoin-client").start() for { - _ <- Future.successful(0) - span0 = Kamon.spanBuilder("getblockhash").start() blockHash <- rpcClient.invoke("getblockhash", blockHeight).map(_.extractOpt[String].map(ByteVector32.fromValidHex).getOrElse(ByteVector32.Zeroes)) - _ = span0.finish() - span1 = Kamon.spanBuilder("getblock").start() txid: ByteVector32 <- rpcClient.invoke("getblock", blockHash).map(json => Try { val JArray(txs) = json \ "tx" ByteVector32.fromValidHex(txs(txIndex).extract[String]) }.getOrElse(ByteVector32.Zeroes)) - _ = span1.finish() - span2 = Kamon.spanBuilder("getrawtx").start() tx <- getRawTransaction(txid) - _ = span2.finish() - span3 = Kamon.spanBuilder("utxospendable-mempool").start() unspent <- isTransactionOutputSpendable(txid, outputIndex, includeMempool = true) - _ = span3.finish() fundingTxStatus <- if (unspent) { Future.successful(UtxoStatus.Unspent) } else { // if this returns true, it means that the spending tx is *not* in the blockchain isTransactionOutputSpendable(txid, outputIndex, includeMempool = false).map(res => UtxoStatus.Spent(spendingTxConfirmed = !res)) } - _ = span.finish() } yield ValidateResult(c, Right((Transaction.read(tx), fundingTxStatus))) } recover { case t: Throwable => ValidateResult(c, Left(t)) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/payment/send/MultiPartPaymentLifecycle.scala b/eclair-core/src/main/scala/fr/acinq/eclair/payment/send/MultiPartPaymentLifecycle.scala index 3412e6952..660b5d0c7 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/payment/send/MultiPartPaymentLifecycle.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/payment/send/MultiPartPaymentLifecycle.scala @@ -32,8 +32,6 @@ import fr.acinq.eclair.router.RouteCalculation import fr.acinq.eclair.router.Router._ import fr.acinq.eclair.wire._ import fr.acinq.eclair.{CltvExpiry, FSMDiagnosticActorLogging, Logs, MilliSatoshi, MilliSatoshiLong, NodeParams} -import kamon.Kamon -import kamon.context.Context import java.util.UUID import java.util.concurrent.TimeUnit @@ -57,13 +55,6 @@ class MultiPartPaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, val start = System.currentTimeMillis private var retriedFailedChannels = false - private val span = Kamon.spanBuilder("multi-part-payment") - .tag(Tags.ParentId, cfg.parentId.toString) - .tag(Tags.PaymentHash, paymentHash.toHex) - .tag(Tags.RecipientNodeId, cfg.recipientNodeId.toString()) - .tag(Tags.RecipientAmount, cfg.recipientAmount.toLong) - .start() - startWith(WAIT_FOR_PAYMENT_REQUEST, WaitingForRequest) when(WAIT_FOR_PAYMENT_REQUEST) { @@ -83,11 +74,7 @@ class MultiPartPaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, val (toSend, maxFee) = remainingToSend(nodeParams, d.request, d.pending.values) if (routes.map(_.amount).sum == toSend) { val childPayments = routes.map(route => (UUID.randomUUID(), route)).toMap - Kamon.runWithContextEntry(parentPaymentIdKey, cfg.parentId) { - Kamon.runWithSpan(span, finishSpan = true) { - childPayments.foreach { case (childId, route) => spawnChildPaymentFsm(childId) ! createChildPayment(self, route, d.request) } - } - } + childPayments.foreach { case (childId, route) => spawnChildPaymentFsm(childId) ! createChildPayment(self, route, d.request) } goto(PAYMENT_IN_PROGRESS) using d.copy(remainingAttempts = (d.remainingAttempts - 1).max(0), pending = d.pending ++ childPayments) } else { // If a child payment failed while we were waiting for routes, the routes we received don't cover the whole @@ -242,7 +229,6 @@ class MultiPartPaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, case Left(paymentFailed) => log.warning("multi-part payment failed") reply(origin, paymentFailed) - span.fail("payment failed") case Right(paymentSent) => log.info("multi-part payment succeeded") reply(origin, paymentSent) @@ -254,7 +240,6 @@ class MultiPartPaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, if (retriedFailedChannels) { Metrics.RetryFailedChannelsResult.withTag(Tags.Success, event.isRight).increment() } - span.finish() stop(FSM.Normal) } @@ -278,8 +263,6 @@ class MultiPartPaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, object MultiPartPaymentLifecycle { - val parentPaymentIdKey = Context.key[UUID]("parentPaymentId", UUID.fromString("00000000-0000-0000-0000-000000000000")) - def props(nodeParams: NodeParams, cfg: SendPaymentConfig, router: ActorRef, register: ActorRef) = Props(new MultiPartPaymentLifecycle(nodeParams, cfg, router, register)) /** diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/payment/send/PaymentLifecycle.scala b/eclair-core/src/main/scala/fr/acinq/eclair/payment/send/PaymentLifecycle.scala index ee00004e6..b4493deca 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/payment/send/PaymentLifecycle.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/payment/send/PaymentLifecycle.scala @@ -16,8 +16,6 @@ package fr.acinq.eclair.payment.send -import java.util.concurrent.TimeUnit - import akka.actor.{ActorRef, FSM, Props, Status} import akka.event.Logging.MDC import fr.acinq.bitcoin.ByteVector32 @@ -36,9 +34,8 @@ import fr.acinq.eclair.router.Router._ import fr.acinq.eclair.router._ import fr.acinq.eclair.wire.Onion._ import fr.acinq.eclair.wire._ -import kamon.Kamon -import kamon.trace.Span +import java.util.concurrent.TimeUnit import scala.util.{Failure, Success} /** @@ -52,28 +49,10 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A private val paymentsDb = nodeParams.db.payments private val start = System.currentTimeMillis - private val span = Kamon.runWithContextEntry(MultiPartPaymentLifecycle.parentPaymentIdKey, cfg.parentId) { - val spanBuilder = if (Kamon.currentSpan().isEmpty) { - Kamon.spanBuilder("single-payment") - } else { - Kamon.spanBuilder("payment-part").asChildOf(Kamon.currentSpan()) - } - spanBuilder - .tag(Tags.PaymentId, cfg.id.toString) - .tag(Tags.PaymentHash, paymentHash.toHex) - .tag(Tags.RecipientNodeId, cfg.recipientNodeId.toString()) - .tag(Tags.RecipientAmount, cfg.recipientAmount.toLong) - .start() - } - startWith(WAITING_FOR_REQUEST, WaitingForRequest) when(WAITING_FOR_REQUEST) { case Event(c: SendPaymentToRoute, WaitingForRequest) => - span.tag(Tags.TargetNodeId, c.targetNodeId.toString()) - span.tag(Tags.Amount, c.finalPayload.amount.toLong) - span.tag(Tags.TotalAmount, c.finalPayload.totalAmount.toLong) - span.tag(Tags.Expiry, c.finalPayload.expiry.toLong) log.debug("sending {} to route {}", c.finalPayload.amount, c.printRoute()) val send = SendPayment(c.replyTo, c.targetNodeId, c.finalPayload, maxAttempts = 1, assistedRoutes = c.assistedRoutes) c.route.fold( @@ -86,10 +65,6 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A goto(WAITING_FOR_ROUTE) using WaitingForRoute(send, Nil, Ignore.empty) case Event(c: SendPayment, WaitingForRequest) => - span.tag(Tags.TargetNodeId, c.targetNodeId.toString()) - span.tag(Tags.Amount, c.finalPayload.amount.toLong) - span.tag(Tags.TotalAmount, c.finalPayload.totalAmount.toLong) - span.tag(Tags.Expiry, c.finalPayload.expiry.toLong) log.debug("sending {} to {}", c.finalPayload.amount, c.targetNodeId) router ! RouteRequest(nodeParams.nodeId, c.targetNodeId, c.finalPayload.amount, c.getMaxFee(nodeParams), c.assistedRoutes, routeParams = c.routeParams, paymentContext = Some(cfg.paymentContext)) if (cfg.storeInDb) { @@ -109,7 +84,7 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A log.warning("router error: {}", t.getMessage) Metrics.PaymentError.withTag(Tags.Failure, Tags.FailureType(LocalFailure(Nil, t))).increment() onFailure(c.replyTo, PaymentFailed(id, paymentHash, failures :+ LocalFailure(Nil, t))) - myStop() + stop(FSM.Normal) } when(WAITING_FOR_PAYMENT_COMPLETE) { @@ -122,7 +97,7 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A Metrics.PaymentAttempt.withTag(Tags.MultiPart, value = false).record(d.failures.size + 1) val p = PartialPayment(id, d.c.finalPayload.amount, d.cmd.amount - d.c.finalPayload.amount, htlc.channelId, Some(cfg.fullRoute(d.route))) onSuccess(d.c.replyTo, cfg.createPaymentSent(fulfill.paymentPreimage, p :: Nil)) - myStop() + stop(FSM.Normal) case Event(RES_ADD_SETTLED(_, _, fail: HtlcResult.Fail), d: WaitingForComplete) => fail match { @@ -143,24 +118,6 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A } } - private var stateSpan: Option[Span] = None - - onTransition { - case _ -> state2 => - // whenever there is a transition we stop the current span and start a new one, this way we can track each state - val stateSpanBuilder = Kamon.spanBuilder(state2.toString).asChildOf(span) - nextStateData match { - case d: WaitingForRoute => - // this means that previous state was WAITING_FOR_COMPLETE - d.failures.lastOption.foreach(failure => stateSpan.foreach(span => KamonExt.failSpan(span, failure))) - case d: WaitingForComplete => - stateSpanBuilder.tag("route", s"${cfg.fullRoute(d.route).map(_.nextNodeId).mkString("->")}") - case _ => () - } - stateSpan.foreach(_.finish()) - stateSpan = Some(stateSpanBuilder.start()) - } - whenUnhandled { case Event(_: TransportHandler.ReadAck, _) => stay // ignored, router replies with this when we forward a channel_update } @@ -187,7 +144,7 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A retry(failure, d) } else { onFailure(d.c.replyTo, PaymentFailed(id, paymentHash, d.failures :+ LocalFailure(cfg.fullRoute(d.route), t))) - myStop() + stop(FSM.Normal) } } @@ -205,7 +162,7 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A // if destination node returns an error, we fail the payment immediately log.warning(s"received an error message from target nodeId=$nodeId, failing the payment (failure=$failureMessage)") onFailure(c.replyTo, PaymentFailed(id, paymentHash, failures :+ RemoteFailure(cfg.fullRoute(route), e))) - myStop() + stop(FSM.Normal) case res if failures.size + 1 >= c.maxAttempts => // otherwise we never try more than maxAttempts, no matter the kind of error returned val failure = res match { @@ -222,7 +179,7 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A } log.warning(s"too many failed attempts, failing the payment") onFailure(c.replyTo, PaymentFailed(id, paymentHash, failures :+ failure)) - myStop() + stop(FSM.Normal) case Failure(t) => log.warning(s"cannot parse returned error: ${t.getMessage}, route=${route.printNodes()}") val failure = UnreadableRemoteFailure(cfg.fullRoute(route)) @@ -300,12 +257,6 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A } } - private def myStop(): State = { - stateSpan.foreach(_.finish()) - span.finish() - stop(FSM.Normal) - } - private def onSuccess(replyTo: ActorRef, result: PaymentSent): Unit = { if (cfg.storeInDb) paymentsDb.updateOutgoingPayment(result) replyTo ! result @@ -317,7 +268,6 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A } private def onFailure(replyTo: ActorRef, result: PaymentFailed): Unit = { - span.fail("payment failed") if (cfg.storeInDb) paymentsDb.updateOutgoingPayment(result) replyTo ! result if (cfg.publishEvent) context.system.eventStream.publish(result) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Sync.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Sync.scala index c382218b0..1e7011159 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Sync.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Sync.scala @@ -25,14 +25,12 @@ import fr.acinq.eclair.router.Monitoring.{Metrics, Tags} import fr.acinq.eclair.router.Router._ import fr.acinq.eclair.wire._ import fr.acinq.eclair.{ShortChannelId, serializationResult} -import kamon.Kamon import scodec.bits.ByteVector import shapeless.HNil import scala.annotation.tailrec import scala.collection.SortedSet import scala.collection.immutable.SortedMap -import scala.compat.Platform import scala.concurrent.duration._ import scala.util.Random @@ -70,25 +68,17 @@ object Sync { implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors ctx.sender ! TransportHandler.ReadAck(q) Metrics.QueryChannelRange.Blocks.withoutTags().record(q.numberOfBlocks) - Kamon.runWithContextEntry(remoteNodeIdKey, origin.nodeId.toString) { - Kamon.runWithSpan(Kamon.spanBuilder("query-channel-range").start(), finishSpan = true) { - log.info("received query_channel_range with firstBlockNum={} numberOfBlocks={} extendedQueryFlags_opt={}", q.firstBlockNum, q.numberOfBlocks, q.tlvStream) - // keep channel ids that are in [firstBlockNum, firstBlockNum + numberOfBlocks] - val shortChannelIds: SortedSet[ShortChannelId] = channels.keySet.filter(keep(q.firstBlockNum, q.numberOfBlocks, _)) - log.info("replying with {} items for range=({}, {})", shortChannelIds.size, q.firstBlockNum, q.numberOfBlocks) - val chunks = Kamon.runWithSpan(Kamon.spanBuilder("split-channel-ids").start(), finishSpan = true) { - split(shortChannelIds, q.firstBlockNum, q.numberOfBlocks, routerConf.channelRangeChunkSize) - } - Metrics.QueryChannelRange.Replies.withoutTags().record(chunks.size) - Kamon.runWithSpan(Kamon.spanBuilder("compute-timestamps-checksums").start(), finishSpan = true) { - chunks.foreach { chunk => - val reply = buildReplyChannelRange(chunk, q.chainHash, routerConf.encodingType, q.queryFlags_opt, channels) - origin.peerConnection ! reply - Metrics.ReplyChannelRange.Blocks.withTag(Tags.Direction, Tags.Directions.Outgoing).record(reply.numberOfBlocks) - Metrics.ReplyChannelRange.ShortChannelIds.withTag(Tags.Direction, Tags.Directions.Outgoing).record(reply.shortChannelIds.array.size) - } - } - } + log.info("received query_channel_range with firstBlockNum={} numberOfBlocks={} extendedQueryFlags_opt={}", q.firstBlockNum, q.numberOfBlocks, q.tlvStream) + // keep channel ids that are in [firstBlockNum, firstBlockNum + numberOfBlocks] + val shortChannelIds: SortedSet[ShortChannelId] = channels.keySet.filter(keep(q.firstBlockNum, q.numberOfBlocks, _)) + log.info("replying with {} items for range=({}, {})", shortChannelIds.size, q.firstBlockNum, q.numberOfBlocks) + val chunks = split(shortChannelIds, q.firstBlockNum, q.numberOfBlocks, routerConf.channelRangeChunkSize) + Metrics.QueryChannelRange.Replies.withoutTags().record(chunks.size) + chunks.foreach { chunk => + val reply = buildReplyChannelRange(chunk, q.chainHash, routerConf.encodingType, q.queryFlags_opt, channels) + origin.peerConnection ! reply + Metrics.ReplyChannelRange.Blocks.withTag(Tags.Direction, Tags.Directions.Outgoing).record(reply.numberOfBlocks) + Metrics.ReplyChannelRange.ShortChannelIds.withTag(Tags.Direction, Tags.Directions.Outgoing).record(reply.shortChannelIds.array.size) } } @@ -99,99 +89,88 @@ object Sync { Metrics.ReplyChannelRange.Blocks.withTag(Tags.Direction, Tags.Directions.Incoming).record(r.numberOfBlocks) Metrics.ReplyChannelRange.ShortChannelIds.withTag(Tags.Direction, Tags.Directions.Incoming).record(r.shortChannelIds.array.size) - Kamon.runWithContextEntry(remoteNodeIdKey, origin.nodeId.toString) { - Kamon.runWithSpan(Kamon.spanBuilder("reply-channel-range").start(), finishSpan = true) { - @tailrec - def loop(ids: List[ShortChannelId], timestamps: List[ReplyChannelRangeTlv.Timestamps], checksums: List[ReplyChannelRangeTlv.Checksums], acc: List[ShortChannelIdAndFlag] = List.empty[ShortChannelIdAndFlag]): List[ShortChannelIdAndFlag] = { - ids match { - case Nil => acc.reverse - case head :: tail => - val flag = computeFlag(d.channels)(head, timestamps.headOption, checksums.headOption, routerConf.requestNodeAnnouncements) - // 0 means nothing to query, just don't include it - val acc1 = if (flag != 0) ShortChannelIdAndFlag(head, flag) :: acc else acc - loop(tail, timestamps.drop(1), checksums.drop(1), acc1) - } - } - - val timestamps_opt = r.timestamps_opt.map(_.timestamps).getOrElse(List.empty[ReplyChannelRangeTlv.Timestamps]) - val checksums_opt = r.checksums_opt.map(_.checksums).getOrElse(List.empty[ReplyChannelRangeTlv.Checksums]) - val shortChannelIdAndFlags = Kamon.runWithSpan(Kamon.spanBuilder("compute-flags").start(), finishSpan = true) { - loop(r.shortChannelIds.array, timestamps_opt, checksums_opt) - } - - val (channelCount, updatesCount) = shortChannelIdAndFlags.foldLeft((0, 0)) { - case ((c, u), ShortChannelIdAndFlag(_, flag)) => - val c1 = c + (if (QueryShortChannelIdsTlv.QueryFlagType.includeChannelAnnouncement(flag)) 1 else 0) - val u1 = u + (if (QueryShortChannelIdsTlv.QueryFlagType.includeUpdate1(flag)) 1 else 0) + (if (QueryShortChannelIdsTlv.QueryFlagType.includeUpdate2(flag)) 1 else 0) - (c1, u1) - } - log.info(s"received reply_channel_range with {} channels, we're missing {} channel announcements and {} updates, format={}", r.shortChannelIds.array.size, channelCount, updatesCount, r.shortChannelIds.encoding) - Metrics.ReplyChannelRange.NewChannelAnnouncements.withoutTags().record(channelCount) - Metrics.ReplyChannelRange.NewChannelUpdates.withoutTags().record(updatesCount) - - def buildQuery(chunk: List[ShortChannelIdAndFlag]): QueryShortChannelIds = { - // always encode empty lists as UNCOMPRESSED - val encoding = if (chunk.isEmpty) EncodingType.UNCOMPRESSED else r.shortChannelIds.encoding - QueryShortChannelIds(r.chainHash, - shortChannelIds = EncodedShortChannelIds(encoding, chunk.map(_.shortChannelId)), - if (r.timestamps_opt.isDefined || r.checksums_opt.isDefined) - TlvStream(QueryShortChannelIdsTlv.EncodedQueryFlags(encoding, chunk.map(_.flag))) - else - TlvStream.empty - ) - } - - // we update our sync data to this node (there may be multiple channel range responses and we can only query one set of ids at a time) - val replies = shortChannelIdAndFlags - .grouped(routerConf.channelQueryChunkSize) - .map(buildQuery) - .toList - - val (sync1, replynow_opt) = addToSync(d.sync, origin.nodeId, replies) - // we only send a reply right away if there were no pending requests - replynow_opt.foreach(origin.peerConnection ! _) - val progress = syncProgress(sync1) - ctx.system.eventStream.publish(progress) - ctx.self ! progress - d.copy(sync = sync1) + @tailrec + def loop(ids: List[ShortChannelId], timestamps: List[ReplyChannelRangeTlv.Timestamps], checksums: List[ReplyChannelRangeTlv.Checksums], acc: List[ShortChannelIdAndFlag] = List.empty[ShortChannelIdAndFlag]): List[ShortChannelIdAndFlag] = { + ids match { + case Nil => acc.reverse + case head :: tail => + val flag = computeFlag(d.channels)(head, timestamps.headOption, checksums.headOption, routerConf.requestNodeAnnouncements) + // 0 means nothing to query, just don't include it + val acc1 = if (flag != 0) ShortChannelIdAndFlag(head, flag) :: acc else acc + loop(tail, timestamps.drop(1), checksums.drop(1), acc1) } } + + val timestamps_opt = r.timestamps_opt.map(_.timestamps).getOrElse(List.empty[ReplyChannelRangeTlv.Timestamps]) + val checksums_opt = r.checksums_opt.map(_.checksums).getOrElse(List.empty[ReplyChannelRangeTlv.Checksums]) + val shortChannelIdAndFlags = loop(r.shortChannelIds.array, timestamps_opt, checksums_opt) + val (channelCount, updatesCount) = shortChannelIdAndFlags.foldLeft((0, 0)) { + case ((c, u), ShortChannelIdAndFlag(_, flag)) => + val c1 = c + (if (QueryShortChannelIdsTlv.QueryFlagType.includeChannelAnnouncement(flag)) 1 else 0) + val u1 = u + (if (QueryShortChannelIdsTlv.QueryFlagType.includeUpdate1(flag)) 1 else 0) + (if (QueryShortChannelIdsTlv.QueryFlagType.includeUpdate2(flag)) 1 else 0) + (c1, u1) + } + log.info(s"received reply_channel_range with {} channels, we're missing {} channel announcements and {} updates, format={}", r.shortChannelIds.array.size, channelCount, updatesCount, r.shortChannelIds.encoding) + Metrics.ReplyChannelRange.NewChannelAnnouncements.withoutTags().record(channelCount) + Metrics.ReplyChannelRange.NewChannelUpdates.withoutTags().record(updatesCount) + + def buildQuery(chunk: List[ShortChannelIdAndFlag]): QueryShortChannelIds = { + // always encode empty lists as UNCOMPRESSED + val encoding = if (chunk.isEmpty) EncodingType.UNCOMPRESSED else r.shortChannelIds.encoding + QueryShortChannelIds(r.chainHash, + shortChannelIds = EncodedShortChannelIds(encoding, chunk.map(_.shortChannelId)), + if (r.timestamps_opt.isDefined || r.checksums_opt.isDefined) + TlvStream(QueryShortChannelIdsTlv.EncodedQueryFlags(encoding, chunk.map(_.flag))) + else + TlvStream.empty + ) + } + + // we update our sync data to this node (there may be multiple channel range responses and we can only query one set of ids at a time) + val replies = shortChannelIdAndFlags + .grouped(routerConf.channelQueryChunkSize) + .map(buildQuery) + .toList + + val (sync1, replynow_opt) = addToSync(d.sync, origin.nodeId, replies) + // we only send a reply right away if there were no pending requests + replynow_opt.foreach(origin.peerConnection ! _) + val progress = syncProgress(sync1) + ctx.system.eventStream.publish(progress) + ctx.self ! progress + d.copy(sync = sync1) } def handleQueryShortChannelIds(nodes: Map[PublicKey, NodeAnnouncement], channels: SortedMap[ShortChannelId, PublicChannel], origin: RemoteGossip, q: QueryShortChannelIds)(implicit ctx: ActorContext, log: LoggingAdapter): Unit = { implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors ctx.sender ! TransportHandler.ReadAck(q) - Kamon.runWithContextEntry(remoteNodeIdKey, origin.nodeId.toString) { - Kamon.runWithSpan(Kamon.spanBuilder("query-short-channel-ids").start(), finishSpan = true) { - val flags = q.queryFlags_opt.map(_.array).getOrElse(List.empty[Long]) - var channelCount = 0 - var updateCount = 0 - var nodeCount = 0 + val flags = q.queryFlags_opt.map(_.array).getOrElse(List.empty[Long]) + var channelCount = 0 + var updateCount = 0 + var nodeCount = 0 - processChannelQuery(nodes, channels)( - q.shortChannelIds.array, - flags, - ca => { - channelCount = channelCount + 1 - origin.peerConnection ! ca - }, - cu => { - updateCount = updateCount + 1 - origin.peerConnection ! cu - }, - na => { - nodeCount = nodeCount + 1 - origin.peerConnection ! na - } - ) - Metrics.QueryShortChannelIds.Nodes.withoutTags().record(nodeCount) - Metrics.QueryShortChannelIds.ChannelAnnouncements.withoutTags().record(channelCount) - Metrics.QueryShortChannelIds.ChannelUpdates.withoutTags().record(updateCount) - log.info("received query_short_channel_ids with {} items, sent back {} channels and {} updates and {} nodes", q.shortChannelIds.array.size, channelCount, updateCount, nodeCount) - origin.peerConnection ! ReplyShortChannelIdsEnd(q.chainHash, 1) + processChannelQuery(nodes, channels)( + q.shortChannelIds.array, + flags, + ca => { + channelCount = channelCount + 1 + origin.peerConnection ! ca + }, + cu => { + updateCount = updateCount + 1 + origin.peerConnection ! cu + }, + na => { + nodeCount = nodeCount + 1 + origin.peerConnection ! na } - } + ) + Metrics.QueryShortChannelIds.Nodes.withoutTags().record(nodeCount) + Metrics.QueryShortChannelIds.ChannelAnnouncements.withoutTags().record(channelCount) + Metrics.QueryShortChannelIds.ChannelUpdates.withoutTags().record(updateCount) + log.info("received query_short_channel_ids with {} items, sent back {} channels and {} updates and {} nodes", q.shortChannelIds.array.size, channelCount, updateCount, nodeCount) + origin.peerConnection ! ReplyShortChannelIdsEnd(q.chainHash, 1) } def handleReplyShortChannelIdsEnd(d: Data, origin: RemoteGossip, r: ReplyShortChannelIdsEnd)(implicit ctx: ActorContext, log: LoggingAdapter): Data = { diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala index 6a214952e..6884638d4 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala @@ -29,7 +29,6 @@ import fr.acinq.eclair.router.Router._ import fr.acinq.eclair.transactions.Scripts import fr.acinq.eclair.wire._ import fr.acinq.eclair.{Logs, MilliSatoshiLong, NodeParams, ShortChannelId, TxCoordinates} -import kamon.Kamon object Validation { @@ -69,11 +68,7 @@ object Validation { d } else { log.info("validating shortChannelId={}", c.shortChannelId) - Kamon.runWithContextEntry(shortChannelIdKey, c.shortChannelId) { - Kamon.runWithSpan(Kamon.spanBuilder("validate-channel").tag("shortChannelId", c.shortChannelId.toString).start(), finishSpan = false) { - watcher ! ValidateRequest(c) - } - } + watcher ! ValidateRequest(c) // we don't acknowledge the message just yet d.copy(awaiting = d.awaiting + (c -> Seq(origin))) } @@ -83,102 +78,90 @@ object Validation { implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors import nodeParams.db.{network => db} import r.c - Kamon.runWithContextEntry(shortChannelIdKey, c.shortChannelId) { - Kamon.runWithSpan(Kamon.currentSpan(), finishSpan = true) { - Kamon.runWithSpan(Kamon.spanBuilder("process-validate-result").start(), finishSpan = true) { - d0.awaiting.get(c) match { - case Some(origin +: _) => origin.peerConnection ! TransportHandler.ReadAck(c) // now we can acknowledge the message, we only need to do it for the first peer that sent us the announcement - case _ => () + d0.awaiting.get(c) match { + case Some(origin +: _) => origin.peerConnection ! TransportHandler.ReadAck(c) // now we can acknowledge the message, we only need to do it for the first peer that sent us the announcement + case _ => () + } + val remoteOrigins_opt = d0.awaiting.get(c) + Logs.withMdc(log)(Logs.mdc(remoteNodeId_opt = remoteOrigins_opt.flatMap(_.headOption).map(_.nodeId))) { // in the MDC we use the node id that sent us the announcement first + log.info("got validation result for shortChannelId={} (awaiting={} stash.nodes={} stash.updates={})", c.shortChannelId, d0.awaiting.size, d0.stash.nodes.size, d0.stash.updates.size) + val publicChannel_opt = r match { + case ValidateResult(c, Left(t)) => + log.warning("validation failure for shortChannelId={} reason={}", c.shortChannelId, t.getMessage) + remoteOrigins_opt.foreach(_.foreach(o => sendDecision(o.peerConnection, GossipDecision.ValidationFailure(c)))) + None + case ValidateResult(c, Right((tx, UtxoStatus.Unspent))) => + val TxCoordinates(_, _, outputIndex) = ShortChannelId.coordinates(c.shortChannelId) + val (fundingOutputScript, fundingOutputIsInvalid) = { + // let's check that the output is indeed a P2WSH multisig 2-of-2 of nodeid1 and nodeid2) + val fundingOutputScript = write(pay2wsh(Scripts.multiSig2of2(c.bitcoinKey1, c.bitcoinKey2))) + val fundingOutputIsInvalid = tx.txOut.size < outputIndex + 1 || fundingOutputScript != tx.txOut(outputIndex).publicKeyScript + (fundingOutputScript, fundingOutputIsInvalid) } - val remoteOrigins_opt = d0.awaiting.get(c) - Logs.withMdc(log)(Logs.mdc(remoteNodeId_opt = remoteOrigins_opt.flatMap(_.headOption).map(_.nodeId))) { // in the MDC we use the node id that sent us the announcement first - log.info("got validation result for shortChannelId={} (awaiting={} stash.nodes={} stash.updates={})", c.shortChannelId, d0.awaiting.size, d0.stash.nodes.size, d0.stash.updates.size) - val publicChannel_opt = r match { - case ValidateResult(c, Left(t)) => - log.warning("validation failure for shortChannelId={} reason={}", c.shortChannelId, t.getMessage) - remoteOrigins_opt.foreach(_.foreach(o => sendDecision(o.peerConnection, GossipDecision.ValidationFailure(c)))) - None - case ValidateResult(c, Right((tx, UtxoStatus.Unspent))) => - val TxCoordinates(_, _, outputIndex) = ShortChannelId.coordinates(c.shortChannelId) - val (fundingOutputScript, fundingOutputIsInvalid) = Kamon.runWithSpan(Kamon.spanBuilder("checked-pubkeyscript").start(), finishSpan = true) { - // let's check that the output is indeed a P2WSH multisig 2-of-2 of nodeid1 and nodeid2) - val fundingOutputScript = write(pay2wsh(Scripts.multiSig2of2(c.bitcoinKey1, c.bitcoinKey2))) - val fundingOutputIsInvalid = tx.txOut.size < outputIndex + 1 || fundingOutputScript != tx.txOut(outputIndex).publicKeyScript - (fundingOutputScript, fundingOutputIsInvalid) - } - if (fundingOutputIsInvalid) { - log.error(s"invalid script for shortChannelId={}: txid={} does not have script=$fundingOutputScript at outputIndex=$outputIndex ann={}", c.shortChannelId, tx.txid, c) - remoteOrigins_opt.foreach(_.foreach(o => sendDecision(o.peerConnection, GossipDecision.InvalidAnnouncement(c)))) - None - } else { - watcher ! WatchSpentBasic(ctx.self, tx, outputIndex, BITCOIN_FUNDING_EXTERNAL_CHANNEL_SPENT(c.shortChannelId)) - log.debug("added channel channelId={}", c.shortChannelId) - remoteOrigins_opt.foreach(_.foreach(o => sendDecision(o.peerConnection, GossipDecision.Accepted(c)))) - val capacity = tx.txOut(outputIndex).amount - ctx.system.eventStream.publish(ChannelsDiscovered(SingleChannelDiscovered(c, capacity, None, None) :: Nil)) - Kamon.runWithSpan(Kamon.spanBuilder("add-to-db").start(), finishSpan = true) { - db.addChannel(c, tx.txid, capacity) - } - // in case we just validated our first local channel, we announce the local node - if (!d0.nodes.contains(nodeParams.nodeId) && isRelatedTo(c, nodeParams.nodeId)) { - log.info("first local channel validated, announcing local node") - val nodeAnn = Announcements.makeNodeAnnouncement(nodeParams.privateKey, nodeParams.alias, nodeParams.color, nodeParams.publicAddresses, nodeParams.features) - ctx.self ! nodeAnn - } - // public channels that haven't yet been announced are considered as private channels - val channelMeta_opt = d0.privateChannels.get(c.shortChannelId).map(_.meta) - Some(PublicChannel(c, tx.txid, capacity, None, None, channelMeta_opt)) - } - case ValidateResult(c, Right((tx, fundingTxStatus: UtxoStatus.Spent))) => - if (fundingTxStatus.spendingTxConfirmed) { - log.warning("ignoring shortChannelId={} tx={} (funding tx already spent and spending tx is confirmed)", c.shortChannelId, tx.txid) - // the funding tx has been spent by a transaction that is now confirmed: peer shouldn't send us those - remoteOrigins_opt.foreach(_.foreach(o => sendDecision(o.peerConnection, GossipDecision.ChannelClosed(c)))) - } else { - log.debug("ignoring shortChannelId={} tx={} (funding tx already spent but spending tx isn't confirmed)", c.shortChannelId, tx.txid) - remoteOrigins_opt.foreach(_.foreach(o => sendDecision(o.peerConnection, GossipDecision.ChannelClosing(c)))) - } - // there may be a record if we have just restarted - db.removeChannel(c.shortChannelId) - None + if (fundingOutputIsInvalid) { + log.error(s"invalid script for shortChannelId={}: txid={} does not have script=$fundingOutputScript at outputIndex=$outputIndex ann={}", c.shortChannelId, tx.txid, c) + remoteOrigins_opt.foreach(_.foreach(o => sendDecision(o.peerConnection, GossipDecision.InvalidAnnouncement(c)))) + None + } else { + watcher ! WatchSpentBasic(ctx.self, tx, outputIndex, BITCOIN_FUNDING_EXTERNAL_CHANNEL_SPENT(c.shortChannelId)) + log.debug("added channel channelId={}", c.shortChannelId) + remoteOrigins_opt.foreach(_.foreach(o => sendDecision(o.peerConnection, GossipDecision.Accepted(c)))) + val capacity = tx.txOut(outputIndex).amount + ctx.system.eventStream.publish(ChannelsDiscovered(SingleChannelDiscovered(c, capacity, None, None) :: Nil)) + db.addChannel(c, tx.txid, capacity) + // in case we just validated our first local channel, we announce the local node + if (!d0.nodes.contains(nodeParams.nodeId) && isRelatedTo(c, nodeParams.nodeId)) { + log.info("first local channel validated, announcing local node") + val nodeAnn = Announcements.makeNodeAnnouncement(nodeParams.privateKey, nodeParams.alias, nodeParams.color, nodeParams.publicAddresses, nodeParams.features) + ctx.self ! nodeAnn } - val span1 = Kamon.spanBuilder("reprocess-stash").start - // we also reprocess node and channel_update announcements related to channels that were just analyzed - val reprocessUpdates = d0.stash.updates.filterKeys(u => u.shortChannelId == c.shortChannelId) - val reprocessNodes = d0.stash.nodes.filterKeys(n => isRelatedTo(c, n.nodeId)) - // and we remove the reprocessed messages from the stash - val stash1 = d0.stash.copy(updates = d0.stash.updates -- reprocessUpdates.keys, nodes = d0.stash.nodes -- reprocessNodes.keys) - // we remove channel from awaiting map - val awaiting1 = d0.awaiting - c - span1.finish() + // public channels that haven't yet been announced are considered as private channels + val channelMeta_opt = d0.privateChannels.get(c.shortChannelId).map(_.meta) + Some(PublicChannel(c, tx.txid, capacity, None, None, channelMeta_opt)) + } + case ValidateResult(c, Right((tx, fundingTxStatus: UtxoStatus.Spent))) => + if (fundingTxStatus.spendingTxConfirmed) { + log.warning("ignoring shortChannelId={} tx={} (funding tx already spent and spending tx is confirmed)", c.shortChannelId, tx.txid) + // the funding tx has been spent by a transaction that is now confirmed: peer shouldn't send us those + remoteOrigins_opt.foreach(_.foreach(o => sendDecision(o.peerConnection, GossipDecision.ChannelClosed(c)))) + } else { + log.debug("ignoring shortChannelId={} tx={} (funding tx already spent but spending tx isn't confirmed)", c.shortChannelId, tx.txid) + remoteOrigins_opt.foreach(_.foreach(o => sendDecision(o.peerConnection, GossipDecision.ChannelClosing(c)))) + } + // there may be a record if we have just restarted + db.removeChannel(c.shortChannelId) + None + } + // we also reprocess node and channel_update announcements related to channels that were just analyzed + val reprocessUpdates = d0.stash.updates.filterKeys(u => u.shortChannelId == c.shortChannelId) + val reprocessNodes = d0.stash.nodes.filterKeys(n => isRelatedTo(c, n.nodeId)) + // and we remove the reprocessed messages from the stash + val stash1 = d0.stash.copy(updates = d0.stash.updates -- reprocessUpdates.keys, nodes = d0.stash.nodes -- reprocessNodes.keys) + // we remove channel from awaiting map + val awaiting1 = d0.awaiting - c - publicChannel_opt match { - case Some(pc) => - Kamon.runWithSpan(Kamon.spanBuilder("build-new-state").start, finishSpan = true) { - // note: if the channel is graduating from private to public, the implementation (in the LocalChannelUpdate handler) guarantees that we will process a new channel_update - // right after the channel_announcement, channel_updates will be moved from private to public at that time - val d1 = d0.copy( - channels = d0.channels + (c.shortChannelId -> pc), - privateChannels = d0.privateChannels - c.shortChannelId, // we remove fake announcements that we may have made before - rebroadcast = d0.rebroadcast.copy(channels = d0.rebroadcast.channels + (c -> d0.awaiting.getOrElse(c, Nil).toSet)), // we also add the newly validated channels to the rebroadcast queue - stash = stash1, - awaiting = awaiting1) - // we only reprocess updates and nodes if validation succeeded - val d2 = reprocessUpdates.foldLeft(d1) { - case (d, (u, origins)) => Validation.handleChannelUpdate(d, nodeParams.db.network, nodeParams.routerConf, Right(RemoteChannelUpdate(u, origins)), wasStashed = true) - } - val d3 = reprocessNodes.foldLeft(d2) { - case (d, (n, origins)) => Validation.handleNodeAnnouncement(d, nodeParams.db.network, origins, n, wasStashed = true) - } - d3 - } - case None => - reprocessUpdates.foreach { case (u, origins) => origins.collect { case o: RemoteGossip => sendDecision(o.peerConnection, GossipDecision.NoRelatedChannel(u)) } } - reprocessNodes.foreach { case (n, origins) => origins.collect { case o: RemoteGossip => sendDecision(o.peerConnection, GossipDecision.NoKnownChannel(n)) } } - d0.copy(stash = stash1, awaiting = awaiting1) - } + publicChannel_opt match { + case Some(pc) => + // note: if the channel is graduating from private to public, the implementation (in the LocalChannelUpdate handler) guarantees that we will process a new channel_update + // right after the channel_announcement, channel_updates will be moved from private to public at that time + val d1 = d0.copy( + channels = d0.channels + (c.shortChannelId -> pc), + privateChannels = d0.privateChannels - c.shortChannelId, // we remove fake announcements that we may have made before + rebroadcast = d0.rebroadcast.copy(channels = d0.rebroadcast.channels + (c -> d0.awaiting.getOrElse(c, Nil).toSet)), // we also add the newly validated channels to the rebroadcast queue + stash = stash1, + awaiting = awaiting1) + // we only reprocess updates and nodes if validation succeeded + val d2 = reprocessUpdates.foldLeft(d1) { + case (d, (u, origins)) => Validation.handleChannelUpdate(d, nodeParams.db.network, nodeParams.routerConf, Right(RemoteChannelUpdate(u, origins)), wasStashed = true) } - } + val d3 = reprocessNodes.foldLeft(d2) { + case (d, (n, origins)) => Validation.handleNodeAnnouncement(d, nodeParams.db.network, origins, n, wasStashed = true) + } + d3 + case None => + reprocessUpdates.foreach { case (u, origins) => origins.collect { case o: RemoteGossip => sendDecision(o.peerConnection, GossipDecision.NoRelatedChannel(u)) } } + reprocessNodes.foreach { case (n, origins) => origins.collect { case o: RemoteGossip => sendDecision(o.peerConnection, GossipDecision.NoKnownChannel(n)) } } + d0.copy(stash = stash1, awaiting = awaiting1) } } } diff --git a/eclair-front/src/main/resources/application.conf b/eclair-front/src/main/resources/application.conf index 1e5d3f95d..b33c16682 100644 --- a/eclair-front/src/main/resources/application.conf +++ b/eclair-front/src/main/resources/application.conf @@ -54,10 +54,9 @@ kamon { actors { # Decides which actors generate Spans for the messages they process, given that there is already an ongoing trace # in the Context of the processed message (i.e. there is a Sampled Span in the Context). - # trace { includes = [] - excludes = ["**"] # we don't want automatically generated spans because they conflict with the ones we define + excludes = ["**"] # we don't want automatically generated spans; we're not using them } } } diff --git a/eclair-node/src/main/resources/application.conf b/eclair-node/src/main/resources/application.conf index 270c533c0..6736b8900 100644 --- a/eclair-node/src/main/resources/application.conf +++ b/eclair-node/src/main/resources/application.conf @@ -38,10 +38,9 @@ kamon.instrumentation.akka { actors { # Decides which actors generate Spans for the messages they process, given that there is already an ongoing trace # in the Context of the processed message (i.e. there is a Sampled Span in the Context). - # trace { includes = [ ] - excludes = [ "**" ] # we don't want automatically generated spans because they conflict with the ones we define + excludes = [ "**" ] # we don't want automatically generated spans; we're not using them } } }