1
0
Fork 0
mirror of https://github.com/ACINQ/eclair.git synced 2025-02-24 14:50:46 +01:00

Dedicated event for channel_update modifications (#1935)

Use an event `ChannelUpdateParametersChanged` for the sole purpose of tracking changes to channel_update.

Also, conf change at restore is now treated like a regular fee update. We do handle `CMD_UPDATE_RELAY_FEES` in both `OFFLINE` and `SYNCING`, because there may be a race between `CMD_UPDATE_RELAY_FEES` and
`ChannelRestablish`. And there was no good reason to behave differently in those states anyway.

* fix updateRelayFee api call

The `Register` should be used to channel actors, not the `Router`.
The former tracks all channels, whereas the latter only contains
channels in certain states. We only query the `Router` when we need
reference to external (public) nodes and channels.
This commit is contained in:
Pierre-Marie Padiou 2021-09-03 18:45:05 +02:00 committed by GitHub
parent 9f9f10e911
commit daace535c4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 366 additions and 221 deletions

View file

@ -202,9 +202,7 @@ class EclairImpl(appKit: Kit) extends Eclair with Logging {
for (nodeId <- nodes) {
appKit.nodeParams.db.peers.addOrUpdateRelayFees(nodeId, RelayFees(feeBaseMsat, feeProportionalMillionths))
}
(appKit.router ? Router.GetLocalChannels).mapTo[Iterable[LocalChannel]]
.map(channels => channels.filter(c => nodes.contains(c.remoteNodeId)).map(c => Right(c.shortChannelId)))
.flatMap(channels => sendToChannels[CommandResponse[CMD_UPDATE_RELAY_FEE]](channels.toList, CMD_UPDATE_RELAY_FEE(ActorRef.noSender, feeBaseMsat, feeProportionalMillionths)))
sendToNodes(nodes, CMD_UPDATE_RELAY_FEE(ActorRef.noSender, feeBaseMsat, feeProportionalMillionths, cltvExpiryDelta_opt = None))
}
override def peers()(implicit timeout: Timeout): Future[Iterable[PeerInfo]] = for {
@ -424,6 +422,14 @@ class EclairImpl(appKit: Kit) extends Eclair with Logging {
Future.foldLeft(commands)(Map.empty[ApiTypes.ChannelIdentifier, Either[Throwable, T]])(_ + _)
}
/** Send a request to multiple channels using node ids */
private def sendToNodes[T: ClassTag](nodeids: List[PublicKey], request: Any)(implicit timeout: Timeout): Future[Map[ApiTypes.ChannelIdentifier, Either[Throwable, T]]] = {
for {
channelIds <- (appKit.register ? Symbol("channelsTo")).mapTo[Map[ByteVector32, PublicKey]].map(_.filter(kv => nodeids.contains(kv._2)).keys)
res <- sendToChannels[T](channelIds.map(Left(_)).toList, request)
} yield res
}
override def getInfo()(implicit timeout: Timeout): Future[GetInfoResponse] = Future.successful(
GetInfoResponse(
version = Kit.getVersionLong,

View file

@ -287,32 +287,20 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
watchFundingTx(data.commitments)
context.system.eventStream.publish(ShortChannelIdAssigned(self, normal.channelId, normal.channelUpdate.shortChannelId, None))
// we rebuild a new channel_update with values from the configuration because they may have changed while eclair was down
// we check the configuration because the values for channel_update may have changed while eclair was down
val fees = getRelayFees(nodeParams, remoteNodeId, data.commitments)
val candidateChannelUpdate = Announcements.makeChannelUpdate(
nodeParams.chainHash,
nodeParams.privateKey,
remoteNodeId,
normal.channelUpdate.shortChannelId,
nodeParams.expiryDelta,
normal.commitments.remoteParams.htlcMinimum,
fees.feeBase,
fees.feeProportionalMillionths,
normal.commitments.capacity.toMilliSatoshi,
enable = Announcements.isEnabled(normal.channelUpdate.channelFlags))
val channelUpdate1 = if (Announcements.areSame(candidateChannelUpdate, normal.channelUpdate)) {
// if there was no configuration change we keep the existing channel update
normal.channelUpdate
} else {
log.info("refreshing channel_update due to configuration changes old={} new={}", normal.channelUpdate, candidateChannelUpdate)
candidateChannelUpdate
if (fees.feeBase != normal.channelUpdate.feeBaseMsat ||
fees.feeProportionalMillionths != normal.channelUpdate.feeProportionalMillionths ||
nodeParams.expiryDelta != normal.channelUpdate.cltvExpiryDelta) {
log.info("refreshing channel_update due to configuration changes")
self ! CMD_UPDATE_RELAY_FEE(ActorRef.noSender, fees.feeBase, fees.feeProportionalMillionths, Some(nodeParams.expiryDelta))
}
// we need to periodically re-send channel updates, otherwise channel will be considered stale and get pruned by network
// we take into account the date of the last update so that we don't send superfluous updates when we restart the app
val periodicRefreshInitialDelay = Helpers.nextChannelUpdateRefresh(channelUpdate1.timestamp)
val periodicRefreshInitialDelay = Helpers.nextChannelUpdateRefresh(normal.channelUpdate.timestamp)
context.system.scheduler.scheduleWithFixedDelay(initialDelay = periodicRefreshInitialDelay, delay = REFRESH_CHANNEL_UPDATE_INTERVAL, receiver = self, message = BroadcastChannelUpdate(PeriodicRefresh))
goto(OFFLINE) using normal.copy(channelUpdate = channelUpdate1)
goto(OFFLINE) using normal
case funding: DATA_WAIT_FOR_FUNDING_CONFIRMED =>
watchFundingTx(funding.commitments)
@ -977,7 +965,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
// if channel is public we need to send our announcement_signatures in order to generate the channel_announcement
Some(Helpers.makeAnnouncementSignatures(nodeParams, d.commitments, shortChannelId))
} else None
// we use GOTO instead of stay() because we want to fire transitions
// we use goto() instead of stay() because we want to fire transitions
goto(NORMAL) using d.copy(shortChannelId = shortChannelId, buried = true, channelUpdate = channelUpdate) storing() sending localAnnSigs_opt.toSeq
case Event(remoteAnnSigs: AnnouncementSignatures, d: DATA_NORMAL) if d.commitments.announceChannel =>
@ -996,7 +984,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
if (!Announcements.checkSigs(channelAnn)) {
handleLocalError(InvalidAnnouncementSignatures(d.channelId, remoteAnnSigs), d, Some(remoteAnnSigs))
} else {
// we use GOTO instead of stay() because we want to fire transitions
// we use goto() instead of stay() because we want to fire transitions
goto(NORMAL) using d.copy(channelAnnouncement = Some(channelAnn)) storing()
}
case Some(_) =>
@ -1017,12 +1005,12 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
}
case Event(c: CMD_UPDATE_RELAY_FEE, d: DATA_NORMAL) =>
log.info("updating relay fees: prevFeeBaseMsat={} nextFeeBaseMsat={} prevFeeProportionalMillionths={} nextFeeProportionalMillionths={}", d.channelUpdate.feeBaseMsat, c.feeBase, d.channelUpdate.feeProportionalMillionths, c.feeProportionalMillionths)
val channelUpdate = Announcements.makeChannelUpdate(nodeParams.chainHash, nodeParams.privateKey, remoteNodeId, d.shortChannelId, d.channelUpdate.cltvExpiryDelta, d.channelUpdate.htlcMinimumMsat, c.feeBase, c.feeProportionalMillionths, d.commitments.capacity.toMilliSatoshi, enable = Helpers.aboveReserve(d.commitments))
val channelUpdate1 = Announcements.makeChannelUpdate(nodeParams.chainHash, nodeParams.privateKey, remoteNodeId, d.shortChannelId, c.cltvExpiryDelta_opt.getOrElse(d.channelUpdate.cltvExpiryDelta), d.channelUpdate.htlcMinimumMsat, c.feeBase, c.feeProportionalMillionths, d.commitments.capacity.toMilliSatoshi, enable = Helpers.aboveReserve(d.commitments))
log.info(s"updating relay fees: prev={} next={}", d.channelUpdate.toStringShort, channelUpdate1.toStringShort)
val replyTo = if (c.replyTo == ActorRef.noSender) sender() else c.replyTo
replyTo ! RES_SUCCESS(c, d.channelId)
// we use GOTO instead of stay() because we want to fire transitions
goto(NORMAL) using d.copy(channelUpdate = channelUpdate) storing()
// we use goto() instead of stay() because we want to fire transitions
goto(NORMAL) using d.copy(channelUpdate = channelUpdate1) storing()
case Event(BroadcastChannelUpdate(reason), d: DATA_NORMAL) =>
val age = System.currentTimeMillis.milliseconds - d.channelUpdate.timestamp.seconds
@ -1034,7 +1022,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
stay()
case _ =>
log.debug("refreshing channel_update announcement (reason={})", reason)
// we use GOTO instead of stay() because we want to fire transitions
// we use goto() instead of stay() because we want to fire transitions
goto(NORMAL) using d.copy(channelUpdate = channelUpdate1) storing()
}
@ -1050,12 +1038,12 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
// if we have pending unsigned htlcs, then we cancel them and generate an update with the disabled flag set, that will be returned to the sender in a temporary channel failure
val d1 = if (d.commitments.localChanges.proposed.collectFirst { case add: UpdateAddHtlc => add }.isDefined) {
log.debug("updating channel_update announcement (reason=disabled)")
val channelUpdate = Announcements.makeChannelUpdate(nodeParams.chainHash, nodeParams.privateKey, remoteNodeId, d.shortChannelId, d.channelUpdate.cltvExpiryDelta, d.channelUpdate.htlcMinimumMsat, d.channelUpdate.feeBaseMsat, d.channelUpdate.feeProportionalMillionths, d.commitments.capacity.toMilliSatoshi, enable = false)
val channelUpdate1 = Announcements.makeChannelUpdate(nodeParams.chainHash, nodeParams.privateKey, remoteNodeId, d.shortChannelId, d.channelUpdate.cltvExpiryDelta, d.channelUpdate.htlcMinimumMsat, d.channelUpdate.feeBaseMsat, d.channelUpdate.feeProportionalMillionths, d.commitments.capacity.toMilliSatoshi, enable = false)
// NB: the htlcs stay() in the commitments.localChange, they will be cleaned up after reconnection
d.commitments.localChanges.proposed.collect {
case add: UpdateAddHtlc => relayer ! RES_ADD_SETTLED(d.commitments.originChannels(add.id), add, HtlcResult.DisconnectedBeforeSigned(channelUpdate))
case add: UpdateAddHtlc => relayer ! RES_ADD_SETTLED(d.commitments.originChannels(add.id), add, HtlcResult.DisconnectedBeforeSigned(channelUpdate1))
}
d.copy(channelUpdate = channelUpdate)
d.copy(channelUpdate = channelUpdate1)
} else {
d
}
@ -1546,18 +1534,11 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
// -> in CLOSING we either have mutual closed (so no more htlcs), or already have unilaterally closed (so no action required), and we can't be in OFFLINE state anyway
case Event(ProcessCurrentBlockCount(c), d: HasCommitments) => handleNewBlock(c, d)
case Event(c: CurrentFeerates, d: HasCommitments) =>
handleOfflineFeerate(c, d)
case Event(c: CurrentFeerates, d: HasCommitments) => handleCurrentFeerateDisconnected(c, d)
case Event(c: CMD_ADD_HTLC, d: DATA_NORMAL) => handleAddDisconnected(c, d)
case Event(c: CMD_UPDATE_RELAY_FEE, d: DATA_NORMAL) =>
log.info(s"updating relay fees: prevFeeBaseMsat={} nextFeeBaseMsat={} prevFeeProportionalMillionths={} nextFeeProportionalMillionths={}", d.channelUpdate.feeBaseMsat, c.feeBase, d.channelUpdate.feeProportionalMillionths, c.feeProportionalMillionths)
val channelUpdate = Announcements.makeChannelUpdate(nodeParams.chainHash, nodeParams.privateKey, remoteNodeId, d.shortChannelId, d.channelUpdate.cltvExpiryDelta, d.channelUpdate.htlcMinimumMsat, c.feeBase, c.feeProportionalMillionths, d.commitments.capacity.toMilliSatoshi, enable = false)
val replyTo = if (c.replyTo == ActorRef.noSender) sender() else c.replyTo
replyTo ! RES_SUCCESS(c, d.channelId)
// we're in OFFLINE state, we don't broadcast the new update right away, we will do that when next time we go to NORMAL state
stay() using d.copy(channelUpdate = channelUpdate) storing()
case Event(c: CMD_UPDATE_RELAY_FEE, d: DATA_NORMAL) => handleUpdateRelayFeeDisconnected(c, d)
case Event(getTxResponse: GetTxWithMetaResponse, d: DATA_WAIT_FOR_FUNDING_CONFIRMED) if getTxResponse.txid == d.commitments.commitInput.outPoint.txid => handleGetFundingTx(getTxResponse, d.waitingSinceBlock, d.fundingTx)
@ -1738,8 +1719,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
case Event(ProcessCurrentBlockCount(c), d: HasCommitments) => handleNewBlock(c, d)
case Event(c: CurrentFeerates, d: HasCommitments) =>
handleOfflineFeerate(c, d)
case Event(c: CurrentFeerates, d: HasCommitments) => handleCurrentFeerateDisconnected(c, d)
case Event(getTxResponse: GetTxWithMetaResponse, d: DATA_WAIT_FOR_FUNDING_CONFIRMED) if getTxResponse.txid == d.commitments.commitInput.outPoint.txid => handleGetFundingTx(getTxResponse, d.waitingSinceBlock, d.fundingTx)
@ -1816,6 +1796,9 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
case Event(c: CMD_UPDATE_RELAY_FEE, d) => handleCommandError(CommandUnavailableInThisState(d.channelId, "updaterelayfee", stateName), c)
// at restore, if the configuration has changed, the channel will send a command to itself to update the relay fees
case Event(RES_SUCCESS(_: CMD_UPDATE_RELAY_FEE, channelId), d: DATA_NORMAL) if channelId == d.channelId => stay()
// we only care about this event in NORMAL and SHUTDOWN state, and there may be cases where the task is not cancelled
case Event(_: RevocationTimeout, _) => stay()
@ -1883,25 +1866,20 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
case _ => ()
}
val previousChannelUpdate_opt = stateData match {
case data: DATA_NORMAL => Some(data.channelUpdate)
case _ => None
}
(state, nextState, stateData, nextStateData) match {
// ORDER MATTERS!
case (WAIT_FOR_INIT_INTERNAL, OFFLINE, _, normal: DATA_NORMAL) =>
Logs.withMdc(diagLog)(Logs.mdc(category_opt = Some(Logs.LogCategory.CONNECTION))) {
log.debug("re-emitting channel_update={} enabled={} ", normal.channelUpdate, Announcements.isEnabled(normal.channelUpdate.channelFlags))
}
context.system.eventStream.publish(LocalChannelUpdate(self, normal.commitments.channelId, normal.shortChannelId, normal.commitments.remoteParams.nodeId, normal.channelAnnouncement, normal.channelUpdate, previousChannelUpdate_opt, normal.commitments))
context.system.eventStream.publish(LocalChannelUpdate(self, normal.commitments.channelId, normal.shortChannelId, normal.commitments.remoteParams.nodeId, normal.channelAnnouncement, normal.channelUpdate, normal.commitments))
case (_, _, d1: DATA_NORMAL, d2: DATA_NORMAL) if d1.channelUpdate == d2.channelUpdate && d1.channelAnnouncement == d2.channelAnnouncement =>
// don't do anything if neither the channel_update nor the channel_announcement didn't change
()
case (WAIT_FOR_FUNDING_LOCKED | NORMAL | OFFLINE | SYNCING, NORMAL | OFFLINE, _, normal: DATA_NORMAL) =>
// when we do WAIT_FOR_FUNDING_LOCKED->NORMAL or NORMAL->NORMAL or SYNCING->NORMAL or NORMAL->OFFLINE, we send out the new channel_update (most of the time it will just be to enable/disable the channel)
log.info("emitting channel_update={} enabled={} ", normal.channelUpdate, Announcements.isEnabled(normal.channelUpdate.channelFlags))
context.system.eventStream.publish(LocalChannelUpdate(self, normal.commitments.channelId, normal.shortChannelId, normal.commitments.remoteParams.nodeId, normal.channelAnnouncement, normal.channelUpdate, previousChannelUpdate_opt, normal.commitments))
context.system.eventStream.publish(LocalChannelUpdate(self, normal.commitments.channelId, normal.shortChannelId, normal.commitments.remoteParams.nodeId, normal.channelAnnouncement, normal.channelUpdate, normal.commitments))
case (_, _, _: DATA_NORMAL, _: DATA_NORMAL) =>
// in any other case (e.g. OFFLINE->SYNCING) we do nothing
()
@ -1911,6 +1889,14 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
context.system.eventStream.publish(LocalChannelDown(self, normal.commitments.channelId, normal.shortChannelId, normal.commitments.remoteParams.nodeId))
case _ => ()
}
(stateData, nextStateData) match {
// NORMAL->NORMAL, NORMAL->OFFLINE, SYNCING->NORMAL
case (d1: DATA_NORMAL, d2: DATA_NORMAL) => maybeEmitChannelUpdateChangedEvent(newUpdate = d2.channelUpdate, oldUpdate_opt = Some(d1.channelUpdate), d2)
// WAIT_FOR_FUNDING_LOCKED->NORMAL
case (_: DATA_WAIT_FOR_FUNDING_LOCKED, d2: DATA_NORMAL) => maybeEmitChannelUpdateChangedEvent(newUpdate = d2.channelUpdate, oldUpdate_opt = None, d2)
case _ => ()
}
}
/** Metrics */
@ -1999,7 +1985,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
* @param d the channel commtiments
* @return
*/
private def handleOfflineFeerate(c: CurrentFeerates, d: HasCommitments) = {
private def handleCurrentFeerateDisconnected(c: CurrentFeerates, d: HasCommitments) = {
val networkFeeratePerKw = nodeParams.onChainFeeConf.getCommitmentFeerate(remoteNodeId, d.commitments.channelType, d.commitments.capacity, Some(c))
val currentFeeratePerKw = d.commitments.localCommit.spec.feeratePerKw
// if the network fees are too high we risk to not be able to confirm our current commitment
@ -2140,11 +2126,11 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
if (Announcements.isEnabled(d.channelUpdate.channelFlags)) {
// if the channel isn't disabled we generate a new channel_update
log.info("updating channel_update announcement (reason=disabled)")
val channelUpdate = Announcements.makeChannelUpdate(nodeParams.chainHash, nodeParams.privateKey, remoteNodeId, d.shortChannelId, d.channelUpdate.cltvExpiryDelta, d.channelUpdate.htlcMinimumMsat, d.channelUpdate.feeBaseMsat, d.channelUpdate.feeProportionalMillionths, d.commitments.capacity.toMilliSatoshi, enable = false)
val channelUpdate1 = Announcements.makeChannelUpdate(nodeParams.chainHash, nodeParams.privateKey, remoteNodeId, d.shortChannelId, d.channelUpdate.cltvExpiryDelta, d.channelUpdate.htlcMinimumMsat, d.channelUpdate.feeBaseMsat, d.channelUpdate.feeProportionalMillionths, d.commitments.capacity.toMilliSatoshi, enable = false)
// then we update the state and replay the request
self forward c
// we use goto to fire transitions
goto(stateName) using d.copy(channelUpdate = channelUpdate)
// we use goto() to fire transitions
goto(stateName) using d.copy(channelUpdate = channelUpdate1)
} else {
// channel is already disabled, we reply to the request
val error = ChannelUnavailable(d.channelId)
@ -2152,6 +2138,25 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
}
}
private def handleUpdateRelayFeeDisconnected(c: CMD_UPDATE_RELAY_FEE, d: DATA_NORMAL) = {
val channelUpdate1 = Announcements.makeChannelUpdate(nodeParams.chainHash, nodeParams.privateKey, remoteNodeId, d.shortChannelId, c.cltvExpiryDelta_opt.getOrElse(d.channelUpdate.cltvExpiryDelta), d.channelUpdate.htlcMinimumMsat, c.feeBase, c.feeProportionalMillionths, d.commitments.capacity.toMilliSatoshi, enable = false)
log.info(s"updating relay fees: prev={} next={}", d.channelUpdate.toStringShort, channelUpdate1.toStringShort)
val replyTo = if (c.replyTo == ActorRef.noSender) sender() else c.replyTo
replyTo ! RES_SUCCESS(c, d.channelId)
// We're in OFFLINE state, by using stay() instead of goto() we skip the transition handler and won't broadcast the
// new update right away. The goal is to not emit superfluous updates when the channel is unusable. At reconnection
// there will be a state transition SYNCING->NORMAL which will cause the update to be broadcast.
// However, we still need to advertise that the channel_update parameters have changed, so we manually call the method
maybeEmitChannelUpdateChangedEvent(newUpdate = channelUpdate1, oldUpdate_opt = Some(d.channelUpdate), d)
stay() using d.copy(channelUpdate = channelUpdate1) storing()
}
private def maybeEmitChannelUpdateChangedEvent(newUpdate: ChannelUpdate, oldUpdate_opt: Option[ChannelUpdate], d: DATA_NORMAL): Unit = {
if (oldUpdate_opt.isEmpty || !Announcements.areSameIgnoreFlags(newUpdate, oldUpdate_opt.get)) {
context.system.eventStream.publish(ChannelUpdateParametersChanged(self, d.channelId, newUpdate.shortChannelId, d.commitments.remoteNodeId, newUpdate))
}
}
private def handleNewBlock(c: CurrentBlockCount, d: HasCommitments) = {
val timedOutOutgoing = d.commitments.timedOutOutgoingHtlcs(c.blockCount)
val almostTimedOutIncoming = d.commitments.almostTimedOutIncomingHtlcs(c.blockCount, nodeParams.fulfillSafetyBeforeTimeout)

View file

@ -173,7 +173,7 @@ final case class CMD_SIGN(replyTo_opt: Option[ActorRef] = None) extends HasOptio
sealed trait CloseCommand extends HasReplyToCommand
final case class CMD_CLOSE(replyTo: ActorRef, scriptPubKey: Option[ByteVector]) extends CloseCommand
final case class CMD_FORCECLOSE(replyTo: ActorRef) extends CloseCommand
final case class CMD_UPDATE_RELAY_FEE(replyTo: ActorRef, feeBase: MilliSatoshi, feeProportionalMillionths: Long) extends HasReplyToCommand
final case class CMD_UPDATE_RELAY_FEE(replyTo: ActorRef, feeBase: MilliSatoshi, feeProportionalMillionths: Long, cltvExpiryDelta_opt: Option[CltvExpiryDelta]) extends HasReplyToCommand
final case class CMD_GETSTATE(replyTo: ActorRef) extends HasReplyToCommand
final case class CMD_GETSTATEDATA(replyTo: ActorRef) extends HasReplyToCommand
final case class CMD_GETINFO(replyTo: ActorRef)extends HasReplyToCommand

View file

@ -38,7 +38,9 @@ case class ChannelIdAssigned(channel: ActorRef, remoteNodeId: PublicKey, tempora
case class ShortChannelIdAssigned(channel: ActorRef, channelId: ByteVector32, shortChannelId: ShortChannelId, previousShortChannelId: Option[ShortChannelId]) extends ChannelEvent
case class LocalChannelUpdate(channel: ActorRef, channelId: ByteVector32, shortChannelId: ShortChannelId, remoteNodeId: PublicKey, channelAnnouncement_opt: Option[ChannelAnnouncement], channelUpdate: ChannelUpdate, previousChannelUpdate_opt: Option[ChannelUpdate], commitments: AbstractCommitments) extends ChannelEvent
case class LocalChannelUpdate(channel: ActorRef, channelId: ByteVector32, shortChannelId: ShortChannelId, remoteNodeId: PublicKey, channelAnnouncement_opt: Option[ChannelAnnouncement], channelUpdate: ChannelUpdate, commitments: AbstractCommitments) extends ChannelEvent
case class ChannelUpdateParametersChanged(channel: ActorRef, channelId: ByteVector32, shortChannelId: ShortChannelId, remoteNodeId: PublicKey, channelUpdate: ChannelUpdate) extends ChannelEvent
case class LocalChannelDown(channel: ActorRef, channelId: ByteVector32, shortChannelId: ShortChannelId, remoteNodeId: PublicKey) extends ChannelEvent

View file

@ -39,7 +39,7 @@ trait AuditDb extends Closeable {
def add(channelErrorOccurred: ChannelErrorOccurred): Unit
def addChannelUpdate(localChannelUpdate: LocalChannelUpdate): Unit
def addChannelUpdate(channelUpdateParametersChanged: ChannelUpdateParametersChanged): Unit
def listSent(from: Long, to: Long): Seq[PaymentSent]

View file

@ -40,7 +40,7 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with ActorLogging {
context.system.eventStream.subscribe(self, classOf[ChannelErrorOccurred])
context.system.eventStream.subscribe(self, classOf[ChannelStateChanged])
context.system.eventStream.subscribe(self, classOf[ChannelClosed])
context.system.eventStream.subscribe(self, classOf[LocalChannelUpdate])
context.system.eventStream.subscribe(self, classOf[ChannelUpdateParametersChanged])
override def receive: Receive = {
@ -117,16 +117,8 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with ActorLogging {
auditDb.add(ChannelEvent(e.channelId, e.commitments.remoteParams.nodeId, e.commitments.commitInput.txOut.amount, e.commitments.localParams.isFunder, !e.commitments.announceChannel, event))
channelsDb.updateChannelMeta(e.channelId, event)
case u: LocalChannelUpdate =>
u.previousChannelUpdate_opt match {
case Some(previous) if
u.channelUpdate.feeBaseMsat == previous.feeBaseMsat &&
u.channelUpdate.feeProportionalMillionths == previous.feeProportionalMillionths &&
u.channelUpdate.cltvExpiryDelta == previous.cltvExpiryDelta &&
u.channelUpdate.htlcMinimumMsat == previous.htlcMinimumMsat &&
u.channelUpdate.htlcMaximumMsat == previous.htlcMaximumMsat => ()
case _ => auditDb.addChannelUpdate(u)
}
case u: ChannelUpdateParametersChanged =>
auditDb.addChannelUpdate(u)
}

View file

@ -161,9 +161,9 @@ case class DualAuditDb(sqlite: SqliteAuditDb, postgres: PgAuditDb) extends Audit
sqlite.add(channelErrorOccurred)
}
override def addChannelUpdate(localChannelUpdate: LocalChannelUpdate): Unit = {
runAsync(postgres.addChannelUpdate(localChannelUpdate))
sqlite.addChannelUpdate(localChannelUpdate)
override def addChannelUpdate(channelUpdateParametersChanged: ChannelUpdateParametersChanged): Unit = {
runAsync(postgres.addChannelUpdate(channelUpdateParametersChanged))
sqlite.addChannelUpdate(channelUpdateParametersChanged)
}
override def listSent(from: Long, to: Long): Seq[PaymentSent] = {

View file

@ -18,7 +18,7 @@ package fr.acinq.eclair.db.pg
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.bitcoin.{ByteVector32, Satoshi, SatoshiLong}
import fr.acinq.eclair.channel.{ChannelErrorOccurred, LocalChannelUpdate, LocalError, NetworkFeePaid, RemoteError}
import fr.acinq.eclair.channel._
import fr.acinq.eclair.db.AuditDb.{NetworkFee, Stats}
import fr.acinq.eclair.db.DbEventHandler.ChannelEvent
import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics
@ -243,7 +243,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
}
}
override def addChannelUpdate(u: LocalChannelUpdate): Unit = withMetrics("audit/add-channel-update", DbBackends.Postgres) {
override def addChannelUpdate(u: ChannelUpdateParametersChanged): Unit = withMetrics("audit/add-channel-update", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.prepareStatement("INSERT INTO audit.channel_updates VALUES (?, ?, ?, ?, ?, ?, ?, ?)")) { statement =>
statement.setString(1, u.channelId.toHex)

View file

@ -18,7 +18,7 @@ package fr.acinq.eclair.db.sqlite
import fr.acinq.bitcoin.Crypto.{PrivateKey, PublicKey}
import fr.acinq.bitcoin.{ByteVector32, Satoshi, SatoshiLong}
import fr.acinq.eclair.channel.{ChannelErrorOccurred, LocalChannelUpdate, LocalError, NetworkFeePaid, RemoteError}
import fr.acinq.eclair.channel._
import fr.acinq.eclair.db.AuditDb.{NetworkFee, Stats}
import fr.acinq.eclair.db.DbEventHandler.ChannelEvent
import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics
@ -240,7 +240,7 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging {
}
}
override def addChannelUpdate(u: LocalChannelUpdate): Unit = withMetrics("audit/add-channel-update", DbBackends.Sqlite) {
override def addChannelUpdate(u: ChannelUpdateParametersChanged): Unit = withMetrics("audit/add-channel-update", DbBackends.Sqlite) {
using(sqlite.prepareStatement("INSERT INTO channel_updates VALUES (?, ?, ?, ?, ?, ?, ?, ?)")) { statement =>
statement.setBytes(1, u.channelId.toArray)
statement.setBytes(2, u.remoteNodeId.value.toArray)

View file

@ -93,7 +93,7 @@ object ChannelRelayer {
replyTo ! Relayer.OutgoingChannels(channels.toSeq)
Behaviors.same
case WrappedLocalChannelUpdate(LocalChannelUpdate(_, channelId, shortChannelId, remoteNodeId, _, channelUpdate, _, commitments)) =>
case WrappedLocalChannelUpdate(LocalChannelUpdate(_, channelId, shortChannelId, remoteNodeId, _, channelUpdate, commitments)) =>
context.log.debug(s"updating local channel info for channelId=$channelId shortChannelId=$shortChannelId remoteNodeId=$remoteNodeId channelUpdate={} commitments={}", channelUpdate, commitments)
val channelUpdates1 = channelUpdates + (channelUpdate.shortChannelId -> Relayer.OutgoingChannel(remoteNodeId, channelUpdate, commitments))
val node2channels1 = node2channels.addOne(remoteNodeId, channelUpdate.shortChannelId)

View file

@ -126,6 +126,13 @@ object Announcements {
def areSame(u1: ChannelUpdate, u2: ChannelUpdate): Boolean =
u1.copy(signature = ByteVector64.Zeroes, timestamp = 0) == u2.copy(signature = ByteVector64.Zeroes, timestamp = 0)
def areSameIgnoreFlags(u1: ChannelUpdate, u2: ChannelUpdate): Boolean =
u1.feeBaseMsat == u2.feeBaseMsat &&
u1.feeProportionalMillionths == u2.feeProportionalMillionths &&
u1.cltvExpiryDelta == u2.cltvExpiryDelta &&
u1.htlcMinimumMsat == u2.htlcMinimumMsat &&
u1.htlcMaximumMsat == u2.htlcMaximumMsat
def makeMessageFlags(hasOptionChannelHtlcMax: Boolean): Byte = BitVector.bits(hasOptionChannelHtlcMax :: Nil).padLeft(8).toByte()
def makeChannelFlags(isNode1: Boolean, enable: Boolean): Byte = BitVector.bits(!enable :: !isNode1 :: Nil).padLeft(8).toByte()

View file

@ -264,6 +264,8 @@ case class ChannelUpdate(signature: ByteVector64,
require(((messageFlags & 1) != 0) == htlcMaximumMsat.isDefined, "htlcMaximumMsat is not consistent with messageFlags")
def isNode1 = Announcements.isNode1(channelFlags)
def toStringShort: String = s"cltvExpiryDelta=$cltvExpiryDelta,feeBase=$feeBaseMsat,feeProportionalMillionths=$feeProportionalMillionths"
}
// @formatter:off

View file

@ -1,125 +0,0 @@
package fr.acinq.eclair.channel
import akka.testkit.TestProbe
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.bitcoin._
import fr.acinq.eclair.TestConstants.Alice
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.WatchFundingSpentTriggered
import fr.acinq.eclair.channel.states.ChannelStateTestsBase
import fr.acinq.eclair.crypto.Generators
import fr.acinq.eclair.crypto.keymanager.ChannelKeyManager
import fr.acinq.eclair.transactions.Scripts
import fr.acinq.eclair.transactions.Transactions.{ClaimP2WPKHOutputTx, DefaultCommitmentFormat, InputInfo, TxOwner}
import fr.acinq.eclair.wire.protocol.{ChannelReestablish, CommitSig, Error, Init, RevokeAndAck}
import fr.acinq.eclair.{TestConstants, TestKitBaseClass, _}
import org.scalatest.Outcome
import org.scalatest.funsuite.FixtureAnyFunSuiteLike
import scala.concurrent.duration._
class RecoverySpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with ChannelStateTestsBase {
type FixtureParam = SetupFixture
override def withFixture(test: OneArgTest): Outcome = {
val setup = test.tags.contains("disable-offline-mismatch") match {
case false => init()
case true => init(nodeParamsA = Alice.nodeParams.copy(onChainFeeConf = Alice.nodeParams.onChainFeeConf.copy(closeOnOfflineMismatch = false)))
}
import setup._
within(30 seconds) {
reachNormal(setup)
awaitCond(alice.stateName == NORMAL)
awaitCond(bob.stateName == NORMAL)
withFixture(test.toNoArgTest(setup))
}
}
def aliceInit = Init(TestConstants.Alice.nodeParams.features)
def bobInit = Init(TestConstants.Bob.nodeParams.features)
test("use funding pubkeys from publish commitment to spend our output") { f =>
import f._
val sender = TestProbe()
// we start by storing the current state
val oldStateData = alice.stateData
// then we add an htlc and sign it
addHtlc(250000000 msat, alice, bob, alice2bob, bob2alice)
sender.send(alice, CMD_SIGN())
alice2bob.expectMsgType[CommitSig]
alice2bob.forward(bob)
// alice will receive neither the revocation nor the commit sig
bob2alice.expectMsgType[RevokeAndAck]
bob2alice.expectMsgType[CommitSig]
// we simulate a disconnection
sender.send(alice, INPUT_DISCONNECTED)
sender.send(bob, INPUT_DISCONNECTED)
awaitCond(alice.stateName == OFFLINE)
awaitCond(bob.stateName == OFFLINE)
// then we manually replace alice's state with an older one
alice.setState(OFFLINE, oldStateData)
// then we reconnect them
sender.send(alice, INPUT_RECONNECTED(alice2bob.ref, aliceInit, bobInit))
sender.send(bob, INPUT_RECONNECTED(bob2alice.ref, bobInit, aliceInit))
// peers exchange channel_reestablish messages
alice2bob.expectMsgType[ChannelReestablish]
val ce = bob2alice.expectMsgType[ChannelReestablish]
// alice then realizes it has an old state...
bob2alice.forward(alice)
// ... and ask bob to publish its current commitment
val error = alice2bob.expectMsgType[Error]
assert(new String(error.data.toArray) === PleasePublishYourCommitment(channelId(alice)).getMessage)
// alice now waits for bob to publish its commitment
awaitCond(alice.stateName == WAIT_FOR_REMOTE_PUBLISH_FUTURE_COMMITMENT)
// bob is nice and publishes its commitment
val bobCommitTx = bob.stateData.asInstanceOf[DATA_NORMAL].commitments.fullySignedLocalCommitTx(bob.underlyingActor.nodeParams.channelKeyManager).tx
// actual tests starts here: let's see what we can do with Bob's commit tx
sender.send(alice, WatchFundingSpentTriggered(bobCommitTx))
// from Bob's commit tx we can extract both funding public keys
val OP_2 :: OP_PUSHDATA(pub1, _) :: OP_PUSHDATA(pub2, _) :: OP_2 :: OP_CHECKMULTISIG :: Nil = Script.parse(bobCommitTx.txIn(0).witness.stack.last)
// from Bob's commit tx we can also extract our p2wpkh output
val ourOutput = bobCommitTx.txOut.find(_.publicKeyScript.length == 22).get
val OP_0 :: OP_PUSHDATA(pubKeyHash, _) :: Nil = Script.parse(ourOutput.publicKeyScript)
val keyManager = TestConstants.Alice.nodeParams.channelKeyManager
// find our funding pub key
val fundingPubKey = Seq(PublicKey(pub1), PublicKey(pub2)).find {
pub =>
val channelKeyPath = ChannelKeyManager.keyPath(pub)
val localPubkey = Generators.derivePubKey(keyManager.paymentPoint(channelKeyPath).publicKey, ce.myCurrentPerCommitmentPoint)
localPubkey.hash160 == pubKeyHash
} get
// compute our to-remote pubkey
val channelKeyPath = ChannelKeyManager.keyPath(fundingPubKey)
val ourToRemotePubKey = Generators.derivePubKey(keyManager.paymentPoint(channelKeyPath).publicKey, ce.myCurrentPerCommitmentPoint)
// spend our output
val tx = Transaction(version = 2,
txIn = TxIn(OutPoint(bobCommitTx, bobCommitTx.txOut.indexOf(ourOutput)), sequence = TxIn.SEQUENCE_FINAL, signatureScript = Nil) :: Nil,
txOut = TxOut(Satoshi(1000), Script.pay2pkh(fr.acinq.eclair.randomKey().publicKey)) :: Nil,
lockTime = 0)
val sig = keyManager.sign(
ClaimP2WPKHOutputTx(InputInfo(OutPoint(bobCommitTx, bobCommitTx.txOut.indexOf(ourOutput)), ourOutput, Script.pay2pkh(ourToRemotePubKey)), tx),
keyManager.paymentPoint(channelKeyPath),
ce.myCurrentPerCommitmentPoint,
TxOwner.Local,
DefaultCommitmentFormat)
val tx1 = tx.updateWitness(0, ScriptWitness(Scripts.der(sig) :: ourToRemotePubKey.value :: Nil))
Transaction.correctlySpends(tx1, bobCommitTx :: Nil, ScriptFlags.STANDARD_SCRIPT_VERIFY_FLAGS)
}
}

View file

@ -0,0 +1,254 @@
package fr.acinq.eclair.channel
import akka.actor.ActorRef
import akka.actor.typed.scaladsl.adapter.actorRefAdapter
import akka.testkit
import akka.testkit.{TestActor, TestFSMRef, TestProbe}
import com.softwaremill.quicklens.ModifyPimp
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.bitcoin._
import fr.acinq.eclair.TestConstants.{Alice, Bob}
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.WatchFundingSpentTriggered
import fr.acinq.eclair.channel.states.ChannelStateTestsBase
import fr.acinq.eclair.channel.states.ChannelStateTestsHelperMethods.FakeTxPublisherFactory
import fr.acinq.eclair.crypto.Generators
import fr.acinq.eclair.crypto.keymanager.ChannelKeyManager
import fr.acinq.eclair.router.Announcements
import fr.acinq.eclair.transactions.Scripts
import fr.acinq.eclair.transactions.Transactions.{ClaimP2WPKHOutputTx, DefaultCommitmentFormat, InputInfo, TxOwner}
import fr.acinq.eclair.wire.protocol.{ChannelReestablish, CommitSig, Error, FundingLocked, Init, RevokeAndAck}
import fr.acinq.eclair.{TestKitBaseClass, _}
import org.scalatest.Outcome
import org.scalatest.funsuite.FixtureAnyFunSuiteLike
import scala.concurrent.duration._
class RestoreSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with ChannelStateTestsBase {
type FixtureParam = SetupFixture
override def withFixture(test: OneArgTest): Outcome = {
val setup = test.tags.contains("disable-offline-mismatch") match {
case false => init()
case true => init(nodeParamsA = Alice.nodeParams.copy(onChainFeeConf = Alice.nodeParams.onChainFeeConf.copy(closeOnOfflineMismatch = false)))
}
within(30 seconds) {
reachNormal(setup)
withFixture(test.toNoArgTest(setup))
}
}
def aliceInit = Init(Alice.nodeParams.features)
def bobInit = Init(Bob.nodeParams.features)
test("use funding pubkeys from publish commitment to spend our output") { f =>
import f._
val sender = TestProbe()
// we start by storing the current state
val oldStateData = alice.stateData.asInstanceOf[HasCommitments]
// then we add an htlc and sign it
addHtlc(250000000 msat, alice, bob, alice2bob, bob2alice)
sender.send(alice, CMD_SIGN())
alice2bob.expectMsgType[CommitSig]
alice2bob.forward(bob)
// alice will receive neither the revocation nor the commit sig
bob2alice.expectMsgType[RevokeAndAck]
bob2alice.expectMsgType[CommitSig]
// we simulate a disconnection
sender.send(alice, INPUT_DISCONNECTED)
sender.send(bob, INPUT_DISCONNECTED)
awaitCond(alice.stateName == OFFLINE)
awaitCond(bob.stateName == OFFLINE)
// and we terminate Alice
alice.stop()
// we restart Alice
val newAlice: TestFSMRef[ChannelState, ChannelData, Channel] = TestFSMRef(new Channel(Alice.nodeParams, wallet, Bob.nodeParams.nodeId, alice2blockchain.ref, relayerA.ref, FakeTxPublisherFactory(alice2blockchain)), alicePeer.ref)
newAlice ! INPUT_RESTORED(oldStateData)
// then we reconnect them
sender.send(newAlice, INPUT_RECONNECTED(alice2bob.ref, aliceInit, bobInit))
sender.send(bob, INPUT_RECONNECTED(bob2alice.ref, bobInit, aliceInit))
// peers exchange channel_reestablish messages
alice2bob.expectMsgType[ChannelReestablish]
val ce = bob2alice.expectMsgType[ChannelReestablish]
// alice then realizes it has an old state...
bob2alice.forward(newAlice)
// ... and ask bob to publish its current commitment
val error = alice2bob.expectMsgType[Error]
assert(new String(error.data.toArray) === PleasePublishYourCommitment(channelId(newAlice)).getMessage)
// alice now waits for bob to publish its commitment
awaitCond(newAlice.stateName == WAIT_FOR_REMOTE_PUBLISH_FUTURE_COMMITMENT)
// bob is nice and publishes its commitment
val bobCommitTx = bob.stateData.asInstanceOf[DATA_NORMAL].commitments.fullySignedLocalCommitTx(bob.underlyingActor.nodeParams.channelKeyManager).tx
// actual tests starts here: let's see what we can do with Bob's commit tx
sender.send(newAlice, WatchFundingSpentTriggered(bobCommitTx))
// from Bob's commit tx we can extract both funding public keys
val OP_2 :: OP_PUSHDATA(pub1, _) :: OP_PUSHDATA(pub2, _) :: OP_2 :: OP_CHECKMULTISIG :: Nil = Script.parse(bobCommitTx.txIn(0).witness.stack.last)
// from Bob's commit tx we can also extract our p2wpkh output
val ourOutput = bobCommitTx.txOut.find(_.publicKeyScript.length == 22).get
val OP_0 :: OP_PUSHDATA(pubKeyHash, _) :: Nil = Script.parse(ourOutput.publicKeyScript)
val keyManager = Alice.nodeParams.channelKeyManager
// find our funding pub key
val fundingPubKey = Seq(PublicKey(pub1), PublicKey(pub2)).find {
pub =>
val channelKeyPath = ChannelKeyManager.keyPath(pub)
val localPubkey = Generators.derivePubKey(keyManager.paymentPoint(channelKeyPath).publicKey, ce.myCurrentPerCommitmentPoint)
localPubkey.hash160 == pubKeyHash
} get
// compute our to-remote pubkey
val channelKeyPath = ChannelKeyManager.keyPath(fundingPubKey)
val ourToRemotePubKey = Generators.derivePubKey(keyManager.paymentPoint(channelKeyPath).publicKey, ce.myCurrentPerCommitmentPoint)
// spend our output
val tx = Transaction(version = 2,
txIn = TxIn(OutPoint(bobCommitTx, bobCommitTx.txOut.indexOf(ourOutput)), sequence = TxIn.SEQUENCE_FINAL, signatureScript = Nil) :: Nil,
txOut = TxOut(Satoshi(1000), Script.pay2pkh(fr.acinq.eclair.randomKey().publicKey)) :: Nil,
lockTime = 0)
val sig = keyManager.sign(
ClaimP2WPKHOutputTx(InputInfo(OutPoint(bobCommitTx, bobCommitTx.txOut.indexOf(ourOutput)), ourOutput, Script.pay2pkh(ourToRemotePubKey)), tx),
keyManager.paymentPoint(channelKeyPath),
ce.myCurrentPerCommitmentPoint,
TxOwner.Local,
DefaultCommitmentFormat)
val tx1 = tx.updateWitness(0, ScriptWitness(Scripts.der(sig) :: ourToRemotePubKey.value :: Nil))
Transaction.correctlySpends(tx1, bobCommitTx :: Nil, ScriptFlags.STANDARD_SCRIPT_VERIFY_FLAGS)
}
/** We are only interested in channel updates from Alice, we use the channel flag to discriminate */
def aliceChannelUpdateListener(channelUpdateListener: TestProbe): TestProbe = {
val aliceListener = TestProbe()
channelUpdateListener.setAutoPilot(new testkit.TestActor.AutoPilot {
override def run(sender: ActorRef, msg: Any): TestActor.AutoPilot = msg match {
case u: ChannelUpdateParametersChanged if Announcements.isNode1(u.channelUpdate.channelFlags) == Announcements.isNode1(Alice.nodeParams.nodeId, Bob.nodeParams.nodeId) =>
aliceListener.ref.tell(msg, sender)
TestActor.KeepRunning
case _ => TestActor.KeepRunning
}
})
aliceListener
}
test("restore channel without configuration change") { f =>
import f._
val sender = TestProbe()
val channelUpdateListener = {
val listener = TestProbe()
system.eventStream.subscribe(listener.ref, classOf[ChannelUpdateParametersChanged])
aliceChannelUpdateListener(listener)
}
// we start by storing the current state
assert(alice.stateData.isInstanceOf[DATA_NORMAL])
val oldStateData = alice.stateData.asInstanceOf[DATA_NORMAL]
// we simulate a disconnection
sender.send(alice, INPUT_DISCONNECTED)
sender.send(bob, INPUT_DISCONNECTED)
awaitCond(alice.stateName == OFFLINE)
awaitCond(bob.stateName == OFFLINE)
// and we terminate Alice
alice.stop()
// we restart Alice
val newAlice: TestFSMRef[ChannelState, ChannelData, Channel] = TestFSMRef(new Channel(Alice.nodeParams, wallet, Bob.nodeParams.nodeId, alice2blockchain.ref, relayerA.ref, FakeTxPublisherFactory(alice2blockchain)), alicePeer.ref)
newAlice ! INPUT_RESTORED(oldStateData)
newAlice ! INPUT_RECONNECTED(alice2bob.ref, aliceInit, bobInit)
bob ! INPUT_RECONNECTED(bob2alice.ref, bobInit, aliceInit)
alice2bob.expectMsgType[ChannelReestablish]
bob2alice.expectMsgType[ChannelReestablish]
alice2bob.forward(bob)
bob2alice.forward(newAlice)
awaitCond(newAlice.stateName == NORMAL)
channelUpdateListener.expectNoMessage()
}
test("restore channel with configuration change") { f =>
import f._
val sender = TestProbe()
val channelUpdateListener = {
val listener = TestProbe()
system.eventStream.subscribe(listener.ref, classOf[ChannelUpdateParametersChanged])
aliceChannelUpdateListener(listener)
}
// we start by storing the current state
assert(alice.stateData.isInstanceOf[DATA_NORMAL])
val oldStateData = alice.stateData.asInstanceOf[DATA_NORMAL]
// we simulate a disconnection
sender.send(alice, INPUT_DISCONNECTED)
sender.send(bob, INPUT_DISCONNECTED)
awaitCond(alice.stateName == OFFLINE)
awaitCond(bob.stateName == OFFLINE)
// and we terminate Alice
alice.stop()
// we restart Alice with different configurations
Seq(
Alice.nodeParams
.modify(_.relayParams.privateChannelFees.feeBase).setTo(765 msat),
Alice.nodeParams
.modify(_.relayParams.privateChannelFees.feeProportionalMillionths).setTo(2345),
Alice.nodeParams
.modify(_.expiryDelta).setTo(CltvExpiryDelta(147)),
Alice.nodeParams
.modify(_.relayParams.privateChannelFees.feeProportionalMillionths).setTo(2345)
.modify(_.expiryDelta).setTo(CltvExpiryDelta(147)),
) foreach { newConfig =>
val newAlice: TestFSMRef[ChannelState, ChannelData, Channel] = TestFSMRef(new Channel(newConfig, wallet, Bob.nodeParams.nodeId, alice2blockchain.ref, relayerA.ref, FakeTxPublisherFactory(alice2blockchain)), alicePeer.ref)
newAlice ! INPUT_RESTORED(oldStateData)
val u1 = channelUpdateListener.expectMsgType[ChannelUpdateParametersChanged]
assert(!Announcements.areSameIgnoreFlags(u1.channelUpdate, oldStateData.channelUpdate))
assert(u1.channelUpdate.feeBaseMsat === newConfig.relayParams.privateChannelFees.feeBase)
assert(u1.channelUpdate.feeProportionalMillionths === newConfig.relayParams.privateChannelFees.feeProportionalMillionths)
assert(u1.channelUpdate.cltvExpiryDelta === newConfig.expiryDelta)
newAlice ! INPUT_RECONNECTED(alice2bob.ref, aliceInit, bobInit)
bob ! INPUT_RECONNECTED(bob2alice.ref, bobInit, aliceInit)
alice2bob.expectMsgType[ChannelReestablish]
bob2alice.expectMsgType[ChannelReestablish]
alice2bob.forward(bob)
bob2alice.forward(newAlice)
alice2bob.expectMsgType[FundingLocked]
bob2alice.expectMsgType[FundingLocked]
alice2bob.forward(bob)
bob2alice.forward(newAlice)
awaitCond(newAlice.stateName == NORMAL)
awaitCond(bob.stateName == NORMAL)
channelUpdateListener.expectNoMessage()
// we simulate a disconnection
sender.send(newAlice, INPUT_DISCONNECTED)
sender.send(bob, INPUT_DISCONNECTED)
awaitCond(newAlice.stateName == OFFLINE)
awaitCond(bob.stateName == OFFLINE)
// and we terminate Alice
newAlice.stop()
}
}
}

View file

@ -1782,12 +1782,14 @@ class NormalStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with
val sender = TestProbe()
val newFeeBaseMsat = TestConstants.Alice.nodeParams.relayParams.publicChannelFees.feeBase * 2
val newFeeProportionalMillionth = TestConstants.Alice.nodeParams.relayParams.publicChannelFees.feeProportionalMillionths * 2
sender.send(alice, CMD_UPDATE_RELAY_FEE(ActorRef.noSender, newFeeBaseMsat, newFeeProportionalMillionth))
val newCltvExpiryDelta = CltvExpiryDelta(145)
sender.send(alice, CMD_UPDATE_RELAY_FEE(ActorRef.noSender, newFeeBaseMsat, newFeeProportionalMillionth, cltvExpiryDelta_opt = Some(newCltvExpiryDelta)))
sender.expectMsgType[RES_SUCCESS[CMD_UPDATE_RELAY_FEE]]
val localUpdate = channelUpdateListener.expectMsgType[LocalChannelUpdate]
assert(localUpdate.channelUpdate.feeBaseMsat == newFeeBaseMsat)
assert(localUpdate.channelUpdate.feeProportionalMillionths == newFeeProportionalMillionth)
assert(localUpdate.channelUpdate.cltvExpiryDelta == newCltvExpiryDelta)
relayerA.expectNoMessage(1 seconds)
}

View file

@ -405,7 +405,7 @@ class OfflineStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with
channelUpdateListener.expectNoMessage(300 millis)
// we make alice update here relay fee
alice ! CMD_UPDATE_RELAY_FEE(sender.ref, 4200 msat, 123456)
alice ! CMD_UPDATE_RELAY_FEE(sender.ref, 4200 msat, 123456, cltvExpiryDelta_opt = None)
sender.expectMsgType[RES_SUCCESS[CMD_UPDATE_RELAY_FEE]]
// alice doesn't broadcast the new channel_update yet

View file

@ -598,7 +598,7 @@ class ShutdownStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wit
val sender = TestProbe()
val newFeeBaseMsat = TestConstants.Alice.nodeParams.relayParams.publicChannelFees.feeBase * 2
val newFeeProportionalMillionth = TestConstants.Alice.nodeParams.relayParams.publicChannelFees.feeProportionalMillionths * 2
alice ! CMD_UPDATE_RELAY_FEE(sender.ref, newFeeBaseMsat, newFeeProportionalMillionth)
alice ! CMD_UPDATE_RELAY_FEE(sender.ref, newFeeBaseMsat, newFeeProportionalMillionth, cltvExpiryDelta_opt = None)
sender.expectMsgType[RES_FAILURE[CMD_UPDATE_RELAY_FEE, _]]
relayerA.expectNoMessage(1 seconds)
}

View file

@ -21,7 +21,7 @@ import fr.acinq.bitcoin.{ByteVector32, SatoshiLong, Transaction}
import fr.acinq.eclair.TestDatabases.{TestPgDatabases, TestSqliteDatabases, migrationCheck}
import fr.acinq.eclair._
import fr.acinq.eclair.channel.Helpers.Closing.MutualClose
import fr.acinq.eclair.channel.{ChannelErrorOccurred, LocalChannelUpdate, LocalError, NetworkFeePaid, RemoteError}
import fr.acinq.eclair.channel._
import fr.acinq.eclair.db.AuditDb.Stats
import fr.acinq.eclair.db.DbEventHandler.ChannelEvent
import fr.acinq.eclair.db.jdbc.JdbcUtils.using
@ -610,7 +610,7 @@ class AuditDbSpec extends AnyFunSuite {
val scid = ShortChannelId(123)
val remoteNodeId = randomKey().publicKey
val u = Announcements.makeChannelUpdate(randomBytes32(), randomKey(), remoteNodeId, scid, CltvExpiryDelta(56), 2000 msat, 1000 msat, 999, 1000000000 msat)
dbs.audit.addChannelUpdate(LocalChannelUpdate(null, channelId, scid, remoteNodeId, None, u, None, null))
dbs.audit.addChannelUpdate(ChannelUpdateParametersChanged(null, channelId, scid, remoteNodeId, u))
}
}

View file

@ -179,7 +179,7 @@ class PaymentIntegrationSpec extends IntegrationSpec {
// we then forge a new channel_update for B-C...
val channelUpdateBC = Announcements.makeChannelUpdate(Block.RegtestGenesisBlock.hash, nodes("B").nodeParams.privateKey, nodes("C").nodeParams.nodeId, shortIdBC, nodes("B").nodeParams.expiryDelta + 1, nodes("C").nodeParams.htlcMinimum, nodes("B").nodeParams.relayParams.publicChannelFees.feeBase, nodes("B").nodeParams.relayParams.publicChannelFees.feeProportionalMillionths, 500000000 msat)
// ...and notify B's relayer
nodes("B").system.eventStream.publish(LocalChannelUpdate(system.deadLetters, commitmentBC.channelId, shortIdBC, commitmentBC.remoteParams.nodeId, None, channelUpdateBC, None, commitmentBC))
nodes("B").system.eventStream.publish(LocalChannelUpdate(system.deadLetters, commitmentBC.channelId, shortIdBC, commitmentBC.remoteParams.nodeId, None, channelUpdateBC, commitmentBC))
// we retrieve a payment hash from D
val amountMsat = 4200000.msat
sender.send(nodes("D").paymentHandler, ReceivePayment(Some(amountMsat), Left("1 coffee")))

View file

@ -273,7 +273,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
val channelId = randomBytes32()
val update = Announcements.makeChannelUpdate(Block.RegtestGenesisBlock.hash, randomKey(), remoteNodeId, shortChannelId, CltvExpiryDelta(10), 100 msat, 1000 msat, 100, capacity.toMilliSatoshi)
val commitments = PaymentPacketSpec.makeCommitments(ByteVector32.Zeroes, availableBalanceForSend, testCapacity = capacity)
LocalChannelUpdate(null, channelId, shortChannelId, remoteNodeId, None, update, None, commitments)
LocalChannelUpdate(null, channelId, shortChannelId, remoteNodeId, None, update, commitments)
}
val (a, b) = (randomKey().publicKey, randomKey().publicKey)
@ -426,8 +426,8 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
channels
}
channelRelayer ! WrappedLocalChannelUpdate(LocalChannelUpdate(null, channelId_ab, channelUpdate_ab.shortChannelId, a, None, channelUpdate_ab, None, makeCommitments(channelId_ab, -2000 msat, 300000 msat)))
channelRelayer ! WrappedLocalChannelUpdate(LocalChannelUpdate(null, channelId_bc, channelUpdate_bc.shortChannelId, c, None, channelUpdate_bc, None, makeCommitments(channelId_bc, 400000 msat, -5000 msat)))
channelRelayer ! WrappedLocalChannelUpdate(LocalChannelUpdate(null, channelId_ab, channelUpdate_ab.shortChannelId, a, None, channelUpdate_ab, makeCommitments(channelId_ab, -2000 msat, 300000 msat)))
channelRelayer ! WrappedLocalChannelUpdate(LocalChannelUpdate(null, channelId_bc, channelUpdate_bc.shortChannelId, c, None, channelUpdate_bc, makeCommitments(channelId_bc, 400000 msat, -5000 msat)))
val channels1 = getOutgoingChannels(true)
assert(channels1.size === 2)
@ -445,13 +445,13 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
val channels3 = getOutgoingChannels(true)
assert(channels3.size === 1 && channels3.head.commitments.availableBalanceForSend === 100000.msat)
channelRelayer ! WrappedLocalChannelUpdate(LocalChannelUpdate(null, channelId_ab, channelUpdate_ab.shortChannelId, a, None, channelUpdate_ab.copy(channelFlags = 2), None, makeCommitments(channelId_ab, 100000 msat, 200000 msat)))
channelRelayer ! WrappedLocalChannelUpdate(LocalChannelUpdate(null, channelId_ab, channelUpdate_ab.shortChannelId, a, None, channelUpdate_ab.copy(channelFlags = 2), makeCommitments(channelId_ab, 100000 msat, 200000 msat)))
val channels4 = getOutgoingChannels(true)
assert(channels4.isEmpty)
val channels5 = getOutgoingChannels(false)
assert(channels5.size === 1)
channelRelayer ! WrappedLocalChannelUpdate(LocalChannelUpdate(null, channelId_ab, channelUpdate_ab.shortChannelId, a, None, channelUpdate_ab, None, makeCommitments(channelId_ab, 100000 msat, 200000 msat)))
channelRelayer ! WrappedLocalChannelUpdate(LocalChannelUpdate(null, channelId_ab, channelUpdate_ab.shortChannelId, a, None, channelUpdate_ab, makeCommitments(channelId_ab, 100000 msat, 200000 msat)))
val channels6 = getOutgoingChannels(true)
assert(channels6.size === 1)
@ -461,7 +461,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
assert(channels7.isEmpty)
// We should receive the updated channel update containing the new shortChannelId:
channelRelayer ! WrappedLocalChannelUpdate(LocalChannelUpdate(null, channelId_ab, ShortChannelId(42), a, None, channelUpdate_ab.copy(shortChannelId = ShortChannelId(42)), None, makeCommitments(channelId_ab, 100000 msat, 200000 msat)))
channelRelayer ! WrappedLocalChannelUpdate(LocalChannelUpdate(null, channelId_ab, ShortChannelId(42), a, None, channelUpdate_ab.copy(shortChannelId = ShortChannelId(42)), makeCommitments(channelId_ab, 100000 msat, 200000 msat)))
val channels8 = getOutgoingChannels(true)
assert(channels8.size === 1)
assert(channels8.head.channelUpdate.shortChannelId === ShortChannelId(42))
@ -494,6 +494,6 @@ object ChannelRelayerSpec {
val channelId = channelIds(shortChannelId)
val update = ChannelUpdate(ByteVector64(randomBytes(64)), Block.RegtestGenesisBlock.hash, shortChannelId, 0, 1, Announcements.makeChannelFlags(isNode1 = true, enabled), CltvExpiryDelta(100), htlcMinimum, 1000 msat, 100, Some(capacity.toMilliSatoshi))
val commitments = PaymentPacketSpec.makeCommitments(channelId, testAvailableBalanceForSend = balance, testCapacity = capacity)
LocalChannelUpdate(null, channelId, shortChannelId, outgoingNodeId, None, update, None, commitments)
LocalChannelUpdate(null, channelId, shortChannelId, outgoingNodeId, None, update, commitments)
}
}

View file

@ -76,7 +76,7 @@ class RelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("applicat
assert(sender.expectMessageType[Relayer.OutgoingChannels].channels.isEmpty)
// We publish a channel update, that should be picked up by the channel relayer
system.eventStream ! EventStream.Publish(LocalChannelUpdate(null, channelId_bc, channelUpdate_bc.shortChannelId, c, None, channelUpdate_bc, None, makeCommitments(channelId_bc)))
system.eventStream ! EventStream.Publish(LocalChannelUpdate(null, channelId_bc, channelUpdate_bc.shortChannelId, c, None, channelUpdate_bc, makeCommitments(channelId_bc)))
eventually(PatienceConfiguration.Timeout(30 seconds), PatienceConfiguration.Interval(1 second)) {
childActors.channelRelayer ! ChannelRelayer.GetOutgoingChannels(sender.ref.toClassic, GetOutgoingChannels())
val channels = sender.expectMessageType[Relayer.OutgoingChannels].channels

View file

@ -150,7 +150,7 @@ abstract class BaseRouterSpec extends TestKitBaseClass with FixtureAnyFunSuiteLi
peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, update_gh))
peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, update_hg))
// then private channels
sender.send(router, LocalChannelUpdate(sender.ref, randomBytes32(), channelId_ag, g, None, update_ag, None, CommitmentsSpec.makeCommitments(30000000 msat, 8000000 msat, a, g, announceChannel = false)))
sender.send(router, LocalChannelUpdate(sender.ref, randomBytes32(), channelId_ag, g, None, update_ag, CommitmentsSpec.makeCommitments(30000000 msat, 8000000 msat, a, g, announceChannel = false)))
// watcher receives the get tx requests
assert(watcher.expectMsgType[ValidateRequest].ann === chan_ab)
assert(watcher.expectMsgType[ValidateRequest].ann === chan_bc)

View file

@ -388,7 +388,7 @@ class RouterSpec extends BaseRouterSpec {
assert(res.routes.head.hops.last.nextNodeId === h)
val channelUpdate_ag1 = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_a, g, channelId_ag, CltvExpiryDelta(7), 0 msat, 10 msat, 10, htlcMaximum, enable = false)
sender.send(router, LocalChannelUpdate(sender.ref, null, channelId_ag, g, None, channelUpdate_ag1, None, CommitmentsSpec.makeCommitments(10000 msat, 15000 msat, a, g, announceChannel = false)))
sender.send(router, LocalChannelUpdate(sender.ref, null, channelId_ag, g, None, channelUpdate_ag1, CommitmentsSpec.makeCommitments(10000 msat, 15000 msat, a, g, announceChannel = false)))
sender.send(router, RouteRequest(a, h, DEFAULT_AMOUNT_MSAT, DEFAULT_MAX_FEE, routeParams = DEFAULT_ROUTE_PARAMS))
sender.expectMsg(Failure(RouteNotFound))
}
@ -409,7 +409,7 @@ class RouterSpec extends BaseRouterSpec {
sender.send(router, RouteRequest(a, b, DEFAULT_AMOUNT_MSAT, DEFAULT_MAX_FEE, routeParams = DEFAULT_ROUTE_PARAMS))
sender.expectMsgType[RouteResponse]
val commitments1 = CommitmentsSpec.makeCommitments(10000000 msat, 20000000 msat, a, b, announceChannel = true)
sender.send(router, LocalChannelUpdate(sender.ref, null, channelId_ab, b, Some(chan_ab), update_ab, None, commitments1))
sender.send(router, LocalChannelUpdate(sender.ref, null, channelId_ab, b, Some(chan_ab), update_ab, commitments1))
sender.send(router, RouteRequest(a, b, 12000000 msat, Long.MaxValue.msat, routeParams = DEFAULT_ROUTE_PARAMS))
sender.expectMsg(Failure(BalanceTooLow))
sender.send(router, RouteRequest(a, b, 12000000 msat, Long.MaxValue.msat, allowMultiPart = true, routeParams = DEFAULT_ROUTE_PARAMS))
@ -588,7 +588,7 @@ class RouterSpec extends BaseRouterSpec {
// When the local channel comes back online, it will send a LocalChannelUpdate to the router.
val balances = Set[Option[MilliSatoshi]](Some(10000 msat), Some(15000 msat))
val commitments = CommitmentsSpec.makeCommitments(10000 msat, 15000 msat, a, b, announceChannel = true)
sender.send(router, LocalChannelUpdate(sender.ref, null, channelId_ab, b, Some(chan_ab), update_ab, None, commitments))
sender.send(router, LocalChannelUpdate(sender.ref, null, channelId_ab, b, Some(chan_ab), update_ab, commitments))
sender.send(router, GetRoutingState)
val channel_ab = sender.expectMsgType[RoutingState].channels.find(_.ann == chan_ab).get
assert(Set(channel_ab.meta_opt.map(_.balance1), channel_ab.meta_opt.map(_.balance2)) === balances)
@ -611,7 +611,7 @@ class RouterSpec extends BaseRouterSpec {
// Then we update the balance without changing the contents of the channel update; the graph should still be updated.
val balances = Set[Option[MilliSatoshi]](Some(11000 msat), Some(14000 msat))
val commitments = CommitmentsSpec.makeCommitments(11000 msat, 14000 msat, a, b, announceChannel = true)
sender.send(router, LocalChannelUpdate(sender.ref, null, channelId_ab, b, Some(chan_ab), update_ab, None, commitments))
sender.send(router, LocalChannelUpdate(sender.ref, null, channelId_ab, b, Some(chan_ab), update_ab, commitments))
sender.send(router, GetRoutingState)
val channel_ab = sender.expectMsgType[RoutingState].channels.find(_.ann == chan_ab).get
assert(Set(channel_ab.meta_opt.map(_.balance1), channel_ab.meta_opt.map(_.balance2)) === balances)