1
0
Fork 0
mirror of https://github.com/ACINQ/eclair.git synced 2025-02-21 22:11:46 +01:00

Improve startup resource usage (#2733)

* Move some logs to debug level

This should reduce the pressure on the file system and RAM without impacting
our ability to troubleshoot common issues.

* Avoid herd effect watching external channels

When we restart, we put watches on every public channel in the network.
That creates a lot of RPC calls to bitcoind, which aren't time-sensitive.
It's ok if we don't see immediately that an external channel was closed,
the spec even recommends waiting for 12 blocks to distinguish a channel
close from a splice.

By default, we now smooth that over a 1 hour period. This means we should
also allow our peers to be late at discovering that a channel was closed.
We thus stop sending a `warning` in that case and increase our tolerance
to that kind of behavior.

* Avoid herd effect watching local channels

When we restart, we set watches on our funding transactions. But we don't
actually need to watch them immediately, we just need enough time to react
to our peer broadcasting their commitment. We use long `cltv_delta` delays
to guarantee funds safety, so we can spread out the watches across several
blocks to reduce the start-up load. It essentially is the same thing as
receiving mempool transactions or blocks after a delay, which is something
that our threat model already takes into account.
This commit is contained in:
Bastien Teinturier 2023-09-26 16:31:17 +02:00 committed by GitHub
parent d274fc1939
commit e3ba524306
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 93 additions and 64 deletions

View file

@ -135,6 +135,7 @@ eclair {
// expiry-delta-blocks.
fulfill-safety-before-timeout-blocks = 24
min-final-expiry-delta-blocks = 30 // Bolt 11 invoice's min_final_cltv_expiry; must be strictly greater than fulfill-safety-before-timeout-blocks
max-restart-watch-delay = 60 seconds // we add a random delay before watching funding transactions after restart
max-block-processing-delay = 30 seconds // we add a random delay before processing blocks, capped at this value, to prevent herd effect
max-tx-publish-retry-delay = 60 seconds // we add a random delay before retrying failed transaction publication
// When a channel has been spent while we were offline, we limit how many blocks in the past we scan, otherwise we
@ -292,7 +293,7 @@ eclair {
autoprobe-count = 0 // number of parallel tasks that send test payments to detect invalid channels
router {
watch-spent-window = 1 minute // at startup watches will be put back within that window to reduce herd effect; must be > 0s
watch-spent-window = 60 minutes // at startup watches on public channels will be put back within that window to reduce herd effect; must be > 0s
channel-exclude-duration = 60 seconds // when a temporary channel failure is returned, we exclude the channel from our payment routes for this duration
broadcast-interval = 60 seconds // see BOLT #7

View file

@ -508,6 +508,7 @@ object NodeParams extends Logging {
maxExpiryDelta = maxExpiryDelta,
fulfillSafetyBeforeTimeout = fulfillSafetyBeforeTimeout,
minFinalExpiryDelta = minFinalExpiryDelta,
maxRestartWatchDelay = FiniteDuration(config.getDuration("channel.max-restart-watch-delay").getSeconds, TimeUnit.SECONDS),
maxBlockProcessingDelay = FiniteDuration(config.getDuration("channel.max-block-processing-delay").getSeconds, TimeUnit.SECONDS),
maxTxPublishRetryDelay = FiniteDuration(config.getDuration("channel.max-tx-publish-retry-delay").getSeconds, TimeUnit.SECONDS),
maxChannelSpentRescanBlocks = config.getInt("channel.max-channel-spent-rescan-blocks"),

View file

@ -82,6 +82,7 @@ object Channel {
maxExpiryDelta: CltvExpiryDelta,
fulfillSafetyBeforeTimeout: CltvExpiryDelta,
minFinalExpiryDelta: CltvExpiryDelta,
maxRestartWatchDelay: FiniteDuration,
maxBlockProcessingDelay: FiniteDuration,
maxTxPublishRetryDelay: FiniteDuration,
maxChannelSpentRescanBlocks: Int,
@ -252,8 +253,13 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
context.system.eventStream.publish(ChannelRestored(self, data.channelId, peer, remoteNodeId, data))
txPublisher ! SetChannelId(remoteNodeId, data.channelId)
// we watch all unconfirmed funding txs, whatever our state is
// (there can be multiple funding txs due to rbf, and they can be unconfirmed in any state due to zero-conf)
// We watch all unconfirmed funding txs, whatever our state is.
// There can be multiple funding txs due to rbf, and they can be unconfirmed in any state due to zero-conf.
// To avoid a herd effect on restart, we add a delay before watching funding txs.
val herdDelay_opt = nodeParams.channelConf.maxRestartWatchDelay.toSeconds match {
case maxRestartWatchDelay if maxRestartWatchDelay > 0 => Some((1 + Random.nextLong(maxRestartWatchDelay)).seconds)
case _ => None
}
data match {
case _: ChannelDataWithoutCommitments => ()
case data: ChannelDataWithCommitments => data.commitments.all.foreach { commitment =>
@ -272,14 +278,14 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
// once, because at the next restore, the status of the funding tx will be "confirmed".
()
}
watchFundingConfirmed(commitment.fundingTxId, Some(singleFundingMinDepth(data)))
watchFundingConfirmed(commitment.fundingTxId, Some(singleFundingMinDepth(data)), herdDelay_opt)
case fundingTx: LocalFundingStatus.DualFundedUnconfirmedFundingTx =>
publishFundingTx(fundingTx)
val minDepth_opt = data.commitments.params.minDepthDualFunding(nodeParams.channelConf.minDepthBlocks, fundingTx.sharedTx.tx)
watchFundingConfirmed(fundingTx.sharedTx.txId, minDepth_opt)
watchFundingConfirmed(fundingTx.sharedTx.txId, minDepth_opt, herdDelay_opt)
case fundingTx: LocalFundingStatus.ZeroconfPublishedFundingTx =>
// those are zero-conf channels, the min-depth isn't critical, we use the default
watchFundingConfirmed(fundingTx.tx.txid, Some(nodeParams.channelConf.minDepthBlocks.toLong))
watchFundingConfirmed(fundingTx.tx.txid, Some(nodeParams.channelConf.minDepthBlocks.toLong), herdDelay_opt)
case _: LocalFundingStatus.ConfirmedFundingTx =>
data match {
case closing: DATA_CLOSING if Closing.nothingAtStake(closing) || Closing.isClosingTypeAlreadyKnown(closing).isDefined =>
@ -287,11 +293,11 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
()
case closing: DATA_CLOSING =>
// in all other cases we need to be ready for any type of closing
watchFundingSpent(commitment, closing.spendingTxs.map(_.txid).toSet)
watchFundingSpent(commitment, closing.spendingTxs.map(_.txid).toSet, herdDelay_opt)
case _ =>
// Children splice transactions may already spend that confirmed funding transaction.
val spliceSpendingTxs = data.commitments.all.collect { case c if c.fundingTxIndex == commitment.fundingTxIndex + 1 => c.fundingTxId }
watchFundingSpent(commitment, additionalKnownSpendingTxs = spliceSpendingTxs.toSet)
watchFundingSpent(commitment, additionalKnownSpendingTxs = spliceSpendingTxs.toSet, herdDelay_opt)
}
}
}
@ -342,7 +348,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
if (fees.feeBase != normal.channelUpdate.feeBaseMsat ||
fees.feeProportionalMillionths != normal.channelUpdate.feeProportionalMillionths ||
nodeParams.channelConf.expiryDelta != normal.channelUpdate.cltvExpiryDelta) {
log.info("refreshing channel_update due to configuration changes")
log.debug("refreshing channel_update due to configuration changes")
self ! CMD_UPDATE_RELAY_FEE(ActorRef.noSender, fees.feeBase, fees.feeProportionalMillionths, Some(nodeParams.channelConf.expiryDelta))
}
// we need to periodically re-send channel updates, otherwise channel will be considered stale and get pruned by network
@ -567,7 +573,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
// That's why we move on immediately to the next step, and will update our unsigned funding tx when we
// receive their tx_sigs.
val minDepth_opt = d.commitments.params.minDepthDualFunding(nodeParams.channelConf.minDepthBlocks, signingSession1.fundingTx.sharedTx.tx)
watchFundingConfirmed(signingSession.fundingTx.txId, minDepth_opt)
watchFundingConfirmed(signingSession.fundingTx.txId, minDepth_opt, delay_opt = None)
val commitments1 = d.commitments.add(signingSession1.commitment)
val d1 = d.copy(commitments = commitments1, spliceStatus = SpliceStatus.NoSplice)
stay() using d1 storing() sending signingSession1.localSigs calling endQuiescence(d1)
@ -807,7 +813,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
case Event(c: CMD_UPDATE_RELAY_FEE, d: DATA_NORMAL) =>
val channelUpdate1 = Announcements.makeChannelUpdate(nodeParams.chainHash, nodeParams.privateKey, remoteNodeId, scidForChannelUpdate(d), c.cltvExpiryDelta_opt.getOrElse(d.channelUpdate.cltvExpiryDelta), d.channelUpdate.htlcMinimumMsat, c.feeBase, c.feeProportionalMillionths, d.commitments.params.maxHtlcAmount, isPrivate = !d.commitments.announceChannel, enable = Helpers.aboveReserve(d.commitments))
log.info(s"updating relay fees: prev={} next={}", d.channelUpdate.toStringShort, channelUpdate1.toStringShort)
log.debug(s"updating relay fees: prev={} next={}", d.channelUpdate.toStringShort, channelUpdate1.toStringShort)
val replyTo = if (c.replyTo == ActorRef.noSender) sender() else c.replyTo
replyTo ! RES_SUCCESS(c, d.channelId)
// we use goto() instead of stay() because we want to fire transitions
@ -1081,7 +1087,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
stay() using d.copy(spliceStatus = SpliceStatus.SpliceAborted) sending TxAbort(d.channelId, f.getMessage)
case Right(signingSession1) =>
val minDepth_opt = d.commitments.params.minDepthDualFunding(nodeParams.channelConf.minDepthBlocks, signingSession1.fundingTx.sharedTx.tx)
watchFundingConfirmed(signingSession.fundingTx.txId, minDepth_opt)
watchFundingConfirmed(signingSession.fundingTx.txId, minDepth_opt, delay_opt = None)
val commitments1 = d.commitments.add(signingSession1.commitment)
val d1 = d.copy(commitments = commitments1, spliceStatus = SpliceStatus.NoSplice)
log.info("publishing funding tx for channelId={} fundingTxId={}", d.channelId, signingSession1.fundingTx.sharedTx.txId)
@ -1098,7 +1104,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
val fundingStatus = LocalFundingStatus.ZeroconfPublishedFundingTx(w.tx, d.commitments.localFundingSigs(w.tx.txid))
d.commitments.updateLocalFundingStatus(w.tx.txid, fundingStatus) match {
case Right((commitments1, _)) =>
watchFundingConfirmed(w.tx.txid, Some(nodeParams.channelConf.minDepthBlocks))
watchFundingConfirmed(w.tx.txid, Some(nodeParams.channelConf.minDepthBlocks), delay_opt = None)
maybeEmitEventsPostSplice(d.shortIds, d.commitments, commitments1)
stay() using d.copy(commitments = commitments1) storing() sending SpliceLocked(d.channelId, w.tx.hash)
case Left(_) => stay()
@ -2192,7 +2198,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
d.commitments.updateLocalFundingStatus(w.tx.txid, fundingStatus) match {
case Right((commitments1, _)) =>
log.info(s"zero-conf funding txid=${w.tx.txid} has been published")
watchFundingConfirmed(w.tx.txid, Some(nodeParams.channelConf.minDepthBlocks))
watchFundingConfirmed(w.tx.txid, Some(nodeParams.channelConf.minDepthBlocks), delay_opt = None)
val d1 = d match {
// NB: we discard remote's stashed channel_ready, they will send it back at reconnection
case d: DATA_WAIT_FOR_FUNDING_CONFIRMED =>
@ -2525,7 +2531,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
// we will only emit a new channel_update with the disable flag set if someone tries to use that channel
if (d.channelUpdate.channelFlags.isEnabled) {
// if the channel isn't disabled we generate a new channel_update
log.info("updating channel_update announcement (reason=disabled)")
log.debug("updating channel_update announcement (reason=disabled)")
val channelUpdate1 = Announcements.makeChannelUpdate(nodeParams.chainHash, nodeParams.privateKey, remoteNodeId, scidForChannelUpdate(d), d.channelUpdate.cltvExpiryDelta, d.channelUpdate.htlcMinimumMsat, d.channelUpdate.feeBaseMsat, d.channelUpdate.feeProportionalMillionths, d.commitments.params.maxHtlcAmount, isPrivate = !d.commitments.announceChannel, enable = false)
// then we update the state and replay the request
self forward c
@ -2540,7 +2546,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
private def handleUpdateRelayFeeDisconnected(c: CMD_UPDATE_RELAY_FEE, d: DATA_NORMAL) = {
val channelUpdate1 = Announcements.makeChannelUpdate(nodeParams.chainHash, nodeParams.privateKey, remoteNodeId, scidForChannelUpdate(d), c.cltvExpiryDelta_opt.getOrElse(d.channelUpdate.cltvExpiryDelta), d.channelUpdate.htlcMinimumMsat, c.feeBase, c.feeProportionalMillionths, d.commitments.params.maxHtlcAmount, isPrivate = !d.commitments.announceChannel, enable = false)
log.info(s"updating relay fees: prev={} next={}", d.channelUpdate.toStringShort, channelUpdate1.toStringShort)
log.debug(s"updating relay fees: prev={} next={}", d.channelUpdate.toStringShort, channelUpdate1.toStringShort)
val replyTo = if (c.replyTo == ActorRef.noSender) sender() else c.replyTo
replyTo ! RES_SUCCESS(c, d.channelId)
// We're in OFFLINE state, by using stay() instead of goto() we skip the transition handler and won't broadcast the

View file

@ -380,7 +380,7 @@ trait ChannelOpenDualFunded extends DualFundingHandlers with ErrorHandlers {
// That's why we move on immediately to the next step, and will update our unsigned funding tx when we
// receive their tx_sigs.
val minDepth_opt = d.channelParams.minDepthDualFunding(nodeParams.channelConf.minDepthBlocks, signingSession1.fundingTx.sharedTx.tx)
watchFundingConfirmed(d.signingSession.fundingTx.txId, minDepth_opt)
watchFundingConfirmed(d.signingSession.fundingTx.txId, minDepth_opt, delay_opt = None)
val commitments = Commitments(
params = d.channelParams,
changes = CommitmentChanges.init(),
@ -403,7 +403,7 @@ trait ChannelOpenDualFunded extends DualFundingHandlers with ErrorHandlers {
goto(CLOSED) sending Error(d.channelId, f.getMessage)
case Right(signingSession) =>
val minDepth_opt = d.channelParams.minDepthDualFunding(nodeParams.channelConf.minDepthBlocks, signingSession.fundingTx.sharedTx.tx)
watchFundingConfirmed(d.signingSession.fundingTx.txId, minDepth_opt)
watchFundingConfirmed(d.signingSession.fundingTx.txId, minDepth_opt, delay_opt = None)
val commitments = Commitments(
params = d.channelParams,
changes = CommitmentChanges.init(),
@ -468,7 +468,7 @@ trait ChannelOpenDualFunded extends DualFundingHandlers with ErrorHandlers {
stay() using d.copy(rbfStatus = RbfStatus.RbfAborted) sending TxAbort(d.channelId, f.getMessage)
case Right(signingSession1) =>
val minDepth_opt = d.commitments.params.minDepthDualFunding(nodeParams.channelConf.minDepthBlocks, signingSession1.fundingTx.sharedTx.tx)
watchFundingConfirmed(signingSession.fundingTx.txId, minDepth_opt)
watchFundingConfirmed(signingSession.fundingTx.txId, minDepth_opt, delay_opt = None)
val commitments1 = d.commitments.add(signingSession1.commitment)
val d1 = DATA_WAIT_FOR_DUAL_FUNDING_CONFIRMED(commitments1, d.localPushAmount, d.remotePushAmount, d.waitingSince, d.lastChecked, RbfStatus.NoRbf, d.deferred)
stay() using d1 storing() sending signingSession1.localSigs calling publishFundingTx(signingSession1.fundingTx)
@ -615,7 +615,7 @@ trait ChannelOpenDualFunded extends DualFundingHandlers with ErrorHandlers {
stay() using d.copy(rbfStatus = RbfStatus.RbfWaitingForSigs(signingSession1))
case signingSession1: InteractiveTxSigningSession.SendingSigs =>
val minDepth_opt = d.commitments.params.minDepthDualFunding(nodeParams.channelConf.minDepthBlocks, signingSession1.fundingTx.sharedTx.tx)
watchFundingConfirmed(signingSession.fundingTx.txId, minDepth_opt)
watchFundingConfirmed(signingSession.fundingTx.txId, minDepth_opt, delay_opt = None)
val commitments1 = d.commitments.add(signingSession1.commitment)
val d1 = DATA_WAIT_FOR_DUAL_FUNDING_CONFIRMED(commitments1, d.localPushAmount, d.remotePushAmount, d.waitingSince, d.lastChecked, RbfStatus.NoRbf, d.deferred)
stay() using d1 storing() sending signingSession1.localSigs
@ -677,7 +677,7 @@ trait ChannelOpenDualFunded extends DualFundingHandlers with ErrorHandlers {
d.commitments.updateLocalFundingStatus(w.tx.txid, fundingStatus) match {
case Right((commitments1, _)) =>
// we still watch the funding tx for confirmation even if we can use the zero-conf channel right away
watchFundingConfirmed(w.tx.txid, Some(nodeParams.channelConf.minDepthBlocks))
watchFundingConfirmed(w.tx.txid, Some(nodeParams.channelConf.minDepthBlocks), delay_opt = None)
val realScidStatus = RealScidStatus.Unknown
val shortIds = createShortIds(d.channelId, realScidStatus)
val channelReady = createChannelReady(shortIds, d.commitments.params)

View file

@ -296,7 +296,7 @@ trait ChannelOpenSingleFunded extends SingleFundingHandlers with ErrorHandlers {
context.system.eventStream.publish(ChannelSignatureReceived(self, commitments))
// NB: we don't send a ChannelSignatureSent for the first commit
log.info(s"waiting for them to publish the funding tx for channelId=$channelId fundingTxid=${commitment.fundingTxId}")
watchFundingConfirmed(commitment.fundingTxId, params.minDepthFundee(nodeParams.channelConf.minDepthBlocks, fundingAmount))
watchFundingConfirmed(commitment.fundingTxId, params.minDepthFundee(nodeParams.channelConf.minDepthBlocks, fundingAmount), delay_opt = None)
goto(WAIT_FOR_FUNDING_CONFIRMED) using DATA_WAIT_FOR_FUNDING_CONFIRMED(commitments, nodeParams.currentBlockHeight, None, Right(fundingSigned)) storing() sending fundingSigned
}
}
@ -340,7 +340,7 @@ trait ChannelOpenSingleFunded extends SingleFundingHandlers with ErrorHandlers {
val blockHeight = nodeParams.currentBlockHeight
context.system.eventStream.publish(ChannelSignatureReceived(self, commitments))
log.info(s"publishing funding tx fundingTxid=${commitment.fundingTxId}")
watchFundingConfirmed(commitment.fundingTxId, params.minDepthFunder)
watchFundingConfirmed(commitment.fundingTxId, params.minDepthFunder, delay_opt = None)
// we will publish the funding tx only after the channel state has been written to disk because we want to
// make sure we first persist the commitment that returns back the funds to us in case of problem
goto(WAIT_FOR_FUNDING_CONFIRMED) using DATA_WAIT_FOR_FUNDING_CONFIRMED(commitments, blockHeight, None, Left(fundingCreated)) storing() calling publishFundingTx(d.channelId, fundingTx, fundingTxFee, d.replyTo)
@ -394,7 +394,7 @@ trait ChannelOpenSingleFunded extends SingleFundingHandlers with ErrorHandlers {
case Right((commitments1, _)) =>
log.info("funding txid={} was successfully published for zero-conf channelId={}", w.tx.txid, d.channelId)
// we still watch the funding tx for confirmation even if we can use the zero-conf channel right away
watchFundingConfirmed(w.tx.txid, Some(nodeParams.channelConf.minDepthBlocks))
watchFundingConfirmed(w.tx.txid, Some(nodeParams.channelConf.minDepthBlocks), delay_opt = None)
val realScidStatus = RealScidStatus.Unknown
val shortIds = createShortIds(d.channelId, realScidStatus)
val channelReady = createChannelReady(shortIds, d.commitments.params)

View file

@ -16,7 +16,7 @@
package fr.acinq.eclair.channel.fsm
import akka.actor.typed.scaladsl.adapter.actorRefAdapter
import akka.actor.typed.scaladsl.adapter.{TypedActorRefOps, actorRefAdapter}
import com.softwaremill.quicklens.ModifyPimp
import fr.acinq.bitcoin.ScriptFlags
import fr.acinq.bitcoin.scalacompat.{ByteVector32, Transaction}
@ -29,7 +29,7 @@ import fr.acinq.eclair.channel.fsm.Channel.{ANNOUNCEMENTS_MINCONF, BroadcastChan
import fr.acinq.eclair.router.Announcements
import fr.acinq.eclair.wire.protocol.{AnnouncementSignatures, ChannelReady, ChannelReadyTlv, TlvStream}
import scala.concurrent.duration.DurationInt
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.util.{Failure, Success, Try}
/**
@ -40,17 +40,31 @@ trait CommonFundingHandlers extends CommonHandlers {
this: Channel =>
def watchFundingSpent(commitment: Commitment, additionalKnownSpendingTxs: Set[ByteVector32] = Set.empty): Unit = {
/**
* @param delay_opt optional delay to reduce herd effect at startup.
*/
def watchFundingSpent(commitment: Commitment, additionalKnownSpendingTxs: Set[ByteVector32], delay_opt: Option[FiniteDuration]): Unit = {
val knownSpendingTxs = Set(commitment.localCommit.commitTxAndRemoteSig.commitTx.tx.txid, commitment.remoteCommit.txid) ++ commitment.nextRemoteCommit_opt.map(_.commit.txid).toSet ++ additionalKnownSpendingTxs
blockchain ! WatchFundingSpent(self, commitment.commitInput.outPoint.txid, commitment.commitInput.outPoint.index.toInt, knownSpendingTxs)
val watch = WatchFundingSpent(self, commitment.commitInput.outPoint.txid, commitment.commitInput.outPoint.index.toInt, knownSpendingTxs)
delay_opt match {
case Some(delay) => context.system.scheduler.scheduleOnce(delay, blockchain.toClassic, watch)
case None => blockchain ! watch
}
}
def watchFundingConfirmed(fundingTxId: ByteVector32, minDepth_opt: Option[Long]): Unit = {
minDepth_opt match {
case Some(fundingMinDepth) => blockchain ! WatchFundingConfirmed(self, fundingTxId, fundingMinDepth)
/**
* @param delay_opt optional delay to reduce herd effect at startup.
*/
def watchFundingConfirmed(fundingTxId: ByteVector32, minDepth_opt: Option[Long], delay_opt: Option[FiniteDuration]): Unit = {
val watch = minDepth_opt match {
case Some(fundingMinDepth) => WatchFundingConfirmed(self, fundingTxId, fundingMinDepth)
// When using 0-conf, we make sure that the transaction was successfully published, otherwise there is a risk
// of accidentally double-spending it later (e.g. restarting bitcoind would remove the utxo locks).
case None => blockchain ! WatchPublished(self, fundingTxId)
case None => WatchPublished(self, fundingTxId)
}
delay_opt match {
case Some(delay) => context.system.scheduler.scheduleOnce(delay, blockchain.toClassic, watch)
case None => blockchain ! watch
}
}
@ -75,7 +89,7 @@ trait CommonFundingHandlers extends CommonHandlers {
// First of all, we watch the funding tx that is now confirmed.
// Children splice transactions may already spend that confirmed funding transaction.
val spliceSpendingTxs = commitments1.all.collect { case c if c.fundingTxIndex == commitment.fundingTxIndex + 1 => c.fundingTxId }
watchFundingSpent(commitment, additionalKnownSpendingTxs = spliceSpendingTxs.toSet)
watchFundingSpent(commitment, additionalKnownSpendingTxs = spliceSpendingTxs.toSet, None)
// in the dual-funding case we can forget all other transactions, they have been double spent by the tx that just confirmed
rollbackDualFundingTxs(d.commitments.active // note how we use the unpruned original commitments
.filter(c => c.fundingTxIndex == commitment.fundingTxIndex && c.fundingTxId != commitment.fundingTxId)

View file

@ -156,7 +156,7 @@ trait ErrorHandlers extends CommonHandlers {
private def publishIfNeeded(txs: Iterable[PublishTx], irrevocablySpent: Map[OutPoint, Transaction]): Unit = {
val (skip, process) = txs.partition(publishTx => Closing.inputAlreadySpent(publishTx.input, irrevocablySpent))
process.foreach { publishTx => txPublisher ! publishTx }
skip.foreach(publishTx => log.info("no need to republish tx spending {}:{}, it has already been confirmed", publishTx.input.txid, publishTx.input.index))
skip.foreach(publishTx => log.debug("no need to republish tx spending {}:{}, it has already been confirmed", publishTx.input.txid, publishTx.input.index))
}
/**
@ -165,7 +165,7 @@ trait ErrorHandlers extends CommonHandlers {
private def watchConfirmedIfNeeded(txs: Iterable[Transaction], irrevocablySpent: Map[OutPoint, Transaction]): Unit = {
val (skip, process) = txs.partition(Closing.inputsAlreadySpent(_, irrevocablySpent))
process.foreach(tx => blockchain ! WatchTxConfirmed(self, tx.txid, nodeParams.channelConf.minDepthBlocks))
skip.foreach(tx => log.info(s"no need to watch txid=${tx.txid}, it has already been confirmed"))
skip.foreach(tx => log.debug(s"no need to watch txid=${tx.txid}, it has already been confirmed"))
}
/**
@ -180,7 +180,7 @@ trait ErrorHandlers extends CommonHandlers {
}
val (skip, process) = outputs.partition(irrevocablySpent.contains)
process.foreach(output => blockchain ! WatchOutputSpent(self, parentTx.txid, output.index.toInt, Set.empty))
skip.foreach(output => log.info(s"no need to watch output=${output.txid}:${output.index}, it has already been spent by txid=${irrevocablySpent.get(output).map(_.txid)}"))
skip.foreach(output => log.debug(s"no need to watch output=${output.txid}:${output.index}, it has already been spent by txid=${irrevocablySpent.get(output).map(_.txid)}"))
}
def spendLocalCurrent(d: ChannelDataWithCommitments) = {

View file

@ -78,7 +78,7 @@ trait SingleFundingHandlers extends CommonFundingHandlers {
case Some(fundingTx) =>
// if we are funder, we never give up
// we cannot correctly set the fee, but it was correctly set when we initially published the transaction
log.info(s"republishing the funding tx...")
log.debug(s"republishing the funding tx...")
txPublisher ! PublishFinalTx(fundingTx, fundingTx.txIn.head.outPoint, "funding", 0 sat, None)
// we also check if the funding tx has been double-spent
checkDoubleSpent(fundingTx)

View file

@ -153,7 +153,7 @@ private class MempoolTxMonitor(nodeParams: NodeParams,
}
Behaviors.same
} else if (confirmations < nodeParams.channelConf.minDepthBlocks) {
log.info("txid={} has {} confirmations, waiting to reach min depth", cmd.tx.txid, confirmations)
log.debug("txid={} has {} confirmations, waiting to reach min depth", cmd.tx.txid, confirmations)
cmd.replyTo ! TxRecentlyConfirmed(cmd.tx.txid, confirmations)
Behaviors.same
} else {

View file

@ -252,7 +252,7 @@ private class ReplaceableTxFunder(nodeParams: NodeParams,
case htlcTx: HtlcWithWitnessData =>
val htlcFeerate = cmd.commitment.localCommit.spec.htlcTxFeerate(cmd.commitment.params.commitmentFormat)
if (targetFeerate <= htlcFeerate) {
log.info("publishing {} without adding inputs: txid={}", cmd.desc, htlcTx.txInfo.tx.txid)
log.debug("publishing {} without adding inputs: txid={}", cmd.desc, htlcTx.txInfo.tx.txid)
sign(txWithWitnessData, htlcFeerate, htlcTx.txInfo.amountIn)
} else {
addWalletInputs(htlcTx, targetFeerate)

View file

@ -106,7 +106,7 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A
case Event(InitializeConnection(peer, chainHash, localFeatures, doSync), d: BeforeInitData) =>
d.transport ! TransportHandler.Listener(self)
Metrics.PeerConnectionsConnecting.withTag(Tags.ConnectionState, Tags.ConnectionStates.Initializing).increment()
log.info(s"using features=$localFeatures")
log.debug(s"using features=$localFeatures")
val localInit = d.pendingAuth.address match {
case remoteAddress if !d.pendingAuth.outgoing && conf.sendRemoteAddressInit && NodeAddress.isPublicIPAddress(remoteAddress) => protocol.Init(localFeatures, TlvStream(InitTlv.Networks(chainHash :: Nil), InitTlv.RemoteAddress(remoteAddress)))
case _ => protocol.Init(localFeatures, TlvStream(InitTlv.Networks(chainHash :: Nil)))
@ -161,7 +161,7 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A
// we will delay all rebroadcasts with this value in order to prevent herd effects (each peer has a different delay)
val rebroadcastDelay = Random.nextInt(conf.maxRebroadcastDelay.toSeconds.toInt).seconds
log.info(s"rebroadcast will be delayed by $rebroadcastDelay")
log.debug(s"rebroadcast will be delayed by $rebroadcastDelay")
context.system.eventStream.subscribe(self, classOf[Rebroadcast])
goto(CONNECTED) using ConnectedData(d.chainHash, d.remoteNodeId, d.transport, d.peer, d.localInit, remoteInit, rebroadcastDelay, isPersistent = d.isPersistent)
@ -370,6 +370,10 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A
} else if (d.behavior.fundingTxAlreadySpentCount < MAX_FUNDING_TX_ALREADY_SPENT) {
d.behavior.copy(fundingTxAlreadySpentCount = d.behavior.fundingTxAlreadySpentCount + 1)
} else {
// Our peer isn't necessarily malicious: their bitcoind node may be late, or they restarted and have not
// yet received notifications for the recently closed channels. There may also be splicing attempts that
// are being confirmed and look like closed channels, but actually aren't.
// But we still need to protect ourselves against potentially malicious peers and ignore them.
log.warning(s"peer sent us too many channel announcements with funding tx already spent (count=${d.behavior.fundingTxAlreadySpentCount + 1}), ignoring network announcements for $IGNORE_NETWORK_ANNOUNCEMENTS_PERIOD")
d.transport ! Warning("too many channel announcements with funding tx already spent, please check your bitcoin node")
startSingleTimer(ResumeAnnouncements.toString, ResumeAnnouncements, IGNORE_NETWORK_ANNOUNCEMENTS_PERIOD)
@ -529,10 +533,7 @@ object PeerConnection {
// @formatter:on
val IGNORE_NETWORK_ANNOUNCEMENTS_PERIOD: FiniteDuration = 5 minutes
// @formatter:off
val MAX_FUNDING_TX_ALREADY_SPENT = 10
// @formatter:on
val MAX_FUNDING_TX_ALREADY_SPENT = 250
def props(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: ActorRef, router: ActorRef): Props =
Props(new PeerConnection(keyPair, conf, switchboard, router))

View file

@ -146,7 +146,7 @@ class ReconnectionTask(nodeParams: NodeParams, remoteNodeId: PublicKey) extends
lazy val mediator = DistributedPubSub(context.system).mediator
private def connect(address: NodeAddress, origin: ActorRef, isPersistent: Boolean): Unit = {
log.info(s"connecting to $address")
log.debug(s"connecting to $address")
val req = ClientSpawner.ConnectionRequest(remoteNodeId, address, origin, isPersistent)
if (context.system.hasExtension(Cluster)) {
mediator ! Send(path = "/user/client-spawner", msg = req, localAffinity = false)

View file

@ -118,16 +118,16 @@ class ChannelRelay private(nodeParams: NodeParams,
Behaviors.receiveMessagePartial {
case DoRelay =>
if (previousFailures.isEmpty) {
context.log.info(s"relaying htlc #${r.add.id} from channelId={} to requestedShortChannelId={} nextNode={}", r.add.channelId, r.payload.outgoingChannelId, nextNodeId_opt.getOrElse(""))
context.log.info("relaying htlc #{} from channelId={} to requestedShortChannelId={} nextNode={}", r.add.id, r.add.channelId, r.payload.outgoingChannelId, nextNodeId_opt.getOrElse(""))
}
context.log.debug("attempting relay previousAttempts={}", previousFailures.size)
handleRelay(previousFailures) match {
case RelayFailure(cmdFail) =>
Metrics.recordPaymentRelayFailed(Tags.FailureType(cmdFail), Tags.RelayType.Channel)
context.log.info(s"rejecting htlc reason=${cmdFail.reason}")
context.log.info("rejecting htlc reason={}", cmdFail.reason)
safeSendAndStop(r.add.channelId, cmdFail)
case RelaySuccess(selectedChannelId, cmdAdd) =>
context.log.info(s"forwarding htlc to channelId=$selectedChannelId")
context.log.info("forwarding htlc #{} from channelId={} to channelId={}", r.add.id, r.add.channelId, selectedChannelId)
register ! Register.Forward(forwardFailureAdapter, selectedChannelId, cmdAdd)
waitForAddResponse(selectedChannelId, previousFailures)
}

View file

@ -204,8 +204,8 @@ object RouteCalculation {
val routesToFind = if (r.routeParams.randomize) DEFAULT_ROUTES_COUNT else 1
log.info(s"finding routes ${r.source}->$targetNodeId with assistedChannels={} ignoreNodes={} ignoreChannels={} excludedChannels={}", extraEdges.map(_.desc.shortChannelId).mkString(","), r.ignore.nodes.map(_.value).mkString(","), r.ignore.channels.mkString(","), d.excludedChannels.mkString(","))
log.info("finding routes with params={}, multiPart={}", r.routeParams, r.allowMultiPart)
log.info("local channels to target node: {}", d.graphWithBalances.graph.getEdgesBetween(r.source, targetNodeId).map(e => s"${e.desc.shortChannelId} (${e.balance_opt}/${e.capacity})").mkString(", "))
log.debug("finding routes with params={}, multiPart={}", r.routeParams, r.allowMultiPart)
log.debug("local channels to target node: {}", d.graphWithBalances.graph.getEdgesBetween(r.source, targetNodeId).map(e => s"${e.desc.shortChannelId} (${e.balance_opt}/${e.capacity})").mkString(", "))
val tags = TagSet.Empty.withTag(Tags.MultiPart, r.allowMultiPart).withTag(Tags.Amount, Tags.amountBucket(amountToSend))
KamonExt.time(Metrics.FindRouteDuration.withTags(tags.withTag(Tags.NumberOfRoutes, routesToFind.toLong))) {
val result = if (r.allowMultiPart) {

View file

@ -102,7 +102,7 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm
val nodeAnn = Announcements.makeNodeAnnouncement(nodeParams.privateKey, nodeParams.alias, nodeParams.color, nodeParams.publicAddresses, nodeParams.features.nodeAnnouncementFeatures())
self ! nodeAnn
log.info(s"initialization completed, ready to process messages")
log.info("initialization completed, ready to process messages")
Try(initialized.map(_.success(Done)))
val data = Data(
nodes.map(n => n.nodeId -> n).toMap, channels, pruned,

View file

@ -51,7 +51,7 @@ object Sync {
if (s.replacePrevious || !d.sync.contains(s.remoteNodeId)) {
// ask for everything
val query = QueryChannelRange(s.chainHash, firstBlock = BlockHeight(0), numberOfBlocks = Int.MaxValue.toLong, TlvStream(s.flags_opt.toSet))
log.info("sending query_channel_range={}", query)
log.debug("sending query_channel_range={}", query)
s.to ! query
// we also set a pass-all filter for now (we can update it later) for the future gossip messages, by setting
@ -94,7 +94,7 @@ object Sync {
d.sync.get(origin.nodeId) match {
case None =>
log.info("received unsolicited reply_channel_range with {} channels", r.shortChannelIds.array.size)
log.debug("received unsolicited reply_channel_range with {} channels", r.shortChannelIds.array.size)
d // we didn't request a sync from this node, ignore
case Some(currentSync) if currentSync.remainingQueries.isEmpty && r.shortChannelIds.array.isEmpty =>
// NB: this case deals with peers who don't return any sync data. We're currently not correctly detecting the end
@ -127,7 +127,7 @@ object Sync {
val u1 = u + (if (QueryShortChannelIdsTlv.QueryFlagType.includeUpdate1(flag)) 1 else 0) + (if (QueryShortChannelIdsTlv.QueryFlagType.includeUpdate2(flag)) 1 else 0)
(c1, u1)
}
log.info(s"received reply_channel_range with {} channels, we're missing {} channel announcements and {} updates, format={}", r.shortChannelIds.array.size, channelCount, updatesCount, r.shortChannelIds.encoding)
log.info("received reply_channel_range with {} channels, we're missing {} channel announcements and {} updates, format={}", r.shortChannelIds.array.size, channelCount, updatesCount, r.shortChannelIds.encoding)
Metrics.ReplyChannelRange.NewChannelAnnouncements.withoutTags().record(channelCount)
Metrics.ReplyChannelRange.NewChannelUpdates.withoutTags().record(updatesCount)
@ -186,7 +186,7 @@ object Sync {
Metrics.QueryShortChannelIds.Nodes.withoutTags().record(nodeCount)
Metrics.QueryShortChannelIds.ChannelAnnouncements.withoutTags().record(channelCount)
Metrics.QueryShortChannelIds.ChannelUpdates.withoutTags().record(updateCount)
log.info("received query_short_channel_ids with {} items, sent back {} channels and {} updates and {} nodes", q.shortChannelIds.array.size, channelCount, updateCount, nodeCount)
log.debug("received query_short_channel_ids with {} items, sent back {} channels and {} updates and {} nodes", q.shortChannelIds.array.size, channelCount, updateCount, nodeCount)
origin.peerConnection ! ReplyShortChannelIdsEnd(q.chainHash, 1)
}

View file

@ -356,10 +356,10 @@ object Validation {
// update the graph
val pc1 = pc.applyChannelUpdate(update)
val graphWithBalances1 = if (u.channelFlags.isEnabled) {
update.left.foreach(_ => log.info("added local shortChannelId={} public={} to the network graph", u.shortChannelId, publicChannel))
update.left.foreach(_ => log.debug("added local shortChannelId={} public={} to the network graph", u.shortChannelId, publicChannel))
d.graphWithBalances.addEdge(ActiveEdge(u, pc1))
} else {
update.left.foreach(_ => log.info("disabled local shortChannelId={} public={} in the network graph", u.shortChannelId, publicChannel))
update.left.foreach(_ => log.debug("disabled local shortChannelId={} public={} in the network graph", u.shortChannelId, publicChannel))
d.graphWithBalances.disableEdge(ChannelDesc(u, pc1.ann))
}
d.copy(channels = d.channels + (pc.shortChannelId -> pc1), rebroadcast = d.rebroadcast.copy(updates = d.rebroadcast.updates + (u -> origins)), graphWithBalances = graphWithBalances1)
@ -371,7 +371,7 @@ object Validation {
// we also need to update the graph
val pc1 = pc.applyChannelUpdate(update)
val graphWithBalances1 = d.graphWithBalances.addEdge(ActiveEdge(u, pc1))
update.left.foreach(_ => log.info("added local shortChannelId={} public={} to the network graph", u.shortChannelId, publicChannel))
update.left.foreach(_ => log.debug("added local shortChannelId={} public={} to the network graph", u.shortChannelId, publicChannel))
d.copy(channels = d.channels + (pc.shortChannelId -> pc1), rebroadcast = d.rebroadcast.copy(updates = d.rebroadcast.updates + (u -> origins)), graphWithBalances = graphWithBalances1)
}
case Some(pc: PrivateChannel) =>
@ -396,10 +396,10 @@ object Validation {
// we also need to update the graph
val pc1 = pc.applyChannelUpdate(update)
val graphWithBalances1 = if (u.channelFlags.isEnabled) {
update.left.foreach(_ => log.info("added local channelId={} public={} to the network graph", pc.channelId, publicChannel))
update.left.foreach(_ => log.debug("added local channelId={} public={} to the network graph", pc.channelId, publicChannel))
d.graphWithBalances.addEdge(ActiveEdge(u, pc1))
} else {
update.left.foreach(_ => log.info("disabled local channelId={} public={} in the network graph", pc.channelId, publicChannel))
update.left.foreach(_ => log.debug("disabled local channelId={} public={} in the network graph", pc.channelId, publicChannel))
d.graphWithBalances.disableEdge(ChannelDesc(u, pc1))
}
d.copy(privateChannels = d.privateChannels + (pc.channelId -> pc1), graphWithBalances = graphWithBalances1)
@ -410,7 +410,7 @@ object Validation {
// we also need to update the graph
val pc1 = pc.applyChannelUpdate(update)
val graphWithBalances1 = d.graphWithBalances.addEdge(ActiveEdge(u, pc1))
update.left.foreach(_ => log.info("added local channelId={} public={} to the network graph", pc.channelId, publicChannel))
update.left.foreach(_ => log.debug("added local channelId={} public={} to the network graph", pc.channelId, publicChannel))
d.copy(privateChannels = d.privateChannels + (pc.channelId -> pc1), graphWithBalances = graphWithBalances1)
}
case None =>
@ -551,7 +551,7 @@ object Validation {
} else if (d.privateChannels.contains(lcd.channelId)) {
// the channel was private or public-but-not-yet-announced, let's do the clean up
val localAlias = d.privateChannels(channelId).shortIds.localAlias
log.info("removing private local channel and channel_update for channelId={} localAlias={}", channelId, localAlias)
log.debug("removing private local channel and channel_update for channelId={} localAlias={}", channelId, localAlias)
// we remove the corresponding updates from the graph
val graphWithBalances1 = d.graphWithBalances
.removeChannel(ChannelDesc(localAlias, localNodeId, remoteNodeId))

View file

@ -115,6 +115,7 @@ object TestConstants {
maxExpiryDelta = CltvExpiryDelta(2016),
fulfillSafetyBeforeTimeout = CltvExpiryDelta(6),
minFinalExpiryDelta = CltvExpiryDelta(18),
maxRestartWatchDelay = 0 millis,
maxBlockProcessingDelay = 10 millis,
maxTxPublishRetryDelay = 10 millis,
maxChannelSpentRescanBlocks = 144,
@ -277,6 +278,7 @@ object TestConstants {
maxExpiryDelta = CltvExpiryDelta(2016),
fulfillSafetyBeforeTimeout = CltvExpiryDelta(6),
minFinalExpiryDelta = CltvExpiryDelta(18),
maxRestartWatchDelay = 5 millis,
maxBlockProcessingDelay = 10 millis,
maxTxPublishRetryDelay = 10 millis,
maxChannelSpentRescanBlocks = 144,

View file

@ -338,6 +338,10 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi
val probe = TestProbe()
connect(nodeParams, remoteNodeId, switchboard, router, connection, transport, peerConnection, peer)
val fakeRoutingInfo = RoutingSyncSpec.shortChannelIds.take(PeerConnection.MAX_FUNDING_TX_ALREADY_SPENT + 1).toSeq.map(RoutingSyncSpec.makeFakeRoutingInfo(pub2priv))
val channels = fakeRoutingInfo.map(_._1.ann)
val updates = fakeRoutingInfo.flatMap(_._1.update_1_opt) ++ fakeRoutingInfo.flatMap(_._1.update_2_opt)
val query = QueryShortChannelIds(
Alice.nodeParams.chainHash,
EncodedShortChannelIds(EncodingType.UNCOMPRESSED, List(RealShortChannelId(42000))),

View file

@ -59,7 +59,7 @@ class FrontRouter(routerConf: RouterConf, remoteRouter: ActorRef, initialized: O
when(NORMAL) {
case Event(GetRoutingState, d) =>
log.info(s"getting valid announcements for ${sender()}")
log.debug(s"getting valid announcements for ${sender()}")
sender() ! RoutingState(d.channels.values, d.nodes.values)
stay()