1
0
Fork 0
mirror of https://github.com/ACINQ/eclair.git synced 2025-03-03 17:36:56 +01:00

Remove kamon tracing (#1662)

It's costly, we're not using it, and it's too invasive.
There's no reason to keep it at the moment.
This commit is contained in:
Bastien Teinturier 2021-01-20 18:35:16 +01:00 committed by GitHub
parent 81f15aabd9
commit 54ca292209
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 173 additions and 305 deletions

View file

@ -16,9 +16,7 @@
package fr.acinq.eclair
import fr.acinq.eclair.payment.{LocalFailure, PaymentFailure, RemoteFailure, UnreadableRemoteFailure}
import kamon.metric.Timer
import kamon.trace.Span
import scala.concurrent.{ExecutionContext, Future}
@ -40,15 +38,4 @@ object KamonExt {
res
}
/**
* A helper function that fails a span with proper messages when dealing with payments
*/
def failSpan(span: Span, failure: PaymentFailure) = {
failure match {
case LocalFailure(_, t) => span.fail("local failure", t)
case RemoteFailure(_, e) => span.fail(s"remote failure: origin=${e.originNode} error=${e.failureMessage}")
case UnreadableRemoteFailure(_) => span.fail("unreadable remote failure")
}
}
}

View file

@ -21,7 +21,6 @@ import fr.acinq.eclair.ShortChannelId.coordinates
import fr.acinq.eclair.TxCoordinates
import fr.acinq.eclair.blockchain.{GetTxWithMetaResponse, UtxoStatus, ValidateResult}
import fr.acinq.eclair.wire.ChannelAnnouncement
import kamon.Kamon
import org.json4s.Formats
import org.json4s.JsonAST._
@ -170,31 +169,20 @@ class ExtendedBitcoinClient(val rpcClient: BitcoinJsonRPCClient) {
def validate(c: ChannelAnnouncement)(implicit ec: ExecutionContext): Future[ValidateResult] = {
val TxCoordinates(blockHeight, txIndex, outputIndex) = coordinates(c.shortChannelId)
val span = Kamon.spanBuilder("validate-bitcoin-client").start()
for {
_ <- Future.successful(0)
span0 = Kamon.spanBuilder("getblockhash").start()
blockHash <- rpcClient.invoke("getblockhash", blockHeight).map(_.extractOpt[String].map(ByteVector32.fromValidHex).getOrElse(ByteVector32.Zeroes))
_ = span0.finish()
span1 = Kamon.spanBuilder("getblock").start()
txid: ByteVector32 <- rpcClient.invoke("getblock", blockHash).map(json => Try {
val JArray(txs) = json \ "tx"
ByteVector32.fromValidHex(txs(txIndex).extract[String])
}.getOrElse(ByteVector32.Zeroes))
_ = span1.finish()
span2 = Kamon.spanBuilder("getrawtx").start()
tx <- getRawTransaction(txid)
_ = span2.finish()
span3 = Kamon.spanBuilder("utxospendable-mempool").start()
unspent <- isTransactionOutputSpendable(txid, outputIndex, includeMempool = true)
_ = span3.finish()
fundingTxStatus <- if (unspent) {
Future.successful(UtxoStatus.Unspent)
} else {
// if this returns true, it means that the spending tx is *not* in the blockchain
isTransactionOutputSpendable(txid, outputIndex, includeMempool = false).map(res => UtxoStatus.Spent(spendingTxConfirmed = !res))
}
_ = span.finish()
} yield ValidateResult(c, Right((Transaction.read(tx), fundingTxStatus)))
} recover {
case t: Throwable => ValidateResult(c, Left(t))

View file

@ -32,8 +32,6 @@ import fr.acinq.eclair.router.RouteCalculation
import fr.acinq.eclair.router.Router._
import fr.acinq.eclair.wire._
import fr.acinq.eclair.{CltvExpiry, FSMDiagnosticActorLogging, Logs, MilliSatoshi, MilliSatoshiLong, NodeParams}
import kamon.Kamon
import kamon.context.Context
import java.util.UUID
import java.util.concurrent.TimeUnit
@ -57,13 +55,6 @@ class MultiPartPaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig,
val start = System.currentTimeMillis
private var retriedFailedChannels = false
private val span = Kamon.spanBuilder("multi-part-payment")
.tag(Tags.ParentId, cfg.parentId.toString)
.tag(Tags.PaymentHash, paymentHash.toHex)
.tag(Tags.RecipientNodeId, cfg.recipientNodeId.toString())
.tag(Tags.RecipientAmount, cfg.recipientAmount.toLong)
.start()
startWith(WAIT_FOR_PAYMENT_REQUEST, WaitingForRequest)
when(WAIT_FOR_PAYMENT_REQUEST) {
@ -83,11 +74,7 @@ class MultiPartPaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig,
val (toSend, maxFee) = remainingToSend(nodeParams, d.request, d.pending.values)
if (routes.map(_.amount).sum == toSend) {
val childPayments = routes.map(route => (UUID.randomUUID(), route)).toMap
Kamon.runWithContextEntry(parentPaymentIdKey, cfg.parentId) {
Kamon.runWithSpan(span, finishSpan = true) {
childPayments.foreach { case (childId, route) => spawnChildPaymentFsm(childId) ! createChildPayment(self, route, d.request) }
}
}
childPayments.foreach { case (childId, route) => spawnChildPaymentFsm(childId) ! createChildPayment(self, route, d.request) }
goto(PAYMENT_IN_PROGRESS) using d.copy(remainingAttempts = (d.remainingAttempts - 1).max(0), pending = d.pending ++ childPayments)
} else {
// If a child payment failed while we were waiting for routes, the routes we received don't cover the whole
@ -242,7 +229,6 @@ class MultiPartPaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig,
case Left(paymentFailed) =>
log.warning("multi-part payment failed")
reply(origin, paymentFailed)
span.fail("payment failed")
case Right(paymentSent) =>
log.info("multi-part payment succeeded")
reply(origin, paymentSent)
@ -254,7 +240,6 @@ class MultiPartPaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig,
if (retriedFailedChannels) {
Metrics.RetryFailedChannelsResult.withTag(Tags.Success, event.isRight).increment()
}
span.finish()
stop(FSM.Normal)
}
@ -278,8 +263,6 @@ class MultiPartPaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig,
object MultiPartPaymentLifecycle {
val parentPaymentIdKey = Context.key[UUID]("parentPaymentId", UUID.fromString("00000000-0000-0000-0000-000000000000"))
def props(nodeParams: NodeParams, cfg: SendPaymentConfig, router: ActorRef, register: ActorRef) = Props(new MultiPartPaymentLifecycle(nodeParams, cfg, router, register))
/**

View file

@ -16,8 +16,6 @@
package fr.acinq.eclair.payment.send
import java.util.concurrent.TimeUnit
import akka.actor.{ActorRef, FSM, Props, Status}
import akka.event.Logging.MDC
import fr.acinq.bitcoin.ByteVector32
@ -36,9 +34,8 @@ import fr.acinq.eclair.router.Router._
import fr.acinq.eclair.router._
import fr.acinq.eclair.wire.Onion._
import fr.acinq.eclair.wire._
import kamon.Kamon
import kamon.trace.Span
import java.util.concurrent.TimeUnit
import scala.util.{Failure, Success}
/**
@ -52,28 +49,10 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A
private val paymentsDb = nodeParams.db.payments
private val start = System.currentTimeMillis
private val span = Kamon.runWithContextEntry(MultiPartPaymentLifecycle.parentPaymentIdKey, cfg.parentId) {
val spanBuilder = if (Kamon.currentSpan().isEmpty) {
Kamon.spanBuilder("single-payment")
} else {
Kamon.spanBuilder("payment-part").asChildOf(Kamon.currentSpan())
}
spanBuilder
.tag(Tags.PaymentId, cfg.id.toString)
.tag(Tags.PaymentHash, paymentHash.toHex)
.tag(Tags.RecipientNodeId, cfg.recipientNodeId.toString())
.tag(Tags.RecipientAmount, cfg.recipientAmount.toLong)
.start()
}
startWith(WAITING_FOR_REQUEST, WaitingForRequest)
when(WAITING_FOR_REQUEST) {
case Event(c: SendPaymentToRoute, WaitingForRequest) =>
span.tag(Tags.TargetNodeId, c.targetNodeId.toString())
span.tag(Tags.Amount, c.finalPayload.amount.toLong)
span.tag(Tags.TotalAmount, c.finalPayload.totalAmount.toLong)
span.tag(Tags.Expiry, c.finalPayload.expiry.toLong)
log.debug("sending {} to route {}", c.finalPayload.amount, c.printRoute())
val send = SendPayment(c.replyTo, c.targetNodeId, c.finalPayload, maxAttempts = 1, assistedRoutes = c.assistedRoutes)
c.route.fold(
@ -86,10 +65,6 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A
goto(WAITING_FOR_ROUTE) using WaitingForRoute(send, Nil, Ignore.empty)
case Event(c: SendPayment, WaitingForRequest) =>
span.tag(Tags.TargetNodeId, c.targetNodeId.toString())
span.tag(Tags.Amount, c.finalPayload.amount.toLong)
span.tag(Tags.TotalAmount, c.finalPayload.totalAmount.toLong)
span.tag(Tags.Expiry, c.finalPayload.expiry.toLong)
log.debug("sending {} to {}", c.finalPayload.amount, c.targetNodeId)
router ! RouteRequest(nodeParams.nodeId, c.targetNodeId, c.finalPayload.amount, c.getMaxFee(nodeParams), c.assistedRoutes, routeParams = c.routeParams, paymentContext = Some(cfg.paymentContext))
if (cfg.storeInDb) {
@ -109,7 +84,7 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A
log.warning("router error: {}", t.getMessage)
Metrics.PaymentError.withTag(Tags.Failure, Tags.FailureType(LocalFailure(Nil, t))).increment()
onFailure(c.replyTo, PaymentFailed(id, paymentHash, failures :+ LocalFailure(Nil, t)))
myStop()
stop(FSM.Normal)
}
when(WAITING_FOR_PAYMENT_COMPLETE) {
@ -122,7 +97,7 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A
Metrics.PaymentAttempt.withTag(Tags.MultiPart, value = false).record(d.failures.size + 1)
val p = PartialPayment(id, d.c.finalPayload.amount, d.cmd.amount - d.c.finalPayload.amount, htlc.channelId, Some(cfg.fullRoute(d.route)))
onSuccess(d.c.replyTo, cfg.createPaymentSent(fulfill.paymentPreimage, p :: Nil))
myStop()
stop(FSM.Normal)
case Event(RES_ADD_SETTLED(_, _, fail: HtlcResult.Fail), d: WaitingForComplete) =>
fail match {
@ -143,24 +118,6 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A
}
}
private var stateSpan: Option[Span] = None
onTransition {
case _ -> state2 =>
// whenever there is a transition we stop the current span and start a new one, this way we can track each state
val stateSpanBuilder = Kamon.spanBuilder(state2.toString).asChildOf(span)
nextStateData match {
case d: WaitingForRoute =>
// this means that previous state was WAITING_FOR_COMPLETE
d.failures.lastOption.foreach(failure => stateSpan.foreach(span => KamonExt.failSpan(span, failure)))
case d: WaitingForComplete =>
stateSpanBuilder.tag("route", s"${cfg.fullRoute(d.route).map(_.nextNodeId).mkString("->")}")
case _ => ()
}
stateSpan.foreach(_.finish())
stateSpan = Some(stateSpanBuilder.start())
}
whenUnhandled {
case Event(_: TransportHandler.ReadAck, _) => stay // ignored, router replies with this when we forward a channel_update
}
@ -187,7 +144,7 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A
retry(failure, d)
} else {
onFailure(d.c.replyTo, PaymentFailed(id, paymentHash, d.failures :+ LocalFailure(cfg.fullRoute(d.route), t)))
myStop()
stop(FSM.Normal)
}
}
@ -205,7 +162,7 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A
// if destination node returns an error, we fail the payment immediately
log.warning(s"received an error message from target nodeId=$nodeId, failing the payment (failure=$failureMessage)")
onFailure(c.replyTo, PaymentFailed(id, paymentHash, failures :+ RemoteFailure(cfg.fullRoute(route), e)))
myStop()
stop(FSM.Normal)
case res if failures.size + 1 >= c.maxAttempts =>
// otherwise we never try more than maxAttempts, no matter the kind of error returned
val failure = res match {
@ -222,7 +179,7 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A
}
log.warning(s"too many failed attempts, failing the payment")
onFailure(c.replyTo, PaymentFailed(id, paymentHash, failures :+ failure))
myStop()
stop(FSM.Normal)
case Failure(t) =>
log.warning(s"cannot parse returned error: ${t.getMessage}, route=${route.printNodes()}")
val failure = UnreadableRemoteFailure(cfg.fullRoute(route))
@ -300,12 +257,6 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A
}
}
private def myStop(): State = {
stateSpan.foreach(_.finish())
span.finish()
stop(FSM.Normal)
}
private def onSuccess(replyTo: ActorRef, result: PaymentSent): Unit = {
if (cfg.storeInDb) paymentsDb.updateOutgoingPayment(result)
replyTo ! result
@ -317,7 +268,6 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A
}
private def onFailure(replyTo: ActorRef, result: PaymentFailed): Unit = {
span.fail("payment failed")
if (cfg.storeInDb) paymentsDb.updateOutgoingPayment(result)
replyTo ! result
if (cfg.publishEvent) context.system.eventStream.publish(result)

View file

@ -25,14 +25,12 @@ import fr.acinq.eclair.router.Monitoring.{Metrics, Tags}
import fr.acinq.eclair.router.Router._
import fr.acinq.eclair.wire._
import fr.acinq.eclair.{ShortChannelId, serializationResult}
import kamon.Kamon
import scodec.bits.ByteVector
import shapeless.HNil
import scala.annotation.tailrec
import scala.collection.SortedSet
import scala.collection.immutable.SortedMap
import scala.compat.Platform
import scala.concurrent.duration._
import scala.util.Random
@ -70,25 +68,17 @@ object Sync {
implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors
ctx.sender ! TransportHandler.ReadAck(q)
Metrics.QueryChannelRange.Blocks.withoutTags().record(q.numberOfBlocks)
Kamon.runWithContextEntry(remoteNodeIdKey, origin.nodeId.toString) {
Kamon.runWithSpan(Kamon.spanBuilder("query-channel-range").start(), finishSpan = true) {
log.info("received query_channel_range with firstBlockNum={} numberOfBlocks={} extendedQueryFlags_opt={}", q.firstBlockNum, q.numberOfBlocks, q.tlvStream)
// keep channel ids that are in [firstBlockNum, firstBlockNum + numberOfBlocks]
val shortChannelIds: SortedSet[ShortChannelId] = channels.keySet.filter(keep(q.firstBlockNum, q.numberOfBlocks, _))
log.info("replying with {} items for range=({}, {})", shortChannelIds.size, q.firstBlockNum, q.numberOfBlocks)
val chunks = Kamon.runWithSpan(Kamon.spanBuilder("split-channel-ids").start(), finishSpan = true) {
split(shortChannelIds, q.firstBlockNum, q.numberOfBlocks, routerConf.channelRangeChunkSize)
}
Metrics.QueryChannelRange.Replies.withoutTags().record(chunks.size)
Kamon.runWithSpan(Kamon.spanBuilder("compute-timestamps-checksums").start(), finishSpan = true) {
chunks.foreach { chunk =>
val reply = buildReplyChannelRange(chunk, q.chainHash, routerConf.encodingType, q.queryFlags_opt, channels)
origin.peerConnection ! reply
Metrics.ReplyChannelRange.Blocks.withTag(Tags.Direction, Tags.Directions.Outgoing).record(reply.numberOfBlocks)
Metrics.ReplyChannelRange.ShortChannelIds.withTag(Tags.Direction, Tags.Directions.Outgoing).record(reply.shortChannelIds.array.size)
}
}
}
log.info("received query_channel_range with firstBlockNum={} numberOfBlocks={} extendedQueryFlags_opt={}", q.firstBlockNum, q.numberOfBlocks, q.tlvStream)
// keep channel ids that are in [firstBlockNum, firstBlockNum + numberOfBlocks]
val shortChannelIds: SortedSet[ShortChannelId] = channels.keySet.filter(keep(q.firstBlockNum, q.numberOfBlocks, _))
log.info("replying with {} items for range=({}, {})", shortChannelIds.size, q.firstBlockNum, q.numberOfBlocks)
val chunks = split(shortChannelIds, q.firstBlockNum, q.numberOfBlocks, routerConf.channelRangeChunkSize)
Metrics.QueryChannelRange.Replies.withoutTags().record(chunks.size)
chunks.foreach { chunk =>
val reply = buildReplyChannelRange(chunk, q.chainHash, routerConf.encodingType, q.queryFlags_opt, channels)
origin.peerConnection ! reply
Metrics.ReplyChannelRange.Blocks.withTag(Tags.Direction, Tags.Directions.Outgoing).record(reply.numberOfBlocks)
Metrics.ReplyChannelRange.ShortChannelIds.withTag(Tags.Direction, Tags.Directions.Outgoing).record(reply.shortChannelIds.array.size)
}
}
@ -99,99 +89,88 @@ object Sync {
Metrics.ReplyChannelRange.Blocks.withTag(Tags.Direction, Tags.Directions.Incoming).record(r.numberOfBlocks)
Metrics.ReplyChannelRange.ShortChannelIds.withTag(Tags.Direction, Tags.Directions.Incoming).record(r.shortChannelIds.array.size)
Kamon.runWithContextEntry(remoteNodeIdKey, origin.nodeId.toString) {
Kamon.runWithSpan(Kamon.spanBuilder("reply-channel-range").start(), finishSpan = true) {
@tailrec
def loop(ids: List[ShortChannelId], timestamps: List[ReplyChannelRangeTlv.Timestamps], checksums: List[ReplyChannelRangeTlv.Checksums], acc: List[ShortChannelIdAndFlag] = List.empty[ShortChannelIdAndFlag]): List[ShortChannelIdAndFlag] = {
ids match {
case Nil => acc.reverse
case head :: tail =>
val flag = computeFlag(d.channels)(head, timestamps.headOption, checksums.headOption, routerConf.requestNodeAnnouncements)
// 0 means nothing to query, just don't include it
val acc1 = if (flag != 0) ShortChannelIdAndFlag(head, flag) :: acc else acc
loop(tail, timestamps.drop(1), checksums.drop(1), acc1)
}
}
val timestamps_opt = r.timestamps_opt.map(_.timestamps).getOrElse(List.empty[ReplyChannelRangeTlv.Timestamps])
val checksums_opt = r.checksums_opt.map(_.checksums).getOrElse(List.empty[ReplyChannelRangeTlv.Checksums])
val shortChannelIdAndFlags = Kamon.runWithSpan(Kamon.spanBuilder("compute-flags").start(), finishSpan = true) {
loop(r.shortChannelIds.array, timestamps_opt, checksums_opt)
}
val (channelCount, updatesCount) = shortChannelIdAndFlags.foldLeft((0, 0)) {
case ((c, u), ShortChannelIdAndFlag(_, flag)) =>
val c1 = c + (if (QueryShortChannelIdsTlv.QueryFlagType.includeChannelAnnouncement(flag)) 1 else 0)
val u1 = u + (if (QueryShortChannelIdsTlv.QueryFlagType.includeUpdate1(flag)) 1 else 0) + (if (QueryShortChannelIdsTlv.QueryFlagType.includeUpdate2(flag)) 1 else 0)
(c1, u1)
}
log.info(s"received reply_channel_range with {} channels, we're missing {} channel announcements and {} updates, format={}", r.shortChannelIds.array.size, channelCount, updatesCount, r.shortChannelIds.encoding)
Metrics.ReplyChannelRange.NewChannelAnnouncements.withoutTags().record(channelCount)
Metrics.ReplyChannelRange.NewChannelUpdates.withoutTags().record(updatesCount)
def buildQuery(chunk: List[ShortChannelIdAndFlag]): QueryShortChannelIds = {
// always encode empty lists as UNCOMPRESSED
val encoding = if (chunk.isEmpty) EncodingType.UNCOMPRESSED else r.shortChannelIds.encoding
QueryShortChannelIds(r.chainHash,
shortChannelIds = EncodedShortChannelIds(encoding, chunk.map(_.shortChannelId)),
if (r.timestamps_opt.isDefined || r.checksums_opt.isDefined)
TlvStream(QueryShortChannelIdsTlv.EncodedQueryFlags(encoding, chunk.map(_.flag)))
else
TlvStream.empty
)
}
// we update our sync data to this node (there may be multiple channel range responses and we can only query one set of ids at a time)
val replies = shortChannelIdAndFlags
.grouped(routerConf.channelQueryChunkSize)
.map(buildQuery)
.toList
val (sync1, replynow_opt) = addToSync(d.sync, origin.nodeId, replies)
// we only send a reply right away if there were no pending requests
replynow_opt.foreach(origin.peerConnection ! _)
val progress = syncProgress(sync1)
ctx.system.eventStream.publish(progress)
ctx.self ! progress
d.copy(sync = sync1)
@tailrec
def loop(ids: List[ShortChannelId], timestamps: List[ReplyChannelRangeTlv.Timestamps], checksums: List[ReplyChannelRangeTlv.Checksums], acc: List[ShortChannelIdAndFlag] = List.empty[ShortChannelIdAndFlag]): List[ShortChannelIdAndFlag] = {
ids match {
case Nil => acc.reverse
case head :: tail =>
val flag = computeFlag(d.channels)(head, timestamps.headOption, checksums.headOption, routerConf.requestNodeAnnouncements)
// 0 means nothing to query, just don't include it
val acc1 = if (flag != 0) ShortChannelIdAndFlag(head, flag) :: acc else acc
loop(tail, timestamps.drop(1), checksums.drop(1), acc1)
}
}
val timestamps_opt = r.timestamps_opt.map(_.timestamps).getOrElse(List.empty[ReplyChannelRangeTlv.Timestamps])
val checksums_opt = r.checksums_opt.map(_.checksums).getOrElse(List.empty[ReplyChannelRangeTlv.Checksums])
val shortChannelIdAndFlags = loop(r.shortChannelIds.array, timestamps_opt, checksums_opt)
val (channelCount, updatesCount) = shortChannelIdAndFlags.foldLeft((0, 0)) {
case ((c, u), ShortChannelIdAndFlag(_, flag)) =>
val c1 = c + (if (QueryShortChannelIdsTlv.QueryFlagType.includeChannelAnnouncement(flag)) 1 else 0)
val u1 = u + (if (QueryShortChannelIdsTlv.QueryFlagType.includeUpdate1(flag)) 1 else 0) + (if (QueryShortChannelIdsTlv.QueryFlagType.includeUpdate2(flag)) 1 else 0)
(c1, u1)
}
log.info(s"received reply_channel_range with {} channels, we're missing {} channel announcements and {} updates, format={}", r.shortChannelIds.array.size, channelCount, updatesCount, r.shortChannelIds.encoding)
Metrics.ReplyChannelRange.NewChannelAnnouncements.withoutTags().record(channelCount)
Metrics.ReplyChannelRange.NewChannelUpdates.withoutTags().record(updatesCount)
def buildQuery(chunk: List[ShortChannelIdAndFlag]): QueryShortChannelIds = {
// always encode empty lists as UNCOMPRESSED
val encoding = if (chunk.isEmpty) EncodingType.UNCOMPRESSED else r.shortChannelIds.encoding
QueryShortChannelIds(r.chainHash,
shortChannelIds = EncodedShortChannelIds(encoding, chunk.map(_.shortChannelId)),
if (r.timestamps_opt.isDefined || r.checksums_opt.isDefined)
TlvStream(QueryShortChannelIdsTlv.EncodedQueryFlags(encoding, chunk.map(_.flag)))
else
TlvStream.empty
)
}
// we update our sync data to this node (there may be multiple channel range responses and we can only query one set of ids at a time)
val replies = shortChannelIdAndFlags
.grouped(routerConf.channelQueryChunkSize)
.map(buildQuery)
.toList
val (sync1, replynow_opt) = addToSync(d.sync, origin.nodeId, replies)
// we only send a reply right away if there were no pending requests
replynow_opt.foreach(origin.peerConnection ! _)
val progress = syncProgress(sync1)
ctx.system.eventStream.publish(progress)
ctx.self ! progress
d.copy(sync = sync1)
}
def handleQueryShortChannelIds(nodes: Map[PublicKey, NodeAnnouncement], channels: SortedMap[ShortChannelId, PublicChannel], origin: RemoteGossip, q: QueryShortChannelIds)(implicit ctx: ActorContext, log: LoggingAdapter): Unit = {
implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors
ctx.sender ! TransportHandler.ReadAck(q)
Kamon.runWithContextEntry(remoteNodeIdKey, origin.nodeId.toString) {
Kamon.runWithSpan(Kamon.spanBuilder("query-short-channel-ids").start(), finishSpan = true) {
val flags = q.queryFlags_opt.map(_.array).getOrElse(List.empty[Long])
var channelCount = 0
var updateCount = 0
var nodeCount = 0
val flags = q.queryFlags_opt.map(_.array).getOrElse(List.empty[Long])
var channelCount = 0
var updateCount = 0
var nodeCount = 0
processChannelQuery(nodes, channels)(
q.shortChannelIds.array,
flags,
ca => {
channelCount = channelCount + 1
origin.peerConnection ! ca
},
cu => {
updateCount = updateCount + 1
origin.peerConnection ! cu
},
na => {
nodeCount = nodeCount + 1
origin.peerConnection ! na
}
)
Metrics.QueryShortChannelIds.Nodes.withoutTags().record(nodeCount)
Metrics.QueryShortChannelIds.ChannelAnnouncements.withoutTags().record(channelCount)
Metrics.QueryShortChannelIds.ChannelUpdates.withoutTags().record(updateCount)
log.info("received query_short_channel_ids with {} items, sent back {} channels and {} updates and {} nodes", q.shortChannelIds.array.size, channelCount, updateCount, nodeCount)
origin.peerConnection ! ReplyShortChannelIdsEnd(q.chainHash, 1)
processChannelQuery(nodes, channels)(
q.shortChannelIds.array,
flags,
ca => {
channelCount = channelCount + 1
origin.peerConnection ! ca
},
cu => {
updateCount = updateCount + 1
origin.peerConnection ! cu
},
na => {
nodeCount = nodeCount + 1
origin.peerConnection ! na
}
}
)
Metrics.QueryShortChannelIds.Nodes.withoutTags().record(nodeCount)
Metrics.QueryShortChannelIds.ChannelAnnouncements.withoutTags().record(channelCount)
Metrics.QueryShortChannelIds.ChannelUpdates.withoutTags().record(updateCount)
log.info("received query_short_channel_ids with {} items, sent back {} channels and {} updates and {} nodes", q.shortChannelIds.array.size, channelCount, updateCount, nodeCount)
origin.peerConnection ! ReplyShortChannelIdsEnd(q.chainHash, 1)
}
def handleReplyShortChannelIdsEnd(d: Data, origin: RemoteGossip, r: ReplyShortChannelIdsEnd)(implicit ctx: ActorContext, log: LoggingAdapter): Data = {

View file

@ -29,7 +29,6 @@ import fr.acinq.eclair.router.Router._
import fr.acinq.eclair.transactions.Scripts
import fr.acinq.eclair.wire._
import fr.acinq.eclair.{Logs, MilliSatoshiLong, NodeParams, ShortChannelId, TxCoordinates}
import kamon.Kamon
object Validation {
@ -69,11 +68,7 @@ object Validation {
d
} else {
log.info("validating shortChannelId={}", c.shortChannelId)
Kamon.runWithContextEntry(shortChannelIdKey, c.shortChannelId) {
Kamon.runWithSpan(Kamon.spanBuilder("validate-channel").tag("shortChannelId", c.shortChannelId.toString).start(), finishSpan = false) {
watcher ! ValidateRequest(c)
}
}
watcher ! ValidateRequest(c)
// we don't acknowledge the message just yet
d.copy(awaiting = d.awaiting + (c -> Seq(origin)))
}
@ -83,102 +78,90 @@ object Validation {
implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors
import nodeParams.db.{network => db}
import r.c
Kamon.runWithContextEntry(shortChannelIdKey, c.shortChannelId) {
Kamon.runWithSpan(Kamon.currentSpan(), finishSpan = true) {
Kamon.runWithSpan(Kamon.spanBuilder("process-validate-result").start(), finishSpan = true) {
d0.awaiting.get(c) match {
case Some(origin +: _) => origin.peerConnection ! TransportHandler.ReadAck(c) // now we can acknowledge the message, we only need to do it for the first peer that sent us the announcement
case _ => ()
d0.awaiting.get(c) match {
case Some(origin +: _) => origin.peerConnection ! TransportHandler.ReadAck(c) // now we can acknowledge the message, we only need to do it for the first peer that sent us the announcement
case _ => ()
}
val remoteOrigins_opt = d0.awaiting.get(c)
Logs.withMdc(log)(Logs.mdc(remoteNodeId_opt = remoteOrigins_opt.flatMap(_.headOption).map(_.nodeId))) { // in the MDC we use the node id that sent us the announcement first
log.info("got validation result for shortChannelId={} (awaiting={} stash.nodes={} stash.updates={})", c.shortChannelId, d0.awaiting.size, d0.stash.nodes.size, d0.stash.updates.size)
val publicChannel_opt = r match {
case ValidateResult(c, Left(t)) =>
log.warning("validation failure for shortChannelId={} reason={}", c.shortChannelId, t.getMessage)
remoteOrigins_opt.foreach(_.foreach(o => sendDecision(o.peerConnection, GossipDecision.ValidationFailure(c))))
None
case ValidateResult(c, Right((tx, UtxoStatus.Unspent))) =>
val TxCoordinates(_, _, outputIndex) = ShortChannelId.coordinates(c.shortChannelId)
val (fundingOutputScript, fundingOutputIsInvalid) = {
// let's check that the output is indeed a P2WSH multisig 2-of-2 of nodeid1 and nodeid2)
val fundingOutputScript = write(pay2wsh(Scripts.multiSig2of2(c.bitcoinKey1, c.bitcoinKey2)))
val fundingOutputIsInvalid = tx.txOut.size < outputIndex + 1 || fundingOutputScript != tx.txOut(outputIndex).publicKeyScript
(fundingOutputScript, fundingOutputIsInvalid)
}
val remoteOrigins_opt = d0.awaiting.get(c)
Logs.withMdc(log)(Logs.mdc(remoteNodeId_opt = remoteOrigins_opt.flatMap(_.headOption).map(_.nodeId))) { // in the MDC we use the node id that sent us the announcement first
log.info("got validation result for shortChannelId={} (awaiting={} stash.nodes={} stash.updates={})", c.shortChannelId, d0.awaiting.size, d0.stash.nodes.size, d0.stash.updates.size)
val publicChannel_opt = r match {
case ValidateResult(c, Left(t)) =>
log.warning("validation failure for shortChannelId={} reason={}", c.shortChannelId, t.getMessage)
remoteOrigins_opt.foreach(_.foreach(o => sendDecision(o.peerConnection, GossipDecision.ValidationFailure(c))))
None
case ValidateResult(c, Right((tx, UtxoStatus.Unspent))) =>
val TxCoordinates(_, _, outputIndex) = ShortChannelId.coordinates(c.shortChannelId)
val (fundingOutputScript, fundingOutputIsInvalid) = Kamon.runWithSpan(Kamon.spanBuilder("checked-pubkeyscript").start(), finishSpan = true) {
// let's check that the output is indeed a P2WSH multisig 2-of-2 of nodeid1 and nodeid2)
val fundingOutputScript = write(pay2wsh(Scripts.multiSig2of2(c.bitcoinKey1, c.bitcoinKey2)))
val fundingOutputIsInvalid = tx.txOut.size < outputIndex + 1 || fundingOutputScript != tx.txOut(outputIndex).publicKeyScript
(fundingOutputScript, fundingOutputIsInvalid)
}
if (fundingOutputIsInvalid) {
log.error(s"invalid script for shortChannelId={}: txid={} does not have script=$fundingOutputScript at outputIndex=$outputIndex ann={}", c.shortChannelId, tx.txid, c)
remoteOrigins_opt.foreach(_.foreach(o => sendDecision(o.peerConnection, GossipDecision.InvalidAnnouncement(c))))
None
} else {
watcher ! WatchSpentBasic(ctx.self, tx, outputIndex, BITCOIN_FUNDING_EXTERNAL_CHANNEL_SPENT(c.shortChannelId))
log.debug("added channel channelId={}", c.shortChannelId)
remoteOrigins_opt.foreach(_.foreach(o => sendDecision(o.peerConnection, GossipDecision.Accepted(c))))
val capacity = tx.txOut(outputIndex).amount
ctx.system.eventStream.publish(ChannelsDiscovered(SingleChannelDiscovered(c, capacity, None, None) :: Nil))
Kamon.runWithSpan(Kamon.spanBuilder("add-to-db").start(), finishSpan = true) {
db.addChannel(c, tx.txid, capacity)
}
// in case we just validated our first local channel, we announce the local node
if (!d0.nodes.contains(nodeParams.nodeId) && isRelatedTo(c, nodeParams.nodeId)) {
log.info("first local channel validated, announcing local node")
val nodeAnn = Announcements.makeNodeAnnouncement(nodeParams.privateKey, nodeParams.alias, nodeParams.color, nodeParams.publicAddresses, nodeParams.features)
ctx.self ! nodeAnn
}
// public channels that haven't yet been announced are considered as private channels
val channelMeta_opt = d0.privateChannels.get(c.shortChannelId).map(_.meta)
Some(PublicChannel(c, tx.txid, capacity, None, None, channelMeta_opt))
}
case ValidateResult(c, Right((tx, fundingTxStatus: UtxoStatus.Spent))) =>
if (fundingTxStatus.spendingTxConfirmed) {
log.warning("ignoring shortChannelId={} tx={} (funding tx already spent and spending tx is confirmed)", c.shortChannelId, tx.txid)
// the funding tx has been spent by a transaction that is now confirmed: peer shouldn't send us those
remoteOrigins_opt.foreach(_.foreach(o => sendDecision(o.peerConnection, GossipDecision.ChannelClosed(c))))
} else {
log.debug("ignoring shortChannelId={} tx={} (funding tx already spent but spending tx isn't confirmed)", c.shortChannelId, tx.txid)
remoteOrigins_opt.foreach(_.foreach(o => sendDecision(o.peerConnection, GossipDecision.ChannelClosing(c))))
}
// there may be a record if we have just restarted
db.removeChannel(c.shortChannelId)
None
if (fundingOutputIsInvalid) {
log.error(s"invalid script for shortChannelId={}: txid={} does not have script=$fundingOutputScript at outputIndex=$outputIndex ann={}", c.shortChannelId, tx.txid, c)
remoteOrigins_opt.foreach(_.foreach(o => sendDecision(o.peerConnection, GossipDecision.InvalidAnnouncement(c))))
None
} else {
watcher ! WatchSpentBasic(ctx.self, tx, outputIndex, BITCOIN_FUNDING_EXTERNAL_CHANNEL_SPENT(c.shortChannelId))
log.debug("added channel channelId={}", c.shortChannelId)
remoteOrigins_opt.foreach(_.foreach(o => sendDecision(o.peerConnection, GossipDecision.Accepted(c))))
val capacity = tx.txOut(outputIndex).amount
ctx.system.eventStream.publish(ChannelsDiscovered(SingleChannelDiscovered(c, capacity, None, None) :: Nil))
db.addChannel(c, tx.txid, capacity)
// in case we just validated our first local channel, we announce the local node
if (!d0.nodes.contains(nodeParams.nodeId) && isRelatedTo(c, nodeParams.nodeId)) {
log.info("first local channel validated, announcing local node")
val nodeAnn = Announcements.makeNodeAnnouncement(nodeParams.privateKey, nodeParams.alias, nodeParams.color, nodeParams.publicAddresses, nodeParams.features)
ctx.self ! nodeAnn
}
val span1 = Kamon.spanBuilder("reprocess-stash").start
// we also reprocess node and channel_update announcements related to channels that were just analyzed
val reprocessUpdates = d0.stash.updates.filterKeys(u => u.shortChannelId == c.shortChannelId)
val reprocessNodes = d0.stash.nodes.filterKeys(n => isRelatedTo(c, n.nodeId))
// and we remove the reprocessed messages from the stash
val stash1 = d0.stash.copy(updates = d0.stash.updates -- reprocessUpdates.keys, nodes = d0.stash.nodes -- reprocessNodes.keys)
// we remove channel from awaiting map
val awaiting1 = d0.awaiting - c
span1.finish()
// public channels that haven't yet been announced are considered as private channels
val channelMeta_opt = d0.privateChannels.get(c.shortChannelId).map(_.meta)
Some(PublicChannel(c, tx.txid, capacity, None, None, channelMeta_opt))
}
case ValidateResult(c, Right((tx, fundingTxStatus: UtxoStatus.Spent))) =>
if (fundingTxStatus.spendingTxConfirmed) {
log.warning("ignoring shortChannelId={} tx={} (funding tx already spent and spending tx is confirmed)", c.shortChannelId, tx.txid)
// the funding tx has been spent by a transaction that is now confirmed: peer shouldn't send us those
remoteOrigins_opt.foreach(_.foreach(o => sendDecision(o.peerConnection, GossipDecision.ChannelClosed(c))))
} else {
log.debug("ignoring shortChannelId={} tx={} (funding tx already spent but spending tx isn't confirmed)", c.shortChannelId, tx.txid)
remoteOrigins_opt.foreach(_.foreach(o => sendDecision(o.peerConnection, GossipDecision.ChannelClosing(c))))
}
// there may be a record if we have just restarted
db.removeChannel(c.shortChannelId)
None
}
// we also reprocess node and channel_update announcements related to channels that were just analyzed
val reprocessUpdates = d0.stash.updates.filterKeys(u => u.shortChannelId == c.shortChannelId)
val reprocessNodes = d0.stash.nodes.filterKeys(n => isRelatedTo(c, n.nodeId))
// and we remove the reprocessed messages from the stash
val stash1 = d0.stash.copy(updates = d0.stash.updates -- reprocessUpdates.keys, nodes = d0.stash.nodes -- reprocessNodes.keys)
// we remove channel from awaiting map
val awaiting1 = d0.awaiting - c
publicChannel_opt match {
case Some(pc) =>
Kamon.runWithSpan(Kamon.spanBuilder("build-new-state").start, finishSpan = true) {
// note: if the channel is graduating from private to public, the implementation (in the LocalChannelUpdate handler) guarantees that we will process a new channel_update
// right after the channel_announcement, channel_updates will be moved from private to public at that time
val d1 = d0.copy(
channels = d0.channels + (c.shortChannelId -> pc),
privateChannels = d0.privateChannels - c.shortChannelId, // we remove fake announcements that we may have made before
rebroadcast = d0.rebroadcast.copy(channels = d0.rebroadcast.channels + (c -> d0.awaiting.getOrElse(c, Nil).toSet)), // we also add the newly validated channels to the rebroadcast queue
stash = stash1,
awaiting = awaiting1)
// we only reprocess updates and nodes if validation succeeded
val d2 = reprocessUpdates.foldLeft(d1) {
case (d, (u, origins)) => Validation.handleChannelUpdate(d, nodeParams.db.network, nodeParams.routerConf, Right(RemoteChannelUpdate(u, origins)), wasStashed = true)
}
val d3 = reprocessNodes.foldLeft(d2) {
case (d, (n, origins)) => Validation.handleNodeAnnouncement(d, nodeParams.db.network, origins, n, wasStashed = true)
}
d3
}
case None =>
reprocessUpdates.foreach { case (u, origins) => origins.collect { case o: RemoteGossip => sendDecision(o.peerConnection, GossipDecision.NoRelatedChannel(u)) } }
reprocessNodes.foreach { case (n, origins) => origins.collect { case o: RemoteGossip => sendDecision(o.peerConnection, GossipDecision.NoKnownChannel(n)) } }
d0.copy(stash = stash1, awaiting = awaiting1)
}
publicChannel_opt match {
case Some(pc) =>
// note: if the channel is graduating from private to public, the implementation (in the LocalChannelUpdate handler) guarantees that we will process a new channel_update
// right after the channel_announcement, channel_updates will be moved from private to public at that time
val d1 = d0.copy(
channels = d0.channels + (c.shortChannelId -> pc),
privateChannels = d0.privateChannels - c.shortChannelId, // we remove fake announcements that we may have made before
rebroadcast = d0.rebroadcast.copy(channels = d0.rebroadcast.channels + (c -> d0.awaiting.getOrElse(c, Nil).toSet)), // we also add the newly validated channels to the rebroadcast queue
stash = stash1,
awaiting = awaiting1)
// we only reprocess updates and nodes if validation succeeded
val d2 = reprocessUpdates.foldLeft(d1) {
case (d, (u, origins)) => Validation.handleChannelUpdate(d, nodeParams.db.network, nodeParams.routerConf, Right(RemoteChannelUpdate(u, origins)), wasStashed = true)
}
}
val d3 = reprocessNodes.foldLeft(d2) {
case (d, (n, origins)) => Validation.handleNodeAnnouncement(d, nodeParams.db.network, origins, n, wasStashed = true)
}
d3
case None =>
reprocessUpdates.foreach { case (u, origins) => origins.collect { case o: RemoteGossip => sendDecision(o.peerConnection, GossipDecision.NoRelatedChannel(u)) } }
reprocessNodes.foreach { case (n, origins) => origins.collect { case o: RemoteGossip => sendDecision(o.peerConnection, GossipDecision.NoKnownChannel(n)) } }
d0.copy(stash = stash1, awaiting = awaiting1)
}
}
}

View file

@ -54,10 +54,9 @@ kamon {
actors {
# Decides which actors generate Spans for the messages they process, given that there is already an ongoing trace
# in the Context of the processed message (i.e. there is a Sampled Span in the Context).
#
trace {
includes = []
excludes = ["**"] # we don't want automatically generated spans because they conflict with the ones we define
excludes = ["**"] # we don't want automatically generated spans; we're not using them
}
}
}

View file

@ -38,10 +38,9 @@ kamon.instrumentation.akka {
actors {
# Decides which actors generate Spans for the messages they process, given that there is already an ongoing trace
# in the Context of the processed message (i.e. there is a Sampled Span in the Context).
#
trace {
includes = [ ]
excludes = [ "**" ] # we don't want automatically generated spans because they conflict with the ones we define
excludes = [ "**" ] # we don't want automatically generated spans; we're not using them
}
}
}