1
0
mirror of https://github.com/ACINQ/eclair.git synced 2024-11-19 01:43:22 +01:00

Reduce the number of RPC calls to bitcoind during force-close (#2902)

* Don't spawn anchor tx publisher if commit is confirmed

It is inefficient to spawn a tx publisher for anchor txs if we already
know that the commit tx is confirmed: we will make calls to our bitcoin
node that can easily be avoided. This can matter when force-closing a
large number of channels with frequent disconnections (e.g. wallets).

* Improve `TxTimeLocksMonitor` performance

When publishing a transaction that has CSV delays, we previously used
the watcher and set a `minDepth` on the parent transaction matching
the CSV delay of the child transaction. While this was very simple,
it was unnecessarily expensive for large CSV delays: the watcher would
check for tx confirmations at every block, even when the CSV delay is
very large. When we force-close a large number of channels, it results
in a very large number of RPC calls to our `bitcoind` node.

We don't use the watcher in the `TxTimeLocksMonitor` anymore: instead
we check the parent confirmations once, and then we check again after
the CSV delay.

* Add relative delay hints to `ZmqWatcher`

When we tell the `ZmqWatcher` to watch for confirmations on transactions
that have a relative delay, it is highly inefficient to call our bitcoin
node at every new block to check for confirmations (especially when the
parent transaction isn't even confirmed). We now tell the watcher about
the relative delay, which lets it check for confirmations only at block
heights where we expect the transaction to reach its minimum depth. This
is especially useful to improve performance for delayed transactions
that usually use a CSV of at least 720 blocks.
This commit is contained in:
Bastien Teinturier 2024-09-05 14:48:01 +02:00 committed by GitHub
parent fcd88b0a0a
commit 8370fa29c0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 314 additions and 194 deletions

View File

@ -367,7 +367,7 @@ class Setup(val datadir: File,
// we want to make sure the handler for post-restart broken HTLCs has finished initializing.
_ <- postRestartCleanUpInitialized.future
txPublisherFactory = Channel.SimpleTxPublisherFactory(nodeParams, watcher, bitcoinClient)
txPublisherFactory = Channel.SimpleTxPublisherFactory(nodeParams, bitcoinClient)
channelFactory = Peer.SimpleChannelFactory(nodeParams, watcher, relayer, bitcoinClient, txPublisherFactory)
pendingChannelsRateLimiter = system.spawn(Behaviors.supervise(PendingChannelsRateLimiter(nodeParams, router.toTyped, channels)).onFailure(typed.SupervisorStrategy.resume), name = "pending-channels-rate-limiter")
peerFactory = Switchboard.SimplePeerFactory(nodeParams, bitcoinClient, channelFactory, pendingChannelsRateLimiter, register, router.toTyped)

View File

@ -63,6 +63,7 @@ object ZmqWatcher {
private case class PublishBlockHeight(current: BlockHeight) extends Command
private case class ProcessNewBlock(blockId: BlockId) extends Command
private case class ProcessNewTransaction(tx: Transaction) extends Command
private case class SetWatchHint(w: GenericWatch, hint: WatchHint) extends Command
final case class ValidateRequest(replyTo: ActorRef[ValidateResult], ann: ChannelAnnouncement) extends Command
final case class ValidateResult(c: ChannelAnnouncement, fundingTx: Either[Throwable, (Transaction, UtxoStatus)])
@ -155,7 +156,8 @@ object ZmqWatcher {
case class WatchFundingDeeplyBuried(replyTo: ActorRef[WatchFundingDeeplyBuriedTriggered], txId: TxId, minDepth: Long) extends WatchConfirmed[WatchFundingDeeplyBuriedTriggered]
case class WatchFundingDeeplyBuriedTriggered(blockHeight: BlockHeight, txIndex: Int, tx: Transaction) extends WatchConfirmedTriggered
case class WatchTxConfirmed(replyTo: ActorRef[WatchTxConfirmedTriggered], txId: TxId, minDepth: Long) extends WatchConfirmed[WatchTxConfirmedTriggered]
case class RelativeDelay(parentTxId: TxId, delay: Long)
case class WatchTxConfirmed(replyTo: ActorRef[WatchTxConfirmedTriggered], txId: TxId, minDepth: Long, delay_opt: Option[RelativeDelay] = None) extends WatchConfirmed[WatchTxConfirmedTriggered]
case class WatchTxConfirmedTriggered(blockHeight: BlockHeight, txIndex: Int, tx: Transaction) extends WatchConfirmedTriggered
case class WatchParentTxConfirmed(replyTo: ActorRef[WatchParentTxConfirmedTriggered], txId: TxId, minDepth: Long) extends WatchConfirmed[WatchParentTxConfirmedTriggered]
@ -167,6 +169,13 @@ object ZmqWatcher {
private sealed trait AddWatchResult
private case object Keep extends AddWatchResult
private case object Ignore extends AddWatchResult
sealed trait WatchHint
/**
* In some cases we don't need to check watches every time a block is found and only need to check again after we
* reach a specific block height. This is for example the case for transactions with a CSV delay.
*/
private case class CheckAfterBlock(blockHeight: BlockHeight) extends WatchHint
// @formatter:on
def apply(nodeParams: NodeParams, blockCount: AtomicLong, client: BitcoinCoreClient): Behavior[Command] =
@ -178,7 +187,7 @@ object ZmqWatcher {
timers.startSingleTimer(TickNewBlock, 1 second)
// we start a timer in case we don't receive ZMQ block events
timers.startSingleTimer(TickBlockTimeout, blockTimeout)
new ZmqWatcher(nodeParams, blockCount, client, context, timers).watching(Set.empty[GenericWatch], Map.empty[OutPoint, Set[GenericWatch]])
new ZmqWatcher(nodeParams, blockCount, client, context, timers).watching(Map.empty[GenericWatch, Option[WatchHint]], Map.empty[OutPoint, Set[GenericWatch]])
}
}
@ -224,7 +233,7 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client
private val watchdog = context.spawn(Behaviors.supervise(BlockchainWatchdog(nodeParams, 150 seconds)).onFailure(SupervisorStrategy.resume), "blockchain-watchdog")
private def watching(watches: Set[GenericWatch], watchedUtxos: Map[OutPoint, Set[GenericWatch]]): Behavior[Command] = {
private def watching(watches: Map[GenericWatch, Option[WatchHint]], watchedUtxos: Map[OutPoint, Set[GenericWatch]]): Behavior[Command] = {
Behaviors.receiveMessage {
case ProcessNewTransaction(tx) =>
log.debug("analyzing txid={} tx={}", tx.txid, tx)
@ -239,7 +248,7 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client
case _: WatchPublished => // nothing to do
case _: WatchConfirmed[_] => // nothing to do
}
watches.collect {
watches.keySet.collect {
case w: WatchPublished if w.txId == tx.txid => context.self ! TriggerEvent(w.replyTo, w, WatchPublishedTriggered(tx))
}
Behaviors.same
@ -279,21 +288,32 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client
case Failure(t) => GetBlockCountFailed(t)
case Success(currentHeight) => PublishBlockHeight(currentHeight)
}
// TODO: beware of the herd effect
KamonExt.timeFuture(Metrics.NewBlockCheckConfirmedDuration.withoutTags()) {
Future.sequence(watches.collect {
case w: WatchPublished => checkPublished(w)
case w: WatchConfirmed[_] => checkConfirmed(w)
})
}
Behaviors.same
case PublishBlockHeight(currentHeight) =>
log.debug("setting blockHeight={}", currentHeight)
blockHeight.set(currentHeight.toLong)
context.system.eventStream ! EventStream.Publish(CurrentBlockHeight(currentHeight))
// TODO: should we try to mitigate the herd effect and not check all watches immediately?
KamonExt.timeFuture(Metrics.NewBlockCheckConfirmedDuration.withoutTags()) {
Future.sequence(watches.collect {
case (w: WatchPublished, _) => checkPublished(w)
case (w: WatchConfirmed[_], hint) =>
hint match {
case Some(CheckAfterBlock(delayUntilBlock)) if currentHeight < delayUntilBlock => Future.successful(())
case _ => checkConfirmed(w, currentHeight)
}
})
}
Behaviors.same
case SetWatchHint(w, hint) =>
val watches1 = watches.get(w) match {
case Some(_) => watches + (w -> Some(hint))
case None => watches
}
watching(watches1, watchedUtxos)
case TriggerEvent(replyTo, watch, event) =>
if (watches.contains(watch)) {
log.debug("triggering {}", watch)
@ -323,7 +343,7 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client
checkSpent(w)
Keep
case w: WatchConfirmed[_] =>
checkConfirmed(w)
checkConfirmed(w, BlockHeight(blockHeight.get()))
Keep
case w: WatchPublished =>
checkPublished(w)
@ -333,14 +353,14 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client
case Keep =>
log.debug("adding watch {}", w)
context.watchWith(w.replyTo, StopWatching(w.replyTo))
watching(watches + w, addWatchedUtxos(watchedUtxos, w))
watching(watches + (w -> None), addWatchedUtxos(watchedUtxos, w))
case Ignore =>
Behaviors.same
}
case StopWatching(origin) =>
// we remove watches associated to dead actors
val deprecatedWatches = watches.filter(_.replyTo == origin)
// We remove watches associated to dead actors.
val deprecatedWatches = watches.keySet.filter(_.replyTo == origin)
val watchedUtxos1 = deprecatedWatches.foldLeft(watchedUtxos) { case (m, w) => removeWatchedUtxos(m, w) }
watching(watches -- deprecatedWatches, watchedUtxos1)
@ -353,7 +373,7 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client
Behaviors.same
case r: ListWatches =>
r.replyTo ! watches
r.replyTo ! watches.keySet
Behaviors.same
}
@ -414,7 +434,7 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client
client.getTransaction(w.txId).map(tx => context.self ! TriggerEvent(w.replyTo, w, WatchPublishedTriggered(tx)))
}
private def checkConfirmed(w: WatchConfirmed[_ <: WatchConfirmedTriggered]): Future[Unit] = {
private def checkConfirmed(w: WatchConfirmed[_ <: WatchConfirmedTriggered], currentHeight: BlockHeight): Future[Unit] = {
log.debug("checking confirmations of txid={}", w.txId)
// NB: this is very inefficient since internally we call `getrawtransaction` three times, but it doesn't really
// matter because this only happens once, when the watched transaction has reached min_depth
@ -431,7 +451,33 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client
}
}
}
case _ => Future.successful((): Unit)
case Some(confirmations) =>
// Once the transaction is confirmed, we don't need to check again at every new block, we only need to check
// again once we should have reached the minimum depth to verify that there hasn't been a reorg.
context.self ! SetWatchHint(w, CheckAfterBlock(currentHeight + w.minDepth - confirmations))
Future.successful(())
case None =>
w match {
case WatchTxConfirmed(_, _, _, Some(relativeDelay)) =>
log.debug("txId={} has a relative delay of {} blocks, checking parentTxId={}", w.txId, relativeDelay.delay, relativeDelay.parentTxId)
// Note how we add one block to avoid an off-by-one:
// - if the parent is confirmed at block P
// - the CSV delay is D and the minimum depth is M
// - the first block that can include the child is P + D
// - the first block at which we can reach minimum depth is P + D + M
// - if we are currently at block P + N, the parent has C = N + 1 confirmations
// - we want to check at block P + N + D + M + 1 - C = P + N + D + M + 1 - (N + 1) = P + D + M
val delay = relativeDelay.delay + w.minDepth + 1
client.getTxConfirmations(relativeDelay.parentTxId).map(_.getOrElse(0)).collect {
case confirmations if confirmations < delay => context.self ! SetWatchHint(w, CheckAfterBlock(currentHeight + delay - confirmations))
}
case _ =>
// The transaction is unconfirmed: we don't need to check again at every new block: we can check only once
// every minDepth blocks, which is more efficient. If the transaction is included at the current height in
// a reorg, we will trigger the watch one block later than expected, but this is fine.
context.self ! SetWatchHint(w, CheckAfterBlock(currentHeight + w.minDepth))
Future.successful(())
}
}
}

View File

@ -115,9 +115,9 @@ object Channel {
def spawnTxPublisher(context: ActorContext, remoteNodeId: PublicKey): typed.ActorRef[TxPublisher.Command]
}
case class SimpleTxPublisherFactory(nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Command], bitcoinClient: BitcoinCoreClient) extends TxPublisherFactory {
case class SimpleTxPublisherFactory(nodeParams: NodeParams, bitcoinClient: BitcoinCoreClient) extends TxPublisherFactory {
override def spawnTxPublisher(context: ActorContext, remoteNodeId: PublicKey): typed.ActorRef[TxPublisher.Command] = {
context.spawn(Behaviors.supervise(TxPublisher(nodeParams, remoteNodeId, TxPublisher.SimpleChildFactory(nodeParams, bitcoinClient, watcher))).onFailure(typed.SupervisorStrategy.restart), "tx-publisher")
context.spawn(Behaviors.supervise(TxPublisher(nodeParams, remoteNodeId, TxPublisher.SimpleChildFactory(nodeParams, bitcoinClient))).onFailure(typed.SupervisorStrategy.restart), "tx-publisher")
}
}
@ -1714,7 +1714,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
val (localCommitPublished1, claimHtlcTx_opt) = Closing.LocalClose.claimHtlcDelayedOutput(localCommitPublished, keyManager, d.commitments.latest, tx, nodeParams.currentFeerates, nodeParams.onChainFeeConf, d.finalScriptPubKey)
claimHtlcTx_opt.foreach(claimHtlcTx => {
txPublisher ! PublishFinalTx(claimHtlcTx, claimHtlcTx.fee, None)
blockchain ! WatchTxConfirmed(self, claimHtlcTx.tx.txid, nodeParams.channelConf.minDepthBlocks)
blockchain ! WatchTxConfirmed(self, claimHtlcTx.tx.txid, nodeParams.channelConf.minDepthBlocks, Some(RelativeDelay(tx.txid, d.commitments.params.remoteParams.toSelfDelay.toInt.toLong)))
})
Closing.updateLocalCommitPublished(localCommitPublished1, tx)
}),

View File

@ -18,10 +18,10 @@ package fr.acinq.eclair.channel.fsm
import akka.actor.typed.scaladsl.adapter.actorRefAdapter
import akka.actor.{ActorRef, FSM}
import fr.acinq.bitcoin.scalacompat.{ByteVector32, OutPoint, SatoshiLong, Transaction}
import fr.acinq.bitcoin.scalacompat.{ByteVector32, OutPoint, SatoshiLong, Transaction, TxId}
import fr.acinq.eclair.NotificationsLogger
import fr.acinq.eclair.NotificationsLogger.NotifyNodeOperator
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{WatchOutputSpent, WatchTxConfirmed}
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{RelativeDelay, WatchOutputSpent, WatchTxConfirmed}
import fr.acinq.eclair.channel.Helpers.Closing
import fr.acinq.eclair.channel._
import fr.acinq.eclair.channel.fsm.Channel.UnhandledExceptionStrategy
@ -164,9 +164,9 @@ trait ErrorHandlers extends CommonHandlers {
/**
* This helper method will watch txs only if they haven't yet reached minDepth
*/
private def watchConfirmedIfNeeded(txs: Iterable[Transaction], irrevocablySpent: Map[OutPoint, Transaction]): Unit = {
private def watchConfirmedIfNeeded(txs: Iterable[Transaction], irrevocablySpent: Map[OutPoint, Transaction], relativeDelays: Map[TxId, RelativeDelay]): Unit = {
val (skip, process) = txs.partition(Closing.inputsAlreadySpent(_, irrevocablySpent))
process.foreach(tx => blockchain ! WatchTxConfirmed(self, tx.txid, nodeParams.channelConf.minDepthBlocks))
process.foreach(tx => blockchain ! WatchTxConfirmed(self, tx.txid, nodeParams.channelConf.minDepthBlocks, relativeDelays.get(tx.txid)))
skip.foreach(tx => log.debug(s"no need to watch txid=${tx.txid}, it has already been confirmed"))
}
@ -219,21 +219,23 @@ trait ErrorHandlers extends CommonHandlers {
List(PublishFinalTx(commitTx, commitment.commitInput.outPoint, "commit-tx", Closing.commitTxFee(commitment.commitInput, commitTx, localPaysCommitTxFees), None)) ++ (claimMainDelayedOutputTx.map(tx => PublishFinalTx(tx, tx.fee, None)) ++ redeemableHtlcTxs ++ claimHtlcDelayedTxs.map(tx => PublishFinalTx(tx, tx.fee, None)))
case _: Transactions.AnchorOutputsCommitmentFormat =>
val redeemableHtlcTxs = htlcTxs.values.flatten.map(tx => PublishReplaceableTx(tx, commitment))
val claimLocalAnchor = claimAnchorTxs.collect { case tx: Transactions.ClaimLocalAnchorOutputTx => PublishReplaceableTx(tx, commitment) }
val claimLocalAnchor = claimAnchorTxs.collect { case tx: Transactions.ClaimLocalAnchorOutputTx if !localCommitPublished.isConfirmed => PublishReplaceableTx(tx, commitment) }
List(PublishFinalTx(commitTx, commitment.commitInput.outPoint, "commit-tx", Closing.commitTxFee(commitment.commitInput, commitTx, localPaysCommitTxFees), None)) ++ claimLocalAnchor ++ claimMainDelayedOutputTx.map(tx => PublishFinalTx(tx, tx.fee, None)) ++ redeemableHtlcTxs ++ claimHtlcDelayedTxs.map(tx => PublishFinalTx(tx, tx.fee, None))
}
publishIfNeeded(publishQueue, irrevocablySpent)
// we watch:
// - the commitment tx itself, so that we can handle the case where we don't have any outputs
// - 'final txs' that send funds to our wallet and that spend outputs that only us control
// We watch:
// - the commitment tx itself, so that we can handle the case where we don't have any outputs
// - 'final txs' that send funds to our wallet and that spend outputs that only us control
// Our 'final txs" have a long relative delay: we provide that information to the watcher for efficiency.
val relativeDelays = (claimMainDelayedOutputTx ++ claimHtlcDelayedTxs).map(tx => tx.tx.txid -> RelativeDelay(tx.input.outPoint.txid, commitment.remoteParams.toSelfDelay.toInt.toLong)).toMap
val watchConfirmedQueue = List(commitTx) ++ claimMainDelayedOutputTx.map(_.tx) ++ claimHtlcDelayedTxs.map(_.tx)
watchConfirmedIfNeeded(watchConfirmedQueue, irrevocablySpent)
watchConfirmedIfNeeded(watchConfirmedQueue, irrevocablySpent, relativeDelays)
// we watch outputs of the commitment tx that both parties may spend
// we also watch our local anchor: this ensures that we will correctly detect when it's confirmed and count its fees
// in the audit DB, even if we restart before confirmation
val watchSpentQueue = htlcTxs.keys ++ claimAnchorTxs.collect { case tx: Transactions.ClaimLocalAnchorOutputTx => tx.input.outPoint }
// We watch outputs of the commitment tx that both parties may spend.
// We also watch our local anchor: this ensures that we will correctly detect when it's confirmed and count its fees
// in the audit DB, even if we restart before confirmation.
val watchSpentQueue = htlcTxs.keys ++ claimAnchorTxs.collect { case tx: Transactions.ClaimLocalAnchorOutputTx if !localCommitPublished.isConfirmed => tx.input.outPoint }
watchSpentIfNeeded(commitTx, watchSpentQueue, irrevocablySpent)
}
@ -274,18 +276,18 @@ trait ErrorHandlers extends CommonHandlers {
def doPublish(remoteCommitPublished: RemoteCommitPublished, commitment: FullCommitment): Unit = {
import remoteCommitPublished._
val claimLocalAnchor = claimAnchorTxs.collect { case tx: Transactions.ClaimLocalAnchorOutputTx => PublishReplaceableTx(tx, commitment) }
val claimLocalAnchor = claimAnchorTxs.collect { case tx: Transactions.ClaimLocalAnchorOutputTx if !remoteCommitPublished.isConfirmed => PublishReplaceableTx(tx, commitment) }
val redeemableHtlcTxs = claimHtlcTxs.values.flatten.map(tx => PublishReplaceableTx(tx, commitment))
val publishQueue = claimLocalAnchor ++ claimMainOutputTx.map(tx => PublishFinalTx(tx, tx.fee, None)).toSeq ++ redeemableHtlcTxs
publishIfNeeded(publishQueue, irrevocablySpent)
// we watch:
// We watch:
// - the commitment tx itself, so that we can handle the case where we don't have any outputs
// - 'final txs' that send funds to our wallet and that spend outputs that only us control
val watchConfirmedQueue = List(commitTx) ++ claimMainOutputTx.map(_.tx)
watchConfirmedIfNeeded(watchConfirmedQueue, irrevocablySpent)
watchConfirmedIfNeeded(watchConfirmedQueue, irrevocablySpent, relativeDelays = Map.empty)
// we watch outputs of the commitment tx that both parties may spend
// We watch outputs of the commitment tx that both parties may spend.
val watchSpentQueue = claimHtlcTxs.keys
watchSpentIfNeeded(commitTx, watchSpentQueue, irrevocablySpent)
}
@ -336,13 +338,13 @@ trait ErrorHandlers extends CommonHandlers {
val publishQueue = (claimMainOutputTx ++ mainPenaltyTx ++ htlcPenaltyTxs ++ claimHtlcDelayedPenaltyTxs).map(tx => PublishFinalTx(tx, tx.fee, None))
publishIfNeeded(publishQueue, irrevocablySpent)
// we watch:
// We watch:
// - the commitment tx itself, so that we can handle the case where we don't have any outputs
// - 'final txs' that send funds to our wallet and that spend outputs that only us control
val watchConfirmedQueue = List(commitTx) ++ claimMainOutputTx.map(_.tx)
watchConfirmedIfNeeded(watchConfirmedQueue, irrevocablySpent)
watchConfirmedIfNeeded(watchConfirmedQueue, irrevocablySpent, relativeDelays = Map.empty)
// we watch outputs of the commitment tx that both parties may spend
// We watch outputs of the commitment tx that both parties may spend.
val watchSpentQueue = (mainPenaltyTx ++ htlcPenaltyTxs).map(_.input.outPoint)
watchSpentIfNeeded(commitTx, watchSpentQueue, irrevocablySpent)
}

View File

@ -19,7 +19,6 @@ package fr.acinq.eclair.channel.publish
import akka.actor.typed.scaladsl.{ActorContext, Behaviors, TimerScheduler}
import akka.actor.typed.{ActorRef, Behavior}
import fr.acinq.eclair.NodeParams
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher
import fr.acinq.eclair.blockchain.bitcoind.rpc.BitcoinCoreClient
import fr.acinq.eclair.channel.publish.TxPublisher.TxPublishContext
import fr.acinq.eclair.channel.publish.TxTimeLocksMonitor.CheckTx
@ -50,12 +49,12 @@ object FinalTxPublisher {
case object Stop extends Command
// @formatter:on
def apply(nodeParams: NodeParams, bitcoinClient: BitcoinCoreClient, watcher: ActorRef[ZmqWatcher.Command], txPublishContext: TxPublishContext): Behavior[Command] = {
def apply(nodeParams: NodeParams, bitcoinClient: BitcoinCoreClient, txPublishContext: TxPublishContext): Behavior[Command] = {
Behaviors.setup { context =>
Behaviors.withTimers { timers =>
Behaviors.withMdc(txPublishContext.mdc()) {
Behaviors.receiveMessagePartial {
case Publish(replyTo, cmd) => new FinalTxPublisher(nodeParams, replyTo, cmd, bitcoinClient, watcher, context, timers, txPublishContext).checkTimeLocks()
case Publish(replyTo, cmd) => new FinalTxPublisher(nodeParams, replyTo, cmd, bitcoinClient, context, timers, txPublishContext).checkTimeLocks()
case Stop => Behaviors.stopped
}
}
@ -69,7 +68,6 @@ private class FinalTxPublisher(nodeParams: NodeParams,
replyTo: ActorRef[TxPublisher.PublishTxResult],
cmd: TxPublisher.PublishFinalTx,
bitcoinClient: BitcoinCoreClient,
watcher: ActorRef[ZmqWatcher.Command],
context: ActorContext[FinalTxPublisher.Command],
timers: TimerScheduler[FinalTxPublisher.Command],
txPublishContext: TxPublishContext)(implicit ec: ExecutionContext = ExecutionContext.Implicits.global) {
@ -79,7 +77,7 @@ private class FinalTxPublisher(nodeParams: NodeParams,
private val log = context.log
def checkTimeLocks(): Behavior[Command] = {
val timeLocksChecker = context.spawn(TxTimeLocksMonitor(nodeParams, watcher, txPublishContext), "time-locks-monitor")
val timeLocksChecker = context.spawn(TxTimeLocksMonitor(nodeParams, bitcoinClient, txPublishContext), "time-locks-monitor")
timeLocksChecker ! CheckTx(context.messageAdapter[TxTimeLocksMonitor.TimeLocksOk](_ => TimeLocksOk), cmd.tx, cmd.desc)
Behaviors.receiveMessagePartial {
case TimeLocksOk => checkParentPublished()

View File

@ -19,7 +19,6 @@ package fr.acinq.eclair.channel.publish
import akka.actor.typed.scaladsl.{ActorContext, Behaviors, TimerScheduler}
import akka.actor.typed.{ActorRef, Behavior}
import fr.acinq.bitcoin.scalacompat.{SatoshiLong, Transaction}
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher
import fr.acinq.eclair.blockchain.bitcoind.rpc.BitcoinCoreClient
import fr.acinq.eclair.blockchain.fee.{ConfirmationTarget, FeeratePerKw, FeeratesPerKw}
import fr.acinq.eclair.channel.publish.ReplaceableTxFunder.FundedTx
@ -61,12 +60,12 @@ object ReplaceableTxPublisher {
// Timer key to ensure we don't have multiple concurrent timers running.
private case object BumpFeeKey
def apply(nodeParams: NodeParams, bitcoinClient: BitcoinCoreClient, watcher: ActorRef[ZmqWatcher.Command], txPublishContext: TxPublishContext): Behavior[Command] = {
def apply(nodeParams: NodeParams, bitcoinClient: BitcoinCoreClient, txPublishContext: TxPublishContext): Behavior[Command] = {
Behaviors.setup { context =>
Behaviors.withTimers { timers =>
Behaviors.withMdc(txPublishContext.mdc()) {
Behaviors.receiveMessagePartial {
case Publish(replyTo, cmd) => new ReplaceableTxPublisher(nodeParams, replyTo, cmd, bitcoinClient, watcher, context, timers, txPublishContext).checkPreconditions()
case Publish(replyTo, cmd) => new ReplaceableTxPublisher(nodeParams, replyTo, cmd, bitcoinClient, context, timers, txPublishContext).checkPreconditions()
case Stop => Behaviors.stopped
}
}
@ -108,7 +107,6 @@ private class ReplaceableTxPublisher(nodeParams: NodeParams,
replyTo: ActorRef[TxPublisher.PublishTxResult],
cmd: TxPublisher.PublishReplaceableTx,
bitcoinClient: BitcoinCoreClient,
watcher: ActorRef[ZmqWatcher.Command],
context: ActorContext[ReplaceableTxPublisher.Command],
timers: TimerScheduler[ReplaceableTxPublisher.Command],
txPublishContext: TxPublishContext)(implicit ec: ExecutionContext = ExecutionContext.Implicits.global) {
@ -141,7 +139,7 @@ private class ReplaceableTxPublisher(nodeParams: NodeParams,
// There are no time locks on anchor transactions, we can claim them right away.
case _: ClaimLocalAnchorWithWitnessData => chooseFeerate(txWithWitnessData)
case _ =>
val timeLocksChecker = context.spawn(TxTimeLocksMonitor(nodeParams, watcher, txPublishContext), "time-locks-monitor")
val timeLocksChecker = context.spawn(TxTimeLocksMonitor(nodeParams, bitcoinClient, txPublishContext), "time-locks-monitor")
timeLocksChecker ! TxTimeLocksMonitor.CheckTx(context.messageAdapter[TxTimeLocksMonitor.TimeLocksOk](_ => TimeLocksOk), cmd.txInfo.tx, cmd.desc)
Behaviors.receiveMessagePartial {
case TimeLocksOk => chooseFeerate(txWithWitnessData)

View File

@ -22,7 +22,6 @@ import akka.actor.typed.{ActorRef, Behavior}
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.bitcoin.scalacompat.{ByteVector32, OutPoint, Satoshi, Transaction, TxId}
import fr.acinq.eclair.blockchain.CurrentBlockHeight
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher
import fr.acinq.eclair.blockchain.bitcoind.rpc.BitcoinCoreClient
import fr.acinq.eclair.blockchain.fee.ConfirmationTarget
import fr.acinq.eclair.channel.FullCommitment
@ -140,13 +139,13 @@ object TxPublisher {
// @formatter:on
}
case class SimpleChildFactory(nodeParams: NodeParams, bitcoinClient: BitcoinCoreClient, watcher: ActorRef[ZmqWatcher.Command]) extends ChildFactory {
case class SimpleChildFactory(nodeParams: NodeParams, bitcoinClient: BitcoinCoreClient) extends ChildFactory {
// @formatter:off
override def spawnFinalTxPublisher(context: ActorContext[TxPublisher.Command], txPublishContext: TxPublishContext): ActorRef[FinalTxPublisher.Command] = {
context.spawn(FinalTxPublisher(nodeParams, bitcoinClient, watcher, txPublishContext), s"final-tx-${txPublishContext.id}")
context.spawn(FinalTxPublisher(nodeParams, bitcoinClient, txPublishContext), s"final-tx-${txPublishContext.id}")
}
override def spawnReplaceableTxPublisher(context: ActorContext[Command], txPublishContext: TxPublishContext): ActorRef[ReplaceableTxPublisher.Command] = {
context.spawn(ReplaceableTxPublisher(nodeParams, bitcoinClient, watcher, txPublishContext), s"replaceable-tx-${txPublishContext.id}")
context.spawn(ReplaceableTxPublisher(nodeParams, bitcoinClient, txPublishContext), s"replaceable-tx-${txPublishContext.id}")
}
// @formatter:on
}

View File

@ -20,15 +20,14 @@ import akka.actor.typed.eventstream.EventStream
import akka.actor.typed.scaladsl.{ActorContext, Behaviors, TimerScheduler}
import akka.actor.typed.{ActorRef, Behavior}
import fr.acinq.bitcoin.scalacompat.{Transaction, TxId}
import fr.acinq.eclair.blockchain.CurrentBlockHeight
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{WatchParentTxConfirmed, WatchParentTxConfirmedTriggered}
import fr.acinq.eclair.blockchain.{CurrentBlockHeight, OnChainChannelFunder}
import fr.acinq.eclair.channel.publish.TxPublisher.TxPublishContext
import fr.acinq.eclair.transactions.Scripts
import fr.acinq.eclair.{BlockHeight, NodeParams}
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.DurationLong
import scala.util.Random
import scala.util.{Failure, Random, Success}
/**
* Created by t-bast on 10/06/2021.
@ -47,15 +46,16 @@ object TxTimeLocksMonitor {
case class CheckTx(replyTo: ActorRef[TimeLocksOk], tx: Transaction, desc: String) extends Command
final case class WrappedCurrentBlockHeight(currentBlockHeight: BlockHeight) extends Command
private case object CheckRelativeTimeLock extends Command
private case class ParentTxConfirmed(parentTxId: TxId) extends Command
private case class ParentTxStatus(parentTxId: TxId, confirmations_opt: Option[Int]) extends Command
private case class GetTxConfirmationsFailed(parentTxId: TxId, reason: Throwable) extends Command
// @formatter:on
def apply(nodeParams: NodeParams, watcher: ActorRef[ZmqWatcher.Command], txPublishContext: TxPublishContext): Behavior[Command] = {
def apply(nodeParams: NodeParams, bitcoinClient: OnChainChannelFunder, txPublishContext: TxPublishContext): Behavior[Command] = {
Behaviors.setup { context =>
Behaviors.withTimers { timers =>
Behaviors.withMdc(txPublishContext.mdc()) {
Behaviors.receiveMessagePartial {
case cmd: CheckTx => new TxTimeLocksMonitor(nodeParams, cmd, watcher, context, timers).checkAbsoluteTimeLock()
case cmd: CheckTx => new TxTimeLocksMonitor(nodeParams, cmd, bitcoinClient, context, timers).checkAbsoluteTimeLock()
}
}
}
@ -66,68 +66,103 @@ object TxTimeLocksMonitor {
private class TxTimeLocksMonitor(nodeParams: NodeParams,
cmd: TxTimeLocksMonitor.CheckTx,
watcher: ActorRef[ZmqWatcher.Command],
bitcoinClient: OnChainChannelFunder,
context: ActorContext[TxTimeLocksMonitor.Command],
timers: TimerScheduler[TxTimeLocksMonitor.Command]) {
timers: TimerScheduler[TxTimeLocksMonitor.Command])(implicit ec: ExecutionContext = ExecutionContext.Implicits.global) {
import TxTimeLocksMonitor._
private val log = context.log
def checkAbsoluteTimeLock(): Behavior[Command] = {
val blockHeight = nodeParams.currentBlockHeight
private def checkAbsoluteTimeLock(): Behavior[Command] = {
val cltvTimeout = Scripts.cltvTimeout(cmd.tx)
if (blockHeight < cltvTimeout) {
log.info("delaying publication of {} until block={} (current block={})", cmd.desc, cltvTimeout, blockHeight)
val messageAdapter = context.messageAdapter[CurrentBlockHeight](cbc => WrappedCurrentBlockHeight(cbc.blockHeight))
context.system.eventStream ! EventStream.Subscribe(messageAdapter)
Behaviors.receiveMessagePartial {
case WrappedCurrentBlockHeight(currentBlockHeight) =>
if (cltvTimeout <= currentBlockHeight) {
context.system.eventStream ! EventStream.Unsubscribe(messageAdapter)
timers.startSingleTimer(CheckRelativeTimeLock, (1 + Random.nextLong(nodeParams.channelConf.maxTxPublishRetryDelay.toMillis)).millis)
Behaviors.same
} else {
Behaviors.same
}
case CheckRelativeTimeLock => checkRelativeTimeLocks()
}
context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[CurrentBlockHeight](cbc => WrappedCurrentBlockHeight(cbc.blockHeight)))
if (nodeParams.currentBlockHeight < cltvTimeout) {
log.info("delaying publication of {} until block={} (current block={})", cmd.desc, cltvTimeout, nodeParams.currentBlockHeight)
waitForAbsoluteTimeLock(cltvTimeout)
} else {
checkRelativeTimeLocks()
}
}
def checkRelativeTimeLocks(): Behavior[Command] = {
private def waitForAbsoluteTimeLock(cltvTimeout: BlockHeight): Behavior[Command] = {
Behaviors.receiveMessagePartial {
case WrappedCurrentBlockHeight(currentBlockHeight) if cltvTimeout <= currentBlockHeight =>
timers.startSingleTimer(CheckRelativeTimeLock, (1 + Random.nextLong(nodeParams.channelConf.maxTxPublishRetryDelay.toMillis)).millis)
Behaviors.same
case WrappedCurrentBlockHeight(_) => Behaviors.same
case CheckRelativeTimeLock => checkRelativeTimeLocks()
}
}
private def checkRelativeTimeLocks(): Behavior[Command] = {
val csvTimeouts = Scripts.csvTimeouts(cmd.tx)
if (csvTimeouts.nonEmpty) {
val watchConfirmedResponseMapper: ActorRef[WatchParentTxConfirmedTriggered] = context.messageAdapter(w => ParentTxConfirmed(w.tx.txid))
csvTimeouts.foreach {
val parentTxs = csvTimeouts.map {
case (parentTxId, csvTimeout) =>
log.info("{} has a relative timeout of {} blocks, watching parentTxId={}", cmd.desc, csvTimeout, parentTxId)
watcher ! WatchParentTxConfirmed(watchConfirmedResponseMapper, parentTxId, minDepth = csvTimeout)
log.info("{} has a relative timeout of {} blocks, checking confirmations for parentTxId={}", cmd.desc, csvTimeout, parentTxId)
checkConfirmations(parentTxId)
parentTxId -> RelativeLockStatus(csvTimeout, nodeParams.currentBlockHeight + csvTimeout)
}
waitForParentsToConfirm(csvTimeouts.keySet)
waitForRelativeTimeLocks(parentTxs)
} else {
notifySender()
}
}
def waitForParentsToConfirm(parentTxIds: Set[TxId]): Behavior[Command] = {
private case class RelativeLockStatus(csvTimeout: Long, checkAfterBlock: BlockHeight)
private def waitForRelativeTimeLocks(parentTxs: Map[TxId, RelativeLockStatus]): Behavior[Command] = {
Behaviors.receiveMessagePartial {
case ParentTxConfirmed(parentTxId) =>
log.debug("parent tx of {} has been confirmed (parent txid={})", cmd.desc, parentTxId)
val remainingParentTxIds = parentTxIds - parentTxId
if (remainingParentTxIds.isEmpty) {
log.info("all parent txs of {} have been confirmed", cmd.desc)
notifySender()
} else {
log.debug("some parent txs of {} are still unconfirmed (parent txids={})", cmd.desc, remainingParentTxIds.mkString(","))
waitForParentsToConfirm(remainingParentTxIds)
case ParentTxStatus(parentTxId, confirmations_opt) =>
parentTxs.get(parentTxId) match {
case Some(status) => confirmations_opt match {
case Some(confirmations) if status.csvTimeout <= confirmations =>
log.debug("parentTxId={} of {} has reached enough confirmations", parentTxId, cmd.desc)
val remainingParentTxs = parentTxs - parentTxId
if (remainingParentTxs.isEmpty) {
log.info("all parent txs of {} have reached enough confirmations", cmd.desc)
notifySender()
} else {
log.debug("some parent txs of {} don't have enough confirmations yet (parentTxIds={})", cmd.desc, remainingParentTxs.keySet.mkString(","))
waitForRelativeTimeLocks(remainingParentTxs)
}
case Some(confirmations) =>
log.debug("parentTxId={} doesn't have enough confirmations, retrying in {} blocks", parentTxId, status.csvTimeout - confirmations)
val status1 = status.copy(checkAfterBlock = nodeParams.currentBlockHeight + status.csvTimeout - confirmations)
waitForRelativeTimeLocks(parentTxs + (parentTxId -> status1))
case None =>
log.debug("parentTxId={} is unconfirmed, retrying in {} blocks", parentTxId, status.csvTimeout)
val status1 = status.copy(checkAfterBlock = nodeParams.currentBlockHeight + status.csvTimeout)
waitForRelativeTimeLocks(parentTxs + (parentTxId -> status1))
}
case None =>
log.debug("ignoring duplicate parentTxId={}", parentTxId)
Behaviors.same
}
case GetTxConfirmationsFailed(parentTxId, reason) =>
log.warn("could not get tx confirmations for parentTxId={}, retrying ({})", parentTxId, reason.getMessage)
checkConfirmations(parentTxId)
Behaviors.same
case WrappedCurrentBlockHeight(currentBlockHeight) =>
log.debug("received new block (height={})", currentBlockHeight)
parentTxs.collect {
case (parentTxId, status) if status.checkAfterBlock <= currentBlockHeight =>
log.debug("checking confirmations for parentTxId={} ({} <= {})", parentTxId, status.checkAfterBlock, currentBlockHeight)
checkConfirmations(parentTxId)
}
Behaviors.same
}
}
def notifySender(): Behavior[Command] = {
private def checkConfirmations(parentTxId: TxId): Unit = {
context.pipeToSelf(bitcoinClient.getTxConfirmations(parentTxId)) {
case Success(confirmations_opt) => ParentTxStatus(parentTxId, confirmations_opt)
case Failure(reason) => GetTxConfirmationsFailed(parentTxId, reason)
}
}
private def notifySender(): Behavior[Command] = {
log.debug("time locks satisfied for {}", cmd.desc)
cmd.replyTo ! TimeLocksOk()
Behaviors.stopped

View File

@ -227,6 +227,39 @@ class ZmqWatcherSpec extends TestKitBaseClass with AnyFunSuiteLike with Bitcoind
})
}
test("watch for confirmed transactions with relative delay") {
withWatcher(f => {
import f._
// We simulate a transaction with a 3-blocks CSV delay.
val (priv, address) = createExternalAddress()
val parentTx = sendToAddress(address, 50.millibtc, probe)
val tx = createSpendP2WPKH(parentTx, priv, priv.publicKey, 5_000 sat, 3, 0)
val delay = RelativeDelay(parentTx.txid, 3)
watcher ! WatchTxConfirmed(probe.ref, tx.txid, 6, Some(delay))
probe.expectNoMessage(100 millis)
// We make the parent tx confirm to satisfy the CSV delay and publish the delayed transaction.
generateBlocks(3)
bitcoinClient.publishTransaction(tx).pipeTo(probe.ref)
probe.expectMsg(tx.txid)
probe.expectNoMessage(100 millis)
// The delayed transaction confirms, but hasn't reached its minimum depth.
generateBlocks(3)
probe.expectNoMessage(100 millis)
// The delayed transaction reaches its minimum depth.
generateBlocks(3)
assert(probe.expectMsgType[WatchTxConfirmedTriggered].tx.txid == tx.txid)
// If we watch the transaction when it's already confirmed, we immediately receive the WatchEventConfirmed.
watcher ! WatchTxConfirmed(probe.ref, tx.txid, 3, Some(delay.copy(delay = 720)))
assert(probe.expectMsgType[WatchTxConfirmedTriggered].tx.txid == tx.txid)
})
}
test("watch for spent transactions") {
withWatcher(f => {
import f._

View File

@ -24,7 +24,6 @@ import fr.acinq.bitcoin.scalacompat.{SatoshiLong, Transaction, TxId}
import fr.acinq.eclair.blockchain.CurrentBlockHeight
import fr.acinq.eclair.blockchain.WatcherSpec.createSpendP2WPKH
import fr.acinq.eclair.blockchain.bitcoind.BitcoindService
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{WatchParentTxConfirmed, WatchParentTxConfirmedTriggered}
import fr.acinq.eclair.blockchain.bitcoind.rpc.BitcoinCoreClient
import fr.acinq.eclair.channel.publish.FinalTxPublisher.{Publish, Stop}
import fr.acinq.eclair.channel.publish.TxPublisher.TxRejectedReason.ConflictingTxConfirmed
@ -48,14 +47,13 @@ class FinalTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike with Bi
stopBitcoind()
}
case class Fixture(bitcoinClient: BitcoinCoreClient, publisher: ActorRef[FinalTxPublisher.Command], watcher: TestProbe, probe: TestProbe)
case class Fixture(bitcoinClient: BitcoinCoreClient, publisher: ActorRef[FinalTxPublisher.Command], probe: TestProbe)
def createFixture(): Fixture = {
val probe = TestProbe()
val watcher = TestProbe()
val bitcoinClient = new BitcoinCoreClient(bitcoinrpcclient)
val publisher = system.spawnAnonymous(FinalTxPublisher(TestConstants.Alice.nodeParams, bitcoinClient, watcher.ref, TxPublishContext(UUID.randomUUID(), randomKey().publicKey, None)))
Fixture(bitcoinClient, publisher, watcher, probe)
val publisher = system.spawnAnonymous(FinalTxPublisher(TestConstants.Alice.nodeParams, bitcoinClient, TxPublishContext(UUID.randomUUID(), randomKey().publicKey, None)))
Fixture(bitcoinClient, publisher, probe)
}
def getMempool(bitcoinClient: BitcoinCoreClient, probe: TestProbe): Seq[Transaction] = {
@ -78,17 +76,13 @@ class FinalTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike with Bi
val (priv, address) = createExternalAddress()
val parentTx = sendToAddress(address, 125_000 sat, probe)
createBlocks(5, probe)
val tx = createSpendP2WPKH(parentTx, priv, priv.publicKey, 2_500 sat, sequence = 5, lockTime = 0)
val cmd = PublishFinalTx(tx, tx.txIn.head.outPoint, "tx-time-locks", 0 sat, None)
publisher ! Publish(probe.ref, cmd)
val w = watcher.expectMsgType[WatchParentTxConfirmed]
assert(w.txId == parentTx.txid)
assert(w.minDepth == 5)
createBlocks(5, probe)
w.replyTo ! WatchParentTxConfirmedTriggered(currentBlockHeight(probe), 0, parentTx)
// Once time locks are satisfied, the transaction should be published:
// Time locks are satisfied, the transaction should be published:
waitTxInMempool(bitcoinClient, tx.txid, probe)
createBlocks(1, probe)
probe.expectNoMessage(100 millis) // we don't notify the sender until min depth has been reached
@ -113,7 +107,6 @@ class FinalTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike with Bi
publisher ! Publish(probe.ref, cmd)
// Since the parent is not published yet, we can't publish the child tx either:
watcher.expectNoMessage(100 millis)
assert(!getMempool(bitcoinClient, probe).map(_.txid).contains(tx.txid))
// Once the parent tx is published, it will unblock publication of the child tx:

View File

@ -75,7 +75,7 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w
def createPublisher(): ActorRef[ReplaceableTxPublisher.Command] = createPublisher(alice.underlyingActor.nodeParams)
def createPublisher(nodeParams: NodeParams): ActorRef[ReplaceableTxPublisher.Command] = {
system.spawnAnonymous(ReplaceableTxPublisher(nodeParams, wallet, alice2blockchain.ref, TxPublishContext(UUID.randomUUID(), randomKey().publicKey, None)))
system.spawnAnonymous(ReplaceableTxPublisher(nodeParams, wallet, TxPublishContext(UUID.randomUUID(), randomKey().publicKey, None)))
}
def aliceBlockHeight(): BlockHeight = alice.underlyingActor.nodeParams.currentBlockHeight
@ -170,7 +170,7 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w
generateBlocks(1)
// Execute our test.
val publisher = system.spawn(ReplaceableTxPublisher(aliceNodeParams, walletClient, alice2blockchain.ref, TxPublishContext(testId, TestConstants.Bob.nodeParams.nodeId, None)), testId.toString)
val publisher = system.spawn(ReplaceableTxPublisher(aliceNodeParams, walletClient, TxPublishContext(testId, TestConstants.Bob.nodeParams.nodeId, None)), testId.toString)
val f = Fixture(alice, bob, alice2bob, bob2alice, alice2blockchain, bob2blockchain, walletClient, walletRpcClient, publisher, probe)
// We set a high fastest feerate, to ensure that by default we're not limited by this.
f.setFeerate(FeeratePerKw(100_000 sat), blockTarget = 1)
@ -1035,12 +1035,10 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w
withFixture(Seq(10.5 millibtc), ChannelTypes.AnchorOutputsZeroFeeHtlcTx()) { f =>
import f._
val (commitTx, htlcSuccess, _) = closeChannelWithHtlcs(f, aliceBlockHeight())
val (_, htlcSuccess, _) = closeChannelWithHtlcs(f, aliceBlockHeight())
val htlcSuccessPublisher = createPublisher()
setFeerate(FeeratePerKw(75_000 sat), blockTarget = 1)
htlcSuccessPublisher ! Publish(probe.ref, htlcSuccess)
val w = alice2blockchain.expectMsgType[WatchParentTxConfirmed]
w.replyTo ! WatchParentTxConfirmedTriggered(currentBlockHeight(probe), 0, commitTx)
val result = probe.expectMsgType[TxRejected]
assert(result.cmd == htlcSuccess)
@ -1054,8 +1052,6 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w
val htlcSuccessPublisher = createPublisher()
htlcSuccessPublisher ! Publish(probe.ref, htlcSuccess)
val w = alice2blockchain.expectMsgType[WatchParentTxConfirmed]
w.replyTo ! WatchParentTxConfirmedTriggered(currentBlockHeight(probe), 0, commitTx)
val htlcSuccessTx = getMempoolTxs(1).head
val htlcSuccessTargetFee = Transactions.weight2fee(targetFeerate, htlcSuccessTx.weight.toInt)
assert(htlcSuccessTargetFee * 0.9 <= htlcSuccessTx.fees && htlcSuccessTx.fees <= htlcSuccessTargetFee * 1.2, s"actualFee=${htlcSuccessTx.fees} targetFee=$htlcSuccessTargetFee")
@ -1084,8 +1080,6 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w
generateBlocks(144)
system.eventStream.publish(CurrentBlockHeight(currentBlockHeight(probe)))
setFeerate(targetFeerate) // the feerate is higher than what it was when the channel force-closed
val w = alice2blockchain.expectMsgType[WatchParentTxConfirmed]
w.replyTo ! WatchParentTxConfirmedTriggered(currentBlockHeight(probe), 0, commitTx)
val htlcTimeoutTx = getMempoolTxs(1).head
val htlcTimeoutTargetFee = Transactions.weight2fee(targetFeerate, htlcTimeoutTx.weight.toInt)
assert(htlcTimeoutTargetFee * 0.9 <= htlcTimeoutTx.fees && htlcTimeoutTx.fees <= htlcTimeoutTargetFee * 1.2, s"actualFee=${htlcTimeoutTx.fees} targetFee=$htlcTimeoutTargetFee")
@ -1222,15 +1216,13 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w
val initialFeerate = FeeratePerKw(15_000 sat)
setFeerate(initialFeerate)
val (commitTx, htlcSuccess, _) = closeChannelWithHtlcs(f, aliceBlockHeight() + 30)
val (_, htlcSuccess, _) = closeChannelWithHtlcs(f, aliceBlockHeight() + 30)
val listener = TestProbe()
system.eventStream.subscribe(listener.ref, classOf[TransactionPublished])
val htlcSuccessPublisher = createPublisher()
htlcSuccessPublisher ! Publish(probe.ref, htlcSuccess)
val w = alice2blockchain.expectMsgType[WatchParentTxConfirmed]
w.replyTo ! WatchParentTxConfirmedTriggered(aliceBlockHeight(), 0, commitTx)
val htlcSuccessTxId1 = listener.expectMsgType[TransactionPublished].tx.txid
val htlcSuccessTx1 = getMempoolTxs(1).head
val htlcSuccessInputs1 = getMempool().head.txIn.map(_.outPoint).toSet
@ -1258,15 +1250,13 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w
val initialFeerate = FeeratePerKw(3_000 sat)
setFeerate(initialFeerate)
val (commitTx, htlcSuccess, _) = closeChannelWithHtlcs(f, aliceBlockHeight() + 15)
val (_, htlcSuccess, _) = closeChannelWithHtlcs(f, aliceBlockHeight() + 15)
val listener = TestProbe()
system.eventStream.subscribe(listener.ref, classOf[TransactionPublished])
val htlcSuccessPublisher = createPublisher()
htlcSuccessPublisher ! Publish(probe.ref, htlcSuccess)
val w = alice2blockchain.expectMsgType[WatchParentTxConfirmed]
w.replyTo ! WatchParentTxConfirmedTriggered(aliceBlockHeight(), 0, commitTx)
val htlcSuccessTxId1 = listener.expectMsgType[TransactionPublished].tx.txid
val htlcSuccessTx1 = getMempoolTxs(1).head
val htlcSuccessInputs1 = getMempool().head.txIn.map(_.outPoint).toSet
@ -1294,15 +1284,13 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w
val initialFeerate = FeeratePerKw(10_000 sat)
setFeerate(initialFeerate)
val (commitTx, htlcSuccess, _) = closeChannelWithHtlcs(f, aliceBlockHeight() + 6)
val (_, htlcSuccess, _) = closeChannelWithHtlcs(f, aliceBlockHeight() + 6)
val listener = TestProbe()
system.eventStream.subscribe(listener.ref, classOf[TransactionPublished])
val htlcSuccessPublisher = createPublisher()
htlcSuccessPublisher ! Publish(probe.ref, htlcSuccess)
val w = alice2blockchain.expectMsgType[WatchParentTxConfirmed]
w.replyTo ! WatchParentTxConfirmedTriggered(aliceBlockHeight(), 0, commitTx)
val htlcSuccessTxId = listener.expectMsgType[TransactionPublished].tx.txid
var htlcSuccessTx = getMempoolTxs(1).head
assert(htlcSuccessTx.txid == htlcSuccessTxId)
@ -1327,7 +1315,7 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w
val feerate = FeeratePerKw(15_000 sat)
setFeerate(feerate, fastest = feerate)
// The confirmation target for htlc-timeout corresponds to their CLTV: we should claim them asap once the htlc has timed out.
val (commitTx, _, htlcTimeout) = closeChannelWithHtlcs(f, aliceBlockHeight() + 144)
val (_, _, htlcTimeout) = closeChannelWithHtlcs(f, aliceBlockHeight() + 144)
val listener = TestProbe()
system.eventStream.subscribe(listener.ref, classOf[TransactionPublished])
@ -1336,8 +1324,6 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w
htlcTimeoutPublisher ! Publish(probe.ref, htlcTimeout)
generateBlocks(144)
system.eventStream.publish(CurrentBlockHeight(currentBlockHeight(probe)))
val w = alice2blockchain.expectMsgType[WatchParentTxConfirmed]
w.replyTo ! WatchParentTxConfirmedTriggered(currentBlockHeight(probe), 0, commitTx)
val htlcTimeoutTxId1 = listener.expectMsgType[TransactionPublished].tx.txid
val htlcTimeoutTx1 = getMempoolTxs(1).head
val htlcTimeoutInputs1 = getMempool().head.txIn.map(_.outPoint).toSet
@ -1363,7 +1349,7 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w
withFixture(Seq(15 millibtc, 10 millibtc, 5 millibtc), ChannelTypes.AnchorOutputsZeroFeeHtlcTx()) { f =>
import f._
val (commitTx, htlcSuccess, _) = closeChannelWithHtlcs(f, aliceBlockHeight() + 144)
val (_, htlcSuccess, _) = closeChannelWithHtlcs(f, aliceBlockHeight() + 144)
// The HTLC confirmation target is far away, but we have less safe utxos than the configured threshold.
// We will target a 1-block confirmation to get a safe utxo back as soon as possible.
val highSafeThresholdParams = alice.underlyingActor.nodeParams.modify(_.onChainFeeConf.safeUtxosThreshold).setTo(10)
@ -1373,8 +1359,6 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w
val htlcSuccessPublisher = createPublisher(highSafeThresholdParams)
htlcSuccessPublisher ! Publish(probe.ref, htlcSuccess)
val w = alice2blockchain.expectMsgType[WatchParentTxConfirmed]
w.replyTo ! WatchParentTxConfirmedTriggered(currentBlockHeight(probe), 0, commitTx)
val htlcSuccessTx = getMempoolTxs(1).head
val htlcSuccessTargetFee = Transactions.weight2fee(targetFeerate, htlcSuccessTx.weight.toInt)
assert(htlcSuccessTargetFee * 0.9 <= htlcSuccessTx.fees && htlcSuccessTx.fees <= htlcSuccessTargetFee * 1.1, s"actualFee=${htlcSuccessTx.fees} targetFee=$htlcSuccessTargetFee")
@ -1390,16 +1374,12 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w
val (commitTx, htlcSuccess, _) = closeChannelWithHtlcs(f, aliceBlockHeight() + 18)
val publisher1 = createPublisher()
publisher1 ! Publish(probe.ref, htlcSuccess)
val w1 = alice2blockchain.expectMsgType[WatchParentTxConfirmed]
w1.replyTo ! WatchParentTxConfirmedTriggered(currentBlockHeight(probe), 0, commitTx)
getMempoolTxs(1)
// we try to publish the htlc-success again (can be caused by a node restart): it will fail to replace the existing
// one in the mempool but we must ensure we don't leave some utxos locked.
val publisher2 = createPublisher()
publisher2 ! Publish(probe.ref, htlcSuccess)
val w2 = alice2blockchain.expectMsgType[WatchParentTxConfirmed]
w2.replyTo ! WatchParentTxConfirmedTriggered(currentBlockHeight(probe), 0, commitTx)
val result = probe.expectMsgType[TxRejected]
assert(result.reason == ConflictingTxUnconfirmed)
getMempoolTxs(1) // the previous htlc-success tx is still in the mempool
@ -1424,10 +1404,8 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w
import f._
setFeerate(FeeratePerKw(5_000 sat))
val (commitTx, htlcSuccess, _) = closeChannelWithHtlcs(f, aliceBlockHeight() + 48)
val (_, htlcSuccess, _) = closeChannelWithHtlcs(f, aliceBlockHeight() + 48)
publisher ! Publish(probe.ref, htlcSuccess)
val w = alice2blockchain.expectMsgType[WatchParentTxConfirmed]
w.replyTo ! WatchParentTxConfirmedTriggered(currentBlockHeight(probe), 0, commitTx)
getMempoolTxs(1)
// We unlock utxos before stopping.
@ -1545,8 +1523,6 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w
val claimHtlcSuccessPublisher = createPublisher()
claimHtlcSuccessPublisher ! Publish(probe.ref, claimHtlcSuccess)
val w = alice2blockchain.expectMsgType[WatchParentTxConfirmed]
w.replyTo ! WatchParentTxConfirmedTriggered(currentBlockHeight(probe), 0, remoteCommitTx)
val claimHtlcSuccessTx = getMempoolTxs(1).head
val claimHtlcSuccessTargetFee = Transactions.weight2fee(targetFeerate, claimHtlcSuccessTx.weight.toInt)
assert(claimHtlcSuccessTargetFee * 0.9 <= claimHtlcSuccessTx.fees && claimHtlcSuccessTx.fees <= claimHtlcSuccessTargetFee * 1.1, s"actualFee=${claimHtlcSuccessTx.fees} targetFee=$claimHtlcSuccessTargetFee")
@ -1574,8 +1550,6 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w
generateBlocks(144)
system.eventStream.publish(CurrentBlockHeight(currentBlockHeight(probe)))
setFeerate(targetFeerate) // the feerate is higher than what it was when the channel force-closed
val w = alice2blockchain.expectMsgType[WatchParentTxConfirmed]
w.replyTo ! WatchParentTxConfirmedTriggered(currentBlockHeight(probe), 0, remoteCommitTx)
val claimHtlcTimeoutTx = getMempoolTxs(1).head
val claimHtlcTimeoutTargetFee = Transactions.weight2fee(targetFeerate, claimHtlcTimeoutTx.weight.toInt)
assert(claimHtlcTimeoutTargetFee * 0.9 <= claimHtlcTimeoutTx.fees && claimHtlcTimeoutTx.fees <= claimHtlcTimeoutTargetFee * 1.1, s"actualFee=${claimHtlcTimeoutTx.fees} targetFee=$claimHtlcTimeoutTargetFee")
@ -1683,13 +1657,11 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w
withFixture(Seq(11 millibtc), ChannelTypes.AnchorOutputs()) { f =>
import f._
val (remoteCommitTx, claimHtlcSuccess, claimHtlcTimeout) = remoteCloseChannelWithHtlcs(f, aliceBlockHeight() + 300, nextCommit = false)
val (_, claimHtlcSuccess, claimHtlcTimeout) = remoteCloseChannelWithHtlcs(f, aliceBlockHeight() + 300, nextCommit = false)
setFeerate(FeeratePerKw(50_000 sat))
val claimHtlcSuccessPublisher = createPublisher()
claimHtlcSuccessPublisher ! Publish(probe.ref, claimHtlcSuccess)
val w1 = alice2blockchain.expectMsgType[WatchParentTxConfirmed]
w1.replyTo ! WatchParentTxConfirmedTriggered(currentBlockHeight(probe), 0, remoteCommitTx)
val result1 = probe.expectMsgType[TxRejected]
assert(result1.cmd == claimHtlcSuccess)
assert(result1.reason == TxSkipped(retryNextBlock = true))
@ -1699,8 +1671,6 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w
claimHtlcTimeoutPublisher ! Publish(probe.ref, claimHtlcTimeout)
generateBlocks(144)
system.eventStream.publish(CurrentBlockHeight(currentBlockHeight(probe)))
val w2 = alice2blockchain.expectMsgType[WatchParentTxConfirmed]
w2.replyTo ! WatchParentTxConfirmedTriggered(currentBlockHeight(probe), 0, remoteCommitTx)
val result2 = probe.expectMsgType[TxRejected]
assert(result2.cmd == claimHtlcTimeout)
assert(result2.reason == TxSkipped(retryNextBlock = true))
@ -1771,7 +1741,7 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w
withFixture(Seq(11 millibtc), ChannelTypes.AnchorOutputs()) { f =>
import f._
val (remoteCommitTx, claimHtlcSuccess, _) = remoteCloseChannelWithHtlcs(f, aliceBlockHeight() + 300, nextCommit = false)
val (_, claimHtlcSuccess, _) = remoteCloseChannelWithHtlcs(f, aliceBlockHeight() + 300, nextCommit = false)
val listener = TestProbe()
system.eventStream.subscribe(listener.ref, classOf[TransactionPublished])
@ -1779,8 +1749,6 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w
setFeerate(FeeratePerKw(5_000 sat))
val claimHtlcSuccessPublisher = createPublisher()
claimHtlcSuccessPublisher ! Publish(probe.ref, claimHtlcSuccess)
val w1 = alice2blockchain.expectMsgType[WatchParentTxConfirmed]
w1.replyTo ! WatchParentTxConfirmedTriggered(currentBlockHeight(probe), 0, remoteCommitTx)
val claimHtlcSuccessTx = getMempoolTxs(1).head
assert(listener.expectMsgType[TransactionPublished].tx.txid == claimHtlcSuccessTx.txid)

View File

@ -19,28 +19,47 @@ package fr.acinq.eclair.channel.publish
import akka.actor.typed.ActorRef
import akka.actor.typed.scaladsl.adapter.{ClassicActorSystemOps, actorRefAdapter}
import akka.testkit.TestProbe
import fr.acinq.bitcoin.scalacompat.{OutPoint, SatoshiLong, Script, Transaction, TxIn, TxOut}
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{WatchParentTxConfirmed, WatchParentTxConfirmedTriggered}
import fr.acinq.bitcoin.scalacompat.{OutPoint, SatoshiLong, Script, Transaction, TxId, TxIn, TxOut}
import fr.acinq.eclair.blockchain.NoOpOnChainWallet
import fr.acinq.eclair.channel.publish.TxPublisher.TxPublishContext
import fr.acinq.eclair.channel.publish.TxTimeLocksMonitor.{CheckTx, TimeLocksOk, WrappedCurrentBlockHeight}
import fr.acinq.eclair.{BlockHeight, NodeParams, TestConstants, TestKitBaseClass, randomKey}
import fr.acinq.eclair.{NodeParams, TestConstants, TestKitBaseClass, randomKey}
import org.scalatest.Outcome
import org.scalatest.funsuite.FixtureAnyFunSuiteLike
import java.util.UUID
import scala.concurrent.duration.DurationInt
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.Success
class TxTimeLocksMonitorSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike {
case class FixtureParam(nodeParams: NodeParams, monitor: ActorRef[TxTimeLocksMonitor.Command], watcher: TestProbe, probe: TestProbe)
case class FixtureParam(nodeParams: NodeParams, monitor: ActorRef[TxTimeLocksMonitor.Command], bitcoinClient: BitcoinTestClient, probe: TestProbe)
case class BitcoinTestClient() extends NoOpOnChainWallet {
private val requests = collection.concurrent.TrieMap.empty[TxId, Promise[Option[Int]]]
override def getTxConfirmations(txId: TxId)(implicit ec: ExecutionContext): Future[Option[Int]] = {
val p = Promise[Option[Int]]()
requests += (txId -> p)
p.future
}
def hasRequest(txId: TxId): Boolean = requests.contains(txId)
def completeRequest(txId: TxId, confirmations_opt: Option[Int]): Unit = {
requests.get(txId).map(_.complete(Success(confirmations_opt)))
requests -= txId
}
}
override def withFixture(test: OneArgTest): Outcome = {
within(max = 30 seconds) {
val nodeParams = TestConstants.Alice.nodeParams
val probe = TestProbe()
val watcher = TestProbe()
val monitor = system.spawnAnonymous(TxTimeLocksMonitor(nodeParams, watcher.ref, TxPublishContext(UUID.randomUUID(), randomKey().publicKey, None)))
withFixture(test.toNoArgTest(FixtureParam(nodeParams, monitor, watcher, probe)))
val bitcoinClient = BitcoinTestClient()
val monitor = system.spawnAnonymous(TxTimeLocksMonitor(nodeParams, bitcoinClient, TxPublishContext(UUID.randomUUID(), randomKey().publicKey, None)))
withFixture(test.toNoArgTest(FixtureParam(nodeParams, monitor, bitcoinClient, probe)))
}
}
@ -65,12 +84,26 @@ class TxTimeLocksMonitorSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik
val tx = Transaction(2, TxIn(OutPoint(parentTx, 0), Nil, 3) :: Nil, TxOut(25_000 sat, Script.pay2wpkh(randomKey().publicKey)) :: Nil, 0)
monitor ! CheckTx(probe.ref, tx, "relative-delay")
val w = watcher.expectMsgType[WatchParentTxConfirmed]
assert(w.txId == parentTx.txid)
assert(w.minDepth == 3)
// The parent transaction is unconfirmed: we will check again 3 blocks later (the value of the CSV timeout).
awaitCond(bitcoinClient.hasRequest(parentTx.txid), interval = 100 millis)
bitcoinClient.completeRequest(parentTx.txid, None)
probe.expectNoMessage(100 millis)
w.replyTo ! WatchParentTxConfirmedTriggered(BlockHeight(651), 0, parentTx)
monitor ! WrappedCurrentBlockHeight(nodeParams.currentBlockHeight + 1)
assert(!bitcoinClient.hasRequest(parentTx.txid))
monitor ! WrappedCurrentBlockHeight(nodeParams.currentBlockHeight + 2)
assert(!bitcoinClient.hasRequest(parentTx.txid))
monitor ! WrappedCurrentBlockHeight(nodeParams.currentBlockHeight + 3)
awaitCond(bitcoinClient.hasRequest(parentTx.txid), interval = 100 millis)
// This time the parent transaction has 1 confirmation: we will check again in two more blocks.
bitcoinClient.completeRequest(parentTx.txid, Some(1))
monitor ! WrappedCurrentBlockHeight(nodeParams.currentBlockHeight + 1)
probe.expectNoMessage(100 millis)
assert(!bitcoinClient.hasRequest(parentTx.txid))
monitor ! WrappedCurrentBlockHeight(nodeParams.currentBlockHeight + 2)
awaitCond(bitcoinClient.hasRequest(parentTx.txid), interval = 100 millis)
bitcoinClient.completeRequest(parentTx.txid, Some(3))
probe.expectMsg(TimeLocksOk())
}
@ -81,23 +114,33 @@ class TxTimeLocksMonitorSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik
val parentTx2 = Transaction(2, Nil, TxOut(45_000 sat, Script.pay2wpkh(randomKey().publicKey)) :: Nil, 0)
val tx = Transaction(
2,
TxIn(OutPoint(parentTx1, 0), Nil, 3) :: TxIn(OutPoint(parentTx1, 1), Nil, 1) :: TxIn(OutPoint(parentTx2, 0), Nil, 1) :: Nil,
TxIn(OutPoint(parentTx1, 0), Nil, 3) :: TxIn(OutPoint(parentTx1, 1), Nil, 1) :: TxIn(OutPoint(parentTx2, 0), Nil, 2) :: Nil,
TxOut(50_000 sat, Script.pay2wpkh(randomKey().publicKey)) :: Nil,
0
)
monitor ! CheckTx(probe.ref, tx, "many-relative-delays")
// We send a single watch for parentTx1, with the max of the two delays.
val w1 = watcher.expectMsgType[WatchParentTxConfirmed]
val w2 = watcher.expectMsgType[WatchParentTxConfirmed]
watcher.expectNoMessage(100 millis)
assert(Seq(w1, w2).map(w => (w.txId, w.minDepth)).toSet == Set((parentTx1.txid, 3), (parentTx2.txid, 1)))
// The first parent transaction is unconfirmed.
awaitCond(bitcoinClient.hasRequest(parentTx1.txid), interval = 100 millis)
bitcoinClient.completeRequest(parentTx1.txid, None)
// The second parent transaction is confirmed, but is still missing one confirmation.
awaitCond(bitcoinClient.hasRequest(parentTx2.txid), interval = 100 millis)
bitcoinClient.completeRequest(parentTx2.txid, Some(1))
probe.expectNoMessage(100 millis)
w1.replyTo ! WatchParentTxConfirmedTriggered(BlockHeight(651), 0, parentTx1)
// A new block is found: the second parent transaction has enough confirmations.
monitor ! WrappedCurrentBlockHeight(nodeParams.currentBlockHeight + 1)
awaitCond(bitcoinClient.hasRequest(parentTx2.txid), interval = 100 millis)
assert(!bitcoinClient.hasRequest(parentTx1.txid))
bitcoinClient.completeRequest(parentTx2.txid, Some(2))
probe.expectNoMessage(100 millis)
w2.replyTo ! WatchParentTxConfirmedTriggered(BlockHeight(1105), 0, parentTx2)
// Two more blocks are found: the first parent transaction now has enough confirmations.
monitor ! WrappedCurrentBlockHeight(nodeParams.currentBlockHeight + 3)
awaitCond(bitcoinClient.hasRequest(parentTx1.txid), interval = 100 millis)
assert(!bitcoinClient.hasRequest(parentTx2.txid))
bitcoinClient.completeRequest(parentTx1.txid, Some(3))
probe.expectMsg(TimeLocksOk())
}
@ -114,19 +157,18 @@ class TxTimeLocksMonitorSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik
)
monitor ! CheckTx(probe.ref, tx, "absolute-and-relative-delays")
// We set watches on parent txs only once the absolute delay is over.
watcher.expectNoMessage(100 millis)
// We watch parent txs only once the absolute delay is over.
monitor ! WrappedCurrentBlockHeight(nodeParams.currentBlockHeight + 2)
assert(!bitcoinClient.hasRequest(parentTx1.txid))
assert(!bitcoinClient.hasRequest(parentTx2.txid))
// When the absolute delay is over, we check parent transactions.
monitor ! WrappedCurrentBlockHeight(nodeParams.currentBlockHeight + 3)
val w1 = watcher.expectMsgType[WatchParentTxConfirmed]
val w2 = watcher.expectMsgType[WatchParentTxConfirmed]
watcher.expectNoMessage(100 millis)
assert(Seq(w1, w2).map(w => (w.txId, w.minDepth)).toSet == Set((parentTx1.txid, 3), (parentTx2.txid, 6)))
awaitCond(bitcoinClient.hasRequest(parentTx1.txid), interval = 100 millis)
bitcoinClient.completeRequest(parentTx1.txid, Some(5))
probe.expectNoMessage(100 millis)
w1.replyTo ! WatchParentTxConfirmedTriggered(BlockHeight(651), 0, parentTx1)
probe.expectNoMessage(100 millis)
w2.replyTo ! WatchParentTxConfirmedTriggered(BlockHeight(1105), 0, parentTx2)
awaitCond(bitcoinClient.hasRequest(parentTx2.txid), interval = 100 millis)
bitcoinClient.completeRequest(parentTx2.txid, Some(10))
probe.expectMsg(TimeLocksOk())
}

View File

@ -564,7 +564,11 @@ trait ChannelStateTestsBase extends Assertions with Eventually {
// we watch the confirmation of the "final" transactions that send funds to our wallets (main delayed output and 2nd stage htlc transactions)
assert(s2blockchain.expectMsgType[WatchTxConfirmed].txId == commitTx.txid)
localCommitPublished.claimMainDelayedOutputTx.foreach(claimMain => assert(s2blockchain.expectMsgType[WatchTxConfirmed].txId == claimMain.tx.txid))
localCommitPublished.claimMainDelayedOutputTx.foreach(claimMain => {
val watchConfirmed = s2blockchain.expectMsgType[WatchTxConfirmed]
assert(watchConfirmed.txId == claimMain.tx.txid)
assert(watchConfirmed.delay_opt.map(_.parentTxId).contains(publishedLocalCommitTx.txid))
})
// we watch outputs of the commitment tx that both parties may spend and anchor outputs
val watchedOutputIndexes = localCommitPublished.htlcTxs.keySet.map(_.index) ++ localCommitPublished.claimAnchorTxs.collect { case tx: ClaimLocalAnchorOutputTx => tx.input.outPoint.index }

View File

@ -21,7 +21,6 @@ import akka.testkit.{TestFSMRef, TestProbe}
import fr.acinq.bitcoin.ScriptFlags
import fr.acinq.bitcoin.scalacompat.Crypto.PrivateKey
import fr.acinq.bitcoin.scalacompat.{ByteVector32, ByteVector64, Crypto, OutPoint, SatoshiLong, Script, Transaction, TxIn, TxOut}
import fr.acinq.eclair.Features.StaticRemoteKey
import fr.acinq.eclair.TestUtils.randomTxId
import fr.acinq.eclair.blockchain.DummyOnChainWallet
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher._
@ -37,6 +36,7 @@ import fr.acinq.eclair.transactions.Transactions._
import fr.acinq.eclair.transactions.{Scripts, Transactions}
import fr.acinq.eclair.wire.protocol._
import fr.acinq.eclair.{BlockHeight, CltvExpiry, CltvExpiryDelta, Features, MilliSatoshiLong, TestConstants, TestKitBaseClass, TimestampSecond, randomBytes32, randomKey}
import org.scalatest.Inside.inside
import org.scalatest.funsuite.FixtureAnyFunSuiteLike
import org.scalatest.{Outcome, Tag}
import scodec.bits.ByteVector
@ -710,14 +710,12 @@ class ClosingStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with
// Alice receives the preimage for the incoming HTLC.
alice ! CMD_FULFILL_HTLC(incomingHtlc.id, preimage, commit = true)
assert(alice2blockchain.expectMsgType[PublishReplaceableTx].txInfo.isInstanceOf[ClaimLocalAnchorOutputTx])
assert(alice2blockchain.expectMsgType[PublishFinalTx].tx.txid == claimMainTx.txid)
assert(alice2blockchain.expectMsgType[PublishReplaceableTx].txInfo.isInstanceOf[HtlcTimeoutTx])
assert(alice2blockchain.expectMsgType[PublishReplaceableTx].txInfo.isInstanceOf[HtlcSuccessTx])
assert(alice2blockchain.expectMsgType[WatchTxConfirmed].txId == claimMainTx.txid)
alice2blockchain.expectMsgType[WatchOutputSpent]
alice2blockchain.expectMsgType[WatchOutputSpent]
alice2blockchain.expectMsgType[WatchOutputSpent]
alice2blockchain.expectNoMessage(100 millis)
val closingState2 = alice.stateData.asInstanceOf[DATA_CLOSING].localCommitPublished.get
assert(getHtlcSuccessTxs(closingState2).length == 1)
@ -726,11 +724,17 @@ class ClosingStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with
// The HTLC txs confirms, so we publish 3rd-stage txs.
alice ! WatchTxConfirmedTriggered(BlockHeight(201), 0, htlcTimeoutTx)
val claimHtlcTimeoutDelayedTx = alice2blockchain.expectMsgType[PublishFinalTx].tx
assert(alice2blockchain.expectMsgType[WatchTxConfirmed].txId == claimHtlcTimeoutDelayedTx.txid)
inside(alice2blockchain.expectMsgType[WatchTxConfirmed]) { w =>
assert(w.txId == claimHtlcTimeoutDelayedTx.txid)
assert(w.delay_opt.map(_.parentTxId).contains(htlcTimeoutTx.txid))
}
Transaction.correctlySpends(claimHtlcTimeoutDelayedTx, Seq(htlcTimeoutTx), ScriptFlags.STANDARD_SCRIPT_VERIFY_FLAGS)
alice ! WatchTxConfirmedTriggered(BlockHeight(201), 0, htlcSuccessTx)
val claimHtlcSuccessDelayedTx = alice2blockchain.expectMsgType[PublishFinalTx].tx
assert(alice2blockchain.expectMsgType[WatchTxConfirmed].txId == claimHtlcSuccessDelayedTx.txid)
inside(alice2blockchain.expectMsgType[WatchTxConfirmed]) { w =>
assert(w.txId == claimHtlcSuccessDelayedTx.txid)
assert(w.delay_opt.map(_.parentTxId).contains(htlcSuccessTx.txid))
}
Transaction.correctlySpends(claimHtlcSuccessDelayedTx, Seq(htlcSuccessTx), ScriptFlags.STANDARD_SCRIPT_VERIFY_FLAGS)
// We simulate a node restart after a feerate increase.
@ -742,14 +746,12 @@ class ClosingStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with
awaitCond(alice.stateName == CLOSING)
// We re-publish closing transactions.
assert(alice2blockchain.expectMsgType[PublishReplaceableTx].txInfo.isInstanceOf[ClaimLocalAnchorOutputTx])
assert(alice2blockchain.expectMsgType[PublishFinalTx].tx.txid == claimMainTx.txid)
assert(alice2blockchain.expectMsgType[PublishFinalTx].tx.txid == claimHtlcTimeoutDelayedTx.txid)
assert(alice2blockchain.expectMsgType[PublishFinalTx].tx.txid == claimHtlcSuccessDelayedTx.txid)
assert(alice2blockchain.expectMsgType[WatchTxConfirmed].txId == claimMainTx.txid)
assert(alice2blockchain.expectMsgType[WatchTxConfirmed].txId == claimHtlcTimeoutDelayedTx.txid)
assert(alice2blockchain.expectMsgType[WatchTxConfirmed].txId == claimHtlcSuccessDelayedTx.txid)
alice2blockchain.expectMsgType[WatchOutputSpent]
// We replay the HTLC fulfillment: nothing happens since we already published a 3rd-stage transaction.
alice ! CMD_FULFILL_HTLC(incomingHtlc.id, preimage, commit = true)

View File

@ -96,7 +96,7 @@ object MinimalNodeFixture extends Assertions with Eventually with IntegrationPat
val offerManager = system.spawn(OfferManager(nodeParams, router, 1 minute), "offer-manager")
val paymentHandler = system.actorOf(PaymentHandler.props(nodeParams, register, offerManager), "payment-handler")
val relayer = system.actorOf(Relayer.props(nodeParams, router, register, paymentHandler), "relayer")
val txPublisherFactory = Channel.SimpleTxPublisherFactory(nodeParams, watcherTyped, bitcoinClient)
val txPublisherFactory = Channel.SimpleTxPublisherFactory(nodeParams, bitcoinClient)
val channelFactory = Peer.SimpleChannelFactory(nodeParams, watcherTyped, relayer, wallet, txPublisherFactory)
val pendingChannelsRateLimiter = system.spawnAnonymous(Behaviors.supervise(PendingChannelsRateLimiter(nodeParams, router.toTyped, Seq())).onFailure(typed.SupervisorStrategy.resume))
val peerFactory = Switchboard.SimplePeerFactory(nodeParams, wallet, channelFactory, pendingChannelsRateLimiter, register, router.toTyped)