mirror of
https://github.com/ACINQ/eclair.git
synced 2024-11-20 02:27:32 +01:00
Smarter strategy for sending channel_update
s (#950)
The goal is to prevent sending a lot of updates for flappy channels. Instead of sending a disabled `channel_update` after each disconnection, we now wait for a payment to try to route through the channel and only then reply with a disabled `channel_update` and broadcast it on the network. The reason is that in case of a disconnection, if noone cares about that channel then there is no reason to tell everyone about its current (disconnected) state. In addition to that, when switching from `SYNCING`->`NORMAL`, instead of emitting a new `channel_update` with flag=enabled right away, we wait a little bit and send it later. We also don't send a new `channel_update` if it is identical to the previous one (except if the previous one is outdated). This way, if a connection to a peer is unstable and we keep getting disconnected/reconnected, we won't spam the network. The extra delay allows us to remove the change made in #888, which was a workaround in case we generated `channel_update` too quickly. Also, increased refresh interval from 7 days to 10 days. There was no need to be so conservative. Note that on startup we still need to re-send `channel_update` for all channels in order to properly initialize the `Router` and the `Relayer`. Otherwise they won't know about those channels, and e.g. the `Relayer` will return `UnknownNextPeer` errors. But we don't need to create new `channel_update`s in most cases, so this should have little or no impact to gossip because our peers will already know the updates and will filter them out. On the other hand, if some global parameters (like relaying fees) are changed, it will cause the creation a new `channel_update` for all channels.
This commit is contained in:
parent
4ba4ce8096
commit
fb84dfb855
@ -16,8 +16,6 @@
|
||||
|
||||
package fr.acinq.eclair.channel
|
||||
|
||||
import java.nio.charset.StandardCharsets
|
||||
|
||||
import akka.actor.{ActorRef, FSM, OneForOneStrategy, Props, Status, SupervisorStrategy}
|
||||
import akka.event.Logging.MDC
|
||||
import akka.pattern.pipe
|
||||
@ -32,6 +30,7 @@ import fr.acinq.eclair.payment._
|
||||
import fr.acinq.eclair.router.Announcements
|
||||
import fr.acinq.eclair.transactions._
|
||||
import fr.acinq.eclair.wire.{ChannelReestablish, _}
|
||||
import scodec.bits.ByteVector
|
||||
|
||||
import scala.compat.Platform
|
||||
import scala.concurrent.ExecutionContext
|
||||
@ -69,19 +68,28 @@ object Channel {
|
||||
// as a fundee, we will wait that much time for the funding tx to confirm (funder will rely on the funding tx being double-spent)
|
||||
val FUNDING_TIMEOUT_FUNDEE = 5 days
|
||||
|
||||
val REFRESH_CHANNEL_UPDATE_INTERVAL = 7 days
|
||||
// pruning occurs if no new update has been received in two weeks (BOLT 7)
|
||||
val REFRESH_CHANNEL_UPDATE_INTERVAL = 10 days
|
||||
|
||||
case object TickRefreshChannelUpdate
|
||||
case class BroadcastChannelUpdate(reason: BroadcastReason)
|
||||
|
||||
// @formatter:off
|
||||
sealed trait BroadcastReason
|
||||
case object PeriodicRefresh extends BroadcastReason
|
||||
case object Reconnected extends BroadcastReason
|
||||
case object AboveReserve extends BroadcastReason
|
||||
// @formatter:on
|
||||
|
||||
case object TickChannelOpenTimeout
|
||||
|
||||
case class RevocationTimeout(remoteCommitNumber: Long, peer: ActorRef) // we will receive this message when we waited too long for a revocation for that commit number (NB: we explicitely specify the peer to allow for testing)
|
||||
// we will receive this message when we waited too long for a revocation for that commit number (NB: we explicitely specify the peer to allow for testing)
|
||||
case class RevocationTimeout(remoteCommitNumber: Long, peer: ActorRef)
|
||||
|
||||
// @formatter:off
|
||||
sealed trait ChannelError
|
||||
|
||||
case class LocalError(t: Throwable) extends ChannelError
|
||||
|
||||
case class RemoteError(e: Error) extends ChannelError
|
||||
// @formatter:on
|
||||
|
||||
}
|
||||
|
||||
@ -138,7 +146,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
|
||||
startWith(WAIT_FOR_INIT_INTERNAL, Nothing)
|
||||
|
||||
when(WAIT_FOR_INIT_INTERNAL)(handleExceptions {
|
||||
case Event(initFunder@INPUT_INIT_FUNDER(temporaryChannelId, fundingSatoshis, pushMsat, initialFeeratePerKw, fundingTxFeeratePerKw, localParams, remote, remoteInit, channelFlags), Nothing) =>
|
||||
case Event(initFunder@INPUT_INIT_FUNDER(temporaryChannelId, fundingSatoshis, pushMsat, initialFeeratePerKw, _, localParams, remote, _, channelFlags), Nothing) =>
|
||||
context.system.eventStream.publish(ChannelCreated(self, context.parent, remoteNodeId, true, temporaryChannelId))
|
||||
forwarder ! remote
|
||||
val open = OpenChannel(nodeParams.chainHash,
|
||||
@ -201,11 +209,23 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
|
||||
blockchain ! WatchSpent(self, data.commitments.commitInput.outPoint.txid, data.commitments.commitInput.outPoint.index.toInt, data.commitments.commitInput.txOut.publicKeyScript, BITCOIN_FUNDING_SPENT)
|
||||
blockchain ! WatchLost(self, data.commitments.commitInput.outPoint.txid, nodeParams.minDepthBlocks, BITCOIN_FUNDING_LOST)
|
||||
context.system.eventStream.publish(ShortChannelIdAssigned(self, normal.channelId, normal.channelUpdate.shortChannelId))
|
||||
// we rebuild a new channel_update with values from the configuration because they may have changed while eclair was down
|
||||
val channelUpdate = Announcements.makeChannelUpdate(nodeParams.chainHash, nodeParams.privateKey, remoteNodeId, normal.channelUpdate.shortChannelId, nodeParams.expiryDeltaBlocks,
|
||||
normal.commitments.remoteParams.htlcMinimumMsat, normal.channelUpdate.feeBaseMsat, normal.channelUpdate.feeProportionalMillionths, normal.commitments.localCommit.spec.totalFunds, enable = false)
|
||||
|
||||
goto(OFFLINE) using normal.copy(channelUpdate = channelUpdate)
|
||||
// we rebuild a new channel_update with values from the configuration because they may have changed while eclair was down
|
||||
val candidateChannelUpdate = Announcements.makeChannelUpdate(nodeParams.chainHash, nodeParams.privateKey, remoteNodeId, normal.channelUpdate.shortChannelId, nodeParams.expiryDeltaBlocks,
|
||||
normal.commitments.remoteParams.htlcMinimumMsat, normal.channelUpdate.feeBaseMsat, normal.channelUpdate.feeProportionalMillionths, normal.commitments.localCommit.spec.totalFunds, enable = Announcements.isEnabled(normal.channelUpdate.channelFlags))
|
||||
val channelUpdate1 = if (candidateChannelUpdate.copy(signature = ByteVector.empty, timestamp = 0) == normal.channelUpdate.copy(signature = ByteVector.empty, timestamp = 0)) {
|
||||
// if there was no configuration change we keep the existing channel update
|
||||
normal.channelUpdate
|
||||
} else {
|
||||
log.info("refreshing channel_update due to configuration changes old={} new={}", normal.channelUpdate, candidateChannelUpdate)
|
||||
candidateChannelUpdate
|
||||
}
|
||||
// we need to periodically re-send channel updates, otherwise channel will be considered stale and get pruned by network
|
||||
// we take into account the date of the last update so that we don't send superfluous updates when we restart the app
|
||||
val periodicRefreshInitialDelay = Helpers.nextChannelUpdateRefresh(channelUpdate1.timestamp)
|
||||
context.system.scheduler.schedule(initialDelay = periodicRefreshInitialDelay, interval = REFRESH_CHANNEL_UPDATE_INTERVAL, receiver = self, message = BroadcastChannelUpdate(PeriodicRefresh))
|
||||
|
||||
goto(OFFLINE) using normal.copy(channelUpdate = channelUpdate1)
|
||||
|
||||
case funding: DATA_WAIT_FOR_FUNDING_CONFIRMED =>
|
||||
// TODO: should we wait for an acknowledgment from the watcher?
|
||||
@ -538,6 +558,8 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
|
||||
context.system.eventStream.publish(ShortChannelIdAssigned(self, commitments.channelId, shortChannelId))
|
||||
// we create a channel_update early so that we can use it to send payments through this channel, but it won't be propagated to other nodes since the channel is not yet announced
|
||||
val initialChannelUpdate = Announcements.makeChannelUpdate(nodeParams.chainHash, nodeParams.privateKey, remoteNodeId, shortChannelId, nodeParams.expiryDeltaBlocks, d.commitments.remoteParams.htlcMinimumMsat, nodeParams.feeBaseMsat, nodeParams.feeProportionalMillionth, commitments.localCommit.spec.totalFunds, enable = Helpers.aboveReserve(d.commitments))
|
||||
// we need to periodically re-send channel updates, otherwise channel will be considered stale and get pruned by network
|
||||
context.system.scheduler.schedule(initialDelay = REFRESH_CHANNEL_UPDATE_INTERVAL, interval = REFRESH_CHANNEL_UPDATE_INTERVAL, receiver = self, message = BroadcastChannelUpdate(PeriodicRefresh))
|
||||
goto(NORMAL) using store(DATA_NORMAL(commitments.copy(remoteNextCommitInfo = Right(nextPerCommitmentPoint)), shortChannelId, buried = false, None, initialChannelUpdate, None, None))
|
||||
|
||||
case Event(remoteAnnSigs: AnnouncementSignatures, d: DATA_WAIT_FOR_FUNDING_LOCKED) if d.commitments.announceChannel =>
|
||||
@ -678,7 +700,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
|
||||
if (!Helpers.aboveReserve(d.commitments) && Helpers.aboveReserve(commitments1)) {
|
||||
// we just went above reserve (can't go below), let's refresh our channel_update to enable/disable it accordingly
|
||||
log.info(s"updating channel_update aboveReserve=${Helpers.aboveReserve(commitments1)}")
|
||||
self ! TickRefreshChannelUpdate
|
||||
self ! BroadcastChannelUpdate(AboveReserve)
|
||||
}
|
||||
context.system.eventStream.publish(ChannelSignatureSent(self, commitments1))
|
||||
if (nextRemoteCommit.spec.toRemoteMsat != d.commitments.remoteCommit.spec.toRemoteMsat) {
|
||||
@ -740,10 +762,10 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
|
||||
case Event(c@CMD_CLOSE(localScriptPubKey_opt), d: DATA_NORMAL) =>
|
||||
val localScriptPubKey = localScriptPubKey_opt.getOrElse(d.commitments.localParams.defaultFinalScriptPubKey)
|
||||
if (d.localShutdown.isDefined)
|
||||
handleCommandError(ClosingAlreadyInProgress((d.channelId)), c)
|
||||
handleCommandError(ClosingAlreadyInProgress(d.channelId), c)
|
||||
else if (Commitments.localHasUnsignedOutgoingHtlcs(d.commitments))
|
||||
// TODO: simplistic behavior, we could also sign-then-close
|
||||
handleCommandError(CannotCloseWithUnsignedOutgoingHtlcs((d.channelId)), c)
|
||||
handleCommandError(CannotCloseWithUnsignedOutgoingHtlcs(d.channelId), c)
|
||||
else if (!Closing.isValidFinalScriptPubkey(localScriptPubKey))
|
||||
handleCommandError(InvalidFinalScript(d.channelId), c)
|
||||
else {
|
||||
@ -877,19 +899,26 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
|
||||
stay
|
||||
}
|
||||
|
||||
case Event(TickRefreshChannelUpdate, d: DATA_NORMAL) =>
|
||||
// periodic refresh is used as a keep alive
|
||||
log.info(s"sending channel_update announcement (refresh)")
|
||||
val channelUpdate = Announcements.makeChannelUpdate(nodeParams.chainHash, nodeParams.privateKey, remoteNodeId, d.shortChannelId, d.channelUpdate.cltvExpiryDelta, d.channelUpdate.htlcMinimumMsat, d.channelUpdate.feeBaseMsat, d.channelUpdate.feeProportionalMillionths, d.commitments.localCommit.spec.totalFunds, enable = Helpers.aboveReserve(d.commitments))
|
||||
// we use GOTO instead of stay because we want to fire transitions
|
||||
goto(NORMAL) using store(d.copy(channelUpdate = channelUpdate))
|
||||
|
||||
case Event(CMD_UPDATE_RELAY_FEE(feeBaseMsat, feeProportionalMillionths), d: DATA_NORMAL) =>
|
||||
log.info(s"updating relay fees: prevFeeBaseMsat={} nextFeeBaseMsat={} prevFeeProportionalMillionths={} nextFeeProportionalMillionths={}", d.channelUpdate.feeBaseMsat, feeBaseMsat, d.channelUpdate.feeProportionalMillionths, feeProportionalMillionths)
|
||||
val channelUpdate = Announcements.makeChannelUpdate(nodeParams.chainHash, nodeParams.privateKey, remoteNodeId, d.shortChannelId, d.channelUpdate.cltvExpiryDelta, d.channelUpdate.htlcMinimumMsat, feeBaseMsat, feeProportionalMillionths, d.commitments.localCommit.spec.totalFunds, enable = Helpers.aboveReserve(d.commitments))
|
||||
// we use GOTO instead of stay because we want to fire transitions
|
||||
goto(NORMAL) using store(d.copy(channelUpdate = channelUpdate)) replying "ok"
|
||||
|
||||
case Event(BroadcastChannelUpdate(reason), d: DATA_NORMAL) =>
|
||||
val age = Platform.currentTime.milliseconds - d.channelUpdate.timestamp.seconds
|
||||
val channelUpdate1 = Announcements.makeChannelUpdate(nodeParams.chainHash, nodeParams.privateKey, remoteNodeId, d.shortChannelId, d.channelUpdate.cltvExpiryDelta, d.channelUpdate.htlcMinimumMsat, d.channelUpdate.feeBaseMsat, d.channelUpdate.feeProportionalMillionths, d.commitments.localCommit.spec.totalFunds, enable = Helpers.aboveReserve(d.commitments))
|
||||
reason match {
|
||||
case Reconnected if channelUpdate1.copy(signature = ByteVector.empty, timestamp = 0) == d.channelUpdate.copy(signature = ByteVector.empty, timestamp = 0) && age < REFRESH_CHANNEL_UPDATE_INTERVAL =>
|
||||
// we already sent an identical channel_update not long ago (flapping protection in case we keep being disconnected/reconnected)
|
||||
log.info(s"not sending a new identical channel_update, current one was created {} days ago", age.toDays)
|
||||
stay
|
||||
case _ =>
|
||||
log.info(s"refreshing channel_update announcement (reason=$reason)")
|
||||
// we use GOTO instead of stay because we want to fire transitions
|
||||
goto(NORMAL) using store(d.copy(channelUpdate = channelUpdate1))
|
||||
}
|
||||
|
||||
case Event(WatchEventSpent(BITCOIN_FUNDING_SPENT, tx), d: DATA_NORMAL) if tx.txid == d.commitments.remoteCommit.txid => handleRemoteSpentCurrent(tx, d)
|
||||
|
||||
case Event(WatchEventSpent(BITCOIN_FUNDING_SPENT, tx), d: DATA_NORMAL) if Some(tx.txid) == d.commitments.remoteNextCommitInfo.left.toOption.map(_.nextRemoteCommit.txid) => handleRemoteSpentNext(tx, d)
|
||||
@ -897,15 +926,20 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
|
||||
case Event(WatchEventSpent(BITCOIN_FUNDING_SPENT, tx), d: DATA_NORMAL) => handleRemoteSpentOther(tx, d)
|
||||
|
||||
case Event(INPUT_DISCONNECTED, d: DATA_NORMAL) =>
|
||||
// we disable the channel
|
||||
log.debug(s"sending channel_update announcement (disable)")
|
||||
val channelUpdate = Announcements.makeChannelUpdate(nodeParams.chainHash, nodeParams.privateKey, remoteNodeId, d.shortChannelId, d.channelUpdate.cltvExpiryDelta, d.channelUpdate.htlcMinimumMsat, d.channelUpdate.feeBaseMsat, d.channelUpdate.feeProportionalMillionths, d.commitments.localCommit.spec.totalFunds, enable = false)
|
||||
d.commitments.localChanges.proposed.collect {
|
||||
case add: UpdateAddHtlc => relayer ! Status.Failure(AddHtlcFailed(d.channelId, add.paymentHash, ChannelUnavailable(d.channelId), d.commitments.originChannels(add.id), Some(channelUpdate), None))
|
||||
// we cancel the timer that would have made us send the enabled update after reconnection (flappy channel protection)
|
||||
cancelTimer(Reconnected.toString)
|
||||
// if we have pending unsigned htlcs, then we cancel them and advertise the fact that the channel is now disabled
|
||||
val d1 = if (d.commitments.localChanges.proposed.collectFirst { case add: UpdateAddHtlc => add }.isDefined) {
|
||||
log.info(s"updating channel_update announcement (reason=disabled)")
|
||||
val channelUpdate = Announcements.makeChannelUpdate(nodeParams.chainHash, nodeParams.privateKey, remoteNodeId, d.shortChannelId, d.channelUpdate.cltvExpiryDelta, d.channelUpdate.htlcMinimumMsat, d.channelUpdate.feeBaseMsat, d.channelUpdate.feeProportionalMillionths, d.commitments.localCommit.spec.totalFunds, enable = false)
|
||||
d.commitments.localChanges.proposed.collect {
|
||||
case add: UpdateAddHtlc => relayer ! Status.Failure(AddHtlcFailed(d.channelId, add.paymentHash, ChannelUnavailable(d.channelId), d.commitments.originChannels(add.id), Some(channelUpdate), None))
|
||||
}
|
||||
d.copy(channelUpdate = channelUpdate)
|
||||
} else {
|
||||
d
|
||||
}
|
||||
// disable the channel_update refresh timer
|
||||
cancelTimer(TickRefreshChannelUpdate.toString)
|
||||
goto(OFFLINE) using d.copy(channelUpdate = channelUpdate)
|
||||
goto(OFFLINE) using d1
|
||||
|
||||
case Event(e: Error, d: DATA_NORMAL) => handleRemoteError(e, d)
|
||||
|
||||
@ -1285,7 +1319,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
|
||||
// and we also send events related to fee
|
||||
Closing.networkFeePaid(tx, d1) map { case (fee, desc) => feePaid(fee, tx, desc, d.channelId) }
|
||||
// then let's see if any of the possible close scenarii can be considered done
|
||||
val closeType_opt = Closing.isClosed(d1, Some(tx))
|
||||
val closeType_opt = Closing.isClosed(d1, Some(tx))
|
||||
// finally, if one of the unilateral closes is done, we move to CLOSED state, otherwise we stay (note that we don't store the state)
|
||||
closeType_opt match {
|
||||
case Some(closeType) =>
|
||||
@ -1366,6 +1400,8 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
|
||||
// -> in CLOSING we either have mutual closed (so no more htlcs), or already have unilaterally closed (so no action required), and we can't be in OFFLINE state anyway
|
||||
handleLocalError(HtlcTimedout(d.channelId, d.commitments.timedoutOutgoingHtlcs(count)), d, Some(c))
|
||||
|
||||
case Event(c: CMD_ADD_HTLC, d: DATA_NORMAL) => handleAddDisconnected(c, d)
|
||||
|
||||
case Event(CMD_UPDATE_RELAY_FEE(feeBaseMsat, feeProportionalMillionths), d: DATA_NORMAL) =>
|
||||
log.info(s"updating relay fees: prevFeeBaseMsat={} nextFeeBaseMsat={} prevFeeProportionalMillionths={} nextFeeProportionalMillionths={}", d.channelUpdate.feeBaseMsat, feeBaseMsat, d.channelUpdate.feeProportionalMillionths, feeProportionalMillionths)
|
||||
val channelUpdate = Announcements.makeChannelUpdate(nodeParams.chainHash, nodeParams.privateKey, remoteNodeId, d.shortChannelId, d.channelUpdate.cltvExpiryDelta, d.channelUpdate.htlcMinimumMsat, feeBaseMsat, feeProportionalMillionths, d.commitments.localCommit.spec.totalFunds, enable = false)
|
||||
@ -1466,18 +1502,14 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
|
||||
()
|
||||
}
|
||||
}
|
||||
// re-enable the channel
|
||||
val timestamp = Platform.currentTime.milliseconds.toSeconds match {
|
||||
case ts if ts == d.channelUpdate.timestamp => ts + 1 // corner case: in case of quick reconnection, we bump the timestamp of the new channel_update, otherwise it will get ignored by the network
|
||||
case ts => ts
|
||||
}
|
||||
val channelUpdate = Announcements.makeChannelUpdate(nodeParams.chainHash, nodeParams.privateKey, remoteNodeId, d.shortChannelId, nodeParams.expiryDeltaBlocks, d.commitments.remoteParams.htlcMinimumMsat, d.channelUpdate.feeBaseMsat, d.channelUpdate.feeProportionalMillionths, d.commitments.localCommit.spec.totalFunds, enable = Helpers.aboveReserve(d.commitments), timestamp = timestamp)
|
||||
// we will refresh need to periodically re-send channel updates, otherwise channel will be considered stale and get pruned by network
|
||||
setTimer(TickRefreshChannelUpdate.toString, TickRefreshChannelUpdate, timeout = REFRESH_CHANNEL_UPDATE_INTERVAL, repeat = true)
|
||||
// we will re-enable the channel after some delay to prevent flappy updates in case the connection is unstable
|
||||
setTimer(Reconnected.toString, BroadcastChannelUpdate(Reconnected), 10 seconds, repeat = false)
|
||||
|
||||
goto(NORMAL) using d.copy(commitments = commitments1, channelUpdate = channelUpdate)
|
||||
goto(NORMAL) using d.copy(commitments = commitments1)
|
||||
}
|
||||
|
||||
case Event(c: CMD_ADD_HTLC, d: DATA_NORMAL) => handleAddDisconnected(c, d)
|
||||
|
||||
case Event(channelReestablish: ChannelReestablish, d: DATA_SHUTDOWN) =>
|
||||
val commitments1 = handleSync(channelReestablish, d)
|
||||
// BOLT 2: A node if it has sent a previous shutdown MUST retransmit shutdown.
|
||||
@ -1555,10 +1587,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
|
||||
case Event(c: CMD_ADD_HTLC, d: HasCommitments) =>
|
||||
log.info(s"rejecting htlc request in state=$stateName")
|
||||
val error = ChannelUnavailable(d.channelId)
|
||||
d match {
|
||||
case normal: DATA_NORMAL => handleCommandError(AddHtlcFailed(d.channelId, c.paymentHash, error, origin(c), Some(normal.channelUpdate), Some(c)), c) // can happen if we are in OFFLINE or SYNCING state (channelUpdate will have enable=false)
|
||||
case _ => handleCommandError(AddHtlcFailed(d.channelId, c.paymentHash, error, origin(c), None, Some(c)), c) // we don't provide a channel_update: this will be a permanent channel failure
|
||||
}
|
||||
handleCommandError(AddHtlcFailed(d.channelId, c.paymentHash, error, origin(c), None, Some(c)), c) // we don't provide a channel_update: this will be a permanent channel failure
|
||||
|
||||
case Event(c: CMD_CLOSE, d) => handleCommandError(CommandUnavailableInThisState(Helpers.getChannelId(d), "close", stateName), c)
|
||||
|
||||
@ -1579,8 +1608,8 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
|
||||
// we only care about this event in NORMAL and SHUTDOWN state, and we never unregister to the event stream
|
||||
case Event(CurrentFeerates(_), _) => stay
|
||||
|
||||
// we only care about this event in NORMAL state, and the scheduler is not cancelled in some cases (e.g. NORMAL->CLOSING)
|
||||
case Event(TickRefreshChannelUpdate, _) => stay
|
||||
// we only care about this event in NORMAL state
|
||||
case Event(_: BroadcastChannelUpdate, _) => stay
|
||||
|
||||
// we receive this when we send command to ourselves
|
||||
case Event("ok", _) => stay
|
||||
@ -1626,11 +1655,15 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
|
||||
|
||||
(state, nextState, stateData, nextStateData) match {
|
||||
// ORDER MATTERS!
|
||||
case (WAIT_FOR_INIT_INTERNAL, OFFLINE, _, normal: DATA_NORMAL) =>
|
||||
log.info(s"re-emitting channel_update={} enabled={} ", normal.channelUpdate, Announcements.isEnabled(normal.channelUpdate.channelFlags))
|
||||
context.system.eventStream.publish(LocalChannelUpdate(self, normal.commitments.channelId, normal.shortChannelId, normal.commitments.remoteParams.nodeId, normal.channelAnnouncement, normal.channelUpdate, normal.commitments))
|
||||
case (_, _, d1: DATA_NORMAL, d2: DATA_NORMAL) if d1.channelUpdate == d2.channelUpdate && d1.channelAnnouncement == d2.channelAnnouncement =>
|
||||
// don't do anything if neither the channel_update nor the channel_announcement didn't change
|
||||
()
|
||||
case (WAIT_FOR_FUNDING_LOCKED | NORMAL | SYNCING, NORMAL | OFFLINE, _, normal: DATA_NORMAL) =>
|
||||
case (WAIT_FOR_FUNDING_LOCKED | NORMAL | OFFLINE | SYNCING, NORMAL | OFFLINE, _, normal: DATA_NORMAL) =>
|
||||
// when we do WAIT_FOR_FUNDING_LOCKED->NORMAL or NORMAL->NORMAL or SYNCING->NORMAL or NORMAL->OFFLINE, we send out the new channel_update (most of the time it will just be to enable/disable the channel)
|
||||
log.info(s"emitting channel_update={} enabled={} ", normal.channelUpdate, Announcements.isEnabled(normal.channelUpdate.channelFlags))
|
||||
context.system.eventStream.publish(LocalChannelUpdate(self, normal.commitments.channelId, normal.shortChannelId, normal.commitments.remoteParams.nodeId, normal.channelAnnouncement, normal.channelUpdate, normal.commitments))
|
||||
case (_, _, _: DATA_NORMAL, _: DATA_NORMAL) =>
|
||||
// in any other case (e.g. WAIT_FOR_INIT_INTERNAL->OFFLINE) we do nothing
|
||||
@ -1708,6 +1741,24 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
|
||||
stay
|
||||
}
|
||||
|
||||
def handleAddDisconnected(c: CMD_ADD_HTLC, d: DATA_NORMAL) = {
|
||||
log.info(s"rejecting htlc request in state=$stateName")
|
||||
// in order to reduce gossip spam, we don't disable the channel right away when disconnected
|
||||
// we will only emit a new channel_update with the disable flag set if someone tries to use that channel
|
||||
if (Announcements.isEnabled(d.channelUpdate.channelFlags)) {
|
||||
// if the channel isn't disabled we generate a new channel_update
|
||||
log.info(s"updating channel_update announcement (reason=disabled)")
|
||||
val channelUpdate = Announcements.makeChannelUpdate(nodeParams.chainHash, nodeParams.privateKey, remoteNodeId, d.shortChannelId, d.channelUpdate.cltvExpiryDelta, d.channelUpdate.htlcMinimumMsat, d.channelUpdate.feeBaseMsat, d.channelUpdate.feeProportionalMillionths, d.commitments.localCommit.spec.totalFunds, enable = false)
|
||||
// then we update the state and replay the request
|
||||
self forward c
|
||||
// we use goto to fire transitions
|
||||
goto(stateName) using d.copy(channelUpdate = channelUpdate)
|
||||
} else {
|
||||
// channel is already disabled, we reply to the request
|
||||
handleCommandError(AddHtlcFailed(d.channelId, c.paymentHash, ChannelUnavailable(d.channelId), origin(c), Some(d.channelUpdate), Some(c)), c) // can happen if we are in OFFLINE or SYNCING state (channelUpdate will have enable=false)
|
||||
}
|
||||
}
|
||||
|
||||
def handleLocalError(cause: Throwable, d: Data, msg: Option[Any]) = {
|
||||
cause match {
|
||||
case _: ForcedLocalCommit => log.warning(s"force-closing channel at user request")
|
||||
|
@ -21,6 +21,7 @@ import fr.acinq.bitcoin.Crypto.{Point, PrivateKey, PublicKey, Scalar, ripemd160,
|
||||
import fr.acinq.bitcoin.Script._
|
||||
import fr.acinq.bitcoin.{OutPoint, _}
|
||||
import fr.acinq.eclair.blockchain.EclairWallet
|
||||
import fr.acinq.eclair.channel.Channel.REFRESH_CHANNEL_UPDATE_INTERVAL
|
||||
import fr.acinq.eclair.crypto.{Generators, KeyManager}
|
||||
import fr.acinq.eclair.db.ChannelsDb
|
||||
import fr.acinq.eclair.payment.{Local, Origin}
|
||||
@ -31,8 +32,9 @@ import fr.acinq.eclair.wire._
|
||||
import fr.acinq.eclair.{Globals, NodeParams, ShortChannelId, addressToPublicKeyScript}
|
||||
import scodec.bits.ByteVector
|
||||
|
||||
import scala.compat.Platform
|
||||
import scala.concurrent.Await
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
import scala.concurrent.duration._
|
||||
import scala.util.{Failure, Success, Try}
|
||||
|
||||
/**
|
||||
@ -152,6 +154,22 @@ object Helpers {
|
||||
if (reserveToFundingRatio > nodeParams.maxReserveToFundingRatio) throw ChannelReserveTooHigh(open.temporaryChannelId, accept.channelReserveSatoshis, reserveToFundingRatio, nodeParams.maxReserveToFundingRatio)
|
||||
}
|
||||
|
||||
/**
|
||||
* Compute the delay until we need to refresh the channel_update for our channel not to be considered stale by
|
||||
* other nodes.
|
||||
*
|
||||
* If current update more than [[Channel.REFRESH_CHANNEL_UPDATE_INTERVAL]] old then the delay will be zero.
|
||||
*
|
||||
* @param currentUpdateTimestamp
|
||||
* @return the delay until the next update
|
||||
*/
|
||||
def nextChannelUpdateRefresh(currentUpdateTimestamp: Long)(implicit log: LoggingAdapter): FiniteDuration = {
|
||||
val age = Platform.currentTime.milliseconds - currentUpdateTimestamp.seconds
|
||||
val delay = 0.days.max(REFRESH_CHANNEL_UPDATE_INTERVAL - age)
|
||||
log.info("current channel_update was created {} days ago, will refresh it in {} days", age.toDays, delay.toDays)
|
||||
delay
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param remoteFeeratePerKw remote fee rate per kiloweight
|
||||
|
@ -17,7 +17,7 @@
|
||||
package fr.acinq.eclair.payment
|
||||
|
||||
import akka.actor.{Actor, ActorLogging, Props, Status}
|
||||
import fr.acinq.bitcoin.{ByteVector32, Crypto, MilliSatoshi}
|
||||
import fr.acinq.bitcoin.{Crypto, MilliSatoshi}
|
||||
import fr.acinq.eclair.channel.{CMD_FAIL_HTLC, CMD_FULFILL_HTLC, Channel}
|
||||
import fr.acinq.eclair.db.IncomingPayment
|
||||
import fr.acinq.eclair.payment.PaymentLifecycle.ReceivePayment
|
||||
@ -26,6 +26,7 @@ import fr.acinq.eclair.{Globals, NodeParams, randomBytes32}
|
||||
import concurrent.duration._
|
||||
import scala.compat.Platform
|
||||
import scala.concurrent.ExecutionContext
|
||||
import scala.concurrent.duration._
|
||||
import scala.util.{Failure, Success, Try}
|
||||
|
||||
/**
|
||||
|
@ -24,6 +24,7 @@ import scodec.bits.{BitVector, ByteVector}
|
||||
import shapeless.HNil
|
||||
import scala.concurrent.duration._
|
||||
import scala.compat.Platform
|
||||
import scala.concurrent.duration._
|
||||
|
||||
/**
|
||||
* Created by PM on 03/02/2017.
|
||||
|
@ -454,7 +454,7 @@ class Router(nodeParams: NodeParams, watcher: ActorRef, initialized: Option[Prom
|
||||
stay
|
||||
|
||||
case Event(u: ChannelUpdate, d: Data) =>
|
||||
// it was sent by us, routing messages that are sent by our peers are now wrapped in a PeerRoutingMessage
|
||||
// it was sent by us (e.g. the payment lifecycle); routing messages that are sent by our peers are now wrapped in a PeerRoutingMessage
|
||||
log.debug("received channel update from {}", sender)
|
||||
stay using handle(u, sender, d)
|
||||
|
||||
@ -758,7 +758,7 @@ object Router {
|
||||
def hasChannels(nodeId: PublicKey, channels: Iterable[ChannelAnnouncement]): Boolean = channels.exists(c => isRelatedTo(c, nodeId))
|
||||
|
||||
def isStale(u: ChannelUpdate): Boolean = {
|
||||
// BOLT 7: "nodes MAY prune channels should the timestamp of the latest channel_update be older than 2 weeks (1209600 seconds)"
|
||||
// BOLT 7: "nodes MAY prune channels should the timestamp of the latest channel_update be older than 2 weeks"
|
||||
// but we don't want to prune brand new channels for which we didn't yet receive a channel update
|
||||
val staleThresholdSeconds = (Platform.currentTime.milliseconds - 14.days).toSeconds
|
||||
u.timestamp < staleThresholdSeconds
|
||||
@ -843,7 +843,6 @@ object Router {
|
||||
* @param extraEdges a set of extra edges we want to CONSIDER during the search
|
||||
* @param ignoredEdges a set of extra edges we want to IGNORE during the search
|
||||
* @param routeParams a set of parameters that can restrict the route search
|
||||
* @param wr an object containing the ratios used to 'weight' edges when searching for the shortest path
|
||||
* @return the computed route to the destination @targetNodeId
|
||||
*/
|
||||
def findRoute(g: DirectedGraph,
|
||||
|
@ -34,6 +34,9 @@ import scodec.{Attempt, Codec}
|
||||
import scala.concurrent.duration._
|
||||
import scala.compat.Platform
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
|
||||
/**
|
||||
* Created by PM on 02/06/2017.
|
||||
*/
|
||||
|
@ -19,6 +19,7 @@ package fr.acinq.eclair.channel
|
||||
import fr.acinq.eclair.channel.Helpers.Funding
|
||||
import org.scalatest.FunSuite
|
||||
|
||||
import scala.compat.Platform
|
||||
import scala.concurrent.duration._
|
||||
|
||||
class HelpersSpec extends FunSuite {
|
||||
@ -46,5 +47,15 @@ class HelpersSpec extends FunSuite {
|
||||
minDelay = 10 minutes) === (10 minutes))
|
||||
}
|
||||
|
||||
test("compute refresh delay") {
|
||||
import org.scalatest.Matchers._
|
||||
implicit val log = akka.event.NoLogging
|
||||
Helpers.nextChannelUpdateRefresh(1544400000).toSeconds should equal (0)
|
||||
Helpers.nextChannelUpdateRefresh((Platform.currentTime.milliseconds - 9.days).toSeconds).toSeconds should equal (24 * 3600L +- 100)
|
||||
Helpers.nextChannelUpdateRefresh((Platform.currentTime.milliseconds - 3.days).toSeconds).toSeconds should equal (7 * 24 * 3600L +- 100)
|
||||
Helpers.nextChannelUpdateRefresh(Platform.currentTime.milliseconds.toSeconds).toSeconds should equal (10 * 24 * 3600L +- 100)
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
@ -27,7 +27,7 @@ import fr.acinq.eclair.TestConstants.{Alice, Bob}
|
||||
import fr.acinq.eclair.UInt64.Conversions._
|
||||
import fr.acinq.eclair.blockchain._
|
||||
import fr.acinq.eclair.blockchain.fee.FeeratesPerKw
|
||||
import fr.acinq.eclair.channel.Channel.{RevocationTimeout, TickRefreshChannelUpdate}
|
||||
import fr.acinq.eclair.channel.Channel.{BroadcastChannelUpdate, PeriodicRefresh, Reconnected, RevocationTimeout}
|
||||
import fr.acinq.eclair.channel._
|
||||
import fr.acinq.eclair.channel.states.StateTestsHelperMethods
|
||||
import fr.acinq.eclair.io.Peer
|
||||
@ -2085,7 +2085,7 @@ class NormalStateSpec extends TestkitBaseClass with StateTestsHelperMethods {
|
||||
alice2bob.expectMsg(annSigsA)
|
||||
}
|
||||
|
||||
test("recv TickRefreshChannelUpdate", Tag("channels_public")) { f =>
|
||||
test("recv BroadcastChannelUpdate", Tag("channels_public")) { f =>
|
||||
import f._
|
||||
val sender = TestProbe()
|
||||
sender.send(alice, WatchEventConfirmed(BITCOIN_FUNDING_DEEPLYBURIED, 400000, 42))
|
||||
@ -2096,12 +2096,63 @@ class NormalStateSpec extends TestkitBaseClass with StateTestsHelperMethods {
|
||||
|
||||
// actual test starts here
|
||||
Thread.sleep(1100)
|
||||
sender.send(alice, TickRefreshChannelUpdate)
|
||||
sender.send(alice, BroadcastChannelUpdate(PeriodicRefresh))
|
||||
val update2 = channelUpdateListener.expectMsgType[LocalChannelUpdate]
|
||||
assert(update1.channelUpdate.timestamp < update2.channelUpdate.timestamp)
|
||||
}
|
||||
|
||||
test("recv INPUT_DISCONNECTED", Tag("channels_public")) { f =>
|
||||
test("recv BroadcastChannelUpdate (no changes)", Tag("channels_public")) { f =>
|
||||
import f._
|
||||
val sender = TestProbe()
|
||||
sender.send(alice, WatchEventConfirmed(BITCOIN_FUNDING_DEEPLYBURIED, 400000, 42))
|
||||
sender.send(bob, WatchEventConfirmed(BITCOIN_FUNDING_DEEPLYBURIED, 400000, 42))
|
||||
bob2alice.expectMsgType[AnnouncementSignatures]
|
||||
bob2alice.forward(alice)
|
||||
channelUpdateListener.expectMsgType[LocalChannelUpdate]
|
||||
|
||||
// actual test starts here
|
||||
Thread.sleep(1100)
|
||||
sender.send(alice, BroadcastChannelUpdate(Reconnected))
|
||||
channelUpdateListener.expectNoMsg(1 second)
|
||||
}
|
||||
|
||||
test("recv INPUT_DISCONNECTED") { f =>
|
||||
import f._
|
||||
val sender = TestProbe()
|
||||
sender.send(alice, WatchEventConfirmed(BITCOIN_FUNDING_DEEPLYBURIED, 400000, 42))
|
||||
val update1a = alice2bob.expectMsgType[ChannelUpdate]
|
||||
assert(Announcements.isEnabled(update1a.channelFlags) == true)
|
||||
|
||||
// actual test starts here
|
||||
sender.send(alice, INPUT_DISCONNECTED)
|
||||
awaitCond(alice.stateName == OFFLINE)
|
||||
alice2bob.expectNoMsg(1 second)
|
||||
channelUpdateListener.expectNoMsg(1 second)
|
||||
}
|
||||
|
||||
test("recv INPUT_DISCONNECTED (with pending unsigned htlcs)") { f =>
|
||||
import f._
|
||||
val sender = TestProbe()
|
||||
sender.send(alice, WatchEventConfirmed(BITCOIN_FUNDING_DEEPLYBURIED, 400000, 42))
|
||||
val update1a = alice2bob.expectMsgType[ChannelUpdate]
|
||||
assert(Announcements.isEnabled(update1a.channelFlags) == true)
|
||||
val (_, htlc1) = addHtlc(10000, alice, bob, alice2bob, bob2alice)
|
||||
val (_, htlc2) = addHtlc(10000, alice, bob, alice2bob, bob2alice)
|
||||
val aliceData = alice.stateData.asInstanceOf[DATA_NORMAL]
|
||||
assert(aliceData.commitments.localChanges.proposed.size == 2)
|
||||
|
||||
// actual test starts here
|
||||
Thread.sleep(1100)
|
||||
sender.send(alice, INPUT_DISCONNECTED)
|
||||
assert(relayerA.expectMsgType[Status.Failure].cause.asInstanceOf[AddHtlcFailed].paymentHash === htlc1.paymentHash)
|
||||
assert(relayerA.expectMsgType[Status.Failure].cause.asInstanceOf[AddHtlcFailed].paymentHash === htlc2.paymentHash)
|
||||
val update2a = alice2bob.expectMsgType[ChannelUpdate]
|
||||
assert(channelUpdateListener.expectMsgType[LocalChannelUpdate].channelUpdate === update2a)
|
||||
assert(Announcements.isEnabled(update2a.channelFlags) == false)
|
||||
awaitCond(alice.stateName == OFFLINE)
|
||||
}
|
||||
|
||||
test("recv INPUT_DISCONNECTED (public channel)", Tag("channels_public")) { f =>
|
||||
import f._
|
||||
val sender = TestProbe()
|
||||
sender.send(alice, WatchEventConfirmed(BITCOIN_FUNDING_DEEPLYBURIED, 400000, 42))
|
||||
@ -2112,26 +2163,36 @@ class NormalStateSpec extends TestkitBaseClass with StateTestsHelperMethods {
|
||||
assert(Announcements.isEnabled(update1.channelUpdate.channelFlags) == true)
|
||||
|
||||
// actual test starts here
|
||||
Thread.sleep(1100)
|
||||
sender.send(alice, INPUT_DISCONNECTED)
|
||||
val update2 = channelUpdateListener.expectMsgType[LocalChannelUpdate]
|
||||
assert(update1.channelUpdate.timestamp < update2.channelUpdate.timestamp)
|
||||
assert(Announcements.isEnabled(update2.channelUpdate.channelFlags) == false)
|
||||
awaitCond(alice.stateName == OFFLINE)
|
||||
channelUpdateListener.expectNoMsg(1 second)
|
||||
}
|
||||
|
||||
test("recv INPUT_DISCONNECTED (with pending unsigned htlcs)") { f =>
|
||||
test("recv INPUT_DISCONNECTED (public channel, with pending unsigned htlcs)", Tag("channels_public")) { f =>
|
||||
import f._
|
||||
val sender = TestProbe()
|
||||
sender.send(alice, WatchEventConfirmed(BITCOIN_FUNDING_DEEPLYBURIED, 400000, 42))
|
||||
sender.send(bob, WatchEventConfirmed(BITCOIN_FUNDING_DEEPLYBURIED, 400000, 42))
|
||||
bob2alice.expectMsgType[AnnouncementSignatures]
|
||||
bob2alice.forward(alice)
|
||||
alice2bob.expectMsgType[AnnouncementSignatures]
|
||||
alice2bob.forward(bob)
|
||||
val update1a = channelUpdateListener.expectMsgType[LocalChannelUpdate]
|
||||
val update1b = channelUpdateListener.expectMsgType[LocalChannelUpdate]
|
||||
assert(Announcements.isEnabled(update1a.channelUpdate.channelFlags) == true)
|
||||
val (_, htlc1) = addHtlc(10000, alice, bob, alice2bob, bob2alice)
|
||||
val (_, htlc2) = addHtlc(10000, alice, bob, alice2bob, bob2alice)
|
||||
val aliceData = alice.stateData.asInstanceOf[DATA_NORMAL]
|
||||
assert(aliceData.commitments.localChanges.proposed.size == 2)
|
||||
|
||||
// actual test starts here
|
||||
Thread.sleep(1100)
|
||||
sender.send(alice, INPUT_DISCONNECTED)
|
||||
assert(relayerA.expectMsgType[Status.Failure].cause.asInstanceOf[AddHtlcFailed].paymentHash === htlc1.paymentHash)
|
||||
assert(relayerA.expectMsgType[Status.Failure].cause.asInstanceOf[AddHtlcFailed].paymentHash === htlc2.paymentHash)
|
||||
val update2a = channelUpdateListener.expectMsgType[LocalChannelUpdate]
|
||||
assert(update1a.channelUpdate.timestamp < update2a.channelUpdate.timestamp)
|
||||
assert(Announcements.isEnabled(update2a.channelUpdate.channelFlags) == false)
|
||||
awaitCond(alice.stateName == OFFLINE)
|
||||
}
|
||||
|
||||
|
@ -16,6 +16,7 @@
|
||||
|
||||
package fr.acinq.eclair.channel.states.e
|
||||
|
||||
import akka.actor.Status
|
||||
import java.util.UUID
|
||||
|
||||
import akka.testkit.TestProbe
|
||||
@ -337,9 +338,8 @@ class OfflineStateSpec extends TestkitBaseClass with StateTestsHelperMethods {
|
||||
awaitCond(alice.stateName == OFFLINE)
|
||||
awaitCond(bob.stateName == OFFLINE)
|
||||
|
||||
// alice and bob announce that their channel is OFFLINE
|
||||
assert(Announcements.isEnabled(channelUpdateListener.expectMsgType[LocalChannelUpdate].channelUpdate.channelFlags) == false)
|
||||
assert(Announcements.isEnabled(channelUpdateListener.expectMsgType[LocalChannelUpdate].channelUpdate.channelFlags) == false)
|
||||
// alice and bob will not announce that their channel is OFFLINE
|
||||
channelUpdateListener.expectNoMsg(300 millis)
|
||||
|
||||
// we make alice update here relay fee
|
||||
sender.send(alice, CMD_UPDATE_RELAY_FEE(4200, 123456))
|
||||
@ -358,8 +358,8 @@ class OfflineStateSpec extends TestkitBaseClass with StateTestsHelperMethods {
|
||||
// note that we don't forward the channel_reestablish so that only alice reaches NORMAL state, it facilitates the test below
|
||||
bob2alice.forward(alice)
|
||||
|
||||
// then alice reaches NORMAL state, and during the transition she broadcasts the channel_update
|
||||
val channelUpdate = channelUpdateListener.expectMsgType[LocalChannelUpdate](10 seconds).channelUpdate
|
||||
// then alice reaches NORMAL state, and after a delay she broadcasts the channel_update
|
||||
val channelUpdate = channelUpdateListener.expectMsgType[LocalChannelUpdate](20 seconds).channelUpdate
|
||||
assert(channelUpdate.feeBaseMsat === 4200)
|
||||
assert(channelUpdate.feeProportionalMillionths === 123456)
|
||||
assert(Announcements.isEnabled(channelUpdate.channelFlags) == true)
|
||||
@ -368,7 +368,7 @@ class OfflineStateSpec extends TestkitBaseClass with StateTestsHelperMethods {
|
||||
channelUpdateListener.expectNoMsg(300 millis)
|
||||
}
|
||||
|
||||
test("bump timestamp in case of quick reconnection") { f =>
|
||||
test("broadcast disabled channel_update while offline") { f =>
|
||||
import f._
|
||||
val sender = TestProbe()
|
||||
|
||||
@ -378,32 +378,17 @@ class OfflineStateSpec extends TestkitBaseClass with StateTestsHelperMethods {
|
||||
awaitCond(alice.stateName == OFFLINE)
|
||||
awaitCond(bob.stateName == OFFLINE)
|
||||
|
||||
// alice and bob announce that their channel is OFFLINE
|
||||
val channelUpdate_alice_disabled = channelUpdateListener.expectMsgType[LocalChannelUpdate].channelUpdate
|
||||
val channelUpdate_bob_disabled = channelUpdateListener.expectMsgType[LocalChannelUpdate].channelUpdate
|
||||
assert(Announcements.isEnabled(channelUpdate_alice_disabled.channelFlags) == false)
|
||||
assert(Announcements.isEnabled(channelUpdate_bob_disabled.channelFlags) == false)
|
||||
// alice and bob will not announce that their channel is OFFLINE
|
||||
channelUpdateListener.expectNoMsg(300 millis)
|
||||
|
||||
// we immediately reconnect them
|
||||
sender.send(alice, INPUT_RECONNECTED(alice2bob.ref, aliceInit, bobInit))
|
||||
sender.send(bob, INPUT_RECONNECTED(bob2alice.ref, bobInit, aliceInit))
|
||||
|
||||
// peers exchange channel_reestablish messages
|
||||
alice2bob.expectMsgType[ChannelReestablish]
|
||||
bob2alice.expectMsgType[ChannelReestablish]
|
||||
alice2bob.forward(bob)
|
||||
bob2alice.forward(alice)
|
||||
|
||||
// both nodes reach NORMAL state, and broadcast a new channel_update
|
||||
val channelUpdate_alice_enabled = channelUpdateListener.expectMsgType[LocalChannelUpdate].channelUpdate
|
||||
val channelUpdate_bob_enabled = channelUpdateListener.expectMsgType[LocalChannelUpdate].channelUpdate
|
||||
assert(Announcements.isEnabled(channelUpdate_alice_enabled.channelFlags) == true)
|
||||
assert(Announcements.isEnabled(channelUpdate_bob_enabled.channelFlags) == true)
|
||||
|
||||
// let's check that the two successive channel_update have a different timestamp
|
||||
assert(channelUpdate_alice_enabled.timestamp > channelUpdate_alice_disabled.timestamp)
|
||||
assert(channelUpdate_bob_enabled.timestamp > channelUpdate_bob_disabled.timestamp)
|
||||
// we attempt to send a payment
|
||||
sender.send(alice, CMD_ADD_HTLC(4200, randomBytes32, 123456, upstream = Left(UUID.randomUUID())))
|
||||
val failure = sender.expectMsgType[Status.Failure]
|
||||
val AddHtlcFailed(_, _, ChannelUnavailable(_), _, _, _) = failure.cause
|
||||
|
||||
// alice will broadcast a new disabled channel_update
|
||||
val update = channelUpdateListener.expectMsgType[LocalChannelUpdate]
|
||||
assert(Announcements.isEnabled(update.channelUpdate.channelFlags) == false)
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -28,7 +28,7 @@ import fr.acinq.bitcoin.{Base58, Base58Check, Bech32, Block, ByteVector32, Crypt
|
||||
import fr.acinq.eclair.blockchain.bitcoind.BitcoindService
|
||||
import fr.acinq.eclair.blockchain.bitcoind.rpc.ExtendedBitcoinClient
|
||||
import fr.acinq.eclair.blockchain.{Watch, WatchConfirmed}
|
||||
import fr.acinq.eclair.channel.Channel.TickRefreshChannelUpdate
|
||||
import fr.acinq.eclair.channel.Channel.{BroadcastChannelUpdate, PeriodicRefresh}
|
||||
import fr.acinq.eclair.channel.Register.{Forward, ForwardShortId}
|
||||
import fr.acinq.eclair.channel._
|
||||
import fr.acinq.eclair.crypto.Sphinx.ErrorPacket
|
||||
@ -248,8 +248,8 @@ class IntegrationSpec extends TestKit(ActorSystem("test")) with BitcoindService
|
||||
sender.send(bitcoincli, BitcoinReq("generate", 4))
|
||||
sender.expectMsgType[JValue]
|
||||
// A requires private channels, as a consequence:
|
||||
// - only A and B know about channel A-B
|
||||
// - A is not announced
|
||||
// - only A and B know about channel A-B (and there is no channel_announcement)
|
||||
// - A is not announced (no node_announcement)
|
||||
awaitAnnouncements(nodes.filterKeys(key => List("A", "B").contains(key)), 10, 12, 26)
|
||||
awaitAnnouncements(nodes.filterKeys(key => !List("A", "B").contains(key)), 10, 12, 24)
|
||||
}
|
||||
@ -263,7 +263,9 @@ class IntegrationSpec extends TestKit(ActorSystem("test")) with BitcoindService
|
||||
// then we make the actual payment
|
||||
sender.send(nodes("A").paymentInitiator,
|
||||
SendPayment(amountMsat.amount, pr.paymentHash, nodes("D").nodeParams.nodeId, routeParams = integrationTestRouteParams, maxAttempts = 1))
|
||||
sender.expectMsgType[UUID]
|
||||
val paymentId = sender.expectMsgType[UUID](5 seconds)
|
||||
val ps = sender.expectMsgType[PaymentSucceeded](5 seconds)
|
||||
assert(ps.id == paymentId)
|
||||
}
|
||||
|
||||
test("send an HTLC A->D with an invalid expiry delta for B") {
|
||||
@ -301,7 +303,7 @@ class IntegrationSpec extends TestKit(ActorSystem("test")) with BitcoindService
|
||||
|
||||
// first let's wait 3 seconds to make sure the timestamp of the new channel_update will be strictly greater than the former
|
||||
sender.expectNoMsg(3 seconds)
|
||||
sender.send(nodes("B").register, ForwardShortId(shortIdBC, TickRefreshChannelUpdate))
|
||||
sender.send(nodes("B").register, ForwardShortId(shortIdBC, BroadcastChannelUpdate(PeriodicRefresh)))
|
||||
sender.send(nodes("B").register, ForwardShortId(shortIdBC, CMD_GETINFO))
|
||||
val channelUpdateBC_new = sender.expectMsgType[RES_GETINFO].data.asInstanceOf[DATA_NORMAL].channelUpdate
|
||||
logger.info(s"channelUpdateBC=$channelUpdateBC")
|
||||
@ -525,7 +527,7 @@ class IntegrationSpec extends TestKit(ActorSystem("test")) with BitcoindService
|
||||
sender.expectMsgType[JValue](10 seconds)
|
||||
// and we wait for C'channel to close
|
||||
awaitCond(stateListener.expectMsgType[ChannelStateChanged].currentState == CLOSED, max = 30 seconds)
|
||||
awaitAnnouncements(nodes.filter(_._1 == "A"), 9, 11, 24)
|
||||
awaitAnnouncements(nodes.filterKeys(_ == "A"), 9, 11, 24)
|
||||
}
|
||||
|
||||
test("propagate a fulfill upstream when a downstream htlc is redeemed on-chain (remote commit)") {
|
||||
@ -602,7 +604,7 @@ class IntegrationSpec extends TestKit(ActorSystem("test")) with BitcoindService
|
||||
sender.expectMsgType[JValue](10 seconds)
|
||||
// and we wait for C'channel to close
|
||||
awaitCond(stateListener.expectMsgType[ChannelStateChanged].currentState == CLOSED, max = 30 seconds)
|
||||
awaitAnnouncements(nodes.filter(_._1 == "A"), 8, 10, 22)
|
||||
awaitAnnouncements(nodes.filterKeys(_ == "A"), 8, 10, 22)
|
||||
}
|
||||
|
||||
test("propagate a failure upstream when a downstream htlc times out (local commit)") {
|
||||
@ -664,7 +666,7 @@ class IntegrationSpec extends TestKit(ActorSystem("test")) with BitcoindService
|
||||
sender.expectMsgType[JValue](10 seconds)
|
||||
// and we wait for C'channel to close
|
||||
awaitCond(stateListener.expectMsgType[ChannelStateChanged].currentState == CLOSED, max = 30 seconds)
|
||||
awaitAnnouncements(nodes.filter(_._1 == "A"), 7, 9, 20)
|
||||
awaitAnnouncements(nodes.filterKeys(_ == "A"), 7, 9, 20)
|
||||
}
|
||||
|
||||
test("propagate a failure upstream when a downstream htlc times out (remote commit)") {
|
||||
@ -730,7 +732,7 @@ class IntegrationSpec extends TestKit(ActorSystem("test")) with BitcoindService
|
||||
sender.expectMsgType[JValue](10 seconds)
|
||||
// and we wait for C'channel to close
|
||||
awaitCond(stateListener.expectMsgType[ChannelStateChanged].currentState == CLOSED, max = 30 seconds)
|
||||
awaitAnnouncements(nodes.filter(_._1 == "A"), 6, 8, 18)
|
||||
awaitAnnouncements(nodes.filterKeys(_ == "A"), 6, 8, 18)
|
||||
}
|
||||
|
||||
test("punish a node that has published a revoked commit tx") {
|
||||
@ -856,7 +858,7 @@ class IntegrationSpec extends TestKit(ActorSystem("test")) with BitcoindService
|
||||
// and we wait for C'channel to close
|
||||
awaitCond(stateListener.expectMsgType[ChannelStateChanged].currentState == CLOSED, max = 30 seconds)
|
||||
// this will remove the channel
|
||||
awaitAnnouncements(nodes.filter(_._1 == "A"), 5, 7, 16)
|
||||
awaitAnnouncements(nodes.filterKeys(_ == "A"), 5, 7, 16)
|
||||
}
|
||||
|
||||
test("generate and validate lots of channels") {
|
||||
|
@ -242,7 +242,7 @@ class RouterSpec extends BaseRouterSpec {
|
||||
val blockHeight = Globals.blockCount.get().toInt - 2020
|
||||
val channelId = ShortChannelId(blockHeight, 5, 0)
|
||||
val announcement = channelAnnouncement(channelId, priv_a, priv_c, priv_funding_a, priv_funding_c)
|
||||
val timestamp = Platform.currentTime.millisecond.toSeconds - 1209600 - 1
|
||||
val timestamp = (Platform.currentTime.milliseconds - 14.days - 1.day).toSeconds
|
||||
val update = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_a, c, channelId, cltvExpiryDelta = 7, htlcMinimumMsat = 0, feeBaseMsat = 766000, feeProportionalMillionths = 10, htlcMaximumMsat = 5, timestamp = timestamp)
|
||||
val probe = TestProbe()
|
||||
probe.ignoreMsg { case _: TransportHandler.ReadAck => true }
|
||||
|
@ -17,6 +17,7 @@
|
||||
package fr.acinq.eclair.wire
|
||||
|
||||
import java.util.UUID
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
import fr.acinq.bitcoin.DeterministicWallet.KeyPath
|
||||
import fr.acinq.bitcoin.{DeterministicWallet, OutPoint}
|
||||
@ -28,7 +29,9 @@ import fr.acinq.eclair.wire.ChannelCodecs._
|
||||
import fr.acinq.eclair.{UInt64, randomBytes, randomBytes32, randomKey}
|
||||
import org.scalatest.FunSuite
|
||||
import scodec.bits._
|
||||
|
||||
import scala.compat.Platform
|
||||
import scala.concurrent.duration._
|
||||
import scala.util.Random
|
||||
import scala.concurrent.duration._
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user