From 8370fa29c01b38b025e3ae9a272430fdb9efbdaa Mon Sep 17 00:00:00 2001 From: Bastien Teinturier <31281497+t-bast@users.noreply.github.com> Date: Thu, 5 Sep 2024 14:48:01 +0200 Subject: [PATCH] Reduce the number of RPC calls to bitcoind during force-close (#2902) * Don't spawn anchor tx publisher if commit is confirmed It is inefficient to spawn a tx publisher for anchor txs if we already know that the commit tx is confirmed: we will make calls to our bitcoin node that can easily be avoided. This can matter when force-closing a large number of channels with frequent disconnections (e.g. wallets). * Improve `TxTimeLocksMonitor` performance When publishing a transaction that has CSV delays, we previously used the watcher and set a `minDepth` on the parent transaction matching the CSV delay of the child transaction. While this was very simple, it was unnecessarily expensive for large CSV delays: the watcher would check for tx confirmations at every block, even when the CSV delay is very large. When we force-close a large number of channels, it results in a very large number of RPC calls to our `bitcoind` node. We don't use the watcher in the `TxTimeLocksMonitor` anymore: instead we check the parent confirmations once, and then we check again after the CSV delay. * Add relative delay hints to `ZmqWatcher` When we tell the `ZmqWatcher` to watch for confirmations on transactions that have a relative delay, it is highly inefficient to call our bitcoin node at every new block to check for confirmations (especially when the parent transaction isn't even confirmed). We now tell the watcher about the relative delay, which lets it check for confirmations only at block heights where we expect the transaction to reach its minimum depth. This is especially useful to improve performance for delayed transactions that usually use a CSV of at least 720 blocks. --- .../main/scala/fr/acinq/eclair/Setup.scala | 2 +- .../blockchain/bitcoind/ZmqWatcher.scala | 82 +++++++++--- .../fr/acinq/eclair/channel/fsm/Channel.scala | 6 +- .../eclair/channel/fsm/ErrorHandlers.scala | 42 +++--- .../channel/publish/FinalTxPublisher.scala | 8 +- .../publish/ReplaceableTxPublisher.scala | 8 +- .../eclair/channel/publish/TxPublisher.scala | 7 +- .../channel/publish/TxTimeLocksMonitor.scala | 121 +++++++++++------- .../blockchain/bitcoind/ZmqWatcherSpec.scala | 33 +++++ .../publish/FinalTxPublisherSpec.scala | 19 +-- .../publish/ReplaceableTxPublisherSpec.scala | 54 ++------ .../publish/TxTimeLocksMonitorSpec.scala | 102 ++++++++++----- .../ChannelStateTestsHelperMethods.scala | 6 +- .../channel/states/h/ClosingStateSpec.scala | 16 ++- .../basic/fixtures/MinimalNodeFixture.scala | 2 +- 15 files changed, 314 insertions(+), 194 deletions(-) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala b/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala index 2b28bb041..3eddf8de6 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala @@ -367,7 +367,7 @@ class Setup(val datadir: File, // we want to make sure the handler for post-restart broken HTLCs has finished initializing. _ <- postRestartCleanUpInitialized.future - txPublisherFactory = Channel.SimpleTxPublisherFactory(nodeParams, watcher, bitcoinClient) + txPublisherFactory = Channel.SimpleTxPublisherFactory(nodeParams, bitcoinClient) channelFactory = Peer.SimpleChannelFactory(nodeParams, watcher, relayer, bitcoinClient, txPublisherFactory) pendingChannelsRateLimiter = system.spawn(Behaviors.supervise(PendingChannelsRateLimiter(nodeParams, router.toTyped, channels)).onFailure(typed.SupervisorStrategy.resume), name = "pending-channels-rate-limiter") peerFactory = Switchboard.SimplePeerFactory(nodeParams, bitcoinClient, channelFactory, pendingChannelsRateLimiter, register, router.toTyped) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcher.scala b/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcher.scala index 2ec66eaf1..d5126f4a8 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcher.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcher.scala @@ -63,6 +63,7 @@ object ZmqWatcher { private case class PublishBlockHeight(current: BlockHeight) extends Command private case class ProcessNewBlock(blockId: BlockId) extends Command private case class ProcessNewTransaction(tx: Transaction) extends Command + private case class SetWatchHint(w: GenericWatch, hint: WatchHint) extends Command final case class ValidateRequest(replyTo: ActorRef[ValidateResult], ann: ChannelAnnouncement) extends Command final case class ValidateResult(c: ChannelAnnouncement, fundingTx: Either[Throwable, (Transaction, UtxoStatus)]) @@ -155,7 +156,8 @@ object ZmqWatcher { case class WatchFundingDeeplyBuried(replyTo: ActorRef[WatchFundingDeeplyBuriedTriggered], txId: TxId, minDepth: Long) extends WatchConfirmed[WatchFundingDeeplyBuriedTriggered] case class WatchFundingDeeplyBuriedTriggered(blockHeight: BlockHeight, txIndex: Int, tx: Transaction) extends WatchConfirmedTriggered - case class WatchTxConfirmed(replyTo: ActorRef[WatchTxConfirmedTriggered], txId: TxId, minDepth: Long) extends WatchConfirmed[WatchTxConfirmedTriggered] + case class RelativeDelay(parentTxId: TxId, delay: Long) + case class WatchTxConfirmed(replyTo: ActorRef[WatchTxConfirmedTriggered], txId: TxId, minDepth: Long, delay_opt: Option[RelativeDelay] = None) extends WatchConfirmed[WatchTxConfirmedTriggered] case class WatchTxConfirmedTriggered(blockHeight: BlockHeight, txIndex: Int, tx: Transaction) extends WatchConfirmedTriggered case class WatchParentTxConfirmed(replyTo: ActorRef[WatchParentTxConfirmedTriggered], txId: TxId, minDepth: Long) extends WatchConfirmed[WatchParentTxConfirmedTriggered] @@ -167,6 +169,13 @@ object ZmqWatcher { private sealed trait AddWatchResult private case object Keep extends AddWatchResult private case object Ignore extends AddWatchResult + + sealed trait WatchHint + /** + * In some cases we don't need to check watches every time a block is found and only need to check again after we + * reach a specific block height. This is for example the case for transactions with a CSV delay. + */ + private case class CheckAfterBlock(blockHeight: BlockHeight) extends WatchHint // @formatter:on def apply(nodeParams: NodeParams, blockCount: AtomicLong, client: BitcoinCoreClient): Behavior[Command] = @@ -178,7 +187,7 @@ object ZmqWatcher { timers.startSingleTimer(TickNewBlock, 1 second) // we start a timer in case we don't receive ZMQ block events timers.startSingleTimer(TickBlockTimeout, blockTimeout) - new ZmqWatcher(nodeParams, blockCount, client, context, timers).watching(Set.empty[GenericWatch], Map.empty[OutPoint, Set[GenericWatch]]) + new ZmqWatcher(nodeParams, blockCount, client, context, timers).watching(Map.empty[GenericWatch, Option[WatchHint]], Map.empty[OutPoint, Set[GenericWatch]]) } } @@ -224,7 +233,7 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client private val watchdog = context.spawn(Behaviors.supervise(BlockchainWatchdog(nodeParams, 150 seconds)).onFailure(SupervisorStrategy.resume), "blockchain-watchdog") - private def watching(watches: Set[GenericWatch], watchedUtxos: Map[OutPoint, Set[GenericWatch]]): Behavior[Command] = { + private def watching(watches: Map[GenericWatch, Option[WatchHint]], watchedUtxos: Map[OutPoint, Set[GenericWatch]]): Behavior[Command] = { Behaviors.receiveMessage { case ProcessNewTransaction(tx) => log.debug("analyzing txid={} tx={}", tx.txid, tx) @@ -239,7 +248,7 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client case _: WatchPublished => // nothing to do case _: WatchConfirmed[_] => // nothing to do } - watches.collect { + watches.keySet.collect { case w: WatchPublished if w.txId == tx.txid => context.self ! TriggerEvent(w.replyTo, w, WatchPublishedTriggered(tx)) } Behaviors.same @@ -279,21 +288,32 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client case Failure(t) => GetBlockCountFailed(t) case Success(currentHeight) => PublishBlockHeight(currentHeight) } - // TODO: beware of the herd effect - KamonExt.timeFuture(Metrics.NewBlockCheckConfirmedDuration.withoutTags()) { - Future.sequence(watches.collect { - case w: WatchPublished => checkPublished(w) - case w: WatchConfirmed[_] => checkConfirmed(w) - }) - } Behaviors.same case PublishBlockHeight(currentHeight) => log.debug("setting blockHeight={}", currentHeight) blockHeight.set(currentHeight.toLong) context.system.eventStream ! EventStream.Publish(CurrentBlockHeight(currentHeight)) + // TODO: should we try to mitigate the herd effect and not check all watches immediately? + KamonExt.timeFuture(Metrics.NewBlockCheckConfirmedDuration.withoutTags()) { + Future.sequence(watches.collect { + case (w: WatchPublished, _) => checkPublished(w) + case (w: WatchConfirmed[_], hint) => + hint match { + case Some(CheckAfterBlock(delayUntilBlock)) if currentHeight < delayUntilBlock => Future.successful(()) + case _ => checkConfirmed(w, currentHeight) + } + }) + } Behaviors.same + case SetWatchHint(w, hint) => + val watches1 = watches.get(w) match { + case Some(_) => watches + (w -> Some(hint)) + case None => watches + } + watching(watches1, watchedUtxos) + case TriggerEvent(replyTo, watch, event) => if (watches.contains(watch)) { log.debug("triggering {}", watch) @@ -323,7 +343,7 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client checkSpent(w) Keep case w: WatchConfirmed[_] => - checkConfirmed(w) + checkConfirmed(w, BlockHeight(blockHeight.get())) Keep case w: WatchPublished => checkPublished(w) @@ -333,14 +353,14 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client case Keep => log.debug("adding watch {}", w) context.watchWith(w.replyTo, StopWatching(w.replyTo)) - watching(watches + w, addWatchedUtxos(watchedUtxos, w)) + watching(watches + (w -> None), addWatchedUtxos(watchedUtxos, w)) case Ignore => Behaviors.same } case StopWatching(origin) => - // we remove watches associated to dead actors - val deprecatedWatches = watches.filter(_.replyTo == origin) + // We remove watches associated to dead actors. + val deprecatedWatches = watches.keySet.filter(_.replyTo == origin) val watchedUtxos1 = deprecatedWatches.foldLeft(watchedUtxos) { case (m, w) => removeWatchedUtxos(m, w) } watching(watches -- deprecatedWatches, watchedUtxos1) @@ -353,7 +373,7 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client Behaviors.same case r: ListWatches => - r.replyTo ! watches + r.replyTo ! watches.keySet Behaviors.same } @@ -414,7 +434,7 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client client.getTransaction(w.txId).map(tx => context.self ! TriggerEvent(w.replyTo, w, WatchPublishedTriggered(tx))) } - private def checkConfirmed(w: WatchConfirmed[_ <: WatchConfirmedTriggered]): Future[Unit] = { + private def checkConfirmed(w: WatchConfirmed[_ <: WatchConfirmedTriggered], currentHeight: BlockHeight): Future[Unit] = { log.debug("checking confirmations of txid={}", w.txId) // NB: this is very inefficient since internally we call `getrawtransaction` three times, but it doesn't really // matter because this only happens once, when the watched transaction has reached min_depth @@ -431,7 +451,33 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client } } } - case _ => Future.successful((): Unit) + case Some(confirmations) => + // Once the transaction is confirmed, we don't need to check again at every new block, we only need to check + // again once we should have reached the minimum depth to verify that there hasn't been a reorg. + context.self ! SetWatchHint(w, CheckAfterBlock(currentHeight + w.minDepth - confirmations)) + Future.successful(()) + case None => + w match { + case WatchTxConfirmed(_, _, _, Some(relativeDelay)) => + log.debug("txId={} has a relative delay of {} blocks, checking parentTxId={}", w.txId, relativeDelay.delay, relativeDelay.parentTxId) + // Note how we add one block to avoid an off-by-one: + // - if the parent is confirmed at block P + // - the CSV delay is D and the minimum depth is M + // - the first block that can include the child is P + D + // - the first block at which we can reach minimum depth is P + D + M + // - if we are currently at block P + N, the parent has C = N + 1 confirmations + // - we want to check at block P + N + D + M + 1 - C = P + N + D + M + 1 - (N + 1) = P + D + M + val delay = relativeDelay.delay + w.minDepth + 1 + client.getTxConfirmations(relativeDelay.parentTxId).map(_.getOrElse(0)).collect { + case confirmations if confirmations < delay => context.self ! SetWatchHint(w, CheckAfterBlock(currentHeight + delay - confirmations)) + } + case _ => + // The transaction is unconfirmed: we don't need to check again at every new block: we can check only once + // every minDepth blocks, which is more efficient. If the transaction is included at the current height in + // a reorg, we will trigger the watch one block later than expected, but this is fine. + context.self ! SetWatchHint(w, CheckAfterBlock(currentHeight + w.minDepth)) + Future.successful(()) + } } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala index e55d86d61..1146511f4 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala @@ -115,9 +115,9 @@ object Channel { def spawnTxPublisher(context: ActorContext, remoteNodeId: PublicKey): typed.ActorRef[TxPublisher.Command] } - case class SimpleTxPublisherFactory(nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Command], bitcoinClient: BitcoinCoreClient) extends TxPublisherFactory { + case class SimpleTxPublisherFactory(nodeParams: NodeParams, bitcoinClient: BitcoinCoreClient) extends TxPublisherFactory { override def spawnTxPublisher(context: ActorContext, remoteNodeId: PublicKey): typed.ActorRef[TxPublisher.Command] = { - context.spawn(Behaviors.supervise(TxPublisher(nodeParams, remoteNodeId, TxPublisher.SimpleChildFactory(nodeParams, bitcoinClient, watcher))).onFailure(typed.SupervisorStrategy.restart), "tx-publisher") + context.spawn(Behaviors.supervise(TxPublisher(nodeParams, remoteNodeId, TxPublisher.SimpleChildFactory(nodeParams, bitcoinClient))).onFailure(typed.SupervisorStrategy.restart), "tx-publisher") } } @@ -1714,7 +1714,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with val (localCommitPublished1, claimHtlcTx_opt) = Closing.LocalClose.claimHtlcDelayedOutput(localCommitPublished, keyManager, d.commitments.latest, tx, nodeParams.currentFeerates, nodeParams.onChainFeeConf, d.finalScriptPubKey) claimHtlcTx_opt.foreach(claimHtlcTx => { txPublisher ! PublishFinalTx(claimHtlcTx, claimHtlcTx.fee, None) - blockchain ! WatchTxConfirmed(self, claimHtlcTx.tx.txid, nodeParams.channelConf.minDepthBlocks) + blockchain ! WatchTxConfirmed(self, claimHtlcTx.tx.txid, nodeParams.channelConf.minDepthBlocks, Some(RelativeDelay(tx.txid, d.commitments.params.remoteParams.toSelfDelay.toInt.toLong))) }) Closing.updateLocalCommitPublished(localCommitPublished1, tx) }), diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ErrorHandlers.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ErrorHandlers.scala index 728d3cfa4..31103a737 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ErrorHandlers.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ErrorHandlers.scala @@ -18,10 +18,10 @@ package fr.acinq.eclair.channel.fsm import akka.actor.typed.scaladsl.adapter.actorRefAdapter import akka.actor.{ActorRef, FSM} -import fr.acinq.bitcoin.scalacompat.{ByteVector32, OutPoint, SatoshiLong, Transaction} +import fr.acinq.bitcoin.scalacompat.{ByteVector32, OutPoint, SatoshiLong, Transaction, TxId} import fr.acinq.eclair.NotificationsLogger import fr.acinq.eclair.NotificationsLogger.NotifyNodeOperator -import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{WatchOutputSpent, WatchTxConfirmed} +import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{RelativeDelay, WatchOutputSpent, WatchTxConfirmed} import fr.acinq.eclair.channel.Helpers.Closing import fr.acinq.eclair.channel._ import fr.acinq.eclair.channel.fsm.Channel.UnhandledExceptionStrategy @@ -164,9 +164,9 @@ trait ErrorHandlers extends CommonHandlers { /** * This helper method will watch txs only if they haven't yet reached minDepth */ - private def watchConfirmedIfNeeded(txs: Iterable[Transaction], irrevocablySpent: Map[OutPoint, Transaction]): Unit = { + private def watchConfirmedIfNeeded(txs: Iterable[Transaction], irrevocablySpent: Map[OutPoint, Transaction], relativeDelays: Map[TxId, RelativeDelay]): Unit = { val (skip, process) = txs.partition(Closing.inputsAlreadySpent(_, irrevocablySpent)) - process.foreach(tx => blockchain ! WatchTxConfirmed(self, tx.txid, nodeParams.channelConf.minDepthBlocks)) + process.foreach(tx => blockchain ! WatchTxConfirmed(self, tx.txid, nodeParams.channelConf.minDepthBlocks, relativeDelays.get(tx.txid))) skip.foreach(tx => log.debug(s"no need to watch txid=${tx.txid}, it has already been confirmed")) } @@ -219,21 +219,23 @@ trait ErrorHandlers extends CommonHandlers { List(PublishFinalTx(commitTx, commitment.commitInput.outPoint, "commit-tx", Closing.commitTxFee(commitment.commitInput, commitTx, localPaysCommitTxFees), None)) ++ (claimMainDelayedOutputTx.map(tx => PublishFinalTx(tx, tx.fee, None)) ++ redeemableHtlcTxs ++ claimHtlcDelayedTxs.map(tx => PublishFinalTx(tx, tx.fee, None))) case _: Transactions.AnchorOutputsCommitmentFormat => val redeemableHtlcTxs = htlcTxs.values.flatten.map(tx => PublishReplaceableTx(tx, commitment)) - val claimLocalAnchor = claimAnchorTxs.collect { case tx: Transactions.ClaimLocalAnchorOutputTx => PublishReplaceableTx(tx, commitment) } + val claimLocalAnchor = claimAnchorTxs.collect { case tx: Transactions.ClaimLocalAnchorOutputTx if !localCommitPublished.isConfirmed => PublishReplaceableTx(tx, commitment) } List(PublishFinalTx(commitTx, commitment.commitInput.outPoint, "commit-tx", Closing.commitTxFee(commitment.commitInput, commitTx, localPaysCommitTxFees), None)) ++ claimLocalAnchor ++ claimMainDelayedOutputTx.map(tx => PublishFinalTx(tx, tx.fee, None)) ++ redeemableHtlcTxs ++ claimHtlcDelayedTxs.map(tx => PublishFinalTx(tx, tx.fee, None)) } publishIfNeeded(publishQueue, irrevocablySpent) - // we watch: - // - the commitment tx itself, so that we can handle the case where we don't have any outputs - // - 'final txs' that send funds to our wallet and that spend outputs that only us control + // We watch: + // - the commitment tx itself, so that we can handle the case where we don't have any outputs + // - 'final txs' that send funds to our wallet and that spend outputs that only us control + // Our 'final txs" have a long relative delay: we provide that information to the watcher for efficiency. + val relativeDelays = (claimMainDelayedOutputTx ++ claimHtlcDelayedTxs).map(tx => tx.tx.txid -> RelativeDelay(tx.input.outPoint.txid, commitment.remoteParams.toSelfDelay.toInt.toLong)).toMap val watchConfirmedQueue = List(commitTx) ++ claimMainDelayedOutputTx.map(_.tx) ++ claimHtlcDelayedTxs.map(_.tx) - watchConfirmedIfNeeded(watchConfirmedQueue, irrevocablySpent) + watchConfirmedIfNeeded(watchConfirmedQueue, irrevocablySpent, relativeDelays) - // we watch outputs of the commitment tx that both parties may spend - // we also watch our local anchor: this ensures that we will correctly detect when it's confirmed and count its fees - // in the audit DB, even if we restart before confirmation - val watchSpentQueue = htlcTxs.keys ++ claimAnchorTxs.collect { case tx: Transactions.ClaimLocalAnchorOutputTx => tx.input.outPoint } + // We watch outputs of the commitment tx that both parties may spend. + // We also watch our local anchor: this ensures that we will correctly detect when it's confirmed and count its fees + // in the audit DB, even if we restart before confirmation. + val watchSpentQueue = htlcTxs.keys ++ claimAnchorTxs.collect { case tx: Transactions.ClaimLocalAnchorOutputTx if !localCommitPublished.isConfirmed => tx.input.outPoint } watchSpentIfNeeded(commitTx, watchSpentQueue, irrevocablySpent) } @@ -274,18 +276,18 @@ trait ErrorHandlers extends CommonHandlers { def doPublish(remoteCommitPublished: RemoteCommitPublished, commitment: FullCommitment): Unit = { import remoteCommitPublished._ - val claimLocalAnchor = claimAnchorTxs.collect { case tx: Transactions.ClaimLocalAnchorOutputTx => PublishReplaceableTx(tx, commitment) } + val claimLocalAnchor = claimAnchorTxs.collect { case tx: Transactions.ClaimLocalAnchorOutputTx if !remoteCommitPublished.isConfirmed => PublishReplaceableTx(tx, commitment) } val redeemableHtlcTxs = claimHtlcTxs.values.flatten.map(tx => PublishReplaceableTx(tx, commitment)) val publishQueue = claimLocalAnchor ++ claimMainOutputTx.map(tx => PublishFinalTx(tx, tx.fee, None)).toSeq ++ redeemableHtlcTxs publishIfNeeded(publishQueue, irrevocablySpent) - // we watch: + // We watch: // - the commitment tx itself, so that we can handle the case where we don't have any outputs // - 'final txs' that send funds to our wallet and that spend outputs that only us control val watchConfirmedQueue = List(commitTx) ++ claimMainOutputTx.map(_.tx) - watchConfirmedIfNeeded(watchConfirmedQueue, irrevocablySpent) + watchConfirmedIfNeeded(watchConfirmedQueue, irrevocablySpent, relativeDelays = Map.empty) - // we watch outputs of the commitment tx that both parties may spend + // We watch outputs of the commitment tx that both parties may spend. val watchSpentQueue = claimHtlcTxs.keys watchSpentIfNeeded(commitTx, watchSpentQueue, irrevocablySpent) } @@ -336,13 +338,13 @@ trait ErrorHandlers extends CommonHandlers { val publishQueue = (claimMainOutputTx ++ mainPenaltyTx ++ htlcPenaltyTxs ++ claimHtlcDelayedPenaltyTxs).map(tx => PublishFinalTx(tx, tx.fee, None)) publishIfNeeded(publishQueue, irrevocablySpent) - // we watch: + // We watch: // - the commitment tx itself, so that we can handle the case where we don't have any outputs // - 'final txs' that send funds to our wallet and that spend outputs that only us control val watchConfirmedQueue = List(commitTx) ++ claimMainOutputTx.map(_.tx) - watchConfirmedIfNeeded(watchConfirmedQueue, irrevocablySpent) + watchConfirmedIfNeeded(watchConfirmedQueue, irrevocablySpent, relativeDelays = Map.empty) - // we watch outputs of the commitment tx that both parties may spend + // We watch outputs of the commitment tx that both parties may spend. val watchSpentQueue = (mainPenaltyTx ++ htlcPenaltyTxs).map(_.input.outPoint) watchSpentIfNeeded(commitTx, watchSpentQueue, irrevocablySpent) } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/FinalTxPublisher.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/FinalTxPublisher.scala index e712e49f3..6dba5078d 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/FinalTxPublisher.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/FinalTxPublisher.scala @@ -19,7 +19,6 @@ package fr.acinq.eclair.channel.publish import akka.actor.typed.scaladsl.{ActorContext, Behaviors, TimerScheduler} import akka.actor.typed.{ActorRef, Behavior} import fr.acinq.eclair.NodeParams -import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher import fr.acinq.eclair.blockchain.bitcoind.rpc.BitcoinCoreClient import fr.acinq.eclair.channel.publish.TxPublisher.TxPublishContext import fr.acinq.eclair.channel.publish.TxTimeLocksMonitor.CheckTx @@ -50,12 +49,12 @@ object FinalTxPublisher { case object Stop extends Command // @formatter:on - def apply(nodeParams: NodeParams, bitcoinClient: BitcoinCoreClient, watcher: ActorRef[ZmqWatcher.Command], txPublishContext: TxPublishContext): Behavior[Command] = { + def apply(nodeParams: NodeParams, bitcoinClient: BitcoinCoreClient, txPublishContext: TxPublishContext): Behavior[Command] = { Behaviors.setup { context => Behaviors.withTimers { timers => Behaviors.withMdc(txPublishContext.mdc()) { Behaviors.receiveMessagePartial { - case Publish(replyTo, cmd) => new FinalTxPublisher(nodeParams, replyTo, cmd, bitcoinClient, watcher, context, timers, txPublishContext).checkTimeLocks() + case Publish(replyTo, cmd) => new FinalTxPublisher(nodeParams, replyTo, cmd, bitcoinClient, context, timers, txPublishContext).checkTimeLocks() case Stop => Behaviors.stopped } } @@ -69,7 +68,6 @@ private class FinalTxPublisher(nodeParams: NodeParams, replyTo: ActorRef[TxPublisher.PublishTxResult], cmd: TxPublisher.PublishFinalTx, bitcoinClient: BitcoinCoreClient, - watcher: ActorRef[ZmqWatcher.Command], context: ActorContext[FinalTxPublisher.Command], timers: TimerScheduler[FinalTxPublisher.Command], txPublishContext: TxPublishContext)(implicit ec: ExecutionContext = ExecutionContext.Implicits.global) { @@ -79,7 +77,7 @@ private class FinalTxPublisher(nodeParams: NodeParams, private val log = context.log def checkTimeLocks(): Behavior[Command] = { - val timeLocksChecker = context.spawn(TxTimeLocksMonitor(nodeParams, watcher, txPublishContext), "time-locks-monitor") + val timeLocksChecker = context.spawn(TxTimeLocksMonitor(nodeParams, bitcoinClient, txPublishContext), "time-locks-monitor") timeLocksChecker ! CheckTx(context.messageAdapter[TxTimeLocksMonitor.TimeLocksOk](_ => TimeLocksOk), cmd.tx, cmd.desc) Behaviors.receiveMessagePartial { case TimeLocksOk => checkParentPublished() diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/ReplaceableTxPublisher.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/ReplaceableTxPublisher.scala index b913c9f3f..38385f31c 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/ReplaceableTxPublisher.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/ReplaceableTxPublisher.scala @@ -19,7 +19,6 @@ package fr.acinq.eclair.channel.publish import akka.actor.typed.scaladsl.{ActorContext, Behaviors, TimerScheduler} import akka.actor.typed.{ActorRef, Behavior} import fr.acinq.bitcoin.scalacompat.{SatoshiLong, Transaction} -import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher import fr.acinq.eclair.blockchain.bitcoind.rpc.BitcoinCoreClient import fr.acinq.eclair.blockchain.fee.{ConfirmationTarget, FeeratePerKw, FeeratesPerKw} import fr.acinq.eclair.channel.publish.ReplaceableTxFunder.FundedTx @@ -61,12 +60,12 @@ object ReplaceableTxPublisher { // Timer key to ensure we don't have multiple concurrent timers running. private case object BumpFeeKey - def apply(nodeParams: NodeParams, bitcoinClient: BitcoinCoreClient, watcher: ActorRef[ZmqWatcher.Command], txPublishContext: TxPublishContext): Behavior[Command] = { + def apply(nodeParams: NodeParams, bitcoinClient: BitcoinCoreClient, txPublishContext: TxPublishContext): Behavior[Command] = { Behaviors.setup { context => Behaviors.withTimers { timers => Behaviors.withMdc(txPublishContext.mdc()) { Behaviors.receiveMessagePartial { - case Publish(replyTo, cmd) => new ReplaceableTxPublisher(nodeParams, replyTo, cmd, bitcoinClient, watcher, context, timers, txPublishContext).checkPreconditions() + case Publish(replyTo, cmd) => new ReplaceableTxPublisher(nodeParams, replyTo, cmd, bitcoinClient, context, timers, txPublishContext).checkPreconditions() case Stop => Behaviors.stopped } } @@ -108,7 +107,6 @@ private class ReplaceableTxPublisher(nodeParams: NodeParams, replyTo: ActorRef[TxPublisher.PublishTxResult], cmd: TxPublisher.PublishReplaceableTx, bitcoinClient: BitcoinCoreClient, - watcher: ActorRef[ZmqWatcher.Command], context: ActorContext[ReplaceableTxPublisher.Command], timers: TimerScheduler[ReplaceableTxPublisher.Command], txPublishContext: TxPublishContext)(implicit ec: ExecutionContext = ExecutionContext.Implicits.global) { @@ -141,7 +139,7 @@ private class ReplaceableTxPublisher(nodeParams: NodeParams, // There are no time locks on anchor transactions, we can claim them right away. case _: ClaimLocalAnchorWithWitnessData => chooseFeerate(txWithWitnessData) case _ => - val timeLocksChecker = context.spawn(TxTimeLocksMonitor(nodeParams, watcher, txPublishContext), "time-locks-monitor") + val timeLocksChecker = context.spawn(TxTimeLocksMonitor(nodeParams, bitcoinClient, txPublishContext), "time-locks-monitor") timeLocksChecker ! TxTimeLocksMonitor.CheckTx(context.messageAdapter[TxTimeLocksMonitor.TimeLocksOk](_ => TimeLocksOk), cmd.txInfo.tx, cmd.desc) Behaviors.receiveMessagePartial { case TimeLocksOk => chooseFeerate(txWithWitnessData) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/TxPublisher.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/TxPublisher.scala index 690351982..0255dd9eb 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/TxPublisher.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/TxPublisher.scala @@ -22,7 +22,6 @@ import akka.actor.typed.{ActorRef, Behavior} import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey import fr.acinq.bitcoin.scalacompat.{ByteVector32, OutPoint, Satoshi, Transaction, TxId} import fr.acinq.eclair.blockchain.CurrentBlockHeight -import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher import fr.acinq.eclair.blockchain.bitcoind.rpc.BitcoinCoreClient import fr.acinq.eclair.blockchain.fee.ConfirmationTarget import fr.acinq.eclair.channel.FullCommitment @@ -140,13 +139,13 @@ object TxPublisher { // @formatter:on } - case class SimpleChildFactory(nodeParams: NodeParams, bitcoinClient: BitcoinCoreClient, watcher: ActorRef[ZmqWatcher.Command]) extends ChildFactory { + case class SimpleChildFactory(nodeParams: NodeParams, bitcoinClient: BitcoinCoreClient) extends ChildFactory { // @formatter:off override def spawnFinalTxPublisher(context: ActorContext[TxPublisher.Command], txPublishContext: TxPublishContext): ActorRef[FinalTxPublisher.Command] = { - context.spawn(FinalTxPublisher(nodeParams, bitcoinClient, watcher, txPublishContext), s"final-tx-${txPublishContext.id}") + context.spawn(FinalTxPublisher(nodeParams, bitcoinClient, txPublishContext), s"final-tx-${txPublishContext.id}") } override def spawnReplaceableTxPublisher(context: ActorContext[Command], txPublishContext: TxPublishContext): ActorRef[ReplaceableTxPublisher.Command] = { - context.spawn(ReplaceableTxPublisher(nodeParams, bitcoinClient, watcher, txPublishContext), s"replaceable-tx-${txPublishContext.id}") + context.spawn(ReplaceableTxPublisher(nodeParams, bitcoinClient, txPublishContext), s"replaceable-tx-${txPublishContext.id}") } // @formatter:on } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/TxTimeLocksMonitor.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/TxTimeLocksMonitor.scala index 96976260b..2b4762de5 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/TxTimeLocksMonitor.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/TxTimeLocksMonitor.scala @@ -20,15 +20,14 @@ import akka.actor.typed.eventstream.EventStream import akka.actor.typed.scaladsl.{ActorContext, Behaviors, TimerScheduler} import akka.actor.typed.{ActorRef, Behavior} import fr.acinq.bitcoin.scalacompat.{Transaction, TxId} -import fr.acinq.eclair.blockchain.CurrentBlockHeight -import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher -import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{WatchParentTxConfirmed, WatchParentTxConfirmedTriggered} +import fr.acinq.eclair.blockchain.{CurrentBlockHeight, OnChainChannelFunder} import fr.acinq.eclair.channel.publish.TxPublisher.TxPublishContext import fr.acinq.eclair.transactions.Scripts import fr.acinq.eclair.{BlockHeight, NodeParams} +import scala.concurrent.ExecutionContext import scala.concurrent.duration.DurationLong -import scala.util.Random +import scala.util.{Failure, Random, Success} /** * Created by t-bast on 10/06/2021. @@ -47,15 +46,16 @@ object TxTimeLocksMonitor { case class CheckTx(replyTo: ActorRef[TimeLocksOk], tx: Transaction, desc: String) extends Command final case class WrappedCurrentBlockHeight(currentBlockHeight: BlockHeight) extends Command private case object CheckRelativeTimeLock extends Command - private case class ParentTxConfirmed(parentTxId: TxId) extends Command + private case class ParentTxStatus(parentTxId: TxId, confirmations_opt: Option[Int]) extends Command + private case class GetTxConfirmationsFailed(parentTxId: TxId, reason: Throwable) extends Command // @formatter:on - def apply(nodeParams: NodeParams, watcher: ActorRef[ZmqWatcher.Command], txPublishContext: TxPublishContext): Behavior[Command] = { + def apply(nodeParams: NodeParams, bitcoinClient: OnChainChannelFunder, txPublishContext: TxPublishContext): Behavior[Command] = { Behaviors.setup { context => Behaviors.withTimers { timers => Behaviors.withMdc(txPublishContext.mdc()) { Behaviors.receiveMessagePartial { - case cmd: CheckTx => new TxTimeLocksMonitor(nodeParams, cmd, watcher, context, timers).checkAbsoluteTimeLock() + case cmd: CheckTx => new TxTimeLocksMonitor(nodeParams, cmd, bitcoinClient, context, timers).checkAbsoluteTimeLock() } } } @@ -66,68 +66,103 @@ object TxTimeLocksMonitor { private class TxTimeLocksMonitor(nodeParams: NodeParams, cmd: TxTimeLocksMonitor.CheckTx, - watcher: ActorRef[ZmqWatcher.Command], + bitcoinClient: OnChainChannelFunder, context: ActorContext[TxTimeLocksMonitor.Command], - timers: TimerScheduler[TxTimeLocksMonitor.Command]) { + timers: TimerScheduler[TxTimeLocksMonitor.Command])(implicit ec: ExecutionContext = ExecutionContext.Implicits.global) { import TxTimeLocksMonitor._ private val log = context.log - def checkAbsoluteTimeLock(): Behavior[Command] = { - val blockHeight = nodeParams.currentBlockHeight + private def checkAbsoluteTimeLock(): Behavior[Command] = { val cltvTimeout = Scripts.cltvTimeout(cmd.tx) - if (blockHeight < cltvTimeout) { - log.info("delaying publication of {} until block={} (current block={})", cmd.desc, cltvTimeout, blockHeight) - val messageAdapter = context.messageAdapter[CurrentBlockHeight](cbc => WrappedCurrentBlockHeight(cbc.blockHeight)) - context.system.eventStream ! EventStream.Subscribe(messageAdapter) - Behaviors.receiveMessagePartial { - case WrappedCurrentBlockHeight(currentBlockHeight) => - if (cltvTimeout <= currentBlockHeight) { - context.system.eventStream ! EventStream.Unsubscribe(messageAdapter) - timers.startSingleTimer(CheckRelativeTimeLock, (1 + Random.nextLong(nodeParams.channelConf.maxTxPublishRetryDelay.toMillis)).millis) - Behaviors.same - } else { - Behaviors.same - } - case CheckRelativeTimeLock => checkRelativeTimeLocks() - } + context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[CurrentBlockHeight](cbc => WrappedCurrentBlockHeight(cbc.blockHeight))) + if (nodeParams.currentBlockHeight < cltvTimeout) { + log.info("delaying publication of {} until block={} (current block={})", cmd.desc, cltvTimeout, nodeParams.currentBlockHeight) + waitForAbsoluteTimeLock(cltvTimeout) } else { checkRelativeTimeLocks() } } - def checkRelativeTimeLocks(): Behavior[Command] = { + private def waitForAbsoluteTimeLock(cltvTimeout: BlockHeight): Behavior[Command] = { + Behaviors.receiveMessagePartial { + case WrappedCurrentBlockHeight(currentBlockHeight) if cltvTimeout <= currentBlockHeight => + timers.startSingleTimer(CheckRelativeTimeLock, (1 + Random.nextLong(nodeParams.channelConf.maxTxPublishRetryDelay.toMillis)).millis) + Behaviors.same + case WrappedCurrentBlockHeight(_) => Behaviors.same + case CheckRelativeTimeLock => checkRelativeTimeLocks() + } + } + + private def checkRelativeTimeLocks(): Behavior[Command] = { val csvTimeouts = Scripts.csvTimeouts(cmd.tx) if (csvTimeouts.nonEmpty) { - val watchConfirmedResponseMapper: ActorRef[WatchParentTxConfirmedTriggered] = context.messageAdapter(w => ParentTxConfirmed(w.tx.txid)) - csvTimeouts.foreach { + val parentTxs = csvTimeouts.map { case (parentTxId, csvTimeout) => - log.info("{} has a relative timeout of {} blocks, watching parentTxId={}", cmd.desc, csvTimeout, parentTxId) - watcher ! WatchParentTxConfirmed(watchConfirmedResponseMapper, parentTxId, minDepth = csvTimeout) + log.info("{} has a relative timeout of {} blocks, checking confirmations for parentTxId={}", cmd.desc, csvTimeout, parentTxId) + checkConfirmations(parentTxId) + parentTxId -> RelativeLockStatus(csvTimeout, nodeParams.currentBlockHeight + csvTimeout) } - waitForParentsToConfirm(csvTimeouts.keySet) + waitForRelativeTimeLocks(parentTxs) } else { notifySender() } } - def waitForParentsToConfirm(parentTxIds: Set[TxId]): Behavior[Command] = { + private case class RelativeLockStatus(csvTimeout: Long, checkAfterBlock: BlockHeight) + + private def waitForRelativeTimeLocks(parentTxs: Map[TxId, RelativeLockStatus]): Behavior[Command] = { Behaviors.receiveMessagePartial { - case ParentTxConfirmed(parentTxId) => - log.debug("parent tx of {} has been confirmed (parent txid={})", cmd.desc, parentTxId) - val remainingParentTxIds = parentTxIds - parentTxId - if (remainingParentTxIds.isEmpty) { - log.info("all parent txs of {} have been confirmed", cmd.desc) - notifySender() - } else { - log.debug("some parent txs of {} are still unconfirmed (parent txids={})", cmd.desc, remainingParentTxIds.mkString(",")) - waitForParentsToConfirm(remainingParentTxIds) + case ParentTxStatus(parentTxId, confirmations_opt) => + parentTxs.get(parentTxId) match { + case Some(status) => confirmations_opt match { + case Some(confirmations) if status.csvTimeout <= confirmations => + log.debug("parentTxId={} of {} has reached enough confirmations", parentTxId, cmd.desc) + val remainingParentTxs = parentTxs - parentTxId + if (remainingParentTxs.isEmpty) { + log.info("all parent txs of {} have reached enough confirmations", cmd.desc) + notifySender() + } else { + log.debug("some parent txs of {} don't have enough confirmations yet (parentTxIds={})", cmd.desc, remainingParentTxs.keySet.mkString(",")) + waitForRelativeTimeLocks(remainingParentTxs) + } + case Some(confirmations) => + log.debug("parentTxId={} doesn't have enough confirmations, retrying in {} blocks", parentTxId, status.csvTimeout - confirmations) + val status1 = status.copy(checkAfterBlock = nodeParams.currentBlockHeight + status.csvTimeout - confirmations) + waitForRelativeTimeLocks(parentTxs + (parentTxId -> status1)) + case None => + log.debug("parentTxId={} is unconfirmed, retrying in {} blocks", parentTxId, status.csvTimeout) + val status1 = status.copy(checkAfterBlock = nodeParams.currentBlockHeight + status.csvTimeout) + waitForRelativeTimeLocks(parentTxs + (parentTxId -> status1)) + } + case None => + log.debug("ignoring duplicate parentTxId={}", parentTxId) + Behaviors.same } + case GetTxConfirmationsFailed(parentTxId, reason) => + log.warn("could not get tx confirmations for parentTxId={}, retrying ({})", parentTxId, reason.getMessage) + checkConfirmations(parentTxId) + Behaviors.same + case WrappedCurrentBlockHeight(currentBlockHeight) => + log.debug("received new block (height={})", currentBlockHeight) + parentTxs.collect { + case (parentTxId, status) if status.checkAfterBlock <= currentBlockHeight => + log.debug("checking confirmations for parentTxId={} ({} <= {})", parentTxId, status.checkAfterBlock, currentBlockHeight) + checkConfirmations(parentTxId) + } + Behaviors.same } } - def notifySender(): Behavior[Command] = { + private def checkConfirmations(parentTxId: TxId): Unit = { + context.pipeToSelf(bitcoinClient.getTxConfirmations(parentTxId)) { + case Success(confirmations_opt) => ParentTxStatus(parentTxId, confirmations_opt) + case Failure(reason) => GetTxConfirmationsFailed(parentTxId, reason) + } + } + + private def notifySender(): Behavior[Command] = { log.debug("time locks satisfied for {}", cmd.desc) cmd.replyTo ! TimeLocksOk() Behaviors.stopped diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcherSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcherSpec.scala index 3cc68fb47..d0aaad772 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcherSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcherSpec.scala @@ -227,6 +227,39 @@ class ZmqWatcherSpec extends TestKitBaseClass with AnyFunSuiteLike with Bitcoind }) } + test("watch for confirmed transactions with relative delay") { + withWatcher(f => { + import f._ + + // We simulate a transaction with a 3-blocks CSV delay. + val (priv, address) = createExternalAddress() + val parentTx = sendToAddress(address, 50.millibtc, probe) + val tx = createSpendP2WPKH(parentTx, priv, priv.publicKey, 5_000 sat, 3, 0) + val delay = RelativeDelay(parentTx.txid, 3) + + watcher ! WatchTxConfirmed(probe.ref, tx.txid, 6, Some(delay)) + probe.expectNoMessage(100 millis) + + // We make the parent tx confirm to satisfy the CSV delay and publish the delayed transaction. + generateBlocks(3) + bitcoinClient.publishTransaction(tx).pipeTo(probe.ref) + probe.expectMsg(tx.txid) + probe.expectNoMessage(100 millis) + + // The delayed transaction confirms, but hasn't reached its minimum depth. + generateBlocks(3) + probe.expectNoMessage(100 millis) + + // The delayed transaction reaches its minimum depth. + generateBlocks(3) + assert(probe.expectMsgType[WatchTxConfirmedTriggered].tx.txid == tx.txid) + + // If we watch the transaction when it's already confirmed, we immediately receive the WatchEventConfirmed. + watcher ! WatchTxConfirmed(probe.ref, tx.txid, 3, Some(delay.copy(delay = 720))) + assert(probe.expectMsgType[WatchTxConfirmedTriggered].tx.txid == tx.txid) + }) + } + test("watch for spent transactions") { withWatcher(f => { import f._ diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/publish/FinalTxPublisherSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/publish/FinalTxPublisherSpec.scala index 8d8d59f9b..d25e6d1ff 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/publish/FinalTxPublisherSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/publish/FinalTxPublisherSpec.scala @@ -24,7 +24,6 @@ import fr.acinq.bitcoin.scalacompat.{SatoshiLong, Transaction, TxId} import fr.acinq.eclair.blockchain.CurrentBlockHeight import fr.acinq.eclair.blockchain.WatcherSpec.createSpendP2WPKH import fr.acinq.eclair.blockchain.bitcoind.BitcoindService -import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{WatchParentTxConfirmed, WatchParentTxConfirmedTriggered} import fr.acinq.eclair.blockchain.bitcoind.rpc.BitcoinCoreClient import fr.acinq.eclair.channel.publish.FinalTxPublisher.{Publish, Stop} import fr.acinq.eclair.channel.publish.TxPublisher.TxRejectedReason.ConflictingTxConfirmed @@ -48,14 +47,13 @@ class FinalTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike with Bi stopBitcoind() } - case class Fixture(bitcoinClient: BitcoinCoreClient, publisher: ActorRef[FinalTxPublisher.Command], watcher: TestProbe, probe: TestProbe) + case class Fixture(bitcoinClient: BitcoinCoreClient, publisher: ActorRef[FinalTxPublisher.Command], probe: TestProbe) def createFixture(): Fixture = { val probe = TestProbe() - val watcher = TestProbe() val bitcoinClient = new BitcoinCoreClient(bitcoinrpcclient) - val publisher = system.spawnAnonymous(FinalTxPublisher(TestConstants.Alice.nodeParams, bitcoinClient, watcher.ref, TxPublishContext(UUID.randomUUID(), randomKey().publicKey, None))) - Fixture(bitcoinClient, publisher, watcher, probe) + val publisher = system.spawnAnonymous(FinalTxPublisher(TestConstants.Alice.nodeParams, bitcoinClient, TxPublishContext(UUID.randomUUID(), randomKey().publicKey, None))) + Fixture(bitcoinClient, publisher, probe) } def getMempool(bitcoinClient: BitcoinCoreClient, probe: TestProbe): Seq[Transaction] = { @@ -78,17 +76,13 @@ class FinalTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike with Bi val (priv, address) = createExternalAddress() val parentTx = sendToAddress(address, 125_000 sat, probe) + createBlocks(5, probe) + val tx = createSpendP2WPKH(parentTx, priv, priv.publicKey, 2_500 sat, sequence = 5, lockTime = 0) val cmd = PublishFinalTx(tx, tx.txIn.head.outPoint, "tx-time-locks", 0 sat, None) publisher ! Publish(probe.ref, cmd) - val w = watcher.expectMsgType[WatchParentTxConfirmed] - assert(w.txId == parentTx.txid) - assert(w.minDepth == 5) - createBlocks(5, probe) - w.replyTo ! WatchParentTxConfirmedTriggered(currentBlockHeight(probe), 0, parentTx) - - // Once time locks are satisfied, the transaction should be published: + // Time locks are satisfied, the transaction should be published: waitTxInMempool(bitcoinClient, tx.txid, probe) createBlocks(1, probe) probe.expectNoMessage(100 millis) // we don't notify the sender until min depth has been reached @@ -113,7 +107,6 @@ class FinalTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike with Bi publisher ! Publish(probe.ref, cmd) // Since the parent is not published yet, we can't publish the child tx either: - watcher.expectNoMessage(100 millis) assert(!getMempool(bitcoinClient, probe).map(_.txid).contains(tx.txid)) // Once the parent tx is published, it will unblock publication of the child tx: diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/publish/ReplaceableTxPublisherSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/publish/ReplaceableTxPublisherSpec.scala index 580efbd02..dc44a122d 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/publish/ReplaceableTxPublisherSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/publish/ReplaceableTxPublisherSpec.scala @@ -75,7 +75,7 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w def createPublisher(): ActorRef[ReplaceableTxPublisher.Command] = createPublisher(alice.underlyingActor.nodeParams) def createPublisher(nodeParams: NodeParams): ActorRef[ReplaceableTxPublisher.Command] = { - system.spawnAnonymous(ReplaceableTxPublisher(nodeParams, wallet, alice2blockchain.ref, TxPublishContext(UUID.randomUUID(), randomKey().publicKey, None))) + system.spawnAnonymous(ReplaceableTxPublisher(nodeParams, wallet, TxPublishContext(UUID.randomUUID(), randomKey().publicKey, None))) } def aliceBlockHeight(): BlockHeight = alice.underlyingActor.nodeParams.currentBlockHeight @@ -170,7 +170,7 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w generateBlocks(1) // Execute our test. - val publisher = system.spawn(ReplaceableTxPublisher(aliceNodeParams, walletClient, alice2blockchain.ref, TxPublishContext(testId, TestConstants.Bob.nodeParams.nodeId, None)), testId.toString) + val publisher = system.spawn(ReplaceableTxPublisher(aliceNodeParams, walletClient, TxPublishContext(testId, TestConstants.Bob.nodeParams.nodeId, None)), testId.toString) val f = Fixture(alice, bob, alice2bob, bob2alice, alice2blockchain, bob2blockchain, walletClient, walletRpcClient, publisher, probe) // We set a high fastest feerate, to ensure that by default we're not limited by this. f.setFeerate(FeeratePerKw(100_000 sat), blockTarget = 1) @@ -1035,12 +1035,10 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w withFixture(Seq(10.5 millibtc), ChannelTypes.AnchorOutputsZeroFeeHtlcTx()) { f => import f._ - val (commitTx, htlcSuccess, _) = closeChannelWithHtlcs(f, aliceBlockHeight()) + val (_, htlcSuccess, _) = closeChannelWithHtlcs(f, aliceBlockHeight()) val htlcSuccessPublisher = createPublisher() setFeerate(FeeratePerKw(75_000 sat), blockTarget = 1) htlcSuccessPublisher ! Publish(probe.ref, htlcSuccess) - val w = alice2blockchain.expectMsgType[WatchParentTxConfirmed] - w.replyTo ! WatchParentTxConfirmedTriggered(currentBlockHeight(probe), 0, commitTx) val result = probe.expectMsgType[TxRejected] assert(result.cmd == htlcSuccess) @@ -1054,8 +1052,6 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w val htlcSuccessPublisher = createPublisher() htlcSuccessPublisher ! Publish(probe.ref, htlcSuccess) - val w = alice2blockchain.expectMsgType[WatchParentTxConfirmed] - w.replyTo ! WatchParentTxConfirmedTriggered(currentBlockHeight(probe), 0, commitTx) val htlcSuccessTx = getMempoolTxs(1).head val htlcSuccessTargetFee = Transactions.weight2fee(targetFeerate, htlcSuccessTx.weight.toInt) assert(htlcSuccessTargetFee * 0.9 <= htlcSuccessTx.fees && htlcSuccessTx.fees <= htlcSuccessTargetFee * 1.2, s"actualFee=${htlcSuccessTx.fees} targetFee=$htlcSuccessTargetFee") @@ -1084,8 +1080,6 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w generateBlocks(144) system.eventStream.publish(CurrentBlockHeight(currentBlockHeight(probe))) setFeerate(targetFeerate) // the feerate is higher than what it was when the channel force-closed - val w = alice2blockchain.expectMsgType[WatchParentTxConfirmed] - w.replyTo ! WatchParentTxConfirmedTriggered(currentBlockHeight(probe), 0, commitTx) val htlcTimeoutTx = getMempoolTxs(1).head val htlcTimeoutTargetFee = Transactions.weight2fee(targetFeerate, htlcTimeoutTx.weight.toInt) assert(htlcTimeoutTargetFee * 0.9 <= htlcTimeoutTx.fees && htlcTimeoutTx.fees <= htlcTimeoutTargetFee * 1.2, s"actualFee=${htlcTimeoutTx.fees} targetFee=$htlcTimeoutTargetFee") @@ -1222,15 +1216,13 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w val initialFeerate = FeeratePerKw(15_000 sat) setFeerate(initialFeerate) - val (commitTx, htlcSuccess, _) = closeChannelWithHtlcs(f, aliceBlockHeight() + 30) + val (_, htlcSuccess, _) = closeChannelWithHtlcs(f, aliceBlockHeight() + 30) val listener = TestProbe() system.eventStream.subscribe(listener.ref, classOf[TransactionPublished]) val htlcSuccessPublisher = createPublisher() htlcSuccessPublisher ! Publish(probe.ref, htlcSuccess) - val w = alice2blockchain.expectMsgType[WatchParentTxConfirmed] - w.replyTo ! WatchParentTxConfirmedTriggered(aliceBlockHeight(), 0, commitTx) val htlcSuccessTxId1 = listener.expectMsgType[TransactionPublished].tx.txid val htlcSuccessTx1 = getMempoolTxs(1).head val htlcSuccessInputs1 = getMempool().head.txIn.map(_.outPoint).toSet @@ -1258,15 +1250,13 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w val initialFeerate = FeeratePerKw(3_000 sat) setFeerate(initialFeerate) - val (commitTx, htlcSuccess, _) = closeChannelWithHtlcs(f, aliceBlockHeight() + 15) + val (_, htlcSuccess, _) = closeChannelWithHtlcs(f, aliceBlockHeight() + 15) val listener = TestProbe() system.eventStream.subscribe(listener.ref, classOf[TransactionPublished]) val htlcSuccessPublisher = createPublisher() htlcSuccessPublisher ! Publish(probe.ref, htlcSuccess) - val w = alice2blockchain.expectMsgType[WatchParentTxConfirmed] - w.replyTo ! WatchParentTxConfirmedTriggered(aliceBlockHeight(), 0, commitTx) val htlcSuccessTxId1 = listener.expectMsgType[TransactionPublished].tx.txid val htlcSuccessTx1 = getMempoolTxs(1).head val htlcSuccessInputs1 = getMempool().head.txIn.map(_.outPoint).toSet @@ -1294,15 +1284,13 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w val initialFeerate = FeeratePerKw(10_000 sat) setFeerate(initialFeerate) - val (commitTx, htlcSuccess, _) = closeChannelWithHtlcs(f, aliceBlockHeight() + 6) + val (_, htlcSuccess, _) = closeChannelWithHtlcs(f, aliceBlockHeight() + 6) val listener = TestProbe() system.eventStream.subscribe(listener.ref, classOf[TransactionPublished]) val htlcSuccessPublisher = createPublisher() htlcSuccessPublisher ! Publish(probe.ref, htlcSuccess) - val w = alice2blockchain.expectMsgType[WatchParentTxConfirmed] - w.replyTo ! WatchParentTxConfirmedTriggered(aliceBlockHeight(), 0, commitTx) val htlcSuccessTxId = listener.expectMsgType[TransactionPublished].tx.txid var htlcSuccessTx = getMempoolTxs(1).head assert(htlcSuccessTx.txid == htlcSuccessTxId) @@ -1327,7 +1315,7 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w val feerate = FeeratePerKw(15_000 sat) setFeerate(feerate, fastest = feerate) // The confirmation target for htlc-timeout corresponds to their CLTV: we should claim them asap once the htlc has timed out. - val (commitTx, _, htlcTimeout) = closeChannelWithHtlcs(f, aliceBlockHeight() + 144) + val (_, _, htlcTimeout) = closeChannelWithHtlcs(f, aliceBlockHeight() + 144) val listener = TestProbe() system.eventStream.subscribe(listener.ref, classOf[TransactionPublished]) @@ -1336,8 +1324,6 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w htlcTimeoutPublisher ! Publish(probe.ref, htlcTimeout) generateBlocks(144) system.eventStream.publish(CurrentBlockHeight(currentBlockHeight(probe))) - val w = alice2blockchain.expectMsgType[WatchParentTxConfirmed] - w.replyTo ! WatchParentTxConfirmedTriggered(currentBlockHeight(probe), 0, commitTx) val htlcTimeoutTxId1 = listener.expectMsgType[TransactionPublished].tx.txid val htlcTimeoutTx1 = getMempoolTxs(1).head val htlcTimeoutInputs1 = getMempool().head.txIn.map(_.outPoint).toSet @@ -1363,7 +1349,7 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w withFixture(Seq(15 millibtc, 10 millibtc, 5 millibtc), ChannelTypes.AnchorOutputsZeroFeeHtlcTx()) { f => import f._ - val (commitTx, htlcSuccess, _) = closeChannelWithHtlcs(f, aliceBlockHeight() + 144) + val (_, htlcSuccess, _) = closeChannelWithHtlcs(f, aliceBlockHeight() + 144) // The HTLC confirmation target is far away, but we have less safe utxos than the configured threshold. // We will target a 1-block confirmation to get a safe utxo back as soon as possible. val highSafeThresholdParams = alice.underlyingActor.nodeParams.modify(_.onChainFeeConf.safeUtxosThreshold).setTo(10) @@ -1373,8 +1359,6 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w val htlcSuccessPublisher = createPublisher(highSafeThresholdParams) htlcSuccessPublisher ! Publish(probe.ref, htlcSuccess) - val w = alice2blockchain.expectMsgType[WatchParentTxConfirmed] - w.replyTo ! WatchParentTxConfirmedTriggered(currentBlockHeight(probe), 0, commitTx) val htlcSuccessTx = getMempoolTxs(1).head val htlcSuccessTargetFee = Transactions.weight2fee(targetFeerate, htlcSuccessTx.weight.toInt) assert(htlcSuccessTargetFee * 0.9 <= htlcSuccessTx.fees && htlcSuccessTx.fees <= htlcSuccessTargetFee * 1.1, s"actualFee=${htlcSuccessTx.fees} targetFee=$htlcSuccessTargetFee") @@ -1390,16 +1374,12 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w val (commitTx, htlcSuccess, _) = closeChannelWithHtlcs(f, aliceBlockHeight() + 18) val publisher1 = createPublisher() publisher1 ! Publish(probe.ref, htlcSuccess) - val w1 = alice2blockchain.expectMsgType[WatchParentTxConfirmed] - w1.replyTo ! WatchParentTxConfirmedTriggered(currentBlockHeight(probe), 0, commitTx) getMempoolTxs(1) // we try to publish the htlc-success again (can be caused by a node restart): it will fail to replace the existing // one in the mempool but we must ensure we don't leave some utxos locked. val publisher2 = createPublisher() publisher2 ! Publish(probe.ref, htlcSuccess) - val w2 = alice2blockchain.expectMsgType[WatchParentTxConfirmed] - w2.replyTo ! WatchParentTxConfirmedTriggered(currentBlockHeight(probe), 0, commitTx) val result = probe.expectMsgType[TxRejected] assert(result.reason == ConflictingTxUnconfirmed) getMempoolTxs(1) // the previous htlc-success tx is still in the mempool @@ -1424,10 +1404,8 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w import f._ setFeerate(FeeratePerKw(5_000 sat)) - val (commitTx, htlcSuccess, _) = closeChannelWithHtlcs(f, aliceBlockHeight() + 48) + val (_, htlcSuccess, _) = closeChannelWithHtlcs(f, aliceBlockHeight() + 48) publisher ! Publish(probe.ref, htlcSuccess) - val w = alice2blockchain.expectMsgType[WatchParentTxConfirmed] - w.replyTo ! WatchParentTxConfirmedTriggered(currentBlockHeight(probe), 0, commitTx) getMempoolTxs(1) // We unlock utxos before stopping. @@ -1545,8 +1523,6 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w val claimHtlcSuccessPublisher = createPublisher() claimHtlcSuccessPublisher ! Publish(probe.ref, claimHtlcSuccess) - val w = alice2blockchain.expectMsgType[WatchParentTxConfirmed] - w.replyTo ! WatchParentTxConfirmedTriggered(currentBlockHeight(probe), 0, remoteCommitTx) val claimHtlcSuccessTx = getMempoolTxs(1).head val claimHtlcSuccessTargetFee = Transactions.weight2fee(targetFeerate, claimHtlcSuccessTx.weight.toInt) assert(claimHtlcSuccessTargetFee * 0.9 <= claimHtlcSuccessTx.fees && claimHtlcSuccessTx.fees <= claimHtlcSuccessTargetFee * 1.1, s"actualFee=${claimHtlcSuccessTx.fees} targetFee=$claimHtlcSuccessTargetFee") @@ -1574,8 +1550,6 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w generateBlocks(144) system.eventStream.publish(CurrentBlockHeight(currentBlockHeight(probe))) setFeerate(targetFeerate) // the feerate is higher than what it was when the channel force-closed - val w = alice2blockchain.expectMsgType[WatchParentTxConfirmed] - w.replyTo ! WatchParentTxConfirmedTriggered(currentBlockHeight(probe), 0, remoteCommitTx) val claimHtlcTimeoutTx = getMempoolTxs(1).head val claimHtlcTimeoutTargetFee = Transactions.weight2fee(targetFeerate, claimHtlcTimeoutTx.weight.toInt) assert(claimHtlcTimeoutTargetFee * 0.9 <= claimHtlcTimeoutTx.fees && claimHtlcTimeoutTx.fees <= claimHtlcTimeoutTargetFee * 1.1, s"actualFee=${claimHtlcTimeoutTx.fees} targetFee=$claimHtlcTimeoutTargetFee") @@ -1683,13 +1657,11 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w withFixture(Seq(11 millibtc), ChannelTypes.AnchorOutputs()) { f => import f._ - val (remoteCommitTx, claimHtlcSuccess, claimHtlcTimeout) = remoteCloseChannelWithHtlcs(f, aliceBlockHeight() + 300, nextCommit = false) + val (_, claimHtlcSuccess, claimHtlcTimeout) = remoteCloseChannelWithHtlcs(f, aliceBlockHeight() + 300, nextCommit = false) setFeerate(FeeratePerKw(50_000 sat)) val claimHtlcSuccessPublisher = createPublisher() claimHtlcSuccessPublisher ! Publish(probe.ref, claimHtlcSuccess) - val w1 = alice2blockchain.expectMsgType[WatchParentTxConfirmed] - w1.replyTo ! WatchParentTxConfirmedTriggered(currentBlockHeight(probe), 0, remoteCommitTx) val result1 = probe.expectMsgType[TxRejected] assert(result1.cmd == claimHtlcSuccess) assert(result1.reason == TxSkipped(retryNextBlock = true)) @@ -1699,8 +1671,6 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w claimHtlcTimeoutPublisher ! Publish(probe.ref, claimHtlcTimeout) generateBlocks(144) system.eventStream.publish(CurrentBlockHeight(currentBlockHeight(probe))) - val w2 = alice2blockchain.expectMsgType[WatchParentTxConfirmed] - w2.replyTo ! WatchParentTxConfirmedTriggered(currentBlockHeight(probe), 0, remoteCommitTx) val result2 = probe.expectMsgType[TxRejected] assert(result2.cmd == claimHtlcTimeout) assert(result2.reason == TxSkipped(retryNextBlock = true)) @@ -1771,7 +1741,7 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w withFixture(Seq(11 millibtc), ChannelTypes.AnchorOutputs()) { f => import f._ - val (remoteCommitTx, claimHtlcSuccess, _) = remoteCloseChannelWithHtlcs(f, aliceBlockHeight() + 300, nextCommit = false) + val (_, claimHtlcSuccess, _) = remoteCloseChannelWithHtlcs(f, aliceBlockHeight() + 300, nextCommit = false) val listener = TestProbe() system.eventStream.subscribe(listener.ref, classOf[TransactionPublished]) @@ -1779,8 +1749,6 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w setFeerate(FeeratePerKw(5_000 sat)) val claimHtlcSuccessPublisher = createPublisher() claimHtlcSuccessPublisher ! Publish(probe.ref, claimHtlcSuccess) - val w1 = alice2blockchain.expectMsgType[WatchParentTxConfirmed] - w1.replyTo ! WatchParentTxConfirmedTriggered(currentBlockHeight(probe), 0, remoteCommitTx) val claimHtlcSuccessTx = getMempoolTxs(1).head assert(listener.expectMsgType[TransactionPublished].tx.txid == claimHtlcSuccessTx.txid) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/publish/TxTimeLocksMonitorSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/publish/TxTimeLocksMonitorSpec.scala index c71f57d49..bebd58cad 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/publish/TxTimeLocksMonitorSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/publish/TxTimeLocksMonitorSpec.scala @@ -19,28 +19,47 @@ package fr.acinq.eclair.channel.publish import akka.actor.typed.ActorRef import akka.actor.typed.scaladsl.adapter.{ClassicActorSystemOps, actorRefAdapter} import akka.testkit.TestProbe -import fr.acinq.bitcoin.scalacompat.{OutPoint, SatoshiLong, Script, Transaction, TxIn, TxOut} -import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{WatchParentTxConfirmed, WatchParentTxConfirmedTriggered} +import fr.acinq.bitcoin.scalacompat.{OutPoint, SatoshiLong, Script, Transaction, TxId, TxIn, TxOut} +import fr.acinq.eclair.blockchain.NoOpOnChainWallet import fr.acinq.eclair.channel.publish.TxPublisher.TxPublishContext import fr.acinq.eclair.channel.publish.TxTimeLocksMonitor.{CheckTx, TimeLocksOk, WrappedCurrentBlockHeight} -import fr.acinq.eclair.{BlockHeight, NodeParams, TestConstants, TestKitBaseClass, randomKey} +import fr.acinq.eclair.{NodeParams, TestConstants, TestKitBaseClass, randomKey} import org.scalatest.Outcome import org.scalatest.funsuite.FixtureAnyFunSuiteLike import java.util.UUID import scala.concurrent.duration.DurationInt +import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.util.Success class TxTimeLocksMonitorSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike { - case class FixtureParam(nodeParams: NodeParams, monitor: ActorRef[TxTimeLocksMonitor.Command], watcher: TestProbe, probe: TestProbe) + case class FixtureParam(nodeParams: NodeParams, monitor: ActorRef[TxTimeLocksMonitor.Command], bitcoinClient: BitcoinTestClient, probe: TestProbe) + + case class BitcoinTestClient() extends NoOpOnChainWallet { + private val requests = collection.concurrent.TrieMap.empty[TxId, Promise[Option[Int]]] + + override def getTxConfirmations(txId: TxId)(implicit ec: ExecutionContext): Future[Option[Int]] = { + val p = Promise[Option[Int]]() + requests += (txId -> p) + p.future + } + + def hasRequest(txId: TxId): Boolean = requests.contains(txId) + + def completeRequest(txId: TxId, confirmations_opt: Option[Int]): Unit = { + requests.get(txId).map(_.complete(Success(confirmations_opt))) + requests -= txId + } + } override def withFixture(test: OneArgTest): Outcome = { within(max = 30 seconds) { val nodeParams = TestConstants.Alice.nodeParams val probe = TestProbe() - val watcher = TestProbe() - val monitor = system.spawnAnonymous(TxTimeLocksMonitor(nodeParams, watcher.ref, TxPublishContext(UUID.randomUUID(), randomKey().publicKey, None))) - withFixture(test.toNoArgTest(FixtureParam(nodeParams, monitor, watcher, probe))) + val bitcoinClient = BitcoinTestClient() + val monitor = system.spawnAnonymous(TxTimeLocksMonitor(nodeParams, bitcoinClient, TxPublishContext(UUID.randomUUID(), randomKey().publicKey, None))) + withFixture(test.toNoArgTest(FixtureParam(nodeParams, monitor, bitcoinClient, probe))) } } @@ -65,12 +84,26 @@ class TxTimeLocksMonitorSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik val tx = Transaction(2, TxIn(OutPoint(parentTx, 0), Nil, 3) :: Nil, TxOut(25_000 sat, Script.pay2wpkh(randomKey().publicKey)) :: Nil, 0) monitor ! CheckTx(probe.ref, tx, "relative-delay") - val w = watcher.expectMsgType[WatchParentTxConfirmed] - assert(w.txId == parentTx.txid) - assert(w.minDepth == 3) + // The parent transaction is unconfirmed: we will check again 3 blocks later (the value of the CSV timeout). + awaitCond(bitcoinClient.hasRequest(parentTx.txid), interval = 100 millis) + bitcoinClient.completeRequest(parentTx.txid, None) probe.expectNoMessage(100 millis) - w.replyTo ! WatchParentTxConfirmedTriggered(BlockHeight(651), 0, parentTx) + monitor ! WrappedCurrentBlockHeight(nodeParams.currentBlockHeight + 1) + assert(!bitcoinClient.hasRequest(parentTx.txid)) + monitor ! WrappedCurrentBlockHeight(nodeParams.currentBlockHeight + 2) + assert(!bitcoinClient.hasRequest(parentTx.txid)) + monitor ! WrappedCurrentBlockHeight(nodeParams.currentBlockHeight + 3) + awaitCond(bitcoinClient.hasRequest(parentTx.txid), interval = 100 millis) + // This time the parent transaction has 1 confirmation: we will check again in two more blocks. + bitcoinClient.completeRequest(parentTx.txid, Some(1)) + monitor ! WrappedCurrentBlockHeight(nodeParams.currentBlockHeight + 1) + probe.expectNoMessage(100 millis) + assert(!bitcoinClient.hasRequest(parentTx.txid)) + monitor ! WrappedCurrentBlockHeight(nodeParams.currentBlockHeight + 2) + awaitCond(bitcoinClient.hasRequest(parentTx.txid), interval = 100 millis) + bitcoinClient.completeRequest(parentTx.txid, Some(3)) + probe.expectMsg(TimeLocksOk()) } @@ -81,23 +114,33 @@ class TxTimeLocksMonitorSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik val parentTx2 = Transaction(2, Nil, TxOut(45_000 sat, Script.pay2wpkh(randomKey().publicKey)) :: Nil, 0) val tx = Transaction( 2, - TxIn(OutPoint(parentTx1, 0), Nil, 3) :: TxIn(OutPoint(parentTx1, 1), Nil, 1) :: TxIn(OutPoint(parentTx2, 0), Nil, 1) :: Nil, + TxIn(OutPoint(parentTx1, 0), Nil, 3) :: TxIn(OutPoint(parentTx1, 1), Nil, 1) :: TxIn(OutPoint(parentTx2, 0), Nil, 2) :: Nil, TxOut(50_000 sat, Script.pay2wpkh(randomKey().publicKey)) :: Nil, 0 ) monitor ! CheckTx(probe.ref, tx, "many-relative-delays") - // We send a single watch for parentTx1, with the max of the two delays. - val w1 = watcher.expectMsgType[WatchParentTxConfirmed] - val w2 = watcher.expectMsgType[WatchParentTxConfirmed] - watcher.expectNoMessage(100 millis) - assert(Seq(w1, w2).map(w => (w.txId, w.minDepth)).toSet == Set((parentTx1.txid, 3), (parentTx2.txid, 1))) + // The first parent transaction is unconfirmed. + awaitCond(bitcoinClient.hasRequest(parentTx1.txid), interval = 100 millis) + bitcoinClient.completeRequest(parentTx1.txid, None) + // The second parent transaction is confirmed, but is still missing one confirmation. + awaitCond(bitcoinClient.hasRequest(parentTx2.txid), interval = 100 millis) + bitcoinClient.completeRequest(parentTx2.txid, Some(1)) probe.expectNoMessage(100 millis) - w1.replyTo ! WatchParentTxConfirmedTriggered(BlockHeight(651), 0, parentTx1) + // A new block is found: the second parent transaction has enough confirmations. + monitor ! WrappedCurrentBlockHeight(nodeParams.currentBlockHeight + 1) + awaitCond(bitcoinClient.hasRequest(parentTx2.txid), interval = 100 millis) + assert(!bitcoinClient.hasRequest(parentTx1.txid)) + bitcoinClient.completeRequest(parentTx2.txid, Some(2)) probe.expectNoMessage(100 millis) - w2.replyTo ! WatchParentTxConfirmedTriggered(BlockHeight(1105), 0, parentTx2) + // Two more blocks are found: the first parent transaction now has enough confirmations. + monitor ! WrappedCurrentBlockHeight(nodeParams.currentBlockHeight + 3) + awaitCond(bitcoinClient.hasRequest(parentTx1.txid), interval = 100 millis) + assert(!bitcoinClient.hasRequest(parentTx2.txid)) + bitcoinClient.completeRequest(parentTx1.txid, Some(3)) + probe.expectMsg(TimeLocksOk()) } @@ -114,19 +157,18 @@ class TxTimeLocksMonitorSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik ) monitor ! CheckTx(probe.ref, tx, "absolute-and-relative-delays") - // We set watches on parent txs only once the absolute delay is over. - watcher.expectNoMessage(100 millis) + // We watch parent txs only once the absolute delay is over. + monitor ! WrappedCurrentBlockHeight(nodeParams.currentBlockHeight + 2) + assert(!bitcoinClient.hasRequest(parentTx1.txid)) + assert(!bitcoinClient.hasRequest(parentTx2.txid)) + + // When the absolute delay is over, we check parent transactions. monitor ! WrappedCurrentBlockHeight(nodeParams.currentBlockHeight + 3) - val w1 = watcher.expectMsgType[WatchParentTxConfirmed] - val w2 = watcher.expectMsgType[WatchParentTxConfirmed] - watcher.expectNoMessage(100 millis) - assert(Seq(w1, w2).map(w => (w.txId, w.minDepth)).toSet == Set((parentTx1.txid, 3), (parentTx2.txid, 6))) + awaitCond(bitcoinClient.hasRequest(parentTx1.txid), interval = 100 millis) + bitcoinClient.completeRequest(parentTx1.txid, Some(5)) probe.expectNoMessage(100 millis) - - w1.replyTo ! WatchParentTxConfirmedTriggered(BlockHeight(651), 0, parentTx1) - probe.expectNoMessage(100 millis) - - w2.replyTo ! WatchParentTxConfirmedTriggered(BlockHeight(1105), 0, parentTx2) + awaitCond(bitcoinClient.hasRequest(parentTx2.txid), interval = 100 millis) + bitcoinClient.completeRequest(parentTx2.txid, Some(10)) probe.expectMsg(TimeLocksOk()) } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/ChannelStateTestsHelperMethods.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/ChannelStateTestsHelperMethods.scala index ebc97c182..da2032020 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/ChannelStateTestsHelperMethods.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/ChannelStateTestsHelperMethods.scala @@ -564,7 +564,11 @@ trait ChannelStateTestsBase extends Assertions with Eventually { // we watch the confirmation of the "final" transactions that send funds to our wallets (main delayed output and 2nd stage htlc transactions) assert(s2blockchain.expectMsgType[WatchTxConfirmed].txId == commitTx.txid) - localCommitPublished.claimMainDelayedOutputTx.foreach(claimMain => assert(s2blockchain.expectMsgType[WatchTxConfirmed].txId == claimMain.tx.txid)) + localCommitPublished.claimMainDelayedOutputTx.foreach(claimMain => { + val watchConfirmed = s2blockchain.expectMsgType[WatchTxConfirmed] + assert(watchConfirmed.txId == claimMain.tx.txid) + assert(watchConfirmed.delay_opt.map(_.parentTxId).contains(publishedLocalCommitTx.txid)) + }) // we watch outputs of the commitment tx that both parties may spend and anchor outputs val watchedOutputIndexes = localCommitPublished.htlcTxs.keySet.map(_.index) ++ localCommitPublished.claimAnchorTxs.collect { case tx: ClaimLocalAnchorOutputTx => tx.input.outPoint.index } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/h/ClosingStateSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/h/ClosingStateSpec.scala index 4b2e0a4ad..8703d33c4 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/h/ClosingStateSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/h/ClosingStateSpec.scala @@ -21,7 +21,6 @@ import akka.testkit.{TestFSMRef, TestProbe} import fr.acinq.bitcoin.ScriptFlags import fr.acinq.bitcoin.scalacompat.Crypto.PrivateKey import fr.acinq.bitcoin.scalacompat.{ByteVector32, ByteVector64, Crypto, OutPoint, SatoshiLong, Script, Transaction, TxIn, TxOut} -import fr.acinq.eclair.Features.StaticRemoteKey import fr.acinq.eclair.TestUtils.randomTxId import fr.acinq.eclair.blockchain.DummyOnChainWallet import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher._ @@ -37,6 +36,7 @@ import fr.acinq.eclair.transactions.Transactions._ import fr.acinq.eclair.transactions.{Scripts, Transactions} import fr.acinq.eclair.wire.protocol._ import fr.acinq.eclair.{BlockHeight, CltvExpiry, CltvExpiryDelta, Features, MilliSatoshiLong, TestConstants, TestKitBaseClass, TimestampSecond, randomBytes32, randomKey} +import org.scalatest.Inside.inside import org.scalatest.funsuite.FixtureAnyFunSuiteLike import org.scalatest.{Outcome, Tag} import scodec.bits.ByteVector @@ -710,14 +710,12 @@ class ClosingStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with // Alice receives the preimage for the incoming HTLC. alice ! CMD_FULFILL_HTLC(incomingHtlc.id, preimage, commit = true) - assert(alice2blockchain.expectMsgType[PublishReplaceableTx].txInfo.isInstanceOf[ClaimLocalAnchorOutputTx]) assert(alice2blockchain.expectMsgType[PublishFinalTx].tx.txid == claimMainTx.txid) assert(alice2blockchain.expectMsgType[PublishReplaceableTx].txInfo.isInstanceOf[HtlcTimeoutTx]) assert(alice2blockchain.expectMsgType[PublishReplaceableTx].txInfo.isInstanceOf[HtlcSuccessTx]) assert(alice2blockchain.expectMsgType[WatchTxConfirmed].txId == claimMainTx.txid) alice2blockchain.expectMsgType[WatchOutputSpent] alice2blockchain.expectMsgType[WatchOutputSpent] - alice2blockchain.expectMsgType[WatchOutputSpent] alice2blockchain.expectNoMessage(100 millis) val closingState2 = alice.stateData.asInstanceOf[DATA_CLOSING].localCommitPublished.get assert(getHtlcSuccessTxs(closingState2).length == 1) @@ -726,11 +724,17 @@ class ClosingStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with // The HTLC txs confirms, so we publish 3rd-stage txs. alice ! WatchTxConfirmedTriggered(BlockHeight(201), 0, htlcTimeoutTx) val claimHtlcTimeoutDelayedTx = alice2blockchain.expectMsgType[PublishFinalTx].tx - assert(alice2blockchain.expectMsgType[WatchTxConfirmed].txId == claimHtlcTimeoutDelayedTx.txid) + inside(alice2blockchain.expectMsgType[WatchTxConfirmed]) { w => + assert(w.txId == claimHtlcTimeoutDelayedTx.txid) + assert(w.delay_opt.map(_.parentTxId).contains(htlcTimeoutTx.txid)) + } Transaction.correctlySpends(claimHtlcTimeoutDelayedTx, Seq(htlcTimeoutTx), ScriptFlags.STANDARD_SCRIPT_VERIFY_FLAGS) alice ! WatchTxConfirmedTriggered(BlockHeight(201), 0, htlcSuccessTx) val claimHtlcSuccessDelayedTx = alice2blockchain.expectMsgType[PublishFinalTx].tx - assert(alice2blockchain.expectMsgType[WatchTxConfirmed].txId == claimHtlcSuccessDelayedTx.txid) + inside(alice2blockchain.expectMsgType[WatchTxConfirmed]) { w => + assert(w.txId == claimHtlcSuccessDelayedTx.txid) + assert(w.delay_opt.map(_.parentTxId).contains(htlcSuccessTx.txid)) + } Transaction.correctlySpends(claimHtlcSuccessDelayedTx, Seq(htlcSuccessTx), ScriptFlags.STANDARD_SCRIPT_VERIFY_FLAGS) // We simulate a node restart after a feerate increase. @@ -742,14 +746,12 @@ class ClosingStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with awaitCond(alice.stateName == CLOSING) // We re-publish closing transactions. - assert(alice2blockchain.expectMsgType[PublishReplaceableTx].txInfo.isInstanceOf[ClaimLocalAnchorOutputTx]) assert(alice2blockchain.expectMsgType[PublishFinalTx].tx.txid == claimMainTx.txid) assert(alice2blockchain.expectMsgType[PublishFinalTx].tx.txid == claimHtlcTimeoutDelayedTx.txid) assert(alice2blockchain.expectMsgType[PublishFinalTx].tx.txid == claimHtlcSuccessDelayedTx.txid) assert(alice2blockchain.expectMsgType[WatchTxConfirmed].txId == claimMainTx.txid) assert(alice2blockchain.expectMsgType[WatchTxConfirmed].txId == claimHtlcTimeoutDelayedTx.txid) assert(alice2blockchain.expectMsgType[WatchTxConfirmed].txId == claimHtlcSuccessDelayedTx.txid) - alice2blockchain.expectMsgType[WatchOutputSpent] // We replay the HTLC fulfillment: nothing happens since we already published a 3rd-stage transaction. alice ! CMD_FULFILL_HTLC(incomingHtlc.id, preimage, commit = true) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/integration/basic/fixtures/MinimalNodeFixture.scala b/eclair-core/src/test/scala/fr/acinq/eclair/integration/basic/fixtures/MinimalNodeFixture.scala index bbb153a27..9d8bd4854 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/integration/basic/fixtures/MinimalNodeFixture.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/integration/basic/fixtures/MinimalNodeFixture.scala @@ -96,7 +96,7 @@ object MinimalNodeFixture extends Assertions with Eventually with IntegrationPat val offerManager = system.spawn(OfferManager(nodeParams, router, 1 minute), "offer-manager") val paymentHandler = system.actorOf(PaymentHandler.props(nodeParams, register, offerManager), "payment-handler") val relayer = system.actorOf(Relayer.props(nodeParams, router, register, paymentHandler), "relayer") - val txPublisherFactory = Channel.SimpleTxPublisherFactory(nodeParams, watcherTyped, bitcoinClient) + val txPublisherFactory = Channel.SimpleTxPublisherFactory(nodeParams, bitcoinClient) val channelFactory = Peer.SimpleChannelFactory(nodeParams, watcherTyped, relayer, wallet, txPublisherFactory) val pendingChannelsRateLimiter = system.spawnAnonymous(Behaviors.supervise(PendingChannelsRateLimiter(nodeParams, router.toTyped, Seq())).onFailure(typed.SupervisorStrategy.resume)) val peerFactory = Switchboard.SimplePeerFactory(nodeParams, wallet, channelFactory, pendingChannelsRateLimiter, register, router.toTyped)