1
0
Fork 0
mirror of https://github.com/ACINQ/eclair.git synced 2025-02-22 06:21:42 +01:00

Add support for plugins that intercept open channel messages (#2552)

This commit is contained in:
Richard Myers 2023-02-17 08:56:51 +01:00 committed by GitHub
parent d4c32f99dd
commit ddcb978ead
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 1131 additions and 216 deletions

View file

@ -39,6 +39,28 @@ Eclair offers three strategies to handle that scenario, that node operators can
- `unlock`: eclair will automatically unlock the corresponding utxos
- `ignore`: eclair will leave these utxos locked and start
#### Add plugin support for channel open interception (#2552)
Eclair now supports plugins that intercept channel open requests and decide whether to accept or reject them. This is useful for example to enforce custom policies on who can open channels with you.
An example plugin that demonstrates this functionality can be found in the [eclair-plugins](https://github.com/ACINQ/eclair-plugins) repository.
#### Configurable channel open rate limits (#2552)
We have added parameters to `eclair.conf` to allow nodes to manage the number of channel open requests from peers that are pending on-chain confirmation. A limit exists for each public peer node individually and for all private peer nodes in aggregate.
The new configuration options and defaults are as follows:
```conf
// a list of public keys; we will ignore limits on pending channels from these peers
eclair.channel.channel-open-limits.channel-opener-whitelist = []
// maximum number of pending channels we will accept from a given peer
eclair.channel.channel-open-limits.max-pending-channels-per-peer = 3
// maximum number of pending channels we will accept from all private nodes
eclair.channel.channel-open-limits.max-total-pending-channels-private-nodes = 99
```
## Verifying signatures
You will need `gpg` and our release signing key 7A73FE77DE2C4027. Note that you can get it:

View file

@ -138,7 +138,13 @@ eclair {
unhandled-exception-strategy = "local-close" // local-close or stop
revocation-timeout = 20 seconds // after sending a commit_sig, we will wait for at most that duration before disconnecting
}
channel-open-limits {
max-pending-channels-per-peer = 3 // maximum number of pending channels we will accept from a given peer
max-total-pending-channels-private-nodes = 99 // maximum number of pending channels we will accept from all private nodes
channel-opener-whitelist = [] // a list of public keys; we will ignore rate limits on pending channels from these peers
}
}
balance-check-interval = 1 hour

View file

@ -93,6 +93,8 @@ case class NodeParams(nodeKeyManager: NodeKeyManager,
val pluginMessageTags: Set[Int] = pluginParams.collect { case p: CustomFeaturePlugin => p.messageTags }.toSet.flatten
val pluginOpenChannelInterceptor: Option[InterceptOpenChannelPlugin] = pluginParams.collectFirst { case p: InterceptOpenChannelPlugin => p }
def currentBlockHeight: BlockHeight = BlockHeight(blockHeight.get)
/** Returns the features that should be used in our init message with the given peer. */
@ -343,6 +345,9 @@ object NodeParams extends Logging {
require(Features.knownFeatures.map(_.mandatory).intersect(pluginFeatureSet).isEmpty, "Plugin feature bit overlaps with known feature bit")
require(pluginFeatureSet.size == pluginMessageParams.size, "Duplicate plugin feature bits found")
val interceptOpenChannelPlugins = pluginParams.collect { case p: InterceptOpenChannelPlugin => p }
require(interceptOpenChannelPlugins.size <= 1, s"At most one plugin is allowed to intercept channel open messages, but multiple such plugins were registered: ${interceptOpenChannelPlugins.map(_.getClass.getSimpleName).mkString(", ")}. Disable conflicting plugins and restart eclair.")
val coreAndPluginFeatures: Features[Feature] = features.copy(unknown = features.unknown ++ pluginMessageParams.map(_.pluginFeature))
val overrideInitFeatures: Map[PublicKey, Features[InitFeature]] = config.getConfigList("override-init-features").asScala.map { e =>
@ -440,6 +445,10 @@ object NodeParams extends Logging {
val asyncPaymentHoldTimeoutBlocks = config.getInt("relay.async-payments.hold-timeout-blocks")
require(asyncPaymentHoldTimeoutBlocks >= (asyncPaymentCancelSafetyBeforeTimeoutBlocks + expiryDelta).toInt, "relay.async-payments.hold-timeout-blocks must not be less than relay.async-payments.cancel-safety-before-timeout-blocks + channel.expiry-delta-blocks; otherwise it will have no effect")
val channelOpenerWhitelist: Set[PublicKey] = config.getStringList("channel.channel-open-limits.channel-opener-whitelist").asScala.map(s => PublicKey(ByteVector.fromValidHex(s))).toSet
val maxPendingChannelsPerPeer = config.getInt("channel.channel-open-limits.max-pending-channels-per-peer")
val maxTotalPendingChannelsPrivateNodes = config.getInt("channel.channel-open-limits.max-total-pending-channels-private-nodes")
NodeParams(
nodeKeyManager = nodeKeyManager,
channelKeyManager = channelKeyManager,
@ -476,7 +485,10 @@ object NodeParams extends Logging {
maxTxPublishRetryDelay = FiniteDuration(config.getDuration("channel.max-tx-publish-retry-delay").getSeconds, TimeUnit.SECONDS),
unhandledExceptionStrategy = unhandledExceptionStrategy,
revocationTimeout = FiniteDuration(config.getDuration("channel.revocation-timeout").getSeconds, TimeUnit.SECONDS),
requireConfirmedInputsForDualFunding = config.getBoolean("channel.require-confirmed-inputs-for-dual-funding")
requireConfirmedInputsForDualFunding = config.getBoolean("channel.require-confirmed-inputs-for-dual-funding"),
channelOpenerWhitelist = channelOpenerWhitelist,
maxPendingChannelsPerPeer = maxPendingChannelsPerPeer,
maxTotalPendingChannelsPrivateNodes = maxTotalPendingChannelsPrivateNodes
),
onChainFeeConf = OnChainFeeConf(
feeTargets = feeTargets,

View file

@ -16,11 +16,13 @@
package fr.acinq.eclair
import akka.actor.typed.ActorRef
import akka.event.LoggingAdapter
import fr.acinq.bitcoin.scalacompat.ByteVector32
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.bitcoin.scalacompat.{ByteVector32, Satoshi}
import fr.acinq.eclair.channel.Origin
import fr.acinq.eclair.io.OpenChannelInterceptor.{DefaultParams, OpenChannelNonInitiator}
import fr.acinq.eclair.payment.relay.PostRestartHtlcCleaner.IncomingHtlc
import fr.acinq.eclair.wire.protocol.Error
/** Custom plugin parameters. */
trait PluginParams {
@ -55,4 +57,20 @@ trait CustomCommitmentsPlugin extends PluginParams {
* returned by this method.
*/
def getHtlcsRelayedOut(htlcsIn: Seq[IncomingHtlc], nodeParams: NodeParams, log: LoggingAdapter): Map[Origin, Set[(ByteVector32, Long)]]
}
// @formatter:off
trait InterceptOpenChannelCommand
case class InterceptOpenChannelReceived(replyTo: ActorRef[InterceptOpenChannelResponse], openChannelNonInitiator: OpenChannelNonInitiator, defaultParams: DefaultParams) extends InterceptOpenChannelCommand {
val remoteFundingAmount: Satoshi = openChannelNonInitiator.open.fold(_.fundingSatoshis, _.fundingAmount)
val temporaryChannelId: ByteVector32 = openChannelNonInitiator.open.fold(_.temporaryChannelId, _.temporaryChannelId)
}
sealed trait InterceptOpenChannelResponse
case class AcceptOpenChannel(temporaryChannelId: ByteVector32, defaultParams: DefaultParams) extends InterceptOpenChannelResponse
case class RejectOpenChannel(temporaryChannelId: ByteVector32, error: Error) extends InterceptOpenChannelResponse
// @formatter:on
trait InterceptOpenChannelPlugin extends PluginParams {
def openChannelInterceptor: ActorRef[InterceptOpenChannelCommand]
}

View file

@ -38,7 +38,7 @@ import fr.acinq.eclair.crypto.keymanager.{LocalChannelKeyManager, LocalNodeKeyMa
import fr.acinq.eclair.db.Databases.FileBackup
import fr.acinq.eclair.db.FileBackupHandler.FileBackupParams
import fr.acinq.eclair.db.{Databases, DbEventHandler, FileBackupHandler}
import fr.acinq.eclair.io.{ClientSpawner, Peer, Server, Switchboard}
import fr.acinq.eclair.io.{ClientSpawner, Peer, PendingChannelsRateLimiter, Server, Switchboard}
import fr.acinq.eclair.message.Postman
import fr.acinq.eclair.payment.receive.PaymentHandler
import fr.acinq.eclair.payment.relay.{AsyncPaymentTriggerer, PostRestartHtlcCleaner, Relayer}
@ -359,7 +359,8 @@ class Setup(val datadir: File,
txPublisherFactory = Channel.SimpleTxPublisherFactory(nodeParams, watcher, bitcoinClient)
channelFactory = Peer.SimpleChannelFactory(nodeParams, watcher, relayer, bitcoinClient, txPublisherFactory)
peerFactory = Switchboard.SimplePeerFactory(nodeParams, bitcoinClient, channelFactory)
pendingChannelsRateLimiter = system.spawn(Behaviors.supervise(PendingChannelsRateLimiter(nodeParams, router.toTyped, channels)).onFailure(typed.SupervisorStrategy.resume), name = "pending-channels-rate-limiter")
peerFactory = Switchboard.SimplePeerFactory(nodeParams, bitcoinClient, channelFactory, pendingChannelsRateLimiter)
switchboard = system.actorOf(SimpleSupervisor.props(Switchboard.props(nodeParams, peerFactory), "switchboard", SupervisorStrategy.Resume))
_ = switchboard ! Switchboard.Init(channels)

View file

@ -92,4 +92,4 @@ case class ChannelPersisted(channel: ActorRef, remoteNodeId: PublicKey, channelI
case class LocalCommitConfirmed(channel: ActorRef, remoteNodeId: PublicKey, channelId: ByteVector32, refundAtBlock: BlockHeight) extends ChannelEvent
case class ChannelClosed(channel: ActorRef, channelId: ByteVector32, closingType: ClosingType, commitments: MetaCommitments)
case class ChannelClosed(channel: ActorRef, channelId: ByteVector32, closingType: ClosingType, commitments: MetaCommitments) extends ChannelEvent

View file

@ -81,7 +81,10 @@ object Channel {
maxTxPublishRetryDelay: FiniteDuration,
unhandledExceptionStrategy: UnhandledExceptionStrategy,
revocationTimeout: FiniteDuration,
requireConfirmedInputsForDualFunding: Boolean) {
requireConfirmedInputsForDualFunding: Boolean,
channelOpenerWhitelist: Set[PublicKey],
maxPendingChannelsPerPeer: Int,
maxTotalPendingChannelsPrivateNodes: Int) {
require(0 <= maxHtlcValueInFlightPercent && maxHtlcValueInFlightPercent <= 100, "max-htlc-value-in-flight-percent must be between 0 and 100")
def minFundingSatoshis(announceChannel: Boolean): Satoshi = if (announceChannel) minFundingPublicSatoshis else minFundingPrivateSatoshis

View file

@ -31,6 +31,8 @@ object Monitoring {
val OnionMessagesReceived = Kamon.counter("onionmessages.received")
val OnionMessagesSent = Kamon.counter("onionmessages.sent")
val OnionMessagesThrottled = Kamon.counter("onionmessages.throttled")
val OpenChannelRequestsPending = Kamon.gauge("openchannelrequests.pending")
}
object Tags {
@ -44,6 +46,8 @@ object Monitoring {
val Initialized = "initialized"
}
val PublicPeers = "public"
}
}

View file

@ -0,0 +1,265 @@
/*
* Copyright 2023 ACINQ SAS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package fr.acinq.eclair.io
import akka.actor
import akka.actor.Status
import akka.actor.typed.eventstream.EventStream.Publish
import akka.actor.typed.scaladsl.adapter.TypedActorRefOps
import akka.actor.typed.scaladsl.{ActorContext, Behaviors}
import akka.actor.typed.{ActorRef, Behavior}
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.bitcoin.scalacompat.{BtcDouble, ByteVector32, Satoshi, Script}
import fr.acinq.eclair.Features.Wumbo
import fr.acinq.eclair.blockchain.OnchainPubkeyCache
import fr.acinq.eclair.channel._
import fr.acinq.eclair.channel.fsm.Channel
import fr.acinq.eclair.io.Peer.SpawnChannelNonInitiator
import fr.acinq.eclair.io.PendingChannelsRateLimiter.AddOrRejectChannel
import fr.acinq.eclair.wire.protocol
import fr.acinq.eclair.wire.protocol.Error
import fr.acinq.eclair.{AcceptOpenChannel, CltvExpiryDelta, Features, InitFeature, InterceptOpenChannelPlugin, InterceptOpenChannelReceived, InterceptOpenChannelResponse, Logs, MilliSatoshi, NodeParams, RejectOpenChannel, ToMilliSatoshiConversion}
import scodec.bits.ByteVector
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.reflect.ClassTag
/**
* Child actor of a Peer that handles accepting or rejecting a channel open request initiated by a remote peer and
* configuring local parameters for all open channel requests. It only handles one channel request at a time.
* If a concurrent request comes while still evaluating a previous one, the later request is immediately rejected.
*
* Note: If the remote peer disconnects before the interceptor fails or continues the non-initiator flow, according to the
* Lightning spec the flow should be canceled. Therefore any response sent by this actor with a different `peerConnection`
* should be ignored and not forwarded to the remote peer.
*/
object OpenChannelInterceptor {
// @formatter:off
sealed trait Command
sealed trait WaitForRequestCommands extends Command
case class OpenChannelNonInitiator(remoteNodeId: PublicKey, open: Either[protocol.OpenChannel, protocol.OpenDualFundedChannel], localFeatures: Features[InitFeature], remoteFeatures: Features[InitFeature], peerConnection: ActorRef[Any]) extends WaitForRequestCommands {
val temporaryChannelId: ByteVector32 = open.fold(_.temporaryChannelId, _.temporaryChannelId)
val fundingAmount: Satoshi = open.fold(_.fundingSatoshis, _.fundingAmount)
val channelFlags: ChannelFlags = open.fold(_.channelFlags, _.channelFlags)
val channelType_opt: Option[ChannelType] = open.fold(_.channelType_opt, _.channelType_opt)
}
case class OpenChannelInitiator(replyTo: ActorRef[Any], remoteNodeId: PublicKey, open: Peer.OpenChannel, localFeatures: Features[InitFeature], remoteFeatures: Features[InitFeature]) extends WaitForRequestCommands
private sealed trait CheckRateLimitsCommands extends Command
private case class PendingChannelsRateLimiterResponse(response: PendingChannelsRateLimiter.Response) extends CheckRateLimitsCommands
private sealed trait QueryPluginCommands extends Command
private case class PluginOpenChannelResponse(pluginResponse: InterceptOpenChannelResponse) extends QueryPluginCommands
private case object PluginTimeout extends QueryPluginCommands
// @formatter:on
/** DefaultParams are a subset of ChannelData.LocalParams that can be modified by an InterceptOpenChannelPlugin */
case class DefaultParams(dustLimit: Satoshi,
maxHtlcValueInFlightMsat: MilliSatoshi,
htlcMinimum: MilliSatoshi,
toSelfDelay: CltvExpiryDelta,
maxAcceptedHtlcs: Int)
def apply(peer: ActorRef[Any], nodeParams: NodeParams, remoteNodeId: PublicKey, wallet: OnchainPubkeyCache, pendingChannelsRateLimiter: ActorRef[PendingChannelsRateLimiter.Command], pluginTimeout: FiniteDuration = 1 minute): Behavior[Command] =
Behaviors.setup { context =>
Behaviors.withMdc(Logs.mdc(remoteNodeId_opt = Some(remoteNodeId))) {
new OpenChannelInterceptor(peer, pendingChannelsRateLimiter, pluginTimeout, nodeParams, wallet, context).waitForRequest()
}
}
def makeChannelParams(nodeParams: NodeParams, initFeatures: Features[InitFeature], upfrontShutdownScript_opt: Option[ByteVector], walletStaticPaymentBasepoint_opt: Option[PublicKey], isInitiator: Boolean, dualFunded: Boolean, fundingAmount: Satoshi, unlimitedMaxHtlcValueInFlight: Boolean): LocalParams = {
val maxHtlcValueInFlightMsat = if (unlimitedMaxHtlcValueInFlight) {
// We don't want to impose limits on the amount in flight, typically to allow fully emptying the channel.
21e6.btc.toMilliSatoshi
} else {
// NB: when we're the initiator, we don't know yet if the remote peer will contribute to the funding amount, so
// the percentage-based value may be underestimated. That's ok, this is a security parameter so it makes sense to
// base it on the amount that we're contributing instead of the total funding amount.
nodeParams.channelConf.maxHtlcValueInFlightMsat.min(fundingAmount * nodeParams.channelConf.maxHtlcValueInFlightPercent / 100)
}
LocalParams(
nodeParams.nodeId,
nodeParams.channelKeyManager.newFundingKeyPath(isInitiator), // we make sure that initiator and non-initiator key paths end differently
dustLimit = nodeParams.channelConf.dustLimit,
maxHtlcValueInFlightMsat = maxHtlcValueInFlightMsat,
requestedChannelReserve_opt = if (dualFunded) None else Some((fundingAmount * nodeParams.channelConf.reserveToFundingRatio).max(nodeParams.channelConf.dustLimit)), // BOLT #2: make sure that our reserve is above our dust limit
htlcMinimum = nodeParams.channelConf.htlcMinimum,
toSelfDelay = nodeParams.channelConf.toRemoteDelay, // we choose their delay
maxAcceptedHtlcs = nodeParams.channelConf.maxAcceptedHtlcs,
isInitiator = isInitiator,
upfrontShutdownScript_opt = upfrontShutdownScript_opt,
walletStaticPaymentBasepoint = walletStaticPaymentBasepoint_opt,
initFeatures = initFeatures
)
}
}
private class OpenChannelInterceptor(peer: ActorRef[Any],
pendingChannelsRateLimiter: ActorRef[PendingChannelsRateLimiter.Command],
pluginTimeout: FiniteDuration,
nodeParams: NodeParams,
wallet: OnchainPubkeyCache,
context: ActorContext[OpenChannelInterceptor.Command]) {
import OpenChannelInterceptor._
private def waitForRequest(): Behavior[Command] = {
receiveCommandMessage[WaitForRequestCommands](context, "waitForRequest") {
case request: OpenChannelInitiator => sanityCheckInitiator(request)
case request: OpenChannelNonInitiator => sanityCheckNonInitiator(request)
}
}
private def sanityCheckInitiator(request: OpenChannelInitiator): Behavior[Command] = {
if (request.open.fundingAmount >= Channel.MAX_FUNDING && !request.localFeatures.hasFeature(Wumbo)) {
request.replyTo ! Status.Failure(new RuntimeException(s"fundingAmount=${request.open.fundingAmount} is too big, you must enable large channels support in 'eclair.features' to use funding above ${Channel.MAX_FUNDING} (see eclair.conf)"))
waitForRequest()
} else if (request.open.fundingAmount >= Channel.MAX_FUNDING && !request.remoteFeatures.hasFeature(Wumbo)) {
request.replyTo ! Status.Failure(new RuntimeException(s"fundingAmount=${request.open.fundingAmount} is too big, the remote peer doesn't support wumbo"))
waitForRequest()
} else if (request.open.fundingAmount > nodeParams.channelConf.maxFundingSatoshis) {
request.replyTo ! Status.Failure(new RuntimeException(s"fundingAmount=${request.open.fundingAmount} is too big for the current settings, increase 'eclair.max-funding-satoshis' (see eclair.conf)"))
waitForRequest()
} else {
// If a channel type was provided, we directly use it instead of computing it based on local and remote features.
val channelFlags = request.open.channelFlags_opt.getOrElse(nodeParams.channelConf.channelFlags)
val channelType = request.open.channelType_opt.getOrElse(ChannelTypes.defaultFromFeatures(request.localFeatures, request.remoteFeatures, channelFlags.announceChannel))
val dualFunded = Features.canUseFeature(request.localFeatures, request.remoteFeatures, Features.DualFunding)
val upfrontShutdownScript = Features.canUseFeature(request.localFeatures, request.remoteFeatures, Features.UpfrontShutdownScript)
val localParams = createLocalParams(nodeParams, request.localFeatures, upfrontShutdownScript, channelType, isInitiator = true, dualFunded = dualFunded, request.open.fundingAmount, request.open.disableMaxHtlcValueInFlight)
peer ! Peer.SpawnChannelInitiator(request.open, ChannelConfig.standard, channelType, localParams, request.replyTo.toClassic)
waitForRequest()
}
}
private def sanityCheckNonInitiator(request: OpenChannelNonInitiator): Behavior[Command] = {
validateRemoteChannelType(request.temporaryChannelId, request.channelFlags, request.channelType_opt, request.localFeatures, request.remoteFeatures) match {
case Right(channelType) =>
val dualFunded = Features.canUseFeature(request.localFeatures, request.remoteFeatures, Features.DualFunding)
val upfrontShutdownScript = Features.canUseFeature(request.localFeatures, request.remoteFeatures, Features.UpfrontShutdownScript)
val localParams = createLocalParams(nodeParams, request.localFeatures, upfrontShutdownScript, channelType, isInitiator = false, dualFunded = dualFunded, request.fundingAmount, disableMaxHtlcValueInFlight = false)
checkRateLimits(request, channelType, localParams)
case Left(ex) =>
context.log.warn(s"ignoring remote channel open: ${ex.getMessage}")
sendFailure(ex.getMessage, request)
waitForRequest()
}
}
private def checkRateLimits(request: OpenChannelNonInitiator, channelType: SupportedChannelType, localParams: LocalParams): Behavior[Command] = {
val adapter = context.messageAdapter[PendingChannelsRateLimiter.Response](PendingChannelsRateLimiterResponse)
pendingChannelsRateLimiter ! AddOrRejectChannel(adapter, request.remoteNodeId, request.temporaryChannelId)
receiveCommandMessage[CheckRateLimitsCommands](context, "checkRateLimits") {
case PendingChannelsRateLimiterResponse(PendingChannelsRateLimiter.AcceptOpenChannel) =>
nodeParams.pluginOpenChannelInterceptor match {
case Some(plugin) => queryPlugin(plugin, request, localParams, ChannelConfig.standard, channelType)
case None =>
peer ! SpawnChannelNonInitiator(request.open, ChannelConfig.standard, channelType, localParams, request.peerConnection.toClassic)
waitForRequest()
}
case PendingChannelsRateLimiterResponse(PendingChannelsRateLimiter.ChannelRateLimited) =>
context.log.warn(s"ignoring remote channel open: rate limited")
sendFailure("rate limit reached", request)
waitForRequest()
}
}
private def queryPlugin(plugin: InterceptOpenChannelPlugin, request: OpenChannelInterceptor.OpenChannelNonInitiator, localParams: LocalParams, channelConfig: ChannelConfig, channelType: SupportedChannelType): Behavior[Command] =
Behaviors.withTimers { timers =>
timers.startSingleTimer(PluginTimeout, pluginTimeout)
val pluginResponseAdapter = context.messageAdapter[InterceptOpenChannelResponse](PluginOpenChannelResponse)
val defaultParams = DefaultParams(localParams.dustLimit, localParams.maxHtlcValueInFlightMsat, localParams.htlcMinimum, localParams.toSelfDelay, localParams.maxAcceptedHtlcs)
plugin.openChannelInterceptor ! InterceptOpenChannelReceived(pluginResponseAdapter, request, defaultParams)
receiveCommandMessage[QueryPluginCommands](context, "queryPlugin") {
case PluginOpenChannelResponse(pluginResponse: AcceptOpenChannel) =>
val localParams1 = updateLocalParams(localParams, pluginResponse.defaultParams)
peer ! SpawnChannelNonInitiator(request.open, channelConfig, channelType, localParams1, request.peerConnection.toClassic)
timers.cancel(PluginTimeout)
waitForRequest()
case PluginOpenChannelResponse(pluginResponse: RejectOpenChannel) =>
sendFailure(pluginResponse.error.toAscii, request)
timers.cancel(PluginTimeout)
waitForRequest()
case PluginTimeout =>
context.log.error(s"timed out while waiting for plugin: ${plugin.name}")
sendFailure("plugin timeout", request)
waitForRequest()
}
}
private def sendFailure(error: String, request: OpenChannelNonInitiator): Unit = {
peer ! Peer.OutgoingMessage(Error(request.temporaryChannelId, error), request.peerConnection.toClassic)
context.system.eventStream ! Publish(ChannelAborted(actor.ActorRef.noSender, request.remoteNodeId, request.temporaryChannelId))
}
private def receiveCommandMessage[B <: Command : ClassTag](context: ActorContext[Command], stateName: String)(f: B => Behavior[Command]): Behavior[Command] = {
Behaviors.receiveMessage {
case m: B => f(m)
case o: OpenChannelInitiator =>
o.replyTo ! Status.Failure(new RuntimeException("concurrent request rejected"))
Behaviors.same
case o: OpenChannelNonInitiator =>
context.log.warn(s"ignoring remote channel open: concurrent request rejected")
sendFailure("concurrent request rejected", o)
Behaviors.same
case m =>
context.log.error(s"$stateName: received unhandled message $m")
Behaviors.same
}
}
private def validateRemoteChannelType(temporaryChannelId: ByteVector32, channelFlags: ChannelFlags, remoteChannelType_opt: Option[ChannelType], localFeatures: Features[InitFeature], remoteFeatures: Features[InitFeature]): Either[ChannelException, SupportedChannelType] = {
remoteChannelType_opt match {
// remote explicitly specifies a channel type: we check whether we want to allow it
case Some(remoteChannelType) => ChannelTypes.areCompatible(localFeatures, remoteChannelType) match {
case Some(acceptedChannelType) => Right(acceptedChannelType)
case None => Left(InvalidChannelType(temporaryChannelId, ChannelTypes.defaultFromFeatures(localFeatures, remoteFeatures, channelFlags.announceChannel), remoteChannelType))
}
// Bolt 2: if `option_channel_type` is negotiated: MUST set `channel_type`
case None if Features.canUseFeature(localFeatures, remoteFeatures, Features.ChannelType) => Left(MissingChannelType(temporaryChannelId))
// remote doesn't specify a channel type: we use spec-defined defaults
case None => Right(ChannelTypes.defaultFromFeatures(localFeatures, remoteFeatures, channelFlags.announceChannel))
}
}
private def createLocalParams(nodeParams: NodeParams, initFeatures: Features[InitFeature], upfrontShutdownScript: Boolean, channelType: SupportedChannelType, isInitiator: Boolean, dualFunded: Boolean, fundingAmount: Satoshi, disableMaxHtlcValueInFlight: Boolean): LocalParams = {
val pubkey_opt = if (upfrontShutdownScript || channelType.paysDirectlyToWallet) Some(wallet.getP2wpkhPubkey()) else None
makeChannelParams(
nodeParams, initFeatures,
if (upfrontShutdownScript) Some(Script.write(Script.pay2wpkh(pubkey_opt.get))) else None,
if (channelType.paysDirectlyToWallet) Some(pubkey_opt.get) else None,
isInitiator = isInitiator,
dualFunded = dualFunded,
fundingAmount,
disableMaxHtlcValueInFlight
)
}
private def updateLocalParams(localParams: LocalParams, defaultParams: DefaultParams): LocalParams = {
localParams.copy(
dustLimit = defaultParams.dustLimit,
maxHtlcValueInFlightMsat = defaultParams.maxHtlcValueInFlightMsat,
htlcMinimum = defaultParams.htlcMinimum,
toSelfDelay = defaultParams.toSelfDelay,
maxAcceptedHtlcs = defaultParams.maxAcceptedHtlcs
)
}
}

View file

@ -16,14 +16,14 @@
package fr.acinq.eclair.io
import akka.actor.typed.scaladsl.adapter.ClassicActorRefOps
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter.{ClassicActorContextOps, ClassicActorRefOps}
import akka.actor.{Actor, ActorContext, ActorRef, ExtendedActorSystem, FSM, OneForOneStrategy, PossiblyHarmful, Props, Status, SupervisorStrategy, Terminated, typed}
import akka.event.Logging.MDC
import akka.event.{BusLogging, DiagnosticLoggingAdapter}
import akka.util.Timeout
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.bitcoin.scalacompat.{BtcDouble, ByteVector32, Satoshi, SatoshiLong, Script}
import fr.acinq.eclair.Features.Wumbo
import fr.acinq.bitcoin.scalacompat.{ByteVector32, Satoshi, SatoshiLong}
import fr.acinq.eclair.Logs.LogCategory
import fr.acinq.eclair.NotificationsLogger.NotifyNodeOperator
import fr.acinq.eclair._
@ -34,15 +34,13 @@ import fr.acinq.eclair.channel._
import fr.acinq.eclair.channel.fsm.Channel
import fr.acinq.eclair.io.MessageRelay.Status
import fr.acinq.eclair.io.Monitoring.Metrics
import fr.acinq.eclair.io.OpenChannelInterceptor.{OpenChannelInitiator, OpenChannelNonInitiator}
import fr.acinq.eclair.io.PeerConnection.KillReason
import fr.acinq.eclair.io.Switchboard.RelayMessage
import fr.acinq.eclair.message.OnionMessages
import fr.acinq.eclair.remote.EclairInternalsSerializer.RemoteTypes
import fr.acinq.eclair.wire.protocol
import fr.acinq.eclair.wire.protocol.{Error, HasChannelId, HasTemporaryChannelId, LightningMessage, NodeAddress, OnionMessage, RoutingMessage, UnknownMessage, Warning}
import scodec.bits.ByteVector
import scala.concurrent.ExecutionContext
/**
* This actor represents a logical peer. There is one [[Peer]] per unique remote node id at all time.
@ -54,7 +52,7 @@ import scala.concurrent.ExecutionContext
*
* Created by PM on 26/08/2016.
*/
class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, wallet: OnchainPubkeyCache, channelFactory: Peer.ChannelFactory, switchboard: ActorRef) extends FSMDiagnosticActorLogging[Peer.State, Peer.Data] {
class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, wallet: OnchainPubkeyCache, channelFactory: Peer.ChannelFactory, switchboard: ActorRef, pendingChannelsRateLimiter: typed.ActorRef[PendingChannelsRateLimiter.Command]) extends FSMDiagnosticActorLogging[Peer.State, Peer.Data] {
import Peer._
@ -140,29 +138,8 @@ class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, wallet: OnchainP
stay()
case Event(c: Peer.OpenChannel, d: ConnectedData) =>
if (c.fundingAmount >= Channel.MAX_FUNDING && !d.localFeatures.hasFeature(Wumbo)) {
sender() ! Status.Failure(new RuntimeException(s"fundingAmount=${c.fundingAmount} is too big, you must enable large channels support in 'eclair.features' to use funding above ${Channel.MAX_FUNDING} (see eclair.conf)"))
stay()
} else if (c.fundingAmount >= Channel.MAX_FUNDING && !d.remoteFeatures.hasFeature(Wumbo)) {
sender() ! Status.Failure(new RuntimeException(s"fundingAmount=${c.fundingAmount} is too big, the remote peer doesn't support wumbo"))
stay()
} else if (c.fundingAmount > nodeParams.channelConf.maxFundingSatoshis) {
sender() ! Status.Failure(new RuntimeException(s"fundingAmount=${c.fundingAmount} is too big for the current settings, increase 'eclair.max-funding-satoshis' (see eclair.conf)"))
stay()
} else {
// If a channel type was provided, we directly use it instead of computing it based on local and remote features.
val channelFlags = c.channelFlags_opt.getOrElse(nodeParams.channelConf.channelFlags)
val channelType = c.channelType_opt.getOrElse(ChannelTypes.defaultFromFeatures(d.localFeatures, d.remoteFeatures, channelFlags.announceChannel))
// NB: we need to capture parameters in a val to use them in andThen
val selfRef = self
val origin = sender()
implicit val ec: ExecutionContext = ExecutionContext.Implicits.global
val dualFunded = Features.canUseFeature(d.localFeatures, d.remoteFeatures, Features.DualFunding)
val upfrontShutdownScript = Features.canUseFeature(d.localFeatures, d.remoteFeatures, Features.UpfrontShutdownScript)
val localParams = createLocalParams(nodeParams, d.localFeatures, upfrontShutdownScript, channelType, isInitiator = true, dualFunded = dualFunded, c.fundingAmount, c.disableMaxHtlcValueInFlight)
selfRef ! SpawnChannelInitiator(c, ChannelConfig.standard, channelType, localParams, origin)
stay()
}
openChannelInterceptor ! OpenChannelInitiator(sender().toTyped, remoteNodeId, c, d.localFeatures, d.remoteFeatures)
stay()
case Event(SpawnChannelInitiator(c, channelConfig, channelType, localParams, origin), d: ConnectedData) =>
val channel = spawnChannel(Some(origin))
@ -183,7 +160,7 @@ class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, wallet: OnchainP
case Event(open: protocol.OpenChannel, d: ConnectedData) =>
d.channels.get(TemporaryChannelId(open.temporaryChannelId)) match {
case None =>
handleOpenChannel(Left(open), open.temporaryChannelId, open.fundingSatoshis, open.channelFlags, open.channelType_opt, d)
openChannelInterceptor ! OpenChannelNonInitiator(remoteNodeId, Left(open), d.localFeatures, d.remoteFeatures, d.peerConnection.toTyped)
stay()
case Some(_) =>
log.warning("ignoring open_channel with duplicate temporaryChannelId={}", open.temporaryChannelId)
@ -193,7 +170,7 @@ class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, wallet: OnchainP
case Event(open: protocol.OpenDualFundedChannel, d: ConnectedData) =>
d.channels.get(TemporaryChannelId(open.temporaryChannelId)) match {
case None if Features.canUseFeature(d.localFeatures, d.remoteFeatures, Features.DualFunding) =>
handleOpenChannel(Right(open), open.temporaryChannelId, open.fundingAmount, open.channelFlags, open.channelType_opt, d)
openChannelInterceptor ! OpenChannelNonInitiator(remoteNodeId, Right(open), d.localFeatures, d.remoteFeatures, d.peerConnection.toTyped)
stay()
case None =>
log.info("rejecting open_channel2: dual funding is not supported")
@ -204,20 +181,26 @@ class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, wallet: OnchainP
stay()
}
case Event(SpawnChannelNonInitiator(open, channelConfig, channelType, localParams), d: ConnectedData) =>
val channel = spawnChannel(None)
case Event(SpawnChannelNonInitiator(open, channelConfig, channelType, localParams, peerConnection), d: ConnectedData) =>
val temporaryChannelId = open.fold(_.temporaryChannelId, _.temporaryChannelId)
log.info(s"accepting a new channel with type=$channelType temporaryChannelId=$temporaryChannelId localParams=$localParams")
open match {
case Left(open) =>
channel ! INPUT_INIT_CHANNEL_NON_INITIATOR(open.temporaryChannelId, None, dualFunded = false, None, localParams, d.peerConnection, d.remoteInit, channelConfig, channelType)
channel ! open
case Right(open) =>
// NB: we don't add a contribution to the funding amount.
channel ! INPUT_INIT_CHANNEL_NON_INITIATOR(open.temporaryChannelId, None, dualFunded = true, None, localParams, d.peerConnection, d.remoteInit, channelConfig, channelType)
channel ! open
if (peerConnection == d.peerConnection) {
val channel = spawnChannel(None)
log.info(s"accepting a new channel with type=$channelType temporaryChannelId=$temporaryChannelId localParams=$localParams")
open match {
case Left(open) =>
channel ! INPUT_INIT_CHANNEL_NON_INITIATOR(open.temporaryChannelId, None, dualFunded = false, None, localParams, d.peerConnection, d.remoteInit, channelConfig, channelType)
channel ! open
case Right(open) =>
// NB: we don't add a contribution to the funding amount.
channel ! INPUT_INIT_CHANNEL_NON_INITIATOR(open.temporaryChannelId, None, dualFunded = true, None, localParams, d.peerConnection, d.remoteInit, channelConfig, channelType)
channel ! open
}
stay() using d.copy(channels = d.channels + (TemporaryChannelId(temporaryChannelId) -> channel))
} else {
log.warning("ignoring open_channel request that reconnected during channel intercept, temporaryChannelId={}", temporaryChannelId)
context.system.eventStream.publish(ChannelAborted(ActorRef.noSender, remoteNodeId, temporaryChannelId))
stay()
}
stay() using d.copy(channels = d.channels + (TemporaryChannelId(temporaryChannelId) -> channel))
case Event(msg: HasChannelId, d: ConnectedData) =>
d.channels.get(FinalChannelId(msg.channelId)) match {
@ -389,15 +372,6 @@ class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, wallet: OnchainP
s(e)
}
def createLocalParams(nodeParams: NodeParams, initFeatures: Features[InitFeature], upfrontShutdownScript: Boolean, channelType: SupportedChannelType, isInitiator: Boolean, dualFunded: Boolean, fundingAmount: Satoshi, disableMaxHtlcValueInFlight: Boolean): LocalParams = {
val pubkey_opt = if (upfrontShutdownScript || channelType.paysDirectlyToWallet) Some(wallet.getP2wpkhPubkey()) else None
makeChannelParams(
nodeParams, initFeatures,
if (upfrontShutdownScript) Some(Script.write(Script.pay2wpkh(pubkey_opt.get))) else None,
if (channelType.paysDirectlyToWallet) Some(pubkey_opt.get) else None,
isInitiator = isInitiator, dualFunded = dualFunded, fundingAmount, disableMaxHtlcValueInFlight)
}
def spawnChannel(origin_opt: Option[ActorRef]): ActorRef = {
val channel = channelFactory.spawn(context, remoteNodeId, origin_opt)
context watch channel
@ -409,35 +383,8 @@ class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, wallet: OnchainP
self ! Peer.OutgoingMessage(msg, peerConnection)
}
def handleOpenChannel(open: Either[protocol.OpenChannel, protocol.OpenDualFundedChannel], temporaryChannelId: ByteVector32, fundingAmount: Satoshi, channelFlags: ChannelFlags, channelType_opt: Option[ChannelType], d: ConnectedData): Unit = {
validateRemoteChannelType(temporaryChannelId, channelFlags, channelType_opt, d.localFeatures, d.remoteFeatures) match {
case Right(channelType) =>
// NB: we need to capture parameters in a val to use them in andThen
val selfRef = self
val dualFunded = Features.canUseFeature(d.localFeatures, d.remoteFeatures, Features.DualFunding)
val upfrontShutdownScript = Features.canUseFeature(d.localFeatures, d.remoteFeatures, Features.UpfrontShutdownScript)
val localParams = createLocalParams(nodeParams, d.localFeatures, upfrontShutdownScript, channelType, isInitiator = false, dualFunded = dualFunded, fundingAmount, disableMaxHtlcValueInFlight = false)
selfRef ! SpawnChannelNonInitiator(open, ChannelConfig.standard, channelType, localParams)
case Left(ex) =>
log.warning("ignoring remote channel open: {}", ex.getMessage)
val err = Error(temporaryChannelId, ex.getMessage)
self ! Peer.OutgoingMessage(err, d.peerConnection)
}
}
def validateRemoteChannelType(temporaryChannelId: ByteVector32, channelFlags: ChannelFlags, remoteChannelType_opt: Option[ChannelType], localFeatures: Features[InitFeature], remoteFeatures: Features[InitFeature]): Either[ChannelException, SupportedChannelType] = {
remoteChannelType_opt match {
// remote explicitly specifies a channel type: we check whether we want to allow it
case Some(remoteChannelType) => ChannelTypes.areCompatible(localFeatures, remoteChannelType) match {
case Some(acceptedChannelType) => Right(acceptedChannelType)
case None => Left(InvalidChannelType(temporaryChannelId, ChannelTypes.defaultFromFeatures(localFeatures, remoteFeatures, channelFlags.announceChannel), remoteChannelType))
}
// Bolt 2: if `option_channel_type` is negotiated: MUST set `channel_type`
case None if Features.canUseFeature(localFeatures, remoteFeatures, Features.ChannelType) => Left(MissingChannelType(temporaryChannelId))
// remote doesn't specify a channel type: we use spec-defined defaults
case None => Right(ChannelTypes.defaultFromFeatures(localFeatures, remoteFeatures, channelFlags.announceChannel))
}
}
// resume the openChannelInterceptor in case of failure, we always want the open channel request to succeed or fail
private val openChannelInterceptor = context.spawnAnonymous(Behaviors.supervise(OpenChannelInterceptor(context.self.toTyped, nodeParams, remoteNodeId, wallet, pendingChannelsRateLimiter)).onFailure(typed.SupervisorStrategy.resume))
def stopPeer(): State = {
log.info("removing peer from db")
@ -488,7 +435,7 @@ object Peer {
context.actorOf(Channel.props(nodeParams, wallet, remoteNodeId, watcher, relayer, txPublisherFactory, origin_opt))
}
def props(nodeParams: NodeParams, remoteNodeId: PublicKey, wallet: OnchainPubkeyCache, channelFactory: ChannelFactory, switchboard: ActorRef): Props = Props(new Peer(nodeParams, remoteNodeId, wallet, channelFactory, switchboard))
def props(nodeParams: NodeParams, remoteNodeId: PublicKey, wallet: OnchainPubkeyCache, channelFactory: ChannelFactory, switchboard: ActorRef, pendingChannelsRateLimiter: typed.ActorRef[PendingChannelsRateLimiter.Command]): Props = Props(new Peer(nodeParams, remoteNodeId, wallet, channelFactory, switchboard, pendingChannelsRateLimiter))
// @formatter:off
@ -544,8 +491,8 @@ object Peer {
fundingTxFeerate_opt.foreach(feerate => require(feerate >= FeeratePerKw.MinimumFeeratePerKw, s"fee rate $feerate is below minimum ${FeeratePerKw.MinimumFeeratePerKw}"))
}
private case class SpawnChannelInitiator(cmd: Peer.OpenChannel, channelConfig: ChannelConfig, channelType: SupportedChannelType, localParams: LocalParams, origin: ActorRef)
private case class SpawnChannelNonInitiator(open: Either[protocol.OpenChannel, protocol.OpenDualFundedChannel], channelConfig: ChannelConfig, channelType: SupportedChannelType, localParams: LocalParams)
case class SpawnChannelInitiator(cmd: Peer.OpenChannel, channelConfig: ChannelConfig, channelType: SupportedChannelType, localParams: LocalParams, origin: ActorRef)
case class SpawnChannelNonInitiator(open: Either[protocol.OpenChannel, protocol.OpenDualFundedChannel], channelConfig: ChannelConfig, channelType: SupportedChannelType, localParams: LocalParams, peerConnection: ActorRef)
case class GetPeerInfo(replyTo: Option[typed.ActorRef[PeerInfoResponse]])
sealed trait PeerInfoResponse {
@ -577,29 +524,4 @@ object Peer {
case class RelayUnknownMessage(unknownMessage: UnknownMessage)
// @formatter:on
def makeChannelParams(nodeParams: NodeParams, initFeatures: Features[InitFeature], upfrontShutdownScript_opt: Option[ByteVector], walletStaticPaymentBasepoint_opt: Option[PublicKey], isInitiator: Boolean, dualFunded: Boolean, fundingAmount: Satoshi, unlimitedMaxHtlcValueInFlight: Boolean): LocalParams = {
val maxHtlcValueInFlightMsat = if (unlimitedMaxHtlcValueInFlight) {
// We don't want to impose limits on the amount in flight, typically to allow fully emptying the channel.
21e6.btc.toMilliSatoshi
} else {
// NB: when we're the initiator, we don't know yet if the remote peer will contribute to the funding amount, so
// the percentage-based value may be underestimated. That's ok, this is a security parameter so it makes sense to
// base it on the amount that we're contributing instead of the total funding amount.
nodeParams.channelConf.maxHtlcValueInFlightMsat.min(fundingAmount * nodeParams.channelConf.maxHtlcValueInFlightPercent / 100)
}
LocalParams(
nodeParams.nodeId,
nodeParams.channelKeyManager.newFundingKeyPath(isInitiator), // we make sure that initiator and non-initiator key paths end differently
dustLimit = nodeParams.channelConf.dustLimit,
maxHtlcValueInFlightMsat = maxHtlcValueInFlightMsat,
requestedChannelReserve_opt = if (dualFunded) None else Some((fundingAmount * nodeParams.channelConf.reserveToFundingRatio).max(nodeParams.channelConf.dustLimit)), // BOLT #2: make sure that our reserve is above our dust limit
htlcMinimum = nodeParams.channelConf.htlcMinimum,
toSelfDelay = nodeParams.channelConf.toRemoteDelay, // we choose their delay
maxAcceptedHtlcs = nodeParams.channelConf.maxAcceptedHtlcs,
isInitiator = isInitiator,
upfrontShutdownScript_opt = upfrontShutdownScript_opt,
walletStaticPaymentBasepoint = walletStaticPaymentBasepoint_opt,
initFeatures = initFeatures)
}
}

View file

@ -0,0 +1,154 @@
package fr.acinq.eclair.io
import akka.actor.typed.eventstream.EventStream
import akka.actor.typed.scaladsl.{ActorContext, Behaviors}
import akka.actor.typed.{ActorRef, Behavior}
import fr.acinq.bitcoin.scalacompat.ByteVector32
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.eclair.NodeParams
import fr.acinq.eclair.channel._
import fr.acinq.eclair.io.Monitoring.{Metrics, Tags}
import fr.acinq.eclair.io.PendingChannelsRateLimiter.Command
import fr.acinq.eclair.router.Router
import fr.acinq.eclair.router.Router.{GetNode, PublicNode, UnknownNode}
/**
* A singleton actor that tracks pending channels and rate limits their creation.
*
* This actor should be initialized with the list of current persistent channels. It will track the pending channels
* in real-time and apply the configured rate limits for new requests.
*
* It accepts the command AddOrRejectChannel and will respond with AcceptChannel or ChannelRateLimited. It also tracks
* when channels are assigned a channel id, confirmed on-chain, closed or aborted and will update its internal state
* accordingly.
*
*/
object PendingChannelsRateLimiter {
// @formatter:off
sealed trait Command
case class AddOrRejectChannel(replyTo: ActorRef[Response], remoteNodeId: PublicKey, temporaryChannelId: ByteVector32) extends Command
private case class WrappedGetNodeResponse(temporaryChannelId: ByteVector32, response: Router.GetNodeResponse, replyTo: Option[ActorRef[Response]]) extends Command
private case class ReplaceChannelId(remoteNodeId: PublicKey, temporaryChannelId: ByteVector32, channelId: ByteVector32) extends Command
private case class RemoveChannelId(remoteNodeId: PublicKey, channelId: ByteVector32) extends Command
sealed trait Response
case object AcceptOpenChannel extends Response
case object ChannelRateLimited extends Response
// @formatter:on
def apply(nodeParams: NodeParams, router: ActorRef[Router.GetNode], channels: Seq[PersistentChannelData]): Behavior[Command] = {
Behaviors.setup { context =>
new PendingChannelsRateLimiter(nodeParams, router, context).restoring(filterPendingChannels(channels), Map(), Map())
}
}
private[io] def filterPendingChannels(channels: Seq[PersistentChannelData]): Map[PublicKey, Seq[PersistentChannelData]] = {
channels.filter {
case _: DATA_WAIT_FOR_FUNDING_CONFIRMED => true
case _: DATA_WAIT_FOR_DUAL_FUNDING_CONFIRMED => true
case _: DATA_WAIT_FOR_CHANNEL_READY => true
case _: DATA_WAIT_FOR_DUAL_FUNDING_READY => true
case _ => false
}.groupBy(_.metaCommitments.remoteNodeId)
}
}
private class PendingChannelsRateLimiter(nodeParams: NodeParams, router: ActorRef[Router.GetNode], context: ActorContext[Command]) {
import PendingChannelsRateLimiter._
private def restoring(channels: Map[PublicKey, Seq[PersistentChannelData]], pendingPublicNodeChannels: Map[PublicKey, Seq[ByteVector32]], pendingPrivateNodeChannels: Map[PublicKey, Seq[ByteVector32]]): Behavior[Command] = {
channels.headOption match {
case Some((remoteNodeId, pendingChannels)) =>
val adapter = context.messageAdapter[Router.GetNodeResponse](r => WrappedGetNodeResponse(pendingChannels.head.channelId, r, None))
router ! GetNode(adapter, remoteNodeId)
Behaviors.receiveMessagePartial[Command] {
case AddOrRejectChannel(replyTo, _, _) =>
replyTo ! ChannelRateLimited
Behaviors.same
case WrappedGetNodeResponse(_, PublicNode(announcement, _, _), _) =>
restoring(channels.tail, pendingPublicNodeChannels + (announcement.nodeId -> pendingChannels.map(_.channelId)), pendingPrivateNodeChannels)
case WrappedGetNodeResponse(_, UnknownNode(nodeId), _) =>
restoring(channels.tail, pendingPublicNodeChannels, pendingPrivateNodeChannels + (nodeId -> pendingChannels.map(_.channelId)))
}
case None =>
context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[ChannelIdAssigned](c => ReplaceChannelId(c.remoteNodeId, c.temporaryChannelId, c.channelId)))
context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[ChannelOpened](c => RemoveChannelId(c.remoteNodeId, c.channelId)))
context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[ChannelClosed](c => RemoveChannelId(c.commitments.remoteNodeId, c.channelId)))
context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[ChannelAborted](c => RemoveChannelId(c.remoteNodeId, c.channelId)))
registering(pendingPublicNodeChannels, pendingPrivateNodeChannels)
}
}
private def registering(pendingPublicNodeChannels: Map[PublicKey, Seq[ByteVector32]], pendingPrivateNodeChannels: Map[PublicKey, Seq[ByteVector32]]): Behavior[Command] = {
Metrics.OpenChannelRequestsPending.withTag(Tags.PublicPeers, value = true).update(pendingPublicNodeChannels.map(_._2.length).sum)
Metrics.OpenChannelRequestsPending.withTag(Tags.PublicPeers, value = false).update(pendingPrivateNodeChannels.map(_._2.length).sum)
Behaviors.receiveMessagePartial {
case AddOrRejectChannel(replyTo, remoteNodeId, _) if nodeParams.channelConf.channelOpenerWhitelist.contains(remoteNodeId) =>
replyTo ! AcceptOpenChannel
Behaviors.same
case AddOrRejectChannel(replyTo, remoteNodeId, temporaryChannelId) =>
val adapter = context.messageAdapter[Router.GetNodeResponse](r => WrappedGetNodeResponse(temporaryChannelId, r, Some(replyTo)))
router ! GetNode(adapter, remoteNodeId)
Behaviors.same
case WrappedGetNodeResponse(temporaryChannelId, PublicNode(announcement, _, _), Some(replyTo)) =>
pendingPublicNodeChannels.get(announcement.nodeId) match {
case Some(pendingChannels) if pendingChannels.size >= nodeParams.channelConf.maxPendingChannelsPerPeer =>
replyTo ! ChannelRateLimited
Behaviors.same
case Some(peerChannels) =>
replyTo ! AcceptOpenChannel
registering(pendingPublicNodeChannels + (announcement.nodeId -> (temporaryChannelId +: peerChannels)), pendingPrivateNodeChannels)
case None =>
replyTo ! AcceptOpenChannel
registering(pendingPublicNodeChannels + (announcement.nodeId -> Seq(temporaryChannelId)), pendingPrivateNodeChannels)
}
case WrappedGetNodeResponse(temporaryChannelId, UnknownNode(nodeId), Some(replyTo)) =>
if (pendingPrivateNodeChannels.map(_._2.size).sum >= nodeParams.channelConf.maxTotalPendingChannelsPrivateNodes) {
replyTo ! ChannelRateLimited
Behaviors.same
} else {
replyTo ! AcceptOpenChannel
pendingPrivateNodeChannels.get(nodeId) match {
case Some(peerChannels) =>
registering(pendingPublicNodeChannels, pendingPrivateNodeChannels + (nodeId -> (temporaryChannelId +: peerChannels)))
case None =>
registering(pendingPublicNodeChannels, pendingPrivateNodeChannels + (nodeId -> Seq(temporaryChannelId)))
}
}
case ReplaceChannelId(remoteNodeId, temporaryChannelId, channelId) =>
pendingPublicNodeChannels.get(remoteNodeId) match {
case Some(channels) if channels.contains(temporaryChannelId) =>
registering(pendingPublicNodeChannels + (remoteNodeId -> (channels.filterNot(_ == temporaryChannelId) :+ channelId)), pendingPrivateNodeChannels)
case Some(_) => Behaviors.same
case None =>
pendingPrivateNodeChannels.get(remoteNodeId) match {
case Some(channels) if channels.contains(temporaryChannelId) =>
registering(pendingPublicNodeChannels, pendingPrivateNodeChannels + (remoteNodeId -> (channels.filterNot(_ == temporaryChannelId) :+ channelId)))
case Some(_) => Behaviors.same
case None => Behaviors.same
}
}
case RemoveChannelId(remoteNodeId, channelId) =>
pendingPublicNodeChannels.get(remoteNodeId) match {
case Some(pendingChannels) =>
val pendingChannels1 = pendingChannels.filterNot(_ == channelId)
if (pendingChannels1.isEmpty) {
registering(pendingPublicNodeChannels - remoteNodeId, pendingPrivateNodeChannels)
} else {
registering(pendingPublicNodeChannels + (remoteNodeId -> pendingChannels1), pendingPrivateNodeChannels)
}
case None =>
pendingPrivateNodeChannels.get(remoteNodeId) match {
case Some(pendingChannels) =>
val pendingChannels1 = pendingChannels.filterNot(_ == channelId)
if (pendingChannels1.isEmpty) {
registering(pendingPublicNodeChannels, pendingPrivateNodeChannels - remoteNodeId)
} else {
registering(pendingPublicNodeChannels, pendingPrivateNodeChannels + (remoteNodeId -> pendingChannels1))
}
case None => Behaviors.same
}
}
}
}
}

View file

@ -153,9 +153,9 @@ object Switchboard {
def spawn(context: ActorContext, remoteNodeId: PublicKey): ActorRef
}
case class SimplePeerFactory(nodeParams: NodeParams, wallet: OnchainPubkeyCache, channelFactory: Peer.ChannelFactory) extends PeerFactory {
case class SimplePeerFactory(nodeParams: NodeParams, wallet: OnchainPubkeyCache, channelFactory: Peer.ChannelFactory, pendingChannelsRateLimiter: typed.ActorRef[PendingChannelsRateLimiter.Command]) extends PeerFactory {
override def spawn(context: ActorContext, remoteNodeId: PublicKey): ActorRef =
context.actorOf(Peer.props(nodeParams, remoteNodeId, wallet, channelFactory, context.self), name = peerActorName(remoteNodeId))
context.actorOf(Peer.props(nodeParams, remoteNodeId, wallet, channelFactory, context.self, pendingChannelsRateLimiter), name = peerActorName(remoteNodeId))
}
def props(nodeParams: NodeParams, peerFactory: PeerFactory) = Props(new Switchboard(nodeParams, peerFactory))

View file

@ -24,7 +24,7 @@ import fr.acinq.eclair.channel.fsm.Channel.{ChannelConf, UnhandledExceptionStrat
import fr.acinq.eclair.channel.{ChannelFlags, LocalParams}
import fr.acinq.eclair.crypto.keymanager.{LocalChannelKeyManager, LocalNodeKeyManager}
import fr.acinq.eclair.io.MessageRelay.RelayAll
import fr.acinq.eclair.io.{Peer, PeerConnection}
import fr.acinq.eclair.io.{OpenChannelInterceptor, PeerConnection}
import fr.acinq.eclair.message.OnionMessages.OnionMessageConfig
import fr.acinq.eclair.payment.relay.Relayer.{AsyncPaymentsParams, RelayFees, RelayParams}
import fr.acinq.eclair.router.Graph.WeightRatios
@ -126,6 +126,9 @@ object TestConstants {
minFundingPrivateSatoshis = 900 sat,
maxFundingSatoshis = 16777215 sat,
requireConfirmedInputsForDualFunding = false,
channelOpenerWhitelist = Set.empty,
maxPendingChannelsPerPeer = 3,
maxTotalPendingChannelsPrivateNodes = 99
),
onChainFeeConf = OnChainFeeConf(
feeTargets = FeeTargets(6, 2, 36, 12, 18, 0),
@ -212,7 +215,7 @@ object TestConstants {
purgeInvoicesInterval = None
)
def channelParams: LocalParams = Peer.makeChannelParams(
def channelParams: LocalParams = OpenChannelInterceptor.makeChannelParams(
nodeParams,
nodeParams.features.initFeatures(),
None,
@ -276,6 +279,9 @@ object TestConstants {
minFundingPrivateSatoshis = 900 sat,
maxFundingSatoshis = 16777215 sat,
requireConfirmedInputsForDualFunding = false,
channelOpenerWhitelist = Set.empty,
maxPendingChannelsPerPeer = 3,
maxTotalPendingChannelsPrivateNodes = 99
),
onChainFeeConf = OnChainFeeConf(
feeTargets = FeeTargets(6, 2, 36, 12, 18, 0),
@ -362,7 +368,7 @@ object TestConstants {
purgeInvoicesInterval = None
)
def channelParams: LocalParams = Peer.makeChannelParams(
def channelParams: LocalParams = OpenChannelInterceptor.makeChannelParams(
nodeParams,
nodeParams.features.initFeatures(),
None,

View file

@ -31,7 +31,7 @@ import fr.acinq.eclair.blockchain.fee.FeeratePerKw
import fr.acinq.eclair.blockchain.{OnChainWallet, SingleKeyOnChainWallet}
import fr.acinq.eclair.channel.fund.InteractiveTxBuilder
import fr.acinq.eclair.channel.fund.InteractiveTxBuilder._
import fr.acinq.eclair.io.Peer
import fr.acinq.eclair.io.OpenChannelInterceptor.makeChannelParams
import fr.acinq.eclair.transactions.Scripts
import fr.acinq.eclair.wire.protocol._
import fr.acinq.eclair.{Feature, FeatureSupport, Features, InitFeature, MilliSatoshiLong, NodeParams, TestConstants, TestKitBaseClass, UInt64, randomBytes32, randomKey}
@ -113,8 +113,8 @@ class InteractiveTxBuilderSpec extends TestKitBaseClass with AnyFunSuiteLike wit
private def createFixtureParams(fundingAmountA: Satoshi, fundingAmountB: Satoshi, targetFeerate: FeeratePerKw, dustLimit: Satoshi, lockTime: Long, requireConfirmedInputs: RequireConfirmedInputs = RequireConfirmedInputs(forLocal = false, forRemote = false)): FixtureParams = {
val channelFeatures = ChannelFeatures(ChannelTypes.AnchorOutputsZeroFeeHtlcTx(), Features[InitFeature](Features.DualFunding -> FeatureSupport.Optional), Features[InitFeature](Features.DualFunding -> FeatureSupport.Optional), announceChannel = true)
val Seq(nodeParamsA, nodeParamsB) = Seq(TestConstants.Alice.nodeParams, TestConstants.Bob.nodeParams).map(_.copy(features = Features(channelFeatures.features.map(f => f -> FeatureSupport.Optional).toMap[Feature, FeatureSupport])))
val localParamsA = Peer.makeChannelParams(nodeParamsA, nodeParamsA.features.initFeatures(), None, None, isInitiator = true, dualFunded = true, fundingAmountA, unlimitedMaxHtlcValueInFlight = false)
val localParamsB = Peer.makeChannelParams(nodeParamsB, nodeParamsB.features.initFeatures(), None, None, isInitiator = false, dualFunded = true, fundingAmountB, unlimitedMaxHtlcValueInFlight = false)
val localParamsA = makeChannelParams(nodeParamsA, nodeParamsA.features.initFeatures(), None, None, isInitiator = true, dualFunded = true, fundingAmountA, unlimitedMaxHtlcValueInFlight = false)
val localParamsB = makeChannelParams(nodeParamsB, nodeParamsB.features.initFeatures(), None, None, isInitiator = false, dualFunded = true, fundingAmountB, unlimitedMaxHtlcValueInFlight = false)
val Seq(remoteParamsA, remoteParamsB) = Seq((nodeParamsA, localParamsA), (nodeParamsB, localParamsB)).map {
case (nodeParams, localParams) =>

View file

@ -3,7 +3,7 @@ package fr.acinq.eclair.integration.basic.fixtures
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter.{ClassicActorRefOps, ClassicActorSystemOps}
import akka.actor.{ActorRef, ActorSystem}
import akka.actor.{ActorRef, ActorSystem, typed}
import akka.testkit.{TestActor, TestProbe}
import com.softwaremill.quicklens.ModifyPimp
import com.typesafe.config.ConfigFactory
@ -19,7 +19,7 @@ import fr.acinq.eclair.channel.fsm.Channel
import fr.acinq.eclair.crypto.TransportHandler
import fr.acinq.eclair.crypto.keymanager.{LocalChannelKeyManager, LocalNodeKeyManager}
import fr.acinq.eclair.io.PeerConnection.ConnectionResult
import fr.acinq.eclair.io.{Peer, PeerConnection, Switchboard}
import fr.acinq.eclair.io.{Peer, PeerConnection, PendingChannelsRateLimiter, Switchboard}
import fr.acinq.eclair.payment.Bolt11Invoice.ExtraHop
import fr.acinq.eclair.payment._
import fr.acinq.eclair.payment.receive.{MultiPartHandler, PaymentHandler}
@ -91,7 +91,8 @@ object MinimalNodeFixture extends Assertions with Eventually with IntegrationPat
val relayer = system.actorOf(Relayer.props(nodeParams, router, register, paymentHandler, triggerer.ref.toTyped), "relayer")
val txPublisherFactory = Channel.SimpleTxPublisherFactory(nodeParams, watcherTyped, bitcoinClient)
val channelFactory = Peer.SimpleChannelFactory(nodeParams, watcherTyped, relayer, wallet, txPublisherFactory)
val peerFactory = Switchboard.SimplePeerFactory(nodeParams, wallet, channelFactory)
val pendingChannelsRateLimiter = system.spawnAnonymous(Behaviors.supervise(PendingChannelsRateLimiter(nodeParams, router.toTyped, Seq())).onFailure(typed.SupervisorStrategy.resume))
val peerFactory = Switchboard.SimplePeerFactory(nodeParams, wallet, channelFactory, pendingChannelsRateLimiter)
val switchboard = system.actorOf(Switchboard.props(nodeParams, peerFactory), "switchboard")
val paymentFactory = PaymentInitiator.SimplePaymentFactory(nodeParams, router, register)
val paymentInitiator = system.actorOf(PaymentInitiator.props(nodeParams, paymentFactory), "payment-initiator")

View file

@ -0,0 +1,227 @@
/*
* Copyright 2023 ACINQ SAS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package fr.acinq.eclair.io
import akka.actor.Status
import akka.actor.testkit.typed.scaladsl.{ScalaTestWithActorTestKit, TestProbe}
import akka.actor.typed.ActorRef
import akka.actor.typed.eventstream.EventStream
import akka.actor.typed.scaladsl.adapter.TypedActorRefOps
import com.typesafe.config.ConfigFactory
import fr.acinq.bitcoin.scalacompat.{ByteVector32, Crypto, SatoshiLong}
import fr.acinq.eclair.FeatureSupport.{Mandatory, Optional}
import fr.acinq.eclair.Features.{AnchorOutputs, ChannelType, StaticRemoteKey, Wumbo}
import fr.acinq.eclair.blockchain.DummyOnChainWallet
import fr.acinq.eclair.channel.ChannelTypes.UnsupportedChannelType
import fr.acinq.eclair.channel.fsm.Channel
import fr.acinq.eclair.channel.states.ChannelStateTestsTags
import fr.acinq.eclair.channel.{ChannelAborted, ChannelTypes}
import fr.acinq.eclair.io.OpenChannelInterceptor.{DefaultParams, OpenChannelInitiator, OpenChannelNonInitiator}
import fr.acinq.eclair.io.Peer.{OutgoingMessage, SpawnChannelNonInitiator}
import fr.acinq.eclair.io.PeerSpec.createOpenChannelMessage
import fr.acinq.eclair.io.PendingChannelsRateLimiter.AddOrRejectChannel
import fr.acinq.eclair.payment.Bolt11Invoice.defaultFeatures.initFeatures
import fr.acinq.eclair.wire.protocol.{ChannelTlv, Error, OpenChannel, OpenChannelTlv, TlvStream}
import fr.acinq.eclair.{AcceptOpenChannel, CltvExpiryDelta, Features, InterceptOpenChannelCommand, InterceptOpenChannelPlugin, InterceptOpenChannelReceived, MilliSatoshiLong, RejectOpenChannel, TestConstants, UnknownFeature, randomBytes32, randomKey}
import org.scalatest.funsuite.FixtureAnyFunSuiteLike
import org.scalatest.{Outcome, Tag}
import scala.concurrent.duration.DurationInt
class OpenChannelInterceptorSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("application")) with FixtureAnyFunSuiteLike {
val remoteNodeId: Crypto.PublicKey = randomKey().publicKey
val defaultParams: DefaultParams = DefaultParams(100 sat, 100000 msat, 100 msat, CltvExpiryDelta(288), 10)
val openChannel: OpenChannel = createOpenChannelMessage()
override def withFixture(test: OneArgTest): Outcome = {
val peer = TestProbe[Any]()
val peerConnection = TestProbe[Any]()
val pluginInterceptor = TestProbe[InterceptOpenChannelCommand]()
val wallet = new DummyOnChainWallet()
val pendingChannelsRateLimiter = TestProbe[PendingChannelsRateLimiter.Command]()
val plugin = new InterceptOpenChannelPlugin {
override def name: String = "OpenChannelInterceptorPlugin"
override def openChannelInterceptor: ActorRef[InterceptOpenChannelCommand] = pluginInterceptor.ref
}
val pluginParams = TestConstants.Alice.nodeParams.pluginParams :+ plugin
val nodeParams = TestConstants.Alice.nodeParams.copy(pluginParams = pluginParams)
val eventListener = TestProbe[ChannelAborted]()
system.eventStream ! EventStream.Subscribe(eventListener.ref)
val openChannelInterceptor = testKit.spawn(OpenChannelInterceptor(peer.ref, nodeParams, remoteNodeId, wallet, pendingChannelsRateLimiter.ref, 10 millis))
withFixture(test.toNoArgTest(FixtureParam(openChannelInterceptor, peer, pluginInterceptor, pendingChannelsRateLimiter, peerConnection, eventListener, wallet)))
}
case class FixtureParam(openChannelInterceptor: ActorRef[OpenChannelInterceptor.Command], peer: TestProbe[Any], pluginInterceptor: TestProbe[InterceptOpenChannelCommand], pendingChannelsRateLimiter: TestProbe[PendingChannelsRateLimiter.Command], peerConnection: TestProbe[Any], eventListener: TestProbe[ChannelAborted], wallet: DummyOnChainWallet)
test("reject channel open if timeout waiting for plugin to respond") { f =>
import f._
val openChannelNonInitiator = OpenChannelNonInitiator(remoteNodeId, Left(openChannel), Features.empty, Features.empty, peerConnection.ref)
openChannelInterceptor ! openChannelNonInitiator
pendingChannelsRateLimiter.expectMessageType[AddOrRejectChannel].replyTo ! PendingChannelsRateLimiter.AcceptOpenChannel
pluginInterceptor.expectMessageType[InterceptOpenChannelReceived]
assert(peer.expectMessageType[OutgoingMessage].msg.asInstanceOf[Error].toAscii.contains("plugin timeout"))
eventListener.expectMessageType[ChannelAborted]
}
test("continue channel open if pending channels rate limiter and interceptor plugin accept it") { f =>
import f._
val openChannelNonInitiator = OpenChannelNonInitiator(remoteNodeId, Left(openChannel), Features.empty, Features.empty, peerConnection.ref)
openChannelInterceptor ! openChannelNonInitiator
pendingChannelsRateLimiter.expectMessageType[AddOrRejectChannel].replyTo ! PendingChannelsRateLimiter.AcceptOpenChannel
pluginInterceptor.expectMessageType[InterceptOpenChannelReceived].replyTo ! AcceptOpenChannel(randomBytes32(), defaultParams)
val updatedLocalParams = peer.expectMessageType[SpawnChannelNonInitiator].localParams
assert(updatedLocalParams.dustLimit == defaultParams.dustLimit)
assert(updatedLocalParams.htlcMinimum == defaultParams.htlcMinimum)
assert(updatedLocalParams.maxAcceptedHtlcs == defaultParams.maxAcceptedHtlcs)
assert(updatedLocalParams.maxHtlcValueInFlightMsat == defaultParams.maxHtlcValueInFlightMsat)
assert(updatedLocalParams.toSelfDelay == defaultParams.toSelfDelay)
}
test("continue channel open if no interceptor plugin registered and pending channels rate limiter accepts it") { f =>
import f._
// no open channel interceptor plugin registered
val wallet = new DummyOnChainWallet()
val openChannelInterceptor = testKit.spawn(OpenChannelInterceptor(peer.ref, TestConstants.Alice.nodeParams, remoteNodeId, wallet, pendingChannelsRateLimiter.ref, 10 millis))
val openChannelNonInitiator = OpenChannelNonInitiator(remoteNodeId, Left(openChannel), Features.empty, Features.empty, peerConnection.ref)
openChannelInterceptor ! openChannelNonInitiator
pendingChannelsRateLimiter.expectMessageType[AddOrRejectChannel].replyTo ! PendingChannelsRateLimiter.AcceptOpenChannel
pluginInterceptor.expectNoMessage(10 millis)
peer.expectMessageType[SpawnChannelNonInitiator]
}
test("reject open channel request if rejected by the plugin") { f =>
import f._
val openChannelNonInitiator = OpenChannelNonInitiator(remoteNodeId, Left(openChannel), Features.empty, Features.empty, peerConnection.ref)
openChannelInterceptor ! openChannelNonInitiator
pendingChannelsRateLimiter.expectMessageType[AddOrRejectChannel].replyTo ! PendingChannelsRateLimiter.AcceptOpenChannel
pluginInterceptor.expectMessageType[InterceptOpenChannelReceived].replyTo ! RejectOpenChannel(randomBytes32(), Error(randomBytes32(), "rejected"))
assert(peer.expectMessageType[OutgoingMessage].msg.asInstanceOf[Error].toAscii.contains("rejected"))
eventListener.expectMessageType[ChannelAborted]
}
test("reject open channel request if pending channels rate limit reached") { f =>
import f._
val openChannelNonInitiator = OpenChannelNonInitiator(remoteNodeId, Left(openChannel), Features.empty, Features.empty, peerConnection.ref)
openChannelInterceptor ! openChannelNonInitiator
pendingChannelsRateLimiter.expectMessageType[AddOrRejectChannel].replyTo ! PendingChannelsRateLimiter.ChannelRateLimited
assert(peer.expectMessageType[OutgoingMessage].msg.asInstanceOf[Error].toAscii.contains("rate limit reached"))
eventListener.expectMessageType[ChannelAborted]
}
test("reject open channel request if concurrent request in progress") { f =>
import f._
val openChannelNonInitiator = OpenChannelNonInitiator(remoteNodeId, Left(openChannel), Features.empty, Features.empty, peerConnection.ref)
openChannelInterceptor ! openChannelNonInitiator
// waiting for rate limiter to respond to the first request, do not accept any other requests
openChannelInterceptor ! openChannelNonInitiator.copy(open = Left(openChannel.copy(temporaryChannelId = ByteVector32.One)))
assert(peer.expectMessageType[OutgoingMessage].msg.asInstanceOf[Error].channelId == ByteVector32.One)
// waiting for plugin to respond to the first request, do not accept any other requests
pendingChannelsRateLimiter.expectMessageType[AddOrRejectChannel].replyTo ! PendingChannelsRateLimiter.AcceptOpenChannel
openChannelInterceptor ! openChannelNonInitiator.copy(open = Left(openChannel.copy(temporaryChannelId = ByteVector32.One)))
assert(peer.expectMessageType[OutgoingMessage].msg.asInstanceOf[Error].channelId == ByteVector32.One)
// original request accepted after plugin accepts it
pluginInterceptor.expectMessageType[InterceptOpenChannelReceived].replyTo ! AcceptOpenChannel(randomBytes32(), defaultParams)
assert(peer.expectMessageType[SpawnChannelNonInitiator].open == Left(openChannel))
eventListener.expectMessageType[ChannelAborted]
}
test("don't spawn a wumbo channel if wumbo feature isn't enabled") { f =>
import f._
val probe = TestProbe[Any]()
val fundingAmountBig = Channel.MAX_FUNDING + 10000.sat
openChannelInterceptor ! OpenChannelInitiator(probe.ref, remoteNodeId, Peer.OpenChannel(remoteNodeId, fundingAmountBig, None, None, None, None, None), Features.empty, Features.empty)
assert(probe.expectMessageType[Status.Failure].cause.getMessage == s"fundingAmount=$fundingAmountBig is too big, you must enable large channels support in 'eclair.features' to use funding above ${Channel.MAX_FUNDING} (see eclair.conf)")
}
test("don't spawn a wumbo channel if remote doesn't support wumbo", Tag(ChannelStateTestsTags.Wumbo)) { f =>
import f._
val probe = TestProbe[Any]()
val fundingAmountBig = Channel.MAX_FUNDING + 10000.sat
openChannelInterceptor ! OpenChannelInitiator(probe.ref, remoteNodeId, Peer.OpenChannel(remoteNodeId, fundingAmountBig, None, None, None, None, None), initFeatures().add(Wumbo, Optional), Features.empty)
assert(probe.expectMessageType[Status.Failure].cause.getMessage == s"fundingAmount=$fundingAmountBig is too big, the remote peer doesn't support wumbo")
}
test("don't spawn a channel if fundingSatoshis is greater than maxFundingSatoshis", Tag(ChannelStateTestsTags.Wumbo)) { f =>
import f._
val probe = TestProbe[Any]()
val fundingAmountBig = Channel.MAX_FUNDING + 10000.sat
openChannelInterceptor ! OpenChannelInitiator(probe.ref, remoteNodeId, Peer.OpenChannel(remoteNodeId, fundingAmountBig, None, None, None, None, None), initFeatures().add(Wumbo, Optional), initFeatures().add(Wumbo, Optional))
assert(probe.expectMessageType[Status.Failure].cause.getMessage == s"fundingAmount=$fundingAmountBig is too big for the current settings, increase 'eclair.max-funding-satoshis' (see eclair.conf)")
}
test("don't spawn a channel if we don't support their channel type") { f =>
import f._
// They only support anchor outputs and we don't.
{
val open = createOpenChannelMessage(TlvStream[OpenChannelTlv](ChannelTlv.ChannelTypeTlv(ChannelTypes.AnchorOutputs())))
openChannelInterceptor ! OpenChannelNonInitiator(remoteNodeId, Left(open), Features.empty, Features.empty, peerConnection.ref)
peer.expectMessage(OutgoingMessage(Error(open.temporaryChannelId, "invalid channel_type=anchor_outputs, expected channel_type=standard"), peerConnection.ref.toClassic))
eventListener.expectMessageType[ChannelAborted]
}
// They only support anchor outputs with zero fee htlc txs and we don't.
{
val open = createOpenChannelMessage(TlvStream[OpenChannelTlv](ChannelTlv.ChannelTypeTlv(ChannelTypes.AnchorOutputsZeroFeeHtlcTx())))
openChannelInterceptor ! OpenChannelNonInitiator(remoteNodeId, Left(open), Features.empty, Features.empty, peerConnection.ref)
peer.expectMessage(OutgoingMessage(Error(open.temporaryChannelId, "invalid channel_type=anchor_outputs_zero_fee_htlc_tx, expected channel_type=standard"), peerConnection.ref.toClassic))
eventListener.expectMessageType[ChannelAborted]
}
// They want to use a channel type that doesn't exist in the spec.
{
val open = createOpenChannelMessage(TlvStream[OpenChannelTlv](ChannelTlv.ChannelTypeTlv(UnsupportedChannelType(Features(AnchorOutputs -> Optional)))))
openChannelInterceptor ! OpenChannelNonInitiator(remoteNodeId, Left(open), Features.empty, Features.empty, peerConnection.ref)
peer.expectMessage(OutgoingMessage(Error(open.temporaryChannelId, "invalid channel_type=0x200000, expected channel_type=standard"), peerConnection.ref.toClassic))
eventListener.expectMessageType[ChannelAborted]
}
// They want to use a channel type we don't support yet.
{
val open = createOpenChannelMessage(TlvStream[OpenChannelTlv](ChannelTlv.ChannelTypeTlv(UnsupportedChannelType(Features(Map(StaticRemoteKey -> Mandatory), unknown = Set(UnknownFeature(22)))))))
openChannelInterceptor ! OpenChannelNonInitiator(remoteNodeId, Left(open), Features.empty, Features.empty, peerConnection.ref)
peer.expectMessage(OutgoingMessage(Error(open.temporaryChannelId, "invalid channel_type=0x401000, expected channel_type=standard"), peerConnection.ref.toClassic))
eventListener.expectMessageType[ChannelAborted]
}
}
test("don't spawn a channel if channel type is missing with the feature bit set", Tag(ChannelStateTestsTags.ChannelType)) { f =>
import f._
val open = createOpenChannelMessage()
openChannelInterceptor ! OpenChannelNonInitiator(remoteNodeId, Left(open), initFeatures().add(ChannelType, Optional), initFeatures().add(ChannelType, Optional), peerConnection.ref)
peer.expectMessage(OutgoingMessage(Error(open.temporaryChannelId, "option_channel_type was negotiated but channel_type is missing"), peerConnection.ref.toClassic))
eventListener.expectMessageType[ChannelAborted]
}
}

View file

@ -16,25 +16,24 @@
package fr.acinq.eclair.io
import akka.actor.Status.Failure
import akka.actor.{ActorContext, ActorRef, ActorSystem, FSM, PoisonPill, Status}
import akka.testkit.TestActor.KeepRunning
import akka.testkit.{TestFSMRef, TestKit, TestProbe}
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.bitcoin.scalacompat.{Block, Btc, ByteVector32, SatoshiLong, Script}
import fr.acinq.bitcoin.scalacompat.{Block, Btc, ByteVector32, SatoshiLong}
import fr.acinq.eclair.FeatureSupport.{Mandatory, Optional}
import fr.acinq.eclair.Features._
import fr.acinq.eclair.TestConstants._
import fr.acinq.eclair._
import fr.acinq.eclair.blockchain.DummyOnChainWallet
import fr.acinq.eclair.blockchain.fee.{FeeratePerKw, FeeratesPerKw}
import fr.acinq.eclair.channel.ChannelTypes.UnsupportedChannelType
import fr.acinq.eclair.channel._
import fr.acinq.eclair.channel.fsm.Channel
import fr.acinq.eclair.channel.states.ChannelStateTestsTags
import fr.acinq.eclair.io.Peer._
import fr.acinq.eclair.message.OnionMessages.{Recipient, buildMessage}
import fr.acinq.eclair.testutils.FixtureSpec
import fr.acinq.eclair.wire.internal.channel.ChannelCodecsSpec
import fr.acinq.eclair.wire.internal.channel.ChannelCodecsSpec.localParams
import fr.acinq.eclair.wire.protocol
import fr.acinq.eclair.wire.protocol._
import org.scalatest.{Tag, TestData}
@ -43,7 +42,6 @@ import scodec.bits.ByteVector
import java.net.InetSocketAddress
import java.nio.channels.ServerSocketChannel
import scala.concurrent.duration._
import scala.util.Success
class PeerSpec extends FixtureSpec {
@ -52,7 +50,7 @@ class PeerSpec extends FixtureSpec {
override implicit val patienceConfig: PatienceConfig = PatienceConfig(timeout = 30 seconds, interval = 1 second)
case class FixtureParam(nodeParams: NodeParams, remoteNodeId: PublicKey, system: ActorSystem, peer: TestFSMRef[Peer.State, Peer.Data, Peer], peerConnection: TestProbe, channel: TestProbe, switchboard: TestProbe) {
case class FixtureParam(nodeParams: NodeParams, remoteNodeId: PublicKey, system: ActorSystem, peer: TestFSMRef[Peer.State, Peer.Data, Peer], peerConnection: TestProbe, channel: TestProbe, switchboard: TestProbe, mockLimiter: ActorRef) {
implicit val implicitSystem: ActorSystem = system
def cleanup(): Unit = TestKit.shutdownActorSystem(system)
@ -74,7 +72,6 @@ class PeerSpec extends FixtureSpec {
.modify(_.features).setToIf(testData.tags.contains(ChannelStateTestsTags.AnchorOutputs))(Features(StaticRemoteKey -> Optional, AnchorOutputs -> Optional))
.modify(_.features).setToIf(testData.tags.contains(ChannelStateTestsTags.AnchorOutputsZeroFeeHtlcTxs))(Features(StaticRemoteKey -> Optional, AnchorOutputs -> Optional, AnchorOutputsZeroFeeHtlcTx -> Optional))
.modify(_.features).setToIf(testData.tags.contains(ChannelStateTestsTags.DualFunding))(Features(StaticRemoteKey -> Optional, AnchorOutputs -> Optional, AnchorOutputsZeroFeeHtlcTx -> Optional, DualFunding -> Optional))
.modify(_.channelConf.maxFundingSatoshis).setToIf(testData.tags.contains("high-max-funding-satoshis"))(Btc(0.9))
.modify(_.channelConf.maxHtlcValueInFlightMsat).setToIf(testData.tags.contains("max-htlc-value-in-flight-percent"))(100_000_000 msat)
.modify(_.channelConf.maxHtlcValueInFlightPercent).setToIf(testData.tags.contains("max-htlc-value-in-flight-percent"))(25)
.modify(_.autoReconnect).setToIf(testData.tags.contains("auto_reconnect"))(true)
@ -91,9 +88,20 @@ class PeerSpec extends FixtureSpec {
}
}
val peer: TestFSMRef[Peer.State, Peer.Data, Peer] = TestFSMRef(new Peer(aliceParams, remoteNodeId, wallet, FakeChannelFactory(channel), switchboard.ref))
val mockLimiter = TestProbe()
mockLimiter.setAutoPilot((_: ActorRef, msg: Any) => msg match {
case msg: PendingChannelsRateLimiter.AddOrRejectChannel if testData.tags.contains("rate_limited") =>
msg.replyTo ! PendingChannelsRateLimiter.ChannelRateLimited
KeepRunning
case msg: PendingChannelsRateLimiter.AddOrRejectChannel =>
msg.replyTo ! PendingChannelsRateLimiter.AcceptOpenChannel
KeepRunning
case _ => KeepRunning
})
FixtureParam(aliceParams, remoteNodeId, system, peer, peerConnection, channel, switchboard)
val peer: TestFSMRef[Peer.State, Peer.Data, Peer] = TestFSMRef(new Peer(aliceParams, remoteNodeId, wallet, FakeChannelFactory(channel), switchboard.ref, mockLimiter.ref))
FixtureParam(aliceParams, remoteNodeId, system, peer, peerConnection, channel, switchboard, mockLimiter.ref)
}
def cleanupFixture(fixture: FixtureParam): Unit = fixture.cleanup()
@ -337,89 +345,15 @@ class PeerSpec extends FixtureSpec {
assert(peer.stateData.channels.size == 1)
}
test("don't spawn a wumbo channel if wumbo feature isn't enabled") { f =>
import f._
val probe = TestProbe()
val fundingAmountBig = Channel.MAX_FUNDING + 10000.sat
system.eventStream.subscribe(probe.ref, classOf[ChannelCreated])
connect(remoteNodeId, peer, peerConnection, switchboard)
assert(peer.stateData.channels.isEmpty)
probe.send(peer, Peer.OpenChannel(remoteNodeId, fundingAmountBig, None, None, None, None, None))
assert(probe.expectMsgType[Failure].cause.getMessage == s"fundingAmount=$fundingAmountBig is too big, you must enable large channels support in 'eclair.features' to use funding above ${Channel.MAX_FUNDING} (see eclair.conf)")
}
test("don't spawn a wumbo channel if remote doesn't support wumbo", Tag(ChannelStateTestsTags.Wumbo)) { f =>
import f._
val probe = TestProbe()
val fundingAmountBig = Channel.MAX_FUNDING + 10000.sat
system.eventStream.subscribe(probe.ref, classOf[ChannelCreated])
connect(remoteNodeId, peer, peerConnection, switchboard) // Bob doesn't support wumbo, Alice does
assert(peer.stateData.channels.isEmpty)
probe.send(peer, Peer.OpenChannel(remoteNodeId, fundingAmountBig, None, None, None, None, None))
assert(probe.expectMsgType[Failure].cause.getMessage == s"fundingAmount=$fundingAmountBig is too big, the remote peer doesn't support wumbo")
}
test("don't spawn a channel if fundingSatoshis is greater than maxFundingSatoshis", Tag("high-max-funding-satoshis"), Tag(ChannelStateTestsTags.Wumbo)) { f =>
import f._
val probe = TestProbe()
val fundingAmountBig = Btc(1).toSatoshi
system.eventStream.subscribe(probe.ref, classOf[ChannelCreated])
connect(remoteNodeId, peer, peerConnection, switchboard, remoteInit = protocol.Init(Features(Wumbo -> Optional))) // Bob supports wumbo
assert(peer.stateData.channels.isEmpty)
probe.send(peer, Peer.OpenChannel(remoteNodeId, fundingAmountBig, None, None, None, None, None))
assert(probe.expectMsgType[Failure].cause.getMessage == s"fundingAmount=$fundingAmountBig is too big for the current settings, increase 'eclair.max-funding-satoshis' (see eclair.conf)")
}
test("don't spawn a channel if we don't support their channel type") { f =>
test("handle OpenChannelInterceptor spawning a user initiated open channel request ") { f =>
import f._
connect(remoteNodeId, peer, peerConnection, switchboard)
assert(peer.stateData.channels.isEmpty)
// They only support anchor outputs and we don't.
{
val open = createOpenChannelMessage(TlvStream[OpenChannelTlv](ChannelTlv.ChannelTypeTlv(ChannelTypes.AnchorOutputs())))
peerConnection.send(peer, open)
peerConnection.expectMsg(Error(open.temporaryChannelId, "invalid channel_type=anchor_outputs, expected channel_type=standard"))
}
// They only support anchor outputs with zero fee htlc txs and we don't.
{
val open = createOpenChannelMessage(TlvStream[OpenChannelTlv](ChannelTlv.ChannelTypeTlv(ChannelTypes.AnchorOutputsZeroFeeHtlcTx())))
peerConnection.send(peer, open)
peerConnection.expectMsg(Error(open.temporaryChannelId, "invalid channel_type=anchor_outputs_zero_fee_htlc_tx, expected channel_type=standard"))
}
// They want to use a channel type that doesn't exist in the spec.
{
val open = createOpenChannelMessage(TlvStream[OpenChannelTlv](ChannelTlv.ChannelTypeTlv(UnsupportedChannelType(Features(AnchorOutputs -> Optional)))))
peerConnection.send(peer, open)
peerConnection.expectMsg(Error(open.temporaryChannelId, "invalid channel_type=0x200000, expected channel_type=standard"))
}
// They want to use a channel type we don't support yet.
{
val open = createOpenChannelMessage(TlvStream[OpenChannelTlv](ChannelTlv.ChannelTypeTlv(UnsupportedChannelType(Features(Map(StaticRemoteKey -> Mandatory), unknown = Set(UnknownFeature(22)))))))
peerConnection.send(peer, open)
peerConnection.expectMsg(Error(open.temporaryChannelId, "invalid channel_type=0x401000, expected channel_type=standard"))
}
}
test("don't spawn a channel is channel type is missing with the feature bit set", Tag(ChannelStateTestsTags.ChannelType)) { f =>
import f._
val remoteInit = protocol.Init(Features(ChannelType -> Optional))
connect(remoteNodeId, peer, peerConnection, switchboard, remoteInit = remoteInit)
assert(peer.stateData.channels.isEmpty)
val open = createOpenChannelMessage()
val open = Peer.OpenChannel(remoteNodeId, 10000 sat, None, None, None, None, None)
peerConnection.send(peer, open)
peerConnection.expectMsg(Error(open.temporaryChannelId, "option_channel_type was negotiated but channel_type is missing"))
channel.expectMsgType[INPUT_INIT_CHANNEL_INITIATOR]
}
test("don't spawn a dual funded channel if not supported") { f =>
@ -493,6 +427,28 @@ class PeerSpec extends FixtureSpec {
assert(channel.expectMsgType[INPUT_INIT_CHANNEL_INITIATOR].channelType == ChannelTypes.AnchorOutputs())
}
test("handle OpenChannelInterceptor accepting an open channel message") { f =>
import f._
connect(remoteNodeId, peer, peerConnection, switchboard)
val open = createOpenChannelMessage()
peerConnection.send(peer, open)
assert(channel.expectMsgType[INPUT_INIT_CHANNEL_NON_INITIATOR].temporaryChannelId == open.temporaryChannelId)
channel.expectMsg(open)
}
test("handle OpenChannelInterceptor rejecting an open channel message", Tag("rate_limited")) { f =>
import f._
connect(remoteNodeId, peer, peerConnection, switchboard)
val open = createOpenChannelMessage()
peerConnection.send(peer, open)
peerConnection.expectMsg(Error(open.temporaryChannelId, "rate limit reached"))
assert(peer.stateData.channels.isEmpty)
}
test("use correct on-chain fee rates when spawning a channel (anchor outputs)", Tag(ChannelStateTestsTags.AnchorOutputs)) { f =>
import f._
@ -596,7 +552,7 @@ class PeerSpec extends FixtureSpec {
channel.ref
}
}
val peer = TestFSMRef(new Peer(TestConstants.Alice.nodeParams, remoteNodeId, new DummyOnChainWallet(), channelFactory, switchboard.ref))
val peer = TestFSMRef(new Peer(TestConstants.Alice.nodeParams, remoteNodeId, new DummyOnChainWallet(), channelFactory, switchboard.ref, mockLimiter.toTyped))
connect(remoteNodeId, peer, peerConnection, switchboard)
probe.send(peer, Peer.OpenChannel(remoteNodeId, 15000 sat, None, Some(100 msat), None, None, None))
val init = channel.expectMsgType[INPUT_INIT_CHANNEL_INITIATOR]
@ -666,6 +622,18 @@ class PeerSpec extends FixtureSpec {
probe.send(peer, Peer.RelayUnknownMessage(unknownMessage))
peerConnection.expectMsgType[UnknownMessage]
}
test("abort channel open request if peer reconnects before channel is accepted") { f =>
import f._
val probe = TestProbe()
val open = createOpenChannelMessage()
system.eventStream.subscribe(probe.ref, classOf[ChannelAborted])
connect(remoteNodeId, peer, peerConnection, switchboard)
peer ! SpawnChannelNonInitiator(Left(open), ChannelConfig.standard, ChannelTypes.Standard(), localParams, ActorRef.noSender)
val channelAborted = probe.expectMsgType[ChannelAborted]
assert(channelAborted.remoteNodeId == remoteNodeId)
assert(channelAborted.channelId == open.temporaryChannelId)
}
}
object PeerSpec {

View file

@ -0,0 +1,306 @@
/*
* Copyright 2023 ACINQ SAS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package fr.acinq.eclair.io
import akka.actor.testkit.typed.scaladsl.{ScalaTestWithActorTestKit, TestProbe}
import akka.actor.typed.ActorRef
import akka.actor.typed.eventstream.EventStream.Publish
import com.typesafe.config.ConfigFactory
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.bitcoin.scalacompat.{ByteVector32, SatoshiLong, Transaction, TxOut}
import fr.acinq.eclair.channel._
import fr.acinq.eclair.io.PendingChannelsRateLimiter.filterPendingChannels
import fr.acinq.eclair.router.Router
import fr.acinq.eclair.router.Router.{GetNode, PublicNode, UnknownNode}
import fr.acinq.eclair.transactions.Transactions.{ClosingTx, InputInfo}
import fr.acinq.eclair.wire.protocol._
import fr.acinq.eclair.{BlockHeight, Features, MilliSatoshiLong, NodeParams, ShortChannelId, TestConstants, TimestampSecondLong, randomBytes32, randomBytes64, randomKey}
import org.scalatest.Outcome
import org.scalatest.funsuite.FixtureAnyFunSuiteLike
import scodec.bits.{ByteVector, HexStringSyntax}
import scala.concurrent.duration.DurationInt
class PendingChannelsRateLimiterSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("application")) with FixtureAnyFunSuiteLike {
val channelIdBelowLimit1: ByteVector32 = ByteVector32(hex"0111111110000000000000000000000000000000000000000000000000000000")
val channelIdBelowLimit2: ByteVector32 = ByteVector32(hex"0222222220000000000000000000000000000000000000000000000000000000")
val newChannelId1: ByteVector32 = ByteVector32(hex"0333333330000000000000000000000000000000000000000000000000000000")
val newChannelId2: ByteVector32 = ByteVector32(hex"0444444440000000000000000000000000000000000000000000000000000000")
val channelIdPrivate1: ByteVector32 = ByteVector32(hex"0555555550000000000000000000000000000000000000000000000000000000")
val channelIdPrivate2: ByteVector32 = ByteVector32(hex"0666666660000000000000000000000000000000000000000000000000000000")
val newChannelIdPrivate1: ByteVector32 = ByteVector32(hex"077777770000000000000000000000000000000000000000000000000000000")
val channelIdAtLimit1: ByteVector32 = ByteVector32(hex"0888888880000000000000000000000000000000000000000000000000000000")
val channelIdAtLimit2: ByteVector32 = ByteVector32(hex"0999999990000000000000000000000000000000000000000000000000000000")
override protected def withFixture(test: OneArgTest): Outcome = {
val router = TestProbe[Router.GetNode]()
val probe = TestProbe[PendingChannelsRateLimiter.Response]()
val peerOnWhitelist = randomKey().publicKey
val peerOnWhitelistAtLimit = randomKey().publicKey
val nodeParams = TestConstants.Alice.nodeParams.copy(channelConf = TestConstants.Alice.nodeParams.channelConf.copy(maxPendingChannelsPerPeer = 2, maxTotalPendingChannelsPrivateNodes = 2, channelOpenerWhitelist = Set(peerOnWhitelist, peerOnWhitelistAtLimit)))
val tx = Transaction.read("010000000110f01d4a4228ef959681feb1465c2010d0135be88fd598135b2e09d5413bf6f1000000006a473044022074658623424cebdac8290488b76f893cfb17765b7a3805e773e6770b7b17200102202892cfa9dda662d5eac394ba36fcfd1ea6c0b8bb3230ab96220731967bbdb90101210372d437866d9e4ead3d362b01b615d24cc0d5152c740d51e3c55fb53f6d335d82ffffffff01408b0700000000001976a914678db9a7caa2aca887af1177eda6f3d0f702df0d88ac00000000")
val closingTx = ClosingTx(InputInfo(tx.txIn.head.outPoint, TxOut(10_000 sat, Nil), Nil), tx, None)
val channelsOnWhitelistAtLimit = Seq(
DATA_WAIT_FOR_FUNDING_CONFIRMED(commitments(peerOnWhitelistAtLimit, randomBytes32()), BlockHeight(0), None, Left(FundingCreated(randomBytes32(), ByteVector32.Zeroes, 3, randomBytes64()))),
DATA_WAIT_FOR_CHANNEL_READY(commitments(peerOnWhitelistAtLimit, randomBytes32()), ShortIds(RealScidStatus.Unknown, ShortChannelId.generateLocalAlias(), None)),
)
val peerAtLimit1 = randomKey().publicKey
val channelsAtLimit1 = Seq(
DATA_WAIT_FOR_FUNDING_CONFIRMED(commitments(peerAtLimit1, channelIdAtLimit1), BlockHeight(0), None, Left(FundingCreated(channelIdAtLimit1, ByteVector32.Zeroes, 3, randomBytes64()))),
DATA_WAIT_FOR_CHANNEL_READY(commitments(peerAtLimit1, randomBytes32()), ShortIds(RealScidStatus.Unknown, ShortChannelId.generateLocalAlias(), None)),
)
val peerAtLimit2 = randomKey().publicKey
val channelsAtLimit2 = Seq(
DATA_WAIT_FOR_DUAL_FUNDING_CONFIRMED(commitments(peerAtLimit2, channelIdAtLimit2), 0 msat, 0 msat, BlockHeight(0), BlockHeight(0), RbfStatus.NoRbf, None),
DATA_WAIT_FOR_DUAL_FUNDING_READY(commitments(peerAtLimit2, randomBytes32()), ShortIds(RealScidStatus.Unknown, ShortChannelId.generateLocalAlias(), None)),
)
val peerBelowLimit1 = randomKey().publicKey
val channelsBelowLimit1 = Seq(
DATA_WAIT_FOR_CHANNEL_READY(commitments(peerBelowLimit1, channelIdBelowLimit1), ShortIds(RealScidStatus.Unknown, ShortChannelId.generateLocalAlias(), None)),
)
val peerBelowLimit2 = randomKey().publicKey
val channelsBelowLimit2 = Seq(
DATA_WAIT_FOR_DUAL_FUNDING_READY(commitments(peerBelowLimit2, channelIdBelowLimit2), ShortIds(RealScidStatus.Unknown, ShortChannelId.generateLocalAlias(), None)),
DATA_NORMAL(commitments(peerBelowLimit2, randomBytes32()), ShortIds(RealScidStatus.Unknown, ShortChannelId.generateLocalAlias(), None), None, null, None, None, None),
DATA_SHUTDOWN(commitments(peerBelowLimit2, randomBytes32()), Shutdown(randomBytes32(), ByteVector.empty), Shutdown(randomBytes32(), ByteVector.empty), None),
DATA_CLOSING(commitments(peerBelowLimit2, randomBytes32()), BlockHeight(0), ByteVector.empty, List(), List(closingTx))
)
val privatePeer1 = randomKey().publicKey
val privatePeer2 = randomKey().publicKey
val privateChannels = Seq(
DATA_WAIT_FOR_DUAL_FUNDING_READY(commitments(privatePeer1, channelIdPrivate1), ShortIds(RealScidStatus.Unknown, ShortChannelId.generateLocalAlias(), None)),
DATA_NORMAL(commitments(privatePeer2, randomBytes32()), ShortIds(RealScidStatus.Unknown, ShortChannelId.generateLocalAlias(), None), None, null, None, None, None),
)
val publicChannels = channelsOnWhitelistAtLimit ++ channelsAtLimit1 ++ channelsAtLimit2 ++ channelsBelowLimit1 ++ channelsBelowLimit2
val publicPeers = publicChannels.map(_.metaCommitments.params.remoteNodeId).toSet
assert(Set(peerOnWhitelistAtLimit, peerAtLimit1, peerAtLimit2, peerBelowLimit1, peerBelowLimit2) == publicPeers)
val limiter = testKit.spawn(PendingChannelsRateLimiter(nodeParams, router.ref, publicChannels ++ privateChannels))
filterPendingChannels(publicChannels ++ privateChannels).foreach {
case p if publicPeers.contains(p._1) => router.expectMessageType[GetNode].replyTo ! PublicNode(announcement(p._1), 1, 1 sat)
case p => router.expectMessageType[GetNode].replyTo ! UnknownNode(p._1)
}
router.expectNoMessage(10 millis)
withFixture(test.toNoArgTest(FixtureParam(router, nodeParams, probe, limiter, Seq(peerAtLimit1, peerAtLimit2), Seq(peerBelowLimit1, peerBelowLimit2), Seq(peerOnWhitelist, peerOnWhitelistAtLimit), Seq(privatePeer1, privatePeer2))))
}
def announcement(nodeId: PublicKey): NodeAnnouncement = NodeAnnouncement(randomBytes64(), Features.empty, 1 unixsec, nodeId, Color(100.toByte, 200.toByte, 300.toByte), "node-alias", NodeAddress.fromParts("1.2.3.4", 42000).get :: Nil)
def commitments(remoteNodeId: PublicKey, channelId: ByteVector32): MetaCommitments = {
val commitments = CommitmentsSpec.makeCommitments(500_000 msat, 400_000 msat, TestConstants.Alice.nodeParams.nodeId, remoteNodeId, announceChannel = true)
commitments.copy(params = commitments.params.copy(channelId = channelId))
}
case class FixtureParam(router: TestProbe[Router.GetNode], nodeParams: NodeParams, probe: TestProbe[PendingChannelsRateLimiter.Response], limiter: ActorRef[PendingChannelsRateLimiter.Command], peersAtLimit: Seq[PublicKey], peersBelowLimit: Seq[PublicKey], peersOnWhitelist: Seq[PublicKey], privatePeers: Seq[PublicKey])
test("always accept requests from nodes on white list") { f =>
import f._
peersOnWhitelist.foreach { peer =>
for (_ <- 0 to nodeParams.channelConf.maxPendingChannelsPerPeer + nodeParams.channelConf.maxTotalPendingChannelsPrivateNodes) {
limiter ! PendingChannelsRateLimiter.AddOrRejectChannel(probe.ref, peer, randomBytes32())
router.expectNoMessage(10 millis)
probe.expectMessage(PendingChannelsRateLimiter.AcceptOpenChannel)
}
}
}
test("requests from public nodes are only accepted and tracked while under per node limit") { f =>
import f._
// peers at limit are rejected
peersAtLimit.foreach { peer =>
limiter ! PendingChannelsRateLimiter.AddOrRejectChannel(probe.ref, peer, randomBytes32())
router.expectMessageType[GetNode].replyTo ! PublicNode(announcement(peer), 1, 1 sat)
probe.expectMessage(PendingChannelsRateLimiter.ChannelRateLimited)
}
// peers below limit will accept and track one more channel request
peersBelowLimit.foreach { peer =>
limiter ! PendingChannelsRateLimiter.AddOrRejectChannel(probe.ref, peer, randomBytes32())
router.expectMessageType[GetNode].replyTo ! PublicNode(announcement(peer), 1, 1 sat)
probe.expectMessage(PendingChannelsRateLimiter.AcceptOpenChannel)
}
// peers now at limit reject and do not track additional channel requests
peersBelowLimit.foreach { peer =>
limiter ! PendingChannelsRateLimiter.AddOrRejectChannel(probe.ref, peer, randomBytes32())
router.expectMessageType[GetNode].replyTo ! PublicNode(announcement(peer), 1, 1 sat)
probe.expectMessage(PendingChannelsRateLimiter.ChannelRateLimited)
}
// peers initially at limit still reject channel requests
peersAtLimit.foreach { peer =>
limiter ! PendingChannelsRateLimiter.AddOrRejectChannel(probe.ref, peer, randomBytes32())
router.expectMessageType[GetNode].replyTo ! PublicNode(announcement(peer), 1, 1 sat)
probe.expectMessage(PendingChannelsRateLimiter.ChannelRateLimited)
}
// when new channel ids assigned, stop tracking the old channel id and only track the new one
system.eventStream ! Publish(ChannelIdAssigned(null, peersBelowLimit.head, channelIdBelowLimit1, newChannelId1))
system.eventStream ! Publish(ChannelIdAssigned(null, peersBelowLimit.last, channelIdBelowLimit2, newChannelId2))
// ignore channel id assignments for untracked channels
(peersBelowLimit ++ peersAtLimit).foreach { peer =>
system.eventStream ! Publish(ChannelIdAssigned(null, peer, randomBytes32(), randomBytes32()))
}
// ignore channel id assignments for private peers
system.eventStream ! Publish(ChannelIdAssigned(null, randomKey().publicKey, channelIdPrivate1, newChannelIdPrivate1))
// ignore confirm/close/abort events for channels not tracked for a public peer
system.eventStream ! Publish(ChannelOpened(null, peersAtLimit.head, newChannelId1))
system.eventStream ! Publish(ChannelClosed(null, channelIdAtLimit1, null, commitments(peersBelowLimit.head, randomBytes32())))
system.eventStream ! Publish(ChannelAborted(null, peersBelowLimit.last, randomBytes32()))
// after channel events for untracked channels, new channel requests for public peers are still rejected
(peersBelowLimit ++ peersAtLimit).foreach { peer =>
limiter ! PendingChannelsRateLimiter.AddOrRejectChannel(probe.ref, peer, randomBytes32())
router.expectMessageType[GetNode].replyTo ! PublicNode(announcement(peer), 1, 1 sat)
probe.expectMessage(PendingChannelsRateLimiter.ChannelRateLimited)
}
// stop tracking channels that are confirmed/closed/aborted for a public peer
system.eventStream ! Publish(ChannelOpened(null, peersAtLimit.head, channelIdAtLimit1))
system.eventStream ! Publish(ChannelClosed(null, newChannelId1, null, commitments(peersBelowLimit.head, newChannelId1)))
system.eventStream ! Publish(ChannelAborted(null, peersBelowLimit.last, newChannelId2))
// new channel requests for peers below limit are accepted after matching confirmed/closed/aborted
(peersBelowLimit :+ peersAtLimit.head).foreach { peer =>
limiter ! PendingChannelsRateLimiter.AddOrRejectChannel(probe.ref, peer, randomBytes32())
router.expectMessageType[GetNode].replyTo ! PublicNode(announcement(peer), 1, 1 sat)
probe.expectMessage(PendingChannelsRateLimiter.AcceptOpenChannel)
}
// new channels requests for untracked public peers does not change previously tracked peers
limiter ! PendingChannelsRateLimiter.AddOrRejectChannel(probe.ref, randomKey().publicKey, randomBytes32())
router.expectMessageType[GetNode].replyTo ! PublicNode(announcement(randomKey().publicKey), 1, 1 sat)
probe.expectMessage(PendingChannelsRateLimiter.AcceptOpenChannel)
// public peers at limit still reject channel requests
(peersBelowLimit ++ peersAtLimit).foreach { peer =>
limiter ! PendingChannelsRateLimiter.AddOrRejectChannel(probe.ref, peer, randomBytes32())
router.expectMessageType[GetNode].replyTo ! PublicNode(announcement(peer), 1, 1 sat)
probe.expectMessage(PendingChannelsRateLimiter.ChannelRateLimited)
}
}
test("requests from private nodes are only accepted and tracked while under global limit") { f =>
import f._
// channels requests are accepted when below private channels limit
limiter ! PendingChannelsRateLimiter.AddOrRejectChannel(probe.ref, privatePeers.last, channelIdPrivate2)
router.expectMessageType[GetNode].replyTo ! UnknownNode(privatePeers.last)
probe.expectMessage(PendingChannelsRateLimiter.AcceptOpenChannel)
// channels requests are rejected when at the private channels limit
for (_ <- 0 until 2) {
limiter ! PendingChannelsRateLimiter.AddOrRejectChannel(probe.ref, randomKey().publicKey, randomBytes32())
router.expectMessageType[GetNode].replyTo ! UnknownNode(randomKey().publicKey)
probe.expectMessage(PendingChannelsRateLimiter.ChannelRateLimited)
}
// when new channel ids assigned, stop tracking the old channel id and only track the new one
system.eventStream ! Publish(ChannelIdAssigned(null, privatePeers.head, channelIdPrivate1, newChannelIdPrivate1))
// ignore channel id assignments for untracked node/channel pairs
system.eventStream ! Publish(ChannelIdAssigned(null, randomKey().publicKey, channelIdPrivate1, randomBytes32()))
system.eventStream ! Publish(ChannelIdAssigned(null, privatePeers.head, randomBytes32(), randomBytes32()))
// ignore channel id assignments for public peer channels
system.eventStream ! Publish(ChannelIdAssigned(null, peersBelowLimit.head, channelIdBelowLimit1, newChannelId1))
system.eventStream ! Publish(ChannelIdAssigned(null, peersBelowLimit.last, channelIdBelowLimit2, newChannelId2))
// ignore confirm/close/abort events for node/channel pairs not tracked for a private peer
system.eventStream ! Publish(ChannelOpened(null, privatePeers.head, newChannelId1))
system.eventStream ! Publish(ChannelClosed(null, newChannelId1, null, commitments(privatePeers.last, newChannelId1)))
system.eventStream ! Publish(ChannelAborted(null, peersBelowLimit.last, newChannelIdPrivate1))
// after channel events for untracked channels, new channel requests for private peers are still rejected
limiter ! PendingChannelsRateLimiter.AddOrRejectChannel(probe.ref, randomKey().publicKey, randomBytes32())
router.expectMessageType[GetNode].replyTo ! UnknownNode(randomKey().publicKey)
probe.expectMessage(PendingChannelsRateLimiter.ChannelRateLimited)
// stop tracking channels that are confirmed/closed/aborted for a private peer
system.eventStream ! Publish(ChannelOpened(null, privatePeers.head, newChannelIdPrivate1))
system.eventStream ! Publish(ChannelClosed(null, channelIdPrivate2, null, commitments(privatePeers.last, channelIdPrivate2)))
// new channel requests for peers below limit are accepted after matching confirmed/closed/aborted
limiter ! PendingChannelsRateLimiter.AddOrRejectChannel(probe.ref, privatePeers.head, channelIdPrivate1)
router.expectMessageType[GetNode].replyTo ! UnknownNode(privatePeers.head)
probe.expectMessage(PendingChannelsRateLimiter.AcceptOpenChannel)
// second request from a different node but with the same channel id
limiter ! PendingChannelsRateLimiter.AddOrRejectChannel(probe.ref, randomKey().publicKey, channelIdPrivate1)
router.expectMessageType[GetNode].replyTo ! UnknownNode(randomKey().publicKey)
probe.expectMessage(PendingChannelsRateLimiter.AcceptOpenChannel)
// abort the reused channel id for one private node; private channels now under the limit by one
system.eventStream ! Publish(ChannelAborted(null, privatePeers.head, channelIdPrivate1))
// new channels requests for untracked public peers do not increase the limit for private peers
limiter ! PendingChannelsRateLimiter.AddOrRejectChannel(probe.ref, randomKey().publicKey, channelIdPrivate1)
router.expectMessageType[GetNode].replyTo ! PublicNode(announcement(randomKey().publicKey), 1, 1 sat)
probe.expectMessage(PendingChannelsRateLimiter.AcceptOpenChannel)
// add a new private node that reuses the channel id again; private channels will be at the limit again
limiter ! PendingChannelsRateLimiter.AddOrRejectChannel(probe.ref, randomKey().publicKey, channelIdPrivate1)
router.expectMessageType[GetNode].replyTo ! UnknownNode(randomKey().publicKey)
probe.expectMessage(PendingChannelsRateLimiter.AcceptOpenChannel)
// reject channel requests from private peers when at the limit
limiter ! PendingChannelsRateLimiter.AddOrRejectChannel(probe.ref, randomKey().publicKey, randomBytes32())
router.expectMessageType[GetNode].replyTo ! UnknownNode(randomKey().publicKey)
probe.expectMessage(PendingChannelsRateLimiter.ChannelRateLimited)
}
test("reject any requests that come in during the restore") { f =>
import f._
val channels = Seq(
DATA_WAIT_FOR_CHANNEL_READY(commitments(randomKey().publicKey, randomBytes32()), ShortIds(RealScidStatus.Unknown, ShortChannelId.generateLocalAlias(), None)),
DATA_WAIT_FOR_DUAL_FUNDING_READY(commitments(randomKey().publicKey, randomBytes32()), ShortIds(RealScidStatus.Unknown, ShortChannelId.generateLocalAlias(), None)),
DATA_NORMAL(commitments(randomKey().publicKey, randomBytes32()), ShortIds(RealScidStatus.Unknown, ShortChannelId.generateLocalAlias(), None), None, null, None, None, None),
DATA_SHUTDOWN(commitments(randomKey().publicKey, randomBytes32()), Shutdown(randomBytes32(), ByteVector.empty), Shutdown(randomBytes32(), ByteVector.empty), None),
DATA_WAIT_FOR_FUNDING_CONFIRMED(commitments(randomKey().publicKey, randomBytes32()), BlockHeight(0), None, Left(FundingCreated(randomBytes32(), ByteVector32.Zeroes, 3, randomBytes64()))),
)
val restoredLimiter = testKit.spawn(PendingChannelsRateLimiter(nodeParams, router.ref, channels))
// process one restored private channel
router.expectMessageType[GetNode].replyTo ! UnknownNode(randomKey().publicKey)
// handle a request that comes in during the restore
restoredLimiter ! PendingChannelsRateLimiter.AddOrRejectChannel(probe.ref, randomKey().publicKey, randomBytes32())
probe.expectMessage(PendingChannelsRateLimiter.ChannelRateLimited)
// process one restored public peer channel
router.expectMessageType[GetNode].replyTo ! PublicNode(announcement(randomKey().publicKey), 1, 1 sat)
// handle a request that comes in during the restore
restoredLimiter ! PendingChannelsRateLimiter.AddOrRejectChannel(probe.ref, randomKey().publicKey, randomBytes32())
probe.expectMessage(PendingChannelsRateLimiter.ChannelRateLimited)
// process last restored public peer channel
router.expectMessageType[GetNode].replyTo ! PublicNode(announcement(randomKey().publicKey), 1, 1 sat)
// handle new channel requests for a private peer
restoredLimiter ! PendingChannelsRateLimiter.AddOrRejectChannel(probe.ref, randomKey().publicKey, randomBytes32())
router.expectMessageType[GetNode].replyTo ! UnknownNode(randomKey().publicKey)
probe.expectMessage(PendingChannelsRateLimiter.AcceptOpenChannel)
}
}