1
0
Fork 0
mirror of https://github.com/ACINQ/eclair.git synced 2025-02-24 14:50:46 +01:00

More metrics (#1196)

* added metrics on bitcoin rpc

* added metrics on lightning message codec
This commit is contained in:
Pierre-Marie Padiou 2019-11-04 16:19:15 +01:00 committed by GitHub
parent e831c2a4ba
commit 5a6b791203
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 57 additions and 24 deletions

View file

@ -17,13 +17,14 @@
package fr.acinq.eclair
import kamon.Kamon
import kamon.tag.TagSet
import scala.concurrent.{ExecutionContext, Future}
object KamonExt {
def time[T](name: String)(f: => T) = {
val timer = Kamon.timer(name).withoutTags().start()
def time[T](name: String, tags: TagSet = TagSet.Empty)(f: => T) = {
val timer = Kamon.timer(name).withTags(tags).start()
try {
f
} finally {
@ -31,8 +32,8 @@ object KamonExt {
}
}
def timeFuture[T](name: String)(f: => Future[T])(implicit ec: ExecutionContext): Future[T] = {
val timer = Kamon.timer(name).withoutTags().start()
def timeFuture[T](name: String, tags: TagSet = TagSet.Empty)(f: => Future[T])(implicit ec: ExecutionContext): Future[T] = {
val timer = Kamon.timer(name).withTags(tags).start()
val res = f
res onComplete { case _ => timer.stop }
res

View file

@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicLong
import akka.actor.{Actor, ActorLogging, Cancellable, Props, Terminated}
import akka.pattern.pipe
import fr.acinq.bitcoin._
import fr.acinq.eclair.KamonExt
import fr.acinq.eclair.blockchain._
import fr.acinq.eclair.blockchain.bitcoind.rpc.ExtendedBitcoinClient
import fr.acinq.eclair.channel.BITCOIN_PARENT_TX_CONFIRMED
@ -55,16 +56,18 @@ class ZmqWatcher(blockCount: AtomicLong, client: ExtendedBitcoinClient)(implicit
def watching(watches: Set[Watch], watchedUtxos: Map[OutPoint, Set[Watch]], block2tx: SortedMap[Long, Seq[Transaction]], nextTick: Option[Cancellable]): Receive = {
case NewTransaction(tx) =>
log.debug(s"analyzing txid={} tx={}", tx.txid, tx)
tx.txIn
.map(_.outPoint)
.flatMap(watchedUtxos.get)
.flatten // List[Watch] -> Watch
.collect {
case w: WatchSpentBasic =>
self ! TriggerEvent(w, WatchEventSpentBasic(w.event))
case w: WatchSpent =>
self ! TriggerEvent(w, WatchEventSpent(w.event, tx))
KamonExt.time("watcher.newtx.checkwatch.time") {
log.debug(s"analyzing txid={} tx={}", tx.txid, tx)
tx.txIn
.map(_.outPoint)
.flatMap(watchedUtxos.get)
.flatten // List[Watch] -> Watch
.collect {
case w: WatchSpentBasic =>
self ! TriggerEvent(w, WatchEventSpentBasic(w.event))
case w: WatchSpent =>
self ! TriggerEvent(w, WatchEventSpent(w.event, tx))
}
}
case NewBlock(block) =>
@ -84,7 +87,9 @@ class ZmqWatcher(blockCount: AtomicLong, client: ExtendedBitcoinClient)(implicit
context.system.eventStream.publish(CurrentBlockCount(count))
}
// TODO: beware of the herd effect
watches.collect { case w: WatchConfirmed => checkConfirmed(w) }
KamonExt.timeFuture("watcher.newblock.checkwatch.time") {
Future.sequence(watches.collect { case w: WatchConfirmed => checkConfirmed(w) })
}
context become watching(watches, watchedUtxos, block2tx, None)
case TriggerEvent(w, e) if watches.contains(w) =>

View file

@ -18,6 +18,7 @@ package fr.acinq.eclair.blockchain.bitcoind.rpc
import com.softwaremill.sttp._
import com.softwaremill.sttp.json4s._
import fr.acinq.eclair.KamonExt
import org.json4s.DefaultFormats
import org.json4s.JsonAST.JValue
import org.json4s.jackson.Serialization
@ -38,7 +39,8 @@ class BasicBitcoinJsonRPCClient(user: String, password: String, host: String = "
case o => o
}
def invoke(requests: Seq[JsonRPCRequest])(implicit ec: ExecutionContext): Future[Seq[JsonRPCResponse]] =
def invoke(requests: Seq[JsonRPCRequest])(implicit ec: ExecutionContext): Future[Seq[JsonRPCResponse]] = {
KamonExt.timeFuture("bitcoin.rpc.basic.invoke.time") {
for {
res <- sttp
.post(uri"$scheme://$host:$port")
@ -47,5 +49,7 @@ class BasicBitcoinJsonRPCClient(user: String, password: String, host: String = "
.response(asJson[Seq[JsonRPCResponse]])
.send()
} yield res.unsafeBody
}
}
}

View file

@ -19,6 +19,7 @@ package fr.acinq.eclair.blockchain.bitcoind.rpc
import akka.actor.{ActorSystem, Props}
import akka.pattern.ask
import akka.util.Timeout
import fr.acinq.eclair.KamonExt
import org.json4s.JsonAST
import scala.concurrent.duration._
@ -30,6 +31,9 @@ class BatchingBitcoinJsonRPCClient(rpcClient: BasicBitcoinJsonRPCClient)(implici
val batchingClient = system.actorOf(Props(new BatchingClient(rpcClient)), name = "batching-client")
override def invoke(method: String, params: Any*)(implicit ec: ExecutionContext): Future[JsonAST.JValue] =
(batchingClient ? JsonRPCRequest(method = method, params = params)).mapTo[JsonAST.JValue]
override def invoke(method: String, params: Any*)(implicit ec: ExecutionContext): Future[JsonAST.JValue] = {
KamonExt.timeFuture("bitcoin.rpc.batch.invoke.time") {
(batchingClient ? JsonRPCRequest(method = method, params = params)).mapTo[JsonAST.JValue]
}
}
}

View file

@ -49,7 +49,7 @@ class Authenticator(nodeParams: NodeParams) extends Actor with DiagnosticActorLo
KeyPair(nodeParams.nodeId.value, nodeParams.privateKey.value),
remoteNodeId_opt.map(_.value),
connection = connection,
codec = LightningMessageCodecs.lightningMessageCodec))
codec = LightningMessageCodecs.meteredLightningMessageCodec))
context watch transport
context become ready(switchboard, authenticating + (transport -> pending))

View file

@ -411,7 +411,7 @@ class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, authenticator: A
case Event(badMessage: BadMessage, d: ConnectedData) =>
val behavior1 = badMessage match {
case InvalidSignature(r) =>
val bin: String = LightningMessageCodecs.lightningMessageCodec.encode(r) match {
val bin: String = LightningMessageCodecs.meteredLightningMessageCodec.encode(r) match {
case Attempt.Successful(b) => b.toHex
case _ => "unknown"
}

View file

@ -20,7 +20,7 @@ import fr.acinq.bitcoin.ByteVector32
import fr.acinq.eclair.crypto.Mac32
import fr.acinq.eclair.wire.CommonCodecs._
import fr.acinq.eclair.wire.FailureMessageCodecs.failureMessageCodec
import fr.acinq.eclair.wire.LightningMessageCodecs.{channelUpdateCodec, lightningMessageCodec}
import fr.acinq.eclair.wire.LightningMessageCodecs.{channelUpdateCodec, meteredLightningMessageCodec}
import fr.acinq.eclair.{CltvExpiry, LongToBtcAmount, MilliSatoshi, UInt64}
import scodec.codecs._
import scodec.{Attempt, Codec}
@ -85,7 +85,7 @@ object FailureMessageCodecs {
val NODE = 0x2000
val UPDATE = 0x1000
val channelUpdateCodecWithType = lightningMessageCodec.narrow[ChannelUpdate](f => Attempt.successful(f.asInstanceOf[ChannelUpdate]), g => g)
val channelUpdateCodecWithType = meteredLightningMessageCodec.narrow[ChannelUpdate](f => Attempt.successful(f.asInstanceOf[ChannelUpdate]), g => g)
// NB: for historical reasons some implementations were including/omitting the message type (258 for ChannelUpdate)
// this codec supports both versions for decoding, and will encode with the message type

View file

@ -16,9 +16,12 @@
package fr.acinq.eclair.wire
import fr.acinq.eclair.wire
import fr.acinq.eclair.{KamonExt, wire}
import fr.acinq.eclair.wire.CommonCodecs._
import scodec.Codec
import kamon.Kamon
import kamon.tag.TagSet
import scodec.bits.BitVector
import scodec.{Attempt, Codec}
import scodec.codecs._
/**
@ -286,4 +289,20 @@ object LightningMessageCodecs {
.typecase(264, replyChannelRangeCodec)
.typecase(265, gossipTimestampFilterCodec)
val meteredLightningMessageCodec = Codec[LightningMessage](
(msg: LightningMessage) => KamonExt.time("scodec.encode.time", tags = TagSet.of("type", msg.getClass.getSimpleName))(lightningMessageCodec.encode(msg)),
(bits: BitVector) => {
// this is a bit more involved, because we don't know beforehand what the type of the message will be
val timer = Kamon.timer("scodec.decode.time")
val begin = System.nanoTime()
val res = lightningMessageCodec.decode(bits)
val end = System.nanoTime()
res match {
case Attempt.Successful(decoded) => timer.withTag("type", decoded.value.getClass.getSimpleName).record(end - begin)
case Attempt.Failure(_) => timer.withTag("type", "unknown").record(end - begin)
}
res
}
)
}