1
0
Fork 0
mirror of https://github.com/ACINQ/eclair.git synced 2025-02-23 22:46:44 +01:00

Add a globalbalance api call (#1737)

It returns an overall balance, separating onchain, offchain, and
removing duplicates (e.g. mutual closes that haven't reached min depth
still have an associated channel, but they already appear in the
on-chain balance). We also take into account known preimages, even if
the htlc hasn't been formally resolved.

Metrics have also been added.

Co-authored-by: Bastien Teinturier <31281497+t-bast@users.noreply.github.com>
This commit is contained in:
Pierre-Marie Padiou 2021-07-08 13:57:49 +02:00 committed by GitHub
parent 3a573e267a
commit bd57d41ef3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
24 changed files with 941 additions and 104 deletions

View file

@ -252,6 +252,11 @@
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>com.softwaremill.quicklens</groupId>
<artifactId>quicklens_${scala.version.short}</artifactId>
<version>1.5.0</version>
</dependency>
<!-- MONITORING -->
<dependency>
<groupId>io.kamon</groupId>
@ -264,12 +269,6 @@
<version>${kamon.version}</version>
</dependency>
<!-- TESTS -->
<dependency>
<groupId>com.softwaremill.quicklens</groupId>
<artifactId>quicklens_${scala.version.short}</artifactId>
<version>1.5.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-testkit_${scala.version.short}</artifactId>

View file

@ -69,6 +69,8 @@ eclair {
reserve-to-funding-ratio = 0.01 // recommended by BOLT #2
max-reserve-to-funding-ratio = 0.05 // channel reserve can't be more than 5% of the funding amount (recommended: 1%)
balance-check-interval = 1 hour
to-remote-delay-blocks = 720 // number of blocks that the other node's to-self outputs must be delayed (720 ~ 5 days)
max-to-local-delay-blocks = 2016 // maximum number of blocks that we are ready to accept for our own delayed outputs (2016 ~ 2 weeks)
mindepth-blocks = 3

View file

@ -17,11 +17,15 @@
package fr.acinq.eclair
import akka.actor.ActorRef
import akka.actor.typed.scaladsl.AskPattern.Askable
import akka.actor.typed.scaladsl.adapter.ClassicSchedulerOps
import akka.pattern._
import akka.util.Timeout
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.bitcoin.{ByteVector32, ByteVector64, Crypto, Satoshi}
import fr.acinq.eclair.TimestampQueryFilters._
import fr.acinq.eclair.balance.CheckBalance.GlobalBalance
import fr.acinq.eclair.balance.{BalanceActor, ChannelsListener}
import fr.acinq.eclair.blockchain.OnChainBalance
import fr.acinq.eclair.blockchain.bitcoind.BitcoinCoreWallet
import fr.acinq.eclair.blockchain.bitcoind.BitcoinCoreWallet.WalletTransaction
@ -39,12 +43,13 @@ import fr.acinq.eclair.payment.send.PaymentInitiator.{SendPayment, SendPaymentTo
import fr.acinq.eclair.router.Router._
import fr.acinq.eclair.router.{NetworkStats, RouteCalculation, Router}
import fr.acinq.eclair.wire.protocol._
import grizzled.slf4j.Logging
import scodec.bits.ByteVector
import java.nio.charset.StandardCharsets
import java.util.UUID
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.reflect.ClassTag
case class GetInfoResponse(version: String, nodeId: PublicKey, alias: String, color: String, features: Features, chainHash: ByteVector32, network: String, blockHeight: Int, publicAddresses: Seq[NodeAddress], instanceId: String)
@ -59,9 +64,9 @@ case class VerifiedMessage(valid: Boolean, publicKey: PublicKey)
object TimestampQueryFilters {
/** We use this in the context of timestamp filtering, when we don't need an upper bound. */
val MaxEpochMilliseconds = Duration.fromNanos(Long.MaxValue).toMillis
val MaxEpochMilliseconds: Long = Duration.fromNanos(Long.MaxValue).toMillis
def getDefaultTimestampFilters(from_opt: Option[Long], to_opt: Option[Long]) = {
def getDefaultTimestampFilters(from_opt: Option[Long], to_opt: Option[Long]): TimestampQueryFilters = {
// NB: we expect callers to use seconds, but internally we use milli-seconds everywhere.
val from = from_opt.getOrElse(0L).seconds.toMillis
val to = to_opt.map(_.seconds.toMillis).getOrElse(MaxEpochMilliseconds)
@ -149,12 +154,14 @@ trait Eclair {
def onChainTransactions(count: Int, skip: Int): Future[Iterable[WalletTransaction]]
def globalBalance()(implicit timeout: Timeout): Future[GlobalBalance]
def signMessage(message: ByteVector): SignedMessage
def verifyMessage(message: ByteVector, recoverableSignature: ByteVector): VerifiedMessage
}
class EclairImpl(appKit: Kit) extends Eclair {
class EclairImpl(appKit: Kit) extends Eclair with Logging {
implicit val ec: ExecutionContext = appKit.system.dispatcher
@ -428,6 +435,14 @@ class EclairImpl(appKit: Kit) extends Eclair {
override def usableBalances()(implicit timeout: Timeout): Future[Iterable[UsableBalance]] =
(appKit.relayer ? GetOutgoingChannels()).mapTo[OutgoingChannels].map(_.channels.map(_.toUsableBalance))
override def globalBalance()(implicit timeout: Timeout): Future[GlobalBalance] = {
for {
ChannelsListener.GetChannelsResponse(channels) <- appKit.channelsListener.ask(ref => ChannelsListener.GetChannels(ref))(timeout, appKit.system.scheduler.toTyped)
globalBalance_try <- appKit.balanceActor.ask(res => BalanceActor.GetGlobalBalance(res, channels))(timeout, appKit.system.scheduler.toTyped)
globalBalance <- Promise[GlobalBalance]().complete(globalBalance_try).future
} yield globalBalance
}
override def signMessage(message: ByteVector): SignedMessage = {
val bytesToSign = SignedMessage.signedBytes(message)
val (signature, recoveryId) = appKit.nodeParams.nodeKeyManager.signDigest(bytesToSign)

View file

@ -89,7 +89,8 @@ case class NodeParams(nodeKeyManager: NodeKeyManager,
routerConf: RouterConf,
socksProxy_opt: Option[Socks5ProxyParams],
maxPaymentAttempts: Int,
enableTrampolinePayment: Boolean) {
enableTrampolinePayment: Boolean,
balanceCheckInterval: FiniteDuration) {
val privateKey: Crypto.PrivateKey = nodeKeyManager.nodeKey.privateKey
val nodeId: PublicKey = nodeKeyManager.nodeId
@ -394,7 +395,8 @@ object NodeParams extends Logging {
),
socksProxy_opt = socksProxy_opt,
maxPaymentAttempts = config.getInt("max-payment-attempts"),
enableTrampolinePayment = config.getBoolean("trampoline-payments-enable")
enableTrampolinePayment = config.getBoolean("trampoline-payments-enable"),
balanceCheckInterval = FiniteDuration(config.getDuration("balance-check-interval").getSeconds, TimeUnit.SECONDS)
)
}
}

View file

@ -25,11 +25,12 @@ import akka.util.Timeout
import com.softwaremill.sttp.okhttp.OkHttpFutureBackend
import fr.acinq.bitcoin.{Block, ByteVector32, Satoshi}
import fr.acinq.eclair.Setup.Seeds
import fr.acinq.eclair.blockchain._
import fr.acinq.eclair.balance.{BalanceActor, ChannelsListener}
import fr.acinq.eclair.blockchain.bitcoind.rpc.{BasicBitcoinJsonRPCClient, BatchingBitcoinJsonRPCClient, ExtendedBitcoinClient}
import fr.acinq.eclair.blockchain.bitcoind.zmq.ZMQActor
import fr.acinq.eclair.blockchain.bitcoind.{BitcoinCoreWallet, ZmqWatcher}
import fr.acinq.eclair.blockchain.fee._
import fr.acinq.eclair.blockchain.fee.{ConstantFeeProvider, _}
import fr.acinq.eclair.blockchain.{EclairWallet, _}
import fr.acinq.eclair.channel.{Channel, Register}
import fr.acinq.eclair.crypto.WeakEntropyPool
import fr.acinq.eclair.crypto.keymanager.{LocalChannelKeyManager, LocalNodeKeyManager}
@ -194,6 +195,7 @@ class Setup(val datadir: File,
tcpBound = Promise[Done]()
routerInitialized = Promise[Done]()
postRestartCleanUpInitialized = Promise[Done]()
channelsListenerReady = Promise[Done]()
defaultFeerates = {
val confDefaultFeerates = FeeratesPerKB(
@ -249,6 +251,9 @@ class Setup(val datadir: File,
wallet = new BitcoinCoreWallet(bitcoin)
_ = wallet.getReceiveAddress().map(address => logger.info(s"initial wallet address=$address"))
channelsListener = system.spawn(ChannelsListener(channelsListenerReady), name = "channels-listener")
_ <- channelsListenerReady.future
_ = if (config.getBoolean("file-backup.enabled")) {
nodeParams.db match {
case fileBackup: FileBackup if config.getBoolean("file-backup.enabled") =>
@ -283,6 +288,8 @@ class Setup(val datadir: File,
paymentInitiator = system.actorOf(SimpleSupervisor.props(PaymentInitiator.props(nodeParams, PaymentInitiator.SimplePaymentFactory(nodeParams, router, register)), "payment-initiator", SupervisorStrategy.Restart))
_ = for (i <- 0 until config.getInt("autoprobe-count")) yield system.actorOf(SimpleSupervisor.props(Autoprobe.props(nodeParams, router, paymentInitiator), s"payment-autoprobe-$i", SupervisorStrategy.Restart))
balanceActor = system.spawn(BalanceActor(nodeParams.db, extendedBitcoinClient, channelsListener, nodeParams.balanceCheckInterval), name = "balance-actor")
kit = Kit(
nodeParams = nodeParams,
system = system,
@ -294,6 +301,8 @@ class Setup(val datadir: File,
switchboard = switchboard,
paymentInitiator = paymentInitiator,
server = server,
channelsListener = channelsListener,
balanceActor = balanceActor,
wallet = wallet)
zmqBlockTimeout = after(5 seconds, using = system.scheduler)(Future.failed(BitcoinZMQConnectionTimeoutException))
@ -360,6 +369,8 @@ case class Kit(nodeParams: NodeParams,
switchboard: ActorRef,
paymentInitiator: ActorRef,
server: ActorRef,
channelsListener: typed.ActorRef[ChannelsListener.Command],
balanceActor: typed.ActorRef[BalanceActor.Command],
wallet: EclairWallet)
object Kit {

View file

@ -0,0 +1,139 @@
package fr.acinq.eclair.balance
import akka.actor.typed.scaladsl.{ActorContext, Behaviors}
import akka.actor.typed.{ActorRef, Behavior}
import fr.acinq.bitcoin.ByteVector32
import fr.acinq.eclair.balance.BalanceActor._
import fr.acinq.eclair.balance.CheckBalance.GlobalBalance
import fr.acinq.eclair.balance.Monitoring.{Metrics, Tags}
import fr.acinq.eclair.blockchain.bitcoind.rpc.ExtendedBitcoinClient
import fr.acinq.eclair.blockchain.bitcoind.rpc.ExtendedBitcoinClient.Utxo
import fr.acinq.eclair.channel.HasCommitments
import fr.acinq.eclair.db.Databases
import grizzled.slf4j.Logger
import org.json4s.JsonAST.JInt
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}
object BalanceActor {
// @formatter:off
sealed trait Command
private final case object TickBalance extends Command
final case class GetGlobalBalance(replyTo: ActorRef[Try[GlobalBalance]], channels: Map[ByteVector32, HasCommitments]) extends Command
private final case class WrappedChannels(wrapped: ChannelsListener.GetChannelsResponse) extends Command
private final case class WrappedGlobalBalance(wrapped: Try[GlobalBalance]) extends Command
private final case class WrappedUtxoInfo(wrapped: Try[UtxoInfo]) extends Command
// @formatter:on
def apply(db: Databases, extendedBitcoinClient: ExtendedBitcoinClient, channelsListener: ActorRef[ChannelsListener.GetChannels], interval: FiniteDuration)(implicit ec: ExecutionContext): Behavior[Command] = {
Behaviors.setup { context =>
Behaviors.withTimers { timers =>
timers.startTimerWithFixedDelay(TickBalance, interval)
new BalanceActor(context, db, extendedBitcoinClient, channelsListener).apply(refBalance_opt = None)
}
}
}
final case class UtxoInfo(utxos: Seq[Utxo], ancestorCount: Map[ByteVector32, Long])
def checkUtxos(extendedBitcoinClient: ExtendedBitcoinClient)(implicit ec: ExecutionContext): Future[UtxoInfo] = {
def getUnconfirmedAncestorCount(utxo: Utxo): Future[(ByteVector32, Long)] = extendedBitcoinClient.rpcClient.invoke("getmempoolentry", utxo.txid).map(json => {
val JInt(ancestorCount) = json \ "ancestorcount"
(utxo.txid, ancestorCount.toLong)
}).recover {
case ex: Throwable =>
// a bit hackish but we don't need the actor context for this simple log
val log = Logger(classOf[BalanceActor])
log.warn(s"could not retrieve unconfirmed ancestor count for txId=${utxo.txid} amount=${utxo.amount}:", ex)
(utxo.txid, 0)
}
def getUnconfirmedAncestorCountMap(utxos: Seq[Utxo]): Future[Map[ByteVector32, Long]] = Future.sequence(utxos.filter(_.confirmations == 0).map(getUnconfirmedAncestorCount)).map(_.toMap)
for {
utxos <- extendedBitcoinClient.listUnspent()
ancestorCount <- getUnconfirmedAncestorCountMap(utxos)
} yield UtxoInfo(utxos, ancestorCount)
}
}
private class BalanceActor(context: ActorContext[Command],
db: Databases,
extendedBitcoinClient: ExtendedBitcoinClient,
channelsListener: ActorRef[ChannelsListener.GetChannels])(implicit ec: ExecutionContext) {
private val log = context.log
def apply(refBalance_opt: Option[GlobalBalance]): Behavior[Command] = Behaviors.receiveMessage {
case TickBalance =>
log.debug("checking balance...")
channelsListener ! ChannelsListener.GetChannels(context.messageAdapter[ChannelsListener.GetChannelsResponse](WrappedChannels))
context.pipeToSelf(checkUtxos(extendedBitcoinClient))(WrappedUtxoInfo)
Behaviors.same
case WrappedChannels(res) =>
context.pipeToSelf(CheckBalance.computeGlobalBalance(res.channels, db, extendedBitcoinClient))(WrappedGlobalBalance)
Behaviors.same
case WrappedGlobalBalance(res) =>
res match {
case Success(result) =>
log.info("current balance: total={} onchain.confirmed={} onchain.unconfirmed={} offchain={}", result.total.toDouble, result.onChain.confirmed.toDouble, result.onChain.unconfirmed.toDouble, result.offChain.total.toDouble)
log.debug("current balance details : {}", result)
Metrics.GlobalBalance.withoutTags().update(result.total.toMilliBtc.toDouble)
Metrics.GlobalBalanceDetailed.withTag(Tags.BalanceType, Tags.BalanceTypes.OnchainConfirmed).update(result.onChain.confirmed.toMilliBtc.toLong)
Metrics.GlobalBalanceDetailed.withTag(Tags.BalanceType, Tags.BalanceTypes.OnchainUnconfirmed).update(result.onChain.unconfirmed.toMilliBtc.toLong)
Metrics.GlobalBalanceDetailed.withTag(Tags.BalanceType, Tags.BalanceTypes.Offchain).withTag(Tags.OffchainState, Tags.OffchainStates.waitForFundingConfirmed).update(result.offChain.waitForFundingConfirmed.toMilliBtc.toLong)
Metrics.GlobalBalanceDetailed.withTag(Tags.BalanceType, Tags.BalanceTypes.Offchain).withTag(Tags.OffchainState, Tags.OffchainStates.waitForFundingLocked).update(result.offChain.waitForFundingLocked.toMilliBtc.toLong)
Metrics.GlobalBalanceDetailed.withTag(Tags.BalanceType, Tags.BalanceTypes.Offchain).withTag(Tags.OffchainState, Tags.OffchainStates.normal).update(result.offChain.normal.total.toMilliBtc.toLong)
Metrics.GlobalBalanceDetailed.withTag(Tags.BalanceType, Tags.BalanceTypes.Offchain).withTag(Tags.OffchainState, Tags.OffchainStates.shutdown).update(result.offChain.shutdown.total.toMilliBtc.toLong)
Metrics.GlobalBalanceDetailed.withTag(Tags.BalanceType, Tags.BalanceTypes.Offchain).withTag(Tags.OffchainState, Tags.OffchainStates.closingLocal).update(result.offChain.closing.localCloseBalance.total.toMilliBtc.toLong)
Metrics.GlobalBalanceDetailed.withTag(Tags.BalanceType, Tags.BalanceTypes.Offchain).withTag(Tags.OffchainState, Tags.OffchainStates.closingRemote).update(result.offChain.closing.remoteCloseBalance.total.toMilliBtc.toLong)
Metrics.GlobalBalanceDetailed.withTag(Tags.BalanceType, Tags.BalanceTypes.Offchain).withTag(Tags.OffchainState, Tags.OffchainStates.closingUnknown).update(result.offChain.closing.unknownCloseBalance.total.toMilliBtc.toLong)
Metrics.GlobalBalanceDetailed.withTag(Tags.BalanceType, Tags.BalanceTypes.Offchain).withTag(Tags.OffchainState, Tags.OffchainStates.waitForPublishFutureCommitment).update(result.offChain.waitForPublishFutureCommitment.toMilliBtc.toLong)
refBalance_opt match {
case Some(refBalance) =>
val normalizedValue = 100 + (if (refBalance.total.toSatoshi.toLong > 0) (result.total.toSatoshi.toLong - refBalance.total.toSatoshi.toLong) * 1000D / refBalance.total.toSatoshi.toLong else 0)
val diffValue = result.total.toSatoshi.toLong - refBalance.total.toSatoshi.toLong
log.info("relative balance: current={} reference={} normalized={} diff={}", result.total.toDouble, refBalance.total.toDouble, normalizedValue, diffValue)
Metrics.GlobalBalanceNormalized.withoutTags().update(normalizedValue)
Metrics.GlobalBalanceDiff.withTag(Tags.DiffSign, Tags.DiffSigns.plus).update(diffValue.max(0))
Metrics.GlobalBalanceDiff.withTag(Tags.DiffSign, Tags.DiffSigns.minus).update((-diffValue).max(0))
Behaviors.same
case None =>
log.info("using balance={} as reference", result.total.toDouble)
apply(Some(result))
}
case Failure(t) =>
log.warn("could not compute balance: ", t)
Behaviors.same
}
case GetGlobalBalance(replyTo, channels) =>
CheckBalance.computeGlobalBalance(channels, db, extendedBitcoinClient) onComplete (replyTo ! _)
Behaviors.same
case WrappedUtxoInfo(res) =>
res match {
case Success(UtxoInfo(utxos: Seq[Utxo], ancestorCount: Map[ByteVector32, Long])) =>
val filteredByStatus: Map[String, Seq[Utxo]] = Map(
Monitoring.Tags.UtxoStatuses.Confirmed -> utxos.filter(utxo => utxo.confirmations > 0),
// We cannot create chains of unconfirmed transactions with more than 25 elements, so we ignore such utxos.
Monitoring.Tags.UtxoStatuses.Unconfirmed -> utxos.filter(utxo => utxo.confirmations == 0 && ancestorCount.getOrElse(utxo.txid, 1L) < 25),
Monitoring.Tags.UtxoStatuses.Safe -> utxos.filter(utxo => utxo.safe),
Monitoring.Tags.UtxoStatuses.Unsafe -> utxos.filter(utxo => !utxo.safe),
)
filteredByStatus.foreach {
case (status, filteredUtxos) =>
val amount = filteredUtxos.map(_.amount.toDouble).sum
log.info(s"we have ${filteredUtxos.length} $status utxos ($amount mBTC)")
Monitoring.Metrics.UtxoCount.withTag(Monitoring.Tags.UtxoStatus, status).update(filteredUtxos.length)
Monitoring.Metrics.BitcoinBalance.withTag(Monitoring.Tags.UtxoStatus, status).update(amount)
}
case Failure(t) =>
log.warn("could not check utxos: ", t)
}
Behaviors.same
}
}

View file

@ -0,0 +1,82 @@
package fr.acinq.eclair.balance
import akka.Done
import akka.actor.typed
import akka.actor.typed.Behavior
import akka.actor.typed.eventstream.EventStream
import akka.actor.typed.scaladsl.adapter.ClassicActorRefOps
import akka.actor.typed.scaladsl.{ActorContext, Behaviors}
import fr.acinq.bitcoin.ByteVector32
import fr.acinq.eclair.balance.ChannelsListener._
import fr.acinq.eclair.channel.Helpers.Closing
import fr.acinq.eclair.channel.{ChannelPersisted, ChannelRestored, HasCommitments}
import scala.concurrent.Promise
import scala.concurrent.duration.DurationInt
object ChannelsListener {
// @formatter:off
sealed trait Command
private final case class ChannelData(channelId: ByteVector32, channel: akka.actor.ActorRef, data: HasCommitments) extends Command
private final case class ChannelDied(channelId: ByteVector32) extends Command
final case class GetChannels(replyTo: typed.ActorRef[GetChannelsResponse]) extends Command
final case object SendDummyEvent extends Command
final case object DummyEvent extends Command
// @formatter:on
case class GetChannelsResponse(channels: Map[ByteVector32, HasCommitments])
def apply(ready: Promise[Done]): Behavior[Command] =
Behaviors.setup { context =>
context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[ChannelRestored](e => ChannelData(e.channelId, e.channel, e.data)))
context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[ChannelPersisted](e => ChannelData(e.channelId, e.channel, e.data)))
context.system.eventStream ! EventStream.Subscribe(context.self.narrow[DummyEvent.type])
Behaviors.withTimers { timers =>
// since subscription is asynchronous, we send a fake event so we know when we are subscribed
val dummyTimer = "dummy-event"
timers.startTimerAtFixedRate(dummyTimer, SendDummyEvent, 100 milliseconds)
Behaviors.receiveMessagePartial {
case SendDummyEvent =>
context.system.eventStream ! EventStream.Publish(DummyEvent)
Behaviors.same
case DummyEvent =>
context.log.info("channels listener is ready")
timers.cancel(dummyTimer)
context.system.eventStream ! EventStream.Unsubscribe(context.self.narrow[DummyEvent.type])
ready.success(Done)
new ChannelsListener(context).running(Map.empty)
}
}
}
}
private class ChannelsListener(context: ActorContext[Command]) {
private val log = context.log
def running(channels: Map[ByteVector32, HasCommitments]): Behavior[Command] =
Behaviors.receiveMessage {
case ChannelData(channelId, channel, data) =>
Closing.isClosed(data, additionalConfirmedTx_opt = None) match {
case None =>
context.watchWith(channel.toTyped, ChannelDied(channelId))
running(channels + (channelId -> data))
case _ =>
// if channel is closed we remove early from the map because we want to minimize the window during which
// there is potential duplication between on-chain and off-chain amounts (the channel actors stays alive
// for a few seconds after the channel is closed)
log.debug("remove channel={} from list (closed)", channelId)
context.unwatch(channel.toTyped)
running(channels - channelId)
}
case ChannelDied(channelId) =>
log.debug("remove channel={} from list (died)", channelId)
running(channels - channelId)
case GetChannels(replyTo) =>
replyTo ! GetChannelsResponse(channels)
Behaviors.same
case SendDummyEvent => Behaviors.same
case DummyEvent => Behaviors.same
}
}

View file

@ -0,0 +1,286 @@
package fr.acinq.eclair.balance
import com.softwaremill.quicklens._
import fr.acinq.bitcoin.{Btc, ByteVector32, SatoshiLong}
import fr.acinq.eclair.blockchain.bitcoind.rpc.ExtendedBitcoinClient
import fr.acinq.eclair.channel.Helpers.Closing
import fr.acinq.eclair.channel.Helpers.Closing.{CurrentRemoteClose, LocalClose, NextRemoteClose, RemoteClose}
import fr.acinq.eclair.channel._
import fr.acinq.eclair.db.{Databases, PendingCommandsDb}
import fr.acinq.eclair.transactions.DirectedHtlc.{incoming, outgoing}
import fr.acinq.eclair.transactions.Transactions
import fr.acinq.eclair.transactions.Transactions.{ClaimHtlcSuccessTx, ClaimHtlcTimeoutTx, HtlcSuccessTx, HtlcTimeoutTx}
import fr.acinq.eclair.wire.protocol.UpdateFulfillHtlc
import scala.concurrent.{ExecutionContext, Future}
object CheckBalance {
/**
* For more fine-grained analysis, we count the in-flight amounts separately from the main amounts.
*
* The base assumption regarding htlcs is that they will all timeout. That means that we ignore incoming htlcs, and we
* count outgoing htlcs in our balance.
*/
case class MainAndHtlcBalance(toLocal: Btc = 0.sat, htlcOut: Btc = 0.sat) {
val total: Btc = toLocal + htlcOut
}
/**
* In the closing state some transactions may be published or even confirmed. They will be taken into account if we
* do a `bitcoin-cli getbalance` and we don't want to count them twice.
*
* That's why we keep track of the id of each transaction that pays us any amount. It allows us to double check from
* bitcoin core and remove any published transaction.
*/
case class PossiblyPublishedMainBalance(toLocal: Map[ByteVector32, Btc] = Map.empty) {
val total: Btc = toLocal.values.map(_.toSatoshi).sum
}
case class PossiblyPublishedMainAndHtlcBalance(toLocal: Map[ByteVector32, Btc] = Map.empty, htlcs: Map[ByteVector32, Btc] = Map.empty, htlcsUnpublished: Btc = 0.sat) {
val totalToLocal: Btc = toLocal.values.map(_.toSatoshi).sum
val totalHtlcs: Btc = htlcs.values.map(_.toSatoshi).sum
val total: Btc = totalToLocal + totalHtlcs + htlcsUnpublished
}
/**
* Unless they got evicted, mutual close transactions will also appear in the on-chain balance and will disappear
* from here after on pruning.
*/
case class ClosingBalance(localCloseBalance: PossiblyPublishedMainAndHtlcBalance = PossiblyPublishedMainAndHtlcBalance(),
remoteCloseBalance: PossiblyPublishedMainAndHtlcBalance = PossiblyPublishedMainAndHtlcBalance(),
mutualCloseBalance: PossiblyPublishedMainBalance = PossiblyPublishedMainBalance(),
unknownCloseBalance: MainAndHtlcBalance = MainAndHtlcBalance()) {
val total: Btc = localCloseBalance.total + remoteCloseBalance.total + mutualCloseBalance.total + unknownCloseBalance.total
}
/**
* The overall balance among all channels in all states.
*/
case class OffChainBalance(waitForFundingConfirmed: Btc = 0.sat,
waitForFundingLocked: Btc = 0.sat,
normal: MainAndHtlcBalance = MainAndHtlcBalance(),
shutdown: MainAndHtlcBalance = MainAndHtlcBalance(),
negotiating: Btc = 0.sat,
closing: ClosingBalance = ClosingBalance(),
waitForPublishFutureCommitment: Btc = 0.sat) {
val total: Btc = waitForFundingConfirmed + waitForFundingLocked + normal.total + shutdown.total + negotiating + closing.total + waitForPublishFutureCommitment
}
def updateMainBalance(localCommit: LocalCommit): Btc => Btc = { v: Btc =>
val toLocal = localCommit.spec.toLocal.truncateToSatoshi
v + toLocal
}
def updateMainAndHtlcBalance(localCommit: LocalCommit, knownPreimages: Set[(ByteVector32, Long)]): MainAndHtlcBalance => MainAndHtlcBalance = { b: MainAndHtlcBalance =>
val toLocal = localCommit.spec.toLocal.truncateToSatoshi
// we only count htlcs in if we know the preimage
val htlcIn = localCommit.spec.htlcs.collect(incoming).filter(add => knownPreimages.contains((add.channelId, add.id))).map(_.amountMsat.truncateToSatoshi).sum
val htlcOut = localCommit.spec.htlcs.collect(outgoing).map(_.amountMsat.truncateToSatoshi).sum
b.modify(_.toLocal).using(_ + toLocal)
.modify(_.htlcOut).using(_ + htlcIn + htlcOut)
}
def updatePossiblyPublishedBalance(b1: PossiblyPublishedMainAndHtlcBalance): PossiblyPublishedMainAndHtlcBalance => PossiblyPublishedMainAndHtlcBalance = { b: PossiblyPublishedMainAndHtlcBalance =>
b.modify(_.toLocal).using(_ ++ b1.toLocal)
.modify(_.htlcs).using(_ ++ b1.htlcs)
.modify(_.htlcsUnpublished).using(_ + b1.htlcsUnpublished)
}
/** if remote proved it had the preimage of an outgoing htlc, then we know it won't timeout */
def remoteHasPreimages(c: Commitments, htlcId: Long): Boolean = {
c.remoteChanges.all.collectFirst { case u: UpdateFulfillHtlc if u.id == htlcId => true }.isDefined
}
def computeLocalCloseBalance(c: Commitments, l: LocalClose, knownPreimages: Set[(ByteVector32, Long)]): PossiblyPublishedMainAndHtlcBalance = {
import l._
val toLocal = localCommitPublished.claimMainDelayedOutputTx.toSeq.map(c => c.tx.txid -> c.tx.txOut.head.amount.toBtc).toMap
// incoming htlcs for which we have a preimage and the to-local delay has expired: we have published a claim tx that pays directly to our wallet
val htlcsInOnChain = localCommitPublished.htlcTxs.values.flatten.collect { case htlcTx: HtlcSuccessTx => htlcTx }
.filter(htlcTx => localCommitPublished.claimHtlcDelayedTxs.exists(_.input.outPoint.txid == htlcTx.tx.txid))
.map(_.htlcId)
.toSet
// outgoing htlcs that have timed out and the to-local delay has expired: we have published a claim tx that pays directly to our wallet
val htlcsOutOnChain = localCommitPublished.htlcTxs.values.flatten.collect { case htlcTx: HtlcTimeoutTx => htlcTx }
.filter(htlcTx => localCommitPublished.claimHtlcDelayedTxs.exists(_.input.outPoint.txid == htlcTx.tx.txid))
.map(_.htlcId)
.toSet
// incoming htlcs for which we have a preimage but we are still waiting for the to-local delay
val htlcIn = localCommit.spec.htlcs.collect(incoming)
.filterNot(htlc => htlcsInOnChain.contains(htlc.id)) // we filter the htlc that already pay us on-chain
.filter(add => knownPreimages.contains((add.channelId, add.id))).map(_.amountMsat.truncateToSatoshi).sum
// all outgoing htlcs for which remote didn't prove it had the preimage are expected to time out
val htlcOut = localCommit.spec.htlcs.collect(outgoing)
.filterNot(htlc => htlcsOutOnChain.contains(htlc.id)) // we filter the htlc that already pay us on-chain
.filterNot(htlc => remoteHasPreimages(c, htlc.id))
.map(_.amountMsat.truncateToSatoshi).sum
// all claim txs have possibly been published
val htlcs = localCommitPublished.claimHtlcDelayedTxs
.map(c => c.tx.txid -> c.tx.txOut.head.amount.toBtc).toMap
PossiblyPublishedMainAndHtlcBalance(
toLocal = toLocal,
htlcs = htlcs,
htlcsUnpublished = htlcIn + htlcOut
)
}
def computeRemoteCloseBalance(c: Commitments, r: RemoteClose, knownPreimages: Set[(ByteVector32, Long)]): PossiblyPublishedMainAndHtlcBalance = {
import r._
val toLocal = if (c.channelVersion.paysDirectlyToWallet) {
// If static remote key is enabled, the commit tx directly pays to our wallet
// We use the pubkeyscript to retrieve our output
Transactions.findPubKeyScriptIndex(remoteCommitPublished.commitTx, c.localParams.defaultFinalScriptPubKey) match {
case Right(outputIndex) => Map(remoteCommitPublished.commitTx.txid -> remoteCommitPublished.commitTx.txOut(outputIndex).amount.toBtc)
case _ => Map.empty[ByteVector32, Btc] // either we don't have an output (below dust), or we have used a non-default pubkey script
}
} else {
remoteCommitPublished.claimMainOutputTx.toSeq.map(c => c.tx.txid -> c.tx.txOut.head.amount.toBtc).toMap
}
// incoming htlcs for which we have a preimage: we have published a claim tx that pays directly to our wallet
val htlcsInOnChain = remoteCommitPublished.claimHtlcTxs.values.flatten.collect { case htlcTx: ClaimHtlcSuccessTx => htlcTx }
.map(_.htlcId)
.toSet
// outgoing htlcs that have timed out: we have published a claim tx that pays directly to our wallet
val htlcsOutOnChain = remoteCommitPublished.claimHtlcTxs.values.flatten.collect { case htlcTx: ClaimHtlcTimeoutTx => htlcTx }
.map(_.htlcId)
.toSet
// incoming htlcs for which we have a preimage
val htlcIn = remoteCommit.spec.htlcs.collect(outgoing)
.filter(add => knownPreimages.contains((add.channelId, add.id)))
.filterNot(htlc => htlcsInOnChain.contains(htlc.id)) // we filter the htlc that already pay us on-chain
.map(_.amountMsat.truncateToSatoshi).sum
// all outgoing htlcs for which remote didn't prove it had the preimage are expected to time out
val htlcOut = remoteCommit.spec.htlcs.collect(incoming)
.filterNot(htlc => htlcsOutOnChain.contains(htlc.id)) // we filter the htlc that already pay us on-chain
.filterNot(htlc => remoteHasPreimages(c, htlc.id))
.map(_.amountMsat.truncateToSatoshi).sum
// all claim txs have possibly been published
val htlcs = remoteCommitPublished.claimHtlcTxs.values.flatten
.map(c => c.tx.txid -> c.tx.txOut.head.amount.toBtc).toMap
PossiblyPublishedMainAndHtlcBalance(
toLocal = toLocal,
htlcs = htlcs,
htlcsUnpublished = htlcIn + htlcOut
)
}
/**
* Compute the overall balance a list of channels.
*
* Assumptions:
* - If the commitment transaction hasn't been published, we simply take our local amount (and htlc amount in states
* where they may exist, namely [[NORMAL]] and [[SHUTDOWN]]).
* - In [[CLOSING]] state:
* - If we know for sure we are in a mutual close scenario, then we don't count the amount, because the tx will
* already have been published.
* - If we know for sure we are in a local, then we take the amounts based on the outputs of
* the transactions, whether delayed or not. This ensures that mining fees are taken into account.
* - If we have detected that a remote commit was published, then we assume the closing type will be remote, even
* it is not yet confirmed. Like for local commits, we take amounts based on outputs of transactions.
* - In the other cases, we simply take our local amount
* - TODO?: we disregard anchor outputs
*/
def computeOffChainBalance(channels: Iterable[HasCommitments], knownPreimages: Set[(ByteVector32, Long)]): OffChainBalance = {
channels
.foldLeft(OffChainBalance()) {
case (r, d: DATA_WAIT_FOR_FUNDING_CONFIRMED) => r.modify(_.waitForFundingConfirmed).using(updateMainBalance(d.commitments.localCommit))
case (r, d: DATA_WAIT_FOR_FUNDING_LOCKED) => r.modify(_.waitForFundingLocked).using(updateMainBalance(d.commitments.localCommit))
case (r, d: DATA_NORMAL) => r.modify(_.normal).using(updateMainAndHtlcBalance(d.commitments.localCommit, knownPreimages))
case (r, d: DATA_SHUTDOWN) => r.modify(_.shutdown).using(updateMainAndHtlcBalance(d.commitments.localCommit, knownPreimages))
case (r, d: DATA_NEGOTIATING) => r.modify(_.negotiating).using(updateMainBalance(d.commitments.localCommit))
case (r, d: DATA_CLOSING) =>
Closing.isClosingTypeAlreadyKnown(d) match {
case None if d.mutualClosePublished.nonEmpty && d.localCommitPublished.isEmpty && d.remoteCommitPublished.isEmpty && d.nextRemoteCommitPublished.isEmpty && d.revokedCommitPublished.isEmpty =>
// There can be multiple mutual close transactions for the same channel, but most of the time there will
// only be one. We use the last one in the list, which should be the one we have seen last in our local
// mempool. In the worst case scenario, there are several mutual closes and the one that made it to the
// mempool or the chain isn't the one we are keeping track of here. As a consequence the transaction won't
// be pruned and we will count twice the amount in the global (onChain + offChain) balance, until the
// mutual close tx gets deeply confirmed and the channel is removed.
val mutualClose = d.mutualClosePublished.last
val amount = mutualClose.toLocalOutput match {
case Some(outputInfo) => outputInfo.amount
case None =>
// Normally this would mean that we don't actually have an output, but due to a migration
// the data might not be accurate, see [[ChannelTypes0.migrateClosingTx]]
// As a (hackish) workaround, we use the pubkeyscript to retrieve our output
Transactions.findPubKeyScriptIndex(mutualClose.tx, d.commitments.localParams.defaultFinalScriptPubKey) match {
case Right(outputIndex) => mutualClose.tx.txOut(outputIndex).amount
case _ => 0.sat // either we don't have an output (below dust), or we have used a non-default pubkey script
}
}
r.modify(_.closing.mutualCloseBalance.toLocal).using(_ + (mutualClose.tx.txid -> amount))
case Some(localClose: LocalClose) => r.modify(_.closing.localCloseBalance).using(updatePossiblyPublishedBalance(computeLocalCloseBalance(d.commitments, localClose, knownPreimages)))
case _ if d.remoteCommitPublished.nonEmpty || d.nextRemoteCommitPublished.nonEmpty =>
// We have seen the remote commit, it may or may not have been confirmed. We may have published our own
// local commit too, which may take precedence. But if we are aware of the remote commit, it means that
// our bitcoin core has already seen it (since it's the one who told us about it) and we make
// the assumption that the remote commit won't be replaced by our local commit.
val remoteClose = if (d.remoteCommitPublished.isDefined) {
CurrentRemoteClose(d.commitments.remoteCommit, d.remoteCommitPublished.get)
} else {
NextRemoteClose(d.commitments.remoteNextCommitInfo.left.get.nextRemoteCommit, d.nextRemoteCommitPublished.get)
}
r.modify(_.closing.remoteCloseBalance).using(updatePossiblyPublishedBalance(computeRemoteCloseBalance(d.commitments, remoteClose, knownPreimages)))
case _ => r.modify(_.closing.unknownCloseBalance).using(updateMainAndHtlcBalance(d.commitments.localCommit, knownPreimages))
}
case (r, d: DATA_WAIT_FOR_REMOTE_PUBLISH_FUTURE_COMMITMENT) => r.modify(_.waitForPublishFutureCommitment).using(updateMainBalance(d.commitments.localCommit))
}
}
/**
* Query bitcoin core to prune all amounts related to transactions that have already been published
*/
def prunePublishedTransactions(br: OffChainBalance, bitcoinClient: ExtendedBitcoinClient)(implicit ec: ExecutionContext): Future[OffChainBalance] = {
for {
txs: Iterable[Option[(ByteVector32, Int)]] <- Future.sequence((br.closing.localCloseBalance.toLocal.keys ++
br.closing.localCloseBalance.htlcs.keys ++
br.closing.remoteCloseBalance.toLocal.keys ++
br.closing.remoteCloseBalance.htlcs.keys ++
br.closing.mutualCloseBalance.toLocal.keys)
.map(txid => bitcoinClient.getTxConfirmations(txid).map(_ map { confirmations => txid -> confirmations })))
txMap: Map[ByteVector32, Int] = txs.flatten.toMap
} yield {
br
.modifyAll(
_.closing.localCloseBalance.toLocal,
_.closing.localCloseBalance.htlcs,
_.closing.remoteCloseBalance.toLocal,
_.closing.remoteCloseBalance.htlcs,
_.closing.mutualCloseBalance.toLocal)
.using(map => map.filterNot { case (txid, _) => txMap.contains(txid) })
}
}
case class CorrectedOnChainBalance(confirmed: Btc, unconfirmed: Btc) {
val total: Btc = confirmed + unconfirmed
}
private case class DetailedBalance(confirmed: Btc = 0.sat, unconfirmed: Btc = 0.sat)
/**
* Returns the on-chain balance, but discards the unconfirmed incoming swap-in transactions, because they may be RBF-ed.
* Confirmed swap-in transactions are counted, because we can spend them, but we keep track of what we still owe to our
* users.
*/
def computeOnChainBalance(bitcoinClient: ExtendedBitcoinClient)(implicit ec: ExecutionContext): Future[CorrectedOnChainBalance] = for {
utxos <- bitcoinClient.listUnspent()
detailed = utxos.foldLeft(DetailedBalance()) {
case (total, utxo) if utxo.confirmations > 0 => total.modify(_.confirmed).using(_ + utxo.amount)
case (total, utxo) if utxo.confirmations == 0 => total.modify(_.unconfirmed).using(_ + utxo.amount)
}
} yield CorrectedOnChainBalance(detailed.confirmed, detailed.unconfirmed)
case class GlobalBalance (onChain: CorrectedOnChainBalance, offChain: OffChainBalance) {
val total: Btc = onChain.total + offChain.total
}
def computeGlobalBalance(channels: Map[ByteVector32, HasCommitments], db: Databases, bitcoinClient: ExtendedBitcoinClient)(implicit ec: ExecutionContext): Future[GlobalBalance] = for {
onChain <- CheckBalance.computeOnChainBalance(bitcoinClient)
knownPreimages = db.pendingCommands.listSettlementCommands().collect { case (channelId, cmd: CMD_FULFILL_HTLC) => (channelId, cmd.id) }.toSet
offChainRaw = CheckBalance.computeOffChainBalance(channels.values, knownPreimages)
offChainPruned <- CheckBalance.prunePublishedTransactions(offChainRaw, bitcoinClient)
} yield GlobalBalance(onChain, offChainPruned)
}

View file

@ -0,0 +1,72 @@
/*
* Copyright 2020 ACINQ SAS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package fr.acinq.eclair.balance
import kamon.Kamon
import kamon.metric.Metric
object Monitoring {
object Metrics {
val GlobalBalance: Metric.Gauge = Kamon.gauge("globalbalance", "Global Balance (BTC)")
val GlobalBalanceNormalized: Metric.Gauge = Kamon.gauge("globalbalance.normalized", "Global Balance Normalized")
val GlobalBalanceDiff: Metric.Gauge = Kamon.gauge("globalbalance.diff", "Global Balance Diff (Satoshi)")
val GlobalBalanceDetailed: Metric.Gauge = Kamon.gauge("globalbalance.detailed", "Global Balance Detailed (BTC)")
val BitcoinBalance: Metric.Gauge = Kamon.gauge("bitcoin.balance", "Bitcoin balance (mBTC)")
val UtxoCount: Metric.Gauge = Kamon.gauge("bitcoin.utxo.count", "Number of unspent outputs available")
}
object Tags {
val BalanceType = "type"
val OffchainState = "state"
val DiffSign = "sign"
val UtxoStatus = "status"
object BalanceTypes {
val OnchainConfirmed = "onchain.confirmed"
val OnchainUnconfirmed = "onchain.unconfirmed"
val Offchain = "offchain"
}
object OffchainStates {
val waitForFundingConfirmed = "waitForFundingConfirmed"
val waitForFundingLocked = "waitForFundingLocked"
val normal = "normal"
val shutdown = "shutdown"
val negotiating = "negotiating"
val closingLocal = "closing-local"
val closingRemote = "closing-remote"
val closingUnknown = "closing-unknown"
val waitForPublishFutureCommitment = "waitForPublishFutureCommitment"
}
/** we can't chart negative amounts in Kamon */
object DiffSigns {
val plus = "plus"
val minus = "minus"
}
object UtxoStatuses {
val Confirmed = "confirmed"
val Safe = "safe"
val Unsafe = "unsafe"
val Unconfirmed = "unconfirmed"
}
}
}

View file

@ -17,31 +17,21 @@
package fr.acinq.eclair.blockchain
import kamon.Kamon
import kamon.metric.Metric
object Monitoring {
object Metrics {
val NewBlockCheckConfirmedDuration = Kamon.timer("bitcoin.watcher.newblock.checkconfirmed")
val RpcBasicInvokeCount = Kamon.counter("bitcoin.rpc.basic.invoke.count")
val RpcBasicInvokeDuration = Kamon.timer("bitcoin.rpc.basic.invoke.duration")
val RpcBatchInvokeDuration = Kamon.timer("bitcoin.rpc.batch.invoke.duration")
val BitcoinBalance = Kamon.gauge("bitcoin.balance", "Bitcoin balance (mBTC)")
val MempoolMinFeeratePerKw = Kamon.gauge("bitcoin.mempool.min-feerate-per-kw", "Minimum feerate (sat/kw) for a tx to be accepted in our mempool")
val CannotRetrieveFeeratesCount = Kamon.counter("bitcoin.rpc.feerates.error", "Number of failures to retrieve on-chain feerates")
val UtxoCount = Kamon.gauge("bitcoin.utxo.count", "Number of unspent outputs available")
val NewBlockCheckConfirmedDuration: Metric.Timer = Kamon.timer("bitcoin.watcher.newblock.checkconfirmed")
val RpcBasicInvokeCount: Metric.Counter = Kamon.counter("bitcoin.rpc.basic.invoke.count")
val RpcBasicInvokeDuration: Metric.Timer = Kamon.timer("bitcoin.rpc.basic.invoke.duration")
val RpcBatchInvokeDuration: Metric.Timer = Kamon.timer("bitcoin.rpc.batch.invoke.duration")
val MempoolMinFeeratePerKw: Metric.Gauge = Kamon.gauge("bitcoin.mempool.min-feerate-per-kw", "Minimum feerate (sat/kw) for a tx to be accepted in our mempool")
val CannotRetrieveFeeratesCount: Metric.Counter = Kamon.counter("bitcoin.rpc.feerates.error", "Number of failures to retrieve on-chain feerates")
}
object Tags {
val Method = "method"
val UtxoStatus = "status"
object UtxoStatuses {
val Confirmed = "confirmed"
val Safe = "safe"
val Unsafe = "unsafe"
val Unconfirmed = "unconfirmed"
}
}
}

View file

@ -23,6 +23,7 @@ import fr.acinq.bitcoin._
import fr.acinq.eclair.blockchain.Monitoring.Metrics
import fr.acinq.eclair.blockchain._
import fr.acinq.eclair.blockchain.bitcoind.rpc.ExtendedBitcoinClient
import fr.acinq.eclair.blockchain.bitcoind.rpc.ExtendedBitcoinClient.Utxo
import fr.acinq.eclair.blockchain.watchdogs.BlockchainWatchdog
import fr.acinq.eclair.wire.protocol.ChannelAnnouncement
import fr.acinq.eclair.{KamonExt, ShortChannelId}
@ -244,7 +245,6 @@ private class ZmqWatcher(chainHash: ByteVector32, blockCount: AtomicLong, client
blockCount.set(count)
context.system.eventStream ! EventStream.Publish(CurrentBlockCount(count))
}
checkUtxos()
// TODO: beware of the herd effect
KamonExt.timeFuture(Metrics.NewBlockCheckConfirmedDuration.withoutTags()) {
Future.sequence(watches.collect { case w: WatchConfirmed[_] => checkConfirmed(w) })
@ -384,53 +384,4 @@ private class ZmqWatcher(chainHash: ByteVector32, blockCount: AtomicLong, client
}
}
def checkUtxos(): Future[Unit] = {
case class Utxo(txId: ByteVector32, amount: MilliBtc, confirmations: Long, safe: Boolean)
def listUnspent(): Future[Seq[Utxo]] = client.rpcClient.invoke("listunspent", /* minconf */ 0).collect {
case JArray(values) => values.map(utxo => {
val JInt(confirmations) = utxo \ "confirmations"
val JBool(safe) = utxo \ "safe"
val JDecimal(amount) = utxo \ "amount"
val JString(txid) = utxo \ "txid"
Utxo(ByteVector32.fromValidHex(txid), (amount.doubleValue * 1000).millibtc, confirmations.toLong, safe)
})
}
def getUnconfirmedAncestorCount(utxo: Utxo): Future[(ByteVector32, Long)] = client.rpcClient.invoke("getmempoolentry", utxo.txId).map(json => {
val JInt(ancestorCount) = json \ "ancestorcount"
(utxo.txId, ancestorCount.toLong)
}).recover {
case ex: Throwable =>
log.warn(s"could not retrieve unconfirmed ancestor count for txId=${utxo.txId} amount=${utxo.amount}:", ex)
(utxo.txId, 0)
}
def getUnconfirmedAncestorCountMap(utxos: Seq[Utxo]): Future[Map[ByteVector32, Long]] = Future.sequence(utxos.filter(_.confirmations == 0).map(getUnconfirmedAncestorCount)).map(_.toMap)
def recordUtxos(utxos: Seq[Utxo], ancestorCount: Map[ByteVector32, Long]): Unit = {
val filteredByStatus = Seq(
(Monitoring.Tags.UtxoStatuses.Confirmed, utxos.filter(utxo => utxo.confirmations > 0)),
// We cannot create chains of unconfirmed transactions with more than 25 elements, so we ignore such utxos.
(Monitoring.Tags.UtxoStatuses.Unconfirmed, utxos.filter(utxo => utxo.confirmations == 0 && ancestorCount.getOrElse(utxo.txId, 1L) < 25)),
(Monitoring.Tags.UtxoStatuses.Safe, utxos.filter(utxo => utxo.safe)),
(Monitoring.Tags.UtxoStatuses.Unsafe, utxos.filter(utxo => !utxo.safe)),
)
filteredByStatus.foreach {
case (status, filteredUtxos) =>
val amount = filteredUtxos.map(_.amount.toDouble).sum
log.info(s"we have ${filteredUtxos.length} $status utxos ($amount mBTC)")
Monitoring.Metrics.UtxoCount.withTag(Monitoring.Tags.UtxoStatus, status).update(filteredUtxos.length)
Monitoring.Metrics.BitcoinBalance.withTag(Monitoring.Tags.UtxoStatus, status).update(amount)
}
}
(for {
utxos <- listUnspent()
ancestorCount <- getUnconfirmedAncestorCountMap(utxos)
} yield recordUtxos(utxos, ancestorCount)).recover {
case ex => log.warn(s"could not check utxos: $ex")
}
}
}

View file

@ -159,7 +159,7 @@ class ExtendedBitcoinClient(val rpcClient: BitcoinJsonRPCClient) extends Logging
def unlockOutpoints(outPoints: Seq[OutPoint])(implicit ec: ExecutionContext): Future[Boolean] = {
// we unlock utxos one by one and not as a list as it would fail at the first utxo that is not actually locked and the rest would not be processed
val futures = outPoints
.map(outPoint => Utxo(outPoint.txid, outPoint.index))
.map(outPoint => UnlockOutpoint(outPoint.txid, outPoint.index))
.map(utxo => rpcClient
.invoke("lockunspent", true, List(utxo))
.mapTo[JBool]
@ -276,6 +276,20 @@ class ExtendedBitcoinClient(val rpcClient: BitcoinJsonRPCClient) extends Logging
case t: Throwable => ValidateResult(c, Left(t))
}
def listUnspent()(implicit ec: ExecutionContext): Future[Seq[Utxo]] = rpcClient.invoke("listunspent", /* minconf */ 0).collect {
case JArray(values) => values.map(utxo => {
val JInt(confirmations) = utxo \ "confirmations"
val JBool(safe) = utxo \ "safe"
val JDecimal(amount) = utxo \ "amount"
val JString(txid) = utxo \ "txid"
val label = utxo \ "label" match {
case JString(label) => Some(label)
case _ => None
}
Utxo(ByteVector32.fromValidHex(txid), (amount.doubleValue * 1000).millibtc, confirmations.toLong, safe, label)
})
}
}
object ExtendedBitcoinClient {
@ -322,7 +336,9 @@ object ExtendedBitcoinClient {
*/
case class MempoolTx(txid: ByteVector32, vsize: Long, weight: Long, replaceable: Boolean, fees: Satoshi, ancestorCount: Int, ancestorFees: Satoshi, descendantCount: Int, descendantFees: Satoshi)
case class Utxo(txid: ByteVector32, vout: Long)
case class UnlockOutpoint(txid: ByteVector32, vout: Long)
case class Utxo(txid: ByteVector32, amount: MilliBtc, confirmations: Long, safe: Boolean, label_opt: Option[String])
def toSatoshi(btcAmount: BigDecimal): Satoshi = Satoshi(btcAmount.bigDecimal.scaleByPowerOfTen(8).longValue)

View file

@ -231,7 +231,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
case Event(INPUT_RESTORED(data), _) =>
log.info("restoring channel")
context.system.eventStream.publish(ChannelRestored(self, data.channelId, peer, remoteNodeId, data.commitments.localParams.isFunder, data.commitments))
context.system.eventStream.publish(ChannelRestored(self, data.channelId, peer, remoteNodeId, data))
txPublisher ! SetChannelId(remoteNodeId, data.channelId)
data match {
// NB: order matters!

View file

@ -32,7 +32,7 @@ trait ChannelEvent
case class ChannelCreated(channel: ActorRef, peer: ActorRef, remoteNodeId: PublicKey, isFunder: Boolean, temporaryChannelId: ByteVector32, initialFeeratePerKw: FeeratePerKw, fundingTxFeeratePerKw: Option[FeeratePerKw]) extends ChannelEvent
case class ChannelRestored(channel: ActorRef, channelId: ByteVector32, peer: ActorRef, remoteNodeId: PublicKey, isFunder: Boolean, currentData: AbstractCommitments) extends ChannelEvent
case class ChannelRestored(channel: ActorRef, channelId: ByteVector32, peer: ActorRef, remoteNodeId: PublicKey, data: HasCommitments) extends ChannelEvent
case class ChannelIdAssigned(channel: ActorRef, remoteNodeId: PublicKey, temporaryChannelId: ByteVector32, channelId: ByteVector32) extends ChannelEvent
@ -55,7 +55,7 @@ case class NetworkFeePaid(channel: ActorRef, remoteNodeId: PublicKey, channelId:
// NB: this event is only sent when the channel is available
case class AvailableBalanceChanged(channel: ActorRef, channelId: ByteVector32, shortChannelId: ShortChannelId, commitments: AbstractCommitments) extends ChannelEvent
case class ChannelPersisted(channel: ActorRef, remoteNodeId: PublicKey, channelId: ByteVector32, data: Data) extends ChannelEvent
case class ChannelPersisted(channel: ActorRef, remoteNodeId: PublicKey, channelId: ByteVector32, data: HasCommitments) extends ChannelEvent
case class LocalCommitConfirmed(channel: ActorRef, remoteNodeId: PublicKey, channelId: ByteVector32, refundAtBlock: Long) extends ChannelEvent

View file

@ -40,7 +40,7 @@ class Register extends Actor with ActorLogging {
context.watch(channel)
context become main(channels + (temporaryChannelId -> channel), shortIds, channelsTo + (temporaryChannelId -> remoteNodeId))
case ChannelRestored(channel, channelId, _, remoteNodeId, _, _) =>
case ChannelRestored(channel, channelId, _, remoteNodeId, _) =>
context.watch(channel)
context become main(channels + (channelId -> channel), shortIds, channelsTo + (channelId -> remoteNodeId))

View file

@ -17,7 +17,7 @@
package fr.acinq.eclair
import akka.actor.ActorRef
import akka.actor.typed.scaladsl.adapter.actorRefAdapter
import akka.actor.typed.scaladsl.adapter.{ClassicActorRefOps, actorRefAdapter}
import akka.testkit.TestProbe
import akka.util.Timeout
import fr.acinq.bitcoin.Crypto.{PrivateKey, PublicKey}
@ -62,6 +62,8 @@ class EclairImplSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with I
val switchboard = TestProbe()
val paymentInitiator = TestProbe()
val server = TestProbe()
val channelsListener = TestProbe()
val balanceActor = TestProbe()
val kit = Kit(
TestConstants.Alice.nodeParams,
system,
@ -73,6 +75,8 @@ class EclairImplSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with I
switchboard.ref,
paymentInitiator.ref,
server.ref,
channelsListener.ref.toTyped,
balanceActor.ref.toTyped,
new TestWallet()
)

View file

@ -164,7 +164,8 @@ object TestConstants {
socksProxy_opt = None,
maxPaymentAttempts = 5,
enableTrampolinePayment = true,
instanceId = UUID.fromString("aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa")
instanceId = UUID.fromString("aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"),
balanceCheckInterval = 1 hour
)
def channelParams: LocalParams = Peer.makeChannelParams(
@ -270,7 +271,8 @@ object TestConstants {
socksProxy_opt = None,
maxPaymentAttempts = 5,
enableTrampolinePayment = true,
instanceId = UUID.fromString("bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb")
instanceId = UUID.fromString("bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb"),
balanceCheckInterval = 1 hour
)
def channelParams: LocalParams = Peer.makeChannelParams(

View file

@ -0,0 +1,46 @@
package fr.acinq.eclair.balance
import akka.Done
import akka.actor.testkit.typed.scaladsl.{ScalaTestWithActorTestKit, TestProbe}
import akka.actor.typed.eventstream.EventStream
import akka.actor.typed.scaladsl.adapter.TypedActorRefOps
import com.typesafe.config.ConfigFactory
import fr.acinq.eclair.balance.ChannelsListener.{GetChannels, GetChannelsResponse}
import fr.acinq.eclair.channel.ChannelRestored
import fr.acinq.eclair.wire.internal.channel.ChannelCodecsSpec
import fr.acinq.eclair.{randomBytes32, randomKey}
import org.scalatest.funsuite.AnyFunSuiteLike
import scala.concurrent.Promise
class ChannelsListenerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("application")) with AnyFunSuiteLike {
test("channels listener basic test") {
val ready = Promise[Done]()
val channelsListener = testKit.spawn(ChannelsListener.apply(ready))
eventually {
assert(ready.isCompleted)
}
val channel1 = TestProbe[Any]()
system.eventStream ! EventStream.Publish(ChannelRestored(channel1.ref.toClassic, randomBytes32(), null, randomKey().publicKey, ChannelCodecsSpec.normal))
val channel2 = TestProbe[Any]()
system.eventStream ! EventStream.Publish(ChannelRestored(channel2.ref.toClassic, randomBytes32(), null, randomKey().publicKey, ChannelCodecsSpec.normal))
val probe = TestProbe[GetChannelsResponse]()
eventually {
channelsListener ! GetChannels(probe.ref)
assert(probe.expectMessageType[GetChannelsResponse].channels.size == 2)
}
channel2.stop()
eventually {
channelsListener ! GetChannels(probe.ref)
assert(probe.expectMessageType[GetChannelsResponse].channels.size == 1)
}
}
}

View file

@ -0,0 +1,114 @@
package fr.acinq.eclair.balance
import fr.acinq.bitcoin.{ByteVector32, SatoshiLong}
import fr.acinq.eclair.balance.CheckBalance.{ClosingBalance, OffChainBalance, PossiblyPublishedMainAndHtlcBalance, PossiblyPublishedMainBalance}
import fr.acinq.eclair.blockchain.bitcoind.rpc.ExtendedBitcoinClient
import fr.acinq.eclair.db.jdbc.JdbcUtils.ExtendedResultSet._
import fr.acinq.eclair.db.pg.PgUtils.using
import fr.acinq.eclair.randomBytes32
import fr.acinq.eclair.wire.internal.channel.ChannelCodecs.stateDataCodec
import org.scalatest.funsuite.AnyFunSuite
import org.sqlite.SQLiteConfig
import java.io.File
import java.sql.DriverManager
import scala.collection.immutable.Queue
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, ExecutionContext, Future}
class CheckBalanceSpec extends AnyFunSuite {
ignore("compute from eclair.sqlite") {
val dbFile = new File("eclair.sqlite")
val sqliteConfig = new SQLiteConfig()
sqliteConfig.setReadOnly(true)
val sqlite = DriverManager.getConnection(s"jdbc:sqlite:$dbFile", sqliteConfig.toProperties)
val channels = using(sqlite.createStatement) { statement =>
statement.executeQuery("SELECT data FROM local_channels WHERE is_closed=0")
.mapCodec(stateDataCodec)
}
val knownPreimages: Set[(ByteVector32, Long)] = using(sqlite.prepareStatement("SELECT channel_id, htlc_id FROM pending_relay")) { statement =>
val rs = statement.executeQuery()
var q: Queue[(ByteVector32, Long)] = Queue()
while (rs.next()) {
q = q :+ (rs.getByteVector32("channel_id"), rs.getLong("htlc_id"))
}
q.toSet
}
val res = CheckBalance.computeOffChainBalance(channels, knownPreimages)
println(res)
println(res.total)
}
test("tx pruning") {
val txids = (for (_ <- 0 until 20) yield randomBytes32()).toList
val knownTxids = Set(txids(1), txids(3), txids(4), txids(6), txids(9), txids(12), txids(13))
val bitcoinClient = new ExtendedBitcoinClient(null) {
/** Get the number of confirmations of a given transaction. */
override def getTxConfirmations(txid: ByteVector32)(implicit ec: ExecutionContext): Future[Option[Int]] =
Future.successful(if (knownTxids.contains(txid)) Some(42) else None)
}
val bal1 = OffChainBalance(
closing = ClosingBalance(
localCloseBalance = PossiblyPublishedMainAndHtlcBalance(
toLocal = Map(
txids(0) -> 1000.sat,
txids(1) -> 1000.sat,
txids(2) -> 1000.sat),
htlcs = Map(
txids(3) -> 1000.sat,
txids(4) -> 1000.sat,
txids(5) -> 1000.sat)
),
remoteCloseBalance = PossiblyPublishedMainAndHtlcBalance(
toLocal = Map(
txids(6) -> 1000.sat,
txids(7) -> 1000.sat,
txids(8) -> 1000.sat,
txids(9) -> 1000.sat),
htlcs = Map(
txids(10) -> 1000.sat,
txids(11) -> 1000.sat,
txids(12) -> 1000.sat),
),
mutualCloseBalance = PossiblyPublishedMainBalance(
toLocal = Map(
txids(13) -> 1000.sat,
txids(14) -> 1000.sat
)
)
)
)
val bal2 = Await.result(CheckBalance.prunePublishedTransactions(bal1, bitcoinClient)(ExecutionContext.Implicits.global), 10 seconds)
assert(bal2 == OffChainBalance(
closing = ClosingBalance(
localCloseBalance = PossiblyPublishedMainAndHtlcBalance(
toLocal = Map(
txids(0) -> 1000.sat,
txids(2) -> 1000.sat),
htlcs = Map(
txids(5) -> 1000.sat)
),
remoteCloseBalance = PossiblyPublishedMainAndHtlcBalance(
toLocal = Map(
txids(7) -> 1000.sat,
txids(8) -> 1000.sat),
htlcs = Map(
txids(10) -> 1000.sat,
txids(11) -> 1000.sat),
),
mutualCloseBalance = PossiblyPublishedMainBalance(
toLocal = Map(
txids(14) -> 1000.sat
)
)))
)
}
}

View file

@ -20,14 +20,18 @@ import akka.actor.ActorRef
import akka.testkit.TestProbe
import fr.acinq.bitcoin.Crypto.PrivateKey
import fr.acinq.bitcoin.{ByteVector32, ByteVector64, Crypto, SatoshiLong, ScriptFlags, Transaction}
import fr.acinq.eclair.balance.CheckBalance.PossiblyPublishedMainAndHtlcBalance
import fr.acinq.eclair.Features.StaticRemoteKey
import fr.acinq.eclair.TestConstants.{Alice, Bob}
import fr.acinq.eclair.UInt64.Conversions._
import fr.acinq.eclair._
import fr.acinq.eclair.balance.CheckBalance
import fr.acinq.eclair.blockchain.{CurrentBlockCount, CurrentFeerates}
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher._
import fr.acinq.eclair.blockchain.fee.{FeeratePerByte, FeeratePerKw, FeeratesPerKw}
import fr.acinq.eclair.blockchain.{CurrentBlockCount, CurrentFeerates}
import fr.acinq.eclair.channel.Channel._
import fr.acinq.eclair.channel.Helpers.Closing.{CurrentRemoteClose, LocalClose}
import fr.acinq.eclair.channel._
import fr.acinq.eclair.channel.publish.TxPublisher.{PublishRawTx, PublishTx}
import fr.acinq.eclair.channel.states.{StateTestsBase, StateTestsTags}
@ -2370,6 +2374,24 @@ class NormalStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with
// at best we have a little less than 450 000 + 250 000 + 100 000 + 50 000 = 850 000 (because fees)
assert(amountClaimed === 814880.sat)
val commitments = alice.stateData.asInstanceOf[DATA_CLOSING].commitments
val remoteCommitPublished = alice.stateData.asInstanceOf[DATA_CLOSING].remoteCommitPublished.get
val knownPreimages = Set((commitments.channelId, htlcb1.id))
assert(CheckBalance.computeRemoteCloseBalance(commitments, CurrentRemoteClose(commitments.remoteCommit, remoteCommitPublished), knownPreimages) ===
PossiblyPublishedMainAndHtlcBalance(
toLocal = Map(remoteCommitPublished.claimMainOutputTx.get.tx.txid -> remoteCommitPublished.claimMainOutputTx.get.tx.txOut.head.amount),
htlcs = claimTxs.drop(1).map(claimTx => claimTx.txid -> claimTx.txOut.head.amount.toBtc).toMap,
htlcsUnpublished = htlca3.amountMsat.truncateToSatoshi
))
// assuming alice gets the preimage for the 2nd htlc
val knownPreimages1 = Set((commitments.channelId, htlcb1.id), (commitments.channelId, htlcb2.id))
assert(CheckBalance.computeRemoteCloseBalance(commitments, CurrentRemoteClose(commitments.remoteCommit, remoteCommitPublished), knownPreimages1) ===
PossiblyPublishedMainAndHtlcBalance(
toLocal = Map(remoteCommitPublished.claimMainOutputTx.get.tx.txid -> remoteCommitPublished.claimMainOutputTx.get.tx.txOut.head.amount),
htlcs = claimTxs.drop(1).map(claimTx => claimTx.txid -> claimTx.txOut.head.amount.toBtc).toMap,
htlcsUnpublished = htlca3.amountMsat.truncateToSatoshi + htlcb2.amountMsat.truncateToSatoshi
))
assert(alice2blockchain.expectMsgType[WatchTxConfirmed].txId === bobCommitTx.txid)
assert(alice2blockchain.expectMsgType[WatchTxConfirmed].txId === claimMain.txid)
alice2blockchain.expectMsgType[WatchOutputSpent] // htlc 1
@ -2439,6 +2461,24 @@ class NormalStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with
// at best we have a little less than 500 000 + 250 000 + 100 000 = 850 000 (because fees)
assert(amountClaimed === 822310.sat)
val commitments = alice.stateData.asInstanceOf[DATA_CLOSING].commitments
val remoteCommitPublished = alice.stateData.asInstanceOf[DATA_CLOSING].nextRemoteCommitPublished.get
val knownPreimages = Set((commitments.channelId, htlcb1.id))
assert(CheckBalance.computeRemoteCloseBalance(commitments, CurrentRemoteClose(commitments.remoteNextCommitInfo.left.get.nextRemoteCommit, remoteCommitPublished), knownPreimages) ===
PossiblyPublishedMainAndHtlcBalance(
toLocal = Map(remoteCommitPublished.claimMainOutputTx.get.tx.txid -> remoteCommitPublished.claimMainOutputTx.get.tx.txOut.head.amount),
htlcs = claimTxs.drop(1).map(claimTx => claimTx.txid -> claimTx.txOut.head.amount.toBtc).toMap,
htlcsUnpublished = htlca3.amountMsat.truncateToSatoshi
))
// assuming alice gets the preimage for the 2nd htlc
val knownPreimages1 = Set((commitments.channelId, htlcb1.id), (commitments.channelId, htlcb2.id))
assert(CheckBalance.computeRemoteCloseBalance(commitments, CurrentRemoteClose(commitments.remoteNextCommitInfo.left.get.nextRemoteCommit, remoteCommitPublished), knownPreimages1) ===
PossiblyPublishedMainAndHtlcBalance(
toLocal = Map(remoteCommitPublished.claimMainOutputTx.get.tx.txid -> remoteCommitPublished.claimMainOutputTx.get.tx.txOut.head.amount),
htlcs = claimTxs.drop(1).map(claimTx => claimTx.txid -> claimTx.txOut.head.amount.toBtc).toMap,
htlcsUnpublished = htlca3.amountMsat.truncateToSatoshi + htlcb2.amountMsat.truncateToSatoshi
))
assert(alice2blockchain.expectMsgType[WatchTxConfirmed].txId === bobCommitTx.txid)
assert(alice2blockchain.expectMsgType[WatchTxConfirmed].txId === claimTxs(0).txid) // claim-main
alice2blockchain.expectMsgType[WatchOutputSpent] // htlc 1
@ -2571,8 +2611,8 @@ class NormalStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with
val (rb1, htlcb1) = addHtlc(50000000 msat, bob, alice, bob2alice, alice2bob)
val (rb2, htlcb2) = addHtlc(55000000 msat, bob, alice, bob2alice, alice2bob)
crossSign(alice, bob, alice2bob, bob2alice)
fulfillHtlc(1, ra2, bob, alice, bob2alice, alice2bob)
fulfillHtlc(0, rb1, alice, bob, alice2bob, bob2alice)
fulfillHtlc(htlca2.id, ra2, bob, alice, bob2alice, alice2bob)
fulfillHtlc(htlcb1.id, rb1, alice, bob, alice2bob, bob2alice)
// at this point here is the situation from alice pov and what she should do when she publishes his commit tx:
// balances :
@ -2592,6 +2632,7 @@ class NormalStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with
assert(aliceCommitTx.txOut.size == 6) // two main outputs and 4 pending htlcs
awaitCond(alice.stateName == CLOSING)
assert(alice.stateData.asInstanceOf[DATA_CLOSING].localCommitPublished.isDefined)
val commitments = alice.stateData.asInstanceOf[DATA_CLOSING].commitments
val localCommitPublished = alice.stateData.asInstanceOf[DATA_CLOSING].localCommitPublished.get
assert(localCommitPublished.commitTx == aliceCommitTx)
assert(localCommitPublished.htlcTxs.size === 4)
@ -2599,6 +2640,14 @@ class NormalStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with
assert(getHtlcTimeoutTxs(localCommitPublished).length === 2)
assert(localCommitPublished.claimHtlcDelayedTxs.isEmpty)
val knownPreimages = Set((commitments.channelId, htlcb1.id))
assert(CheckBalance.computeLocalCloseBalance(commitments, LocalClose(commitments.localCommit, localCommitPublished), knownPreimages) ===
PossiblyPublishedMainAndHtlcBalance(
toLocal = Map(localCommitPublished.claimMainDelayedOutputTx.get.tx.txid -> localCommitPublished.claimMainDelayedOutputTx.get.tx.txOut.head.amount),
htlcs = Map.empty,
htlcsUnpublished = htlca1.amountMsat.truncateToSatoshi + htlca3.amountMsat.truncateToSatoshi + htlcb1.amountMsat.truncateToSatoshi
))
// alice can only claim 3 out of 4 htlcs, she can't do anything regarding the htlc sent by bob for which she does not have the htlc
// so we expect 4 transactions:
// - 1 tx to claim the main delayed output
@ -2620,16 +2669,24 @@ class NormalStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with
alice2blockchain.expectNoMsg(1 second)
// 3rd-stage txs are published when htlc txs confirm
Seq(htlcTx1, htlcTx2, htlcTx3).foreach(htlcTimeoutTx => {
val claimHtlcDelayedTxs = Seq(htlcTx1, htlcTx2, htlcTx3).map { htlcTimeoutTx =>
alice ! WatchOutputSpentTriggered(htlcTimeoutTx)
assert(alice2blockchain.expectMsgType[WatchTxConfirmed].txId === htlcTimeoutTx.txid)
alice ! WatchTxConfirmedTriggered(2701, 3, htlcTimeoutTx)
val claimHtlcDelayedTx = alice2blockchain.expectMsgType[PublishRawTx].tx
Transaction.correctlySpends(claimHtlcDelayedTx, htlcTimeoutTx :: Nil, ScriptFlags.STANDARD_SCRIPT_VERIFY_FLAGS)
assert(alice2blockchain.expectMsgType[WatchTxConfirmed].txId === claimHtlcDelayedTx.txid)
})
claimHtlcDelayedTx
}
awaitCond(alice.stateData.asInstanceOf[DATA_CLOSING].localCommitPublished.get.claimHtlcDelayedTxs.length == 3)
alice2blockchain.expectNoMsg(1 second)
alice2blockchain.expectNoMessage(1 second)
assert(CheckBalance.computeLocalCloseBalance(commitments, LocalClose(commitments.localCommit, alice.stateData.asInstanceOf[DATA_CLOSING].localCommitPublished.get), knownPreimages) ===
PossiblyPublishedMainAndHtlcBalance(
toLocal = Map(localCommitPublished.claimMainDelayedOutputTx.get.tx.txid -> localCommitPublished.claimMainDelayedOutputTx.get.tx.txOut.head.amount),
htlcs = claimHtlcDelayedTxs.map(claimTx => claimTx.txid -> claimTx.txOut.head.amount.toBtc).toMap,
htlcsUnpublished = htlca3.amountMsat.truncateToSatoshi
))
}
test("recv Error (nothing at stake)", Tag(StateTestsTags.NoPushMsat)) { f =>

View file

@ -78,16 +78,16 @@ class GUIUpdater(mainController: MainController) extends Actor with ActorLogging
runInGuiThread(() => mainController.channelBox.getChildren.addAll(root))
context.become(main(m + (channel -> channelPaneController)))
case ChannelRestored(channel, channelId, peer, remoteNodeId, isFunder, currentData: Commitments) => // We are specifically interested in normal Commitments with funding txid here
case ChannelRestored(channel, channelId, peer, remoteNodeId, currentData) => // We are specifically interested in normal Commitments with funding txid here
context.watch(channel)
val (channelPaneController, root) = createChannelPanel(channel, peer, remoteNodeId, isFunder, channelId)
channelPaneController.updateBalance(currentData)
val (channelPaneController, root) = createChannelPanel(channel, peer, remoteNodeId, currentData.commitments.localParams.isFunder, channelId)
channelPaneController.updateBalance(currentData.commitments)
val m1 = m + (channel -> channelPaneController)
val totalBalance = m1.values.map(_.getBalance).sum
runInGuiThread(() => {
channelPaneController.refreshBalance()
mainController.refreshTotalBalance(totalBalance)
channelPaneController.txId.setText(currentData.commitInput.outPoint.txid.toHex)
channelPaneController.txId.setText(currentData.commitments.commitInput.outPoint.txid.toHex)
mainController.channelBox.getChildren.addAll(root)
})
context.become(main(m1))

View file

@ -48,6 +48,10 @@ trait OnChain {
}
}
val onChainRoutes: Route = getNewAddress ~ sendOnChain ~ onChainBalance ~ onChainTransactions
val globalBalance: Route = postRequest("globalbalance") { implicit t =>
complete(eclairApi.globalBalance())
}
val onChainRoutes: Route = getNewAddress ~ sendOnChain ~ onChainBalance ~ onChainTransactions ~ globalBalance
}

View file

@ -19,8 +19,9 @@ package fr.acinq.eclair.api.serde
import com.google.common.net.HostAndPort
import de.heikoseeberger.akkahttpjson4s.Json4sSupport
import fr.acinq.bitcoin.Crypto.{PrivateKey, PublicKey}
import fr.acinq.bitcoin.{ByteVector32, ByteVector64, OutPoint, Satoshi, Transaction}
import fr.acinq.bitcoin.{Btc, ByteVector32, ByteVector64, OutPoint, Satoshi, Transaction}
import fr.acinq.eclair.ApiTypes.ChannelIdentifier
import fr.acinq.eclair.balance.CheckBalance.GlobalBalance
import fr.acinq.eclair.blockchain.fee.FeeratePerKw
import fr.acinq.eclair.channel._
import fr.acinq.eclair.crypto.ShaChain
@ -29,11 +30,11 @@ import fr.acinq.eclair.db.{IncomingPaymentStatus, OutgoingPaymentStatus}
import fr.acinq.eclair.payment._
import fr.acinq.eclair.router.Router.RouteResponse
import fr.acinq.eclair.transactions.DirectedHtlc
import fr.acinq.eclair.transactions.Transactions.{ClaimHtlcTx, ClosingTx, HtlcSuccessTx, HtlcTimeoutTx, InputInfo, TransactionWithInputInfo}
import fr.acinq.eclair.transactions.Transactions._
import fr.acinq.eclair.wire.protocol._
import fr.acinq.eclair.{CltvExpiry, CltvExpiryDelta, Features, MilliSatoshi, ShortChannelId, UInt64}
import org.json4s.JsonAST._
import org.json4s.{CustomKeySerializer, CustomSerializer, DefaultFormats, Extraction, JsonAST, ShortTypeHints, TypeHints, jackson}
import org.json4s.{CustomKeySerializer, CustomSerializer, DefaultFormats, Extraction, ShortTypeHints, TypeHints, jackson}
import scodec.bits.ByteVector
import java.net.InetSocketAddress
@ -56,6 +57,12 @@ class ByteVector32Serializer extends CustomSerializer[ByteVector32](_ => ( {
case x: ByteVector32 => JString(x.toHex)
}))
class ByteVector32KeySerializer extends CustomKeySerializer[ByteVector32](_ => ( {
null
}, {
case x: ByteVector32 => x.toHex
}))
class ByteVector64Serializer extends CustomSerializer[ByteVector64](_ => ( {
null
}, {
@ -68,6 +75,12 @@ class UInt64Serializer extends CustomSerializer[UInt64](_ => ( {
case x: UInt64 => JInt(x.toBigInt)
}))
class BtcSerializer extends CustomSerializer[Btc](_ => ( {
null
}, {
case x: Btc => JDecimal(x.toDouble)
}))
class SatoshiSerializer extends CustomSerializer[Satoshi](_ => ( {
null
}, {
@ -370,6 +383,14 @@ class OriginSerializer extends CustomSerializer[Origin](_ => ( {
})
}))
class GlobalBalanceSerializer extends CustomSerializer[GlobalBalance](_ => ( {
null
}, {
case o: GlobalBalance =>
val formats = DefaultFormats + new ByteVector32KeySerializer + new BtcSerializer + new SatoshiSerializer
JObject(JField("total", JDecimal(o.total.toDouble))) merge Extraction.decompose(o)(formats)
}))
case class CustomTypeHints(custom: Map[Class[_], String]) extends TypeHints {
val reverse: Map[String, Class[_]] = custom.map(_.swap)
@ -432,6 +453,7 @@ object JsonSupport extends Json4sSupport {
new ByteVector64Serializer +
new ChannelEventSerializer +
new UInt64Serializer +
new BtcSerializer +
new SatoshiSerializer +
new MilliSatoshiSerializer +
new CltvExpirySerializer +
@ -463,6 +485,7 @@ object JsonSupport extends Json4sSupport {
new JavaUUIDSerializer +
new FeaturesSerializer +
new OriginSerializer +
new GlobalBalanceSerializer +
CustomTypeHints.incomingPaymentStatus +
CustomTypeHints.outgoingPaymentStatus +
CustomTypeHints.paymentEvent +

View file

@ -16,9 +16,11 @@
package fr.acinq.eclair.api
import fr.acinq.bitcoin.{ByteVector32, OutPoint, Satoshi, Transaction, TxOut}
import fr.acinq.bitcoin.{Btc, ByteVector32, OutPoint, Satoshi, Transaction, TxOut}
import fr.acinq.eclair._
import fr.acinq.eclair.api.serde._
import fr.acinq.eclair.balance.CheckBalance
import fr.acinq.eclair.balance.CheckBalance.{ClosingBalance, GlobalBalance, MainAndHtlcBalance, PossiblyPublishedMainAndHtlcBalance, PossiblyPublishedMainBalance}
import fr.acinq.eclair.channel.Origin
import fr.acinq.eclair.payment.{PaymentRequest, PaymentSettlingOnChain}
import fr.acinq.eclair.transactions.Transactions._
@ -143,6 +145,26 @@ class JsonSerializersSpec extends AnyFunSuite with Matchers {
JsonSupport.serialization.write(pr)(JsonSupport.formats) shouldBe """{"prefix":"lntb","timestamp":1622474982,"nodeId":"03e89e4c3d41dc5332c2fb6cc66d12bfb9257ba681945a242f27a08d5ad210d891","serialized":"lntb1pst2q8xpp5qysan6j5xeq97tytxf7pfr0n75na8rztqhh03glmlgsqsyuqzgnqdqqxqrrss9qy9qsqsp5qq67gcxrn2drj5p0lc6p8wgdpqwxnc2h4s9kra5489q0fqsvhumsrzjqfqnj4upt5z6hdludky9vgk4ehzmwu2dk9rcevzczw5ywstehq79c83xr5qqqkqqqqqqqqlgqqqqqeqqjqrzjqwfn3p9278ttzzpe0e00uhyxhned3j5d9acqak5emwfpflp8z2cng838tqqqqxgqqqqqqqlgqqqqqeqqjqkxs4223x2r6sat65asfp0k2pze2rswe9np9vq08waqvsp832ffgymzgx8hgzejasesfxwcw6jj93azwq9klwuzmef3llns3n95pztgqpawp7an","description":"","paymentHash":"0121d9ea5436405f2c8b327c148df3f527d38c4b05eef8a3fbfa200813801226","expiry":3600,"features":{"activated":{"var_onion_optin":"optional","payment_secret":"optional","basic_mpp":"optional"},"unknown":[]},"routingInfo":[[{"nodeId":"02413957815d05abb7fc6d885622d5cdc5b7714db1478cb05813a8474179b83c5c","shortChannelId":"1975837x88x0","feeBase":1000,"feeProportionalMillionths":100,"cltvExpiryDelta":144}],[{"nodeId":"03933884aaf1d6b108397e5efe5c86bcf2d8ca8d2f700eda99db9214fc2712b134","shortChannelId":"1976152x25x0","feeBase":1000,"feeProportionalMillionths":100,"cltvExpiryDelta":144}]]}"""
}
test("GlobalBalance serializer") {
val gb = GlobalBalance(
onChain = CheckBalance.CorrectedOnChainBalance(Btc(0.4), Btc(0.05)),
offChain = CheckBalance.OffChainBalance(normal = MainAndHtlcBalance(
toLocal = Btc(0.2),
htlcOut = Btc(0.05)
),
closing = ClosingBalance(
localCloseBalance = PossiblyPublishedMainAndHtlcBalance(
toLocal = Map(ByteVector32(hex"4d176ad844c363bed59edf81962b008faa6194c3b3757ffcd26ba60f95716db2") -> Btc(0.1)),
htlcs = Map(ByteVector32(hex"94b70cec5a98d67d17c6e3de5c7697f8a6cab4f698df91e633ce35efa3574d71") -> Btc(0.03), ByteVector32(hex"a844edd41ce8503864f3c95d89d850b177a09d7d35e950a7d27c14abb63adb13") -> Btc(0.06)),
htlcsUnpublished = Btc(0.01)),
mutualCloseBalance = PossiblyPublishedMainBalance(
toLocal = Map(ByteVector32(hex"7e3b012534afe0bb8d1f2f09c724b1a10a813ce704e5b9c217ccfdffffff0247") -> Btc(0.1)))
)
)
)
JsonSupport.serialization.write(gb)(JsonSupport.formats) shouldBe """{"total":1.0,"onChain":{"confirmed":0.4,"unconfirmed":0.05},"offChain":{"waitForFundingConfirmed":0.0,"waitForFundingLocked":0.0,"normal":{"toLocal":0.2,"htlcOut":0.05},"shutdown":{"toLocal":0.0,"htlcOut":0.0},"negotiating":0.0,"closing":{"localCloseBalance":{"toLocal":{"4d176ad844c363bed59edf81962b008faa6194c3b3757ffcd26ba60f95716db2":0.1},"htlcs":{"94b70cec5a98d67d17c6e3de5c7697f8a6cab4f698df91e633ce35efa3574d71":0.03,"a844edd41ce8503864f3c95d89d850b177a09d7d35e950a7d27c14abb63adb13":0.06},"htlcsUnpublished":0.01},"remoteCloseBalance":{"toLocal":{},"htlcs":{},"htlcsUnpublished":0.0},"mutualCloseBalance":{"toLocal":{"7e3b012534afe0bb8d1f2f09c724b1a10a813ce704e5b9c217ccfdffffff0247":0.1}},"unknownCloseBalance":{"toLocal":0.0,"htlcOut":0.0}},"waitForPublishFutureCommitment":0.0}}"""
}
test("type hints") {
val e1 = PaymentSettlingOnChain(UUID.randomUUID, 42 msat, randomBytes32())
assert(JsonSupport.serialization.writePretty(e1)(JsonSupport.formats).contains("\"type\" : \"payment-settling-onchain\""))